From cd731ca97cbe81fdda97e5358abbbfc258c72bec Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Wed, 17 Oct 2018 19:23:11 +0200 Subject: [PATCH] [enh] Add post/get to '/network/peering' --- .../main/assembly/config/elasticsearch.yml | 45 +- .../src/test/misc/test_post_peering.sh | 12 + .../org/duniter/elasticsearch/PluginInit.java | 22 +- .../duniter/elasticsearch/PluginSettings.java | 102 +++- .../elasticsearch/dao/impl/PeerDaoImpl.java | 6 +- .../elasticsearch/rest/RestModule.java | 8 +- .../network/RestNetworkPeeringGetAction.java | 89 +++ .../network/RestNetworkPeeringPostAction.java | 117 ++++ .../service/AbstractService.java | 2 +- .../service/BlockchainService.java | 5 + .../elasticsearch/service/NetworkService.java | 540 ++++++++++++++++++ .../elasticsearch/service/PeerService.java | 10 +- .../elasticsearch/service/ServiceLocator.java | 2 +- .../elasticsearch/synchro/SynchroService.java | 188 +----- .../cesium-plus-pod-core_en_GB.properties | 1 + .../cesium-plus-pod-core_fr_FR.properties | 1 + .../subscription/PluginSettings.java | 15 + .../service/NetworkServiceConfiguration.java | 22 + .../subscription/service/ServiceModule.java | 3 + .../service/SubscriptionService.java | 4 +- .../plugin-descriptor.properties | 2 +- .../elasticsearch/user/PluginSettings.java | 57 +- .../service/NetworkServiceConfiguration.java | 22 + .../user/service/ServiceModule.java | 3 +- .../user/service/UserService.java | 12 +- .../user/synchro/SynchroModule.java | 16 +- pom.xml | 2 +- 27 files changed, 1047 insertions(+), 261 deletions(-) create mode 100755 cesium-plus-pod-assembly/src/test/misc/test_post_peering.sh create mode 100644 cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringGetAction.java create mode 100644 cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPostAction.java create mode 100644 cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java create mode 100644 cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/NetworkServiceConfiguration.java create mode 100644 cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/NetworkServiceConfiguration.java diff --git a/cesium-plus-pod-assembly/src/main/assembly/config/elasticsearch.yml b/cesium-plus-pod-assembly/src/main/assembly/config/elasticsearch.yml index 875d104d..1071c343 100644 --- a/cesium-plus-pod-assembly/src/main/assembly/config/elasticsearch.yml +++ b/cesium-plus-pod-assembly/src/main/assembly/config/elasticsearch.yml @@ -180,12 +180,35 @@ duniter.p2p.includes.endpoints: [ # # Pass a list of pubkeys to always synchronize (default: <empty>) # -# duniter.p2p.includes.pubkeys: [""] +# duniter.p2p.includes.pubkeys: [ +# "38MEAZN68Pz1DTvT3tqgxx4yQP6snJCQhPqEFxbDk4aE" +# ] # -# Need to full resync using P2P endpoints, at startup. Useful when new pods has been added +# Enable a full synchro. This will compare each documents from other peers. # # duniter.p2p.fullResyncAtStartup: true # +# Enable publishing of pod endpoints to the network (see the peer document in Duniter protocol). (Default: '${duniter.p2p.enable}') +# +# duniter.p2p.peering.enable: false +# +# Define targeted API (for peers slection) where to send the peer document (if peering is enable). (Default: ["BASIC_MERKLED_API", "BMAS"]) +# This API should accept a POST request to '/network/peering' (will send a see the Duniter protocol) +# +# duniter.p2p.peering.targetedApis: [ +# "BASIC_MERKLED_API", "BMAS" +# ] +# +# Define cluster API to publish (if peering is enable). By default, all compatible API +# +# duniter.p2p.peering.publishedApis: [ +# "ES_CORE_API", "ES_USER_API", "ES_SUBSCRIPTION_API" +# ] +# +# Interval for publishing peer document to the network, in seconds. (Default: 7200 =2h) +# +# duniter.p2p.peering.interval: 7200 +# # ---------------------------------- Duniter4j document moderation --------------- # # Filter too old document, if time older that 'maxPastDelta' (in seconds). (default: 7200 =2h) @@ -255,25 +278,25 @@ duniter.subscription.enable: false # # Email subscription: URL to a Cesium site, for links in the email content (default: https://g1.duniter.fr) # -# duniter.subscription.email.link.url: 'http://domain.com/cesium' +# duniter.subscription.email.link.url: 'https://domain.com/cesium' # -# ---------------------------------- Duniter4j User (profile, message) module ------------------- +# ---------------------------------- Duniter4j Share module ------------------- # # -# Share link: `og:site_name` (default: 'Cesium') +# Share title: `og:site_name` (default: 'Cesium') # # duniter.share.site.name: 'Cesium - Ğ1' # -# Share page link : URL to a web site, for links to a page (default: https://g1.duniter.fr/#/app/page/view/{id}/{title} ) -# Usable variables are: {id} and {title} +# URL to a page (default: https://g1.duniter.fr/#/app/page/view/{id}/{title} ) +# Note: available variables are {id} and {title} # # duniter.share.page.link.url: 'https://domain.com/cesium/#/app/page/view/{id}/{title}' # -# Share user link : URL to a web site, for links to a user (default: https://g1.duniter.fr/#https://g1.duniter.fr/#/app/wot/{pubkey}/{title} ) -# Usable variables are: {pubkey} and {title} +# URL to a user profile (default: https://g1.duniter.fr/#/app/wot/{pubkey}/{title} ) +# Note: available variables are {pubkey} and {title} # # duniter.share.user.link.url: 'https://domain.com/cesium/#/app/wot/{pubkey}/{title}' # -# Share default image: URL of an image (min size of 200x200px) to use as default image for `og:image` (default: https://g1.duniter.fr/img/logo_200px.png) +# Default image to share (min size of 200x200px) for `og:image` (default: https://g1.duniter.fr/img/logo_200px.png) # -# duniter.share.image.default.url: https://g1.duniter.fr/img/logo_200px.png \ No newline at end of file +# duniter.share.image.default.url: 'https://domain.com/cesium/img/logo_200px.png' \ No newline at end of file diff --git a/cesium-plus-pod-assembly/src/test/misc/test_post_peering.sh b/cesium-plus-pod-assembly/src/test/misc/test_post_peering.sh new file mode 100755 index 00000000..28b1d37d --- /dev/null +++ b/cesium-plus-pod-assembly/src/test/misc/test_post_peering.sh @@ -0,0 +1,12 @@ +#!/bin/sh + +curl -XPOST 'http://localhost:9200/network/peering' -d 'Version: 10 +Type: Peer +Currency: g1 +PublicKey: G2CBgZBPLe6FSFUgpx2Jf1Aqsgta6iib3vmDRA1yLiqU +Block: 162921-000001698A08C8877FAF02A3C4547CD932765CF3994FF4747F3C1EC0EA303C7E +Endpoints: +ES_USER_API localhost 9201 +ES_SUBSCRIPTION_API localhost 9201 +ES_CORE_API localhost 9201 +YzUtzvZEzcaKvrb5TCWnR7+J2L+AUkp9JX0EnKzbw4RstVzT4tYXMBUCfMgQm2TwkbZPk/SCnQ38aixv+CfZBQ==' \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 21073f65..21b83e1b 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -26,10 +26,7 @@ import org.duniter.core.client.model.elasticsearch.Currency; import org.duniter.core.client.model.local.Peer; import org.duniter.elasticsearch.dao.*; import org.duniter.elasticsearch.rest.security.RestSecurityController; -import org.duniter.elasticsearch.service.BlockchainService; -import org.duniter.elasticsearch.service.CurrencyService; -import org.duniter.elasticsearch.service.DocStatService; -import org.duniter.elasticsearch.service.PeerService; +import org.duniter.elasticsearch.service.*; import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -255,10 +252,15 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .indexLastBlocks(peer) .listenAndIndexNewBlock(peer); - // Index peers (and listen if new peer appear) - injector.getInstance(PeerService.class) - .listenAndIndexPeers(peer); + if (logger.isInfoEnabled()) { + logger.info(String.format("[%s] Indexing blockchain [OK]", currencyName)); + } + // Index peers (and listen if new peer appear) + if (pluginSettings.enableSynchroDiscovery()) { + injector.getInstance(PeerService.class) + .listenAndIndexPeers(peer); + } // Start synchro if (pluginSettings.enableSynchro()) { @@ -266,8 +268,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .startScheduling(); } - if (logger.isInfoEnabled()) { - logger.info(String.format("[%s] Indexing blockchain [OK]", currencyName)); + // Start publish peering + if (pluginSettings.enablePeering()) { + injector.getInstance(NetworkService.class) + .startPublishingPeerDocumentToNetwork(); } } catch(Throwable e){ diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 8018e4ca..81de4f67 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -28,9 +28,14 @@ import org.apache.commons.io.FileUtils; import org.duniter.core.client.config.Configuration; import org.duniter.core.client.config.ConfigurationOption; import org.duniter.core.client.config.ConfigurationProvider; +import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.StringUtils; +import org.duniter.core.util.crypto.CryptoUtils; +import org.duniter.core.util.crypto.KeyPair; import org.duniter.elasticsearch.i18n.I18nInitializer; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -43,10 +48,8 @@ import org.nuiton.i18n.I18n; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.Set; +import java.util.*; +import java.util.stream.Collectors; import static org.nuiton.i18n.I18n.t; @@ -57,11 +60,14 @@ import static org.nuiton.i18n.I18n.t; */ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { - protected final Settings settings; + private static KeyPair nodeKeyPair; + private static boolean isRandomNodeKeyPair; + private static String nodePubkey; + protected final Settings settings; private List<String> i18nBundleNames = new ArrayList<>(); // Default - private String clusterRemoteUrl; + private final CryptoService cryptoService; /** * Delegate application config. @@ -70,10 +76,12 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { protected final org.duniter.core.client.config.Configuration clientConfig; @Inject - public PluginSettings(org.elasticsearch.common.settings.Settings settings) { + public PluginSettings(org.elasticsearch.common.settings.Settings settings, + CryptoService cryptoService) { super(settings); this.settings = settings; + this.cryptoService = cryptoService; this.applicationConfig = new ApplicationConfig(); // Cascade the application config to the client module @@ -265,6 +273,44 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsBoolean("duniter.p2p.ws.enable", true); } + public boolean enablePeering() { + return this.settings.getAsBoolean("duniter.p2p.peering.enable", enableSynchro()); + } + + /** + * Endpoint API to publish, in the emitted peer document. By default, plugins will defined their own API + * @return + */ + public List<EndpointApi> getPeeringPublishedApis() { + String[] targetedApis = settings.getAsArray("duniter.p2p.peering.publishedApis"); + if (CollectionUtils.isEmpty(targetedApis)) return null; + + return Arrays.stream(targetedApis).map(EndpointApi::valueOf).collect(Collectors.toList()); + } + + /** + * Targeted API where to send the peer document. + * This API should accept a POST request to '/network/peering' (like Duniter node, but can also be a pod) + * @return + */ + public List<EndpointApi> getPeeringTargetedApis() { + String[] targetedApis = settings.getAsArray("duniter.p2p.peering.targetedApis", new String[]{ + EndpointApi.BASIC_MERKLED_API.name(), + EndpointApi.BMAS.name() + }); + if (CollectionUtils.isEmpty(targetedApis)) return null; + + return Arrays.stream(targetedApis).map(EndpointApi::valueOf).collect(Collectors.toList()); + } + + /** + * Interval (in seconds) between publications of the peer document + * @return + */ + public int getPeeringInterval() { + return this.settings.getAsInt("duniter.p2p.peering.interval", 7200 /*=2h*/); + } + public boolean fullResyncAtStartup() { return settings.getAsBoolean("duniter.p2p.fullResyncAtStartup", false); } @@ -293,6 +339,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsInt("duniter.retry.count", 5); } + /** + * Time before retry (in millis) + * @return + */ public int getNodeRetryWaitDuration() { return settings.getAsInt("duniter.retry.waitDuration", 5000); } @@ -432,4 +482,42 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { implementationVersion); } } + + + public KeyPair getNodeKeypair() { + initNodeKeyring(); + return this.nodeKeyPair; + } + + public boolean isRandomNodeKeypair() { + initNodeKeyring(); + return this.isRandomNodeKeyPair; + } + + public String getNodePubkey() { + initNodeKeyring(); + return this.nodePubkey; + } + + protected synchronized void initNodeKeyring() { + if (this.nodeKeyPair != null) return; + if (StringUtils.isNotBlank(getKeyringSalt()) && + StringUtils.isNotBlank(getKeyringPassword())) { + this.nodeKeyPair = cryptoService.getKeyPair(getKeyringSalt(), getKeyringPassword()); + this.nodePubkey = CryptoUtils.encodeBase58(this.nodeKeyPair.getPubKey()); + this.isRandomNodeKeyPair = false; + } + else { + // Use a ramdom keypair + this.nodeKeyPair = cryptoService.getRandomKeypair(); + this.nodePubkey = CryptoUtils.encodeBase58(this.nodeKeyPair.getPubKey()); + this.isRandomNodeKeyPair = true; + + logger.warn(String.format("No keyring in config. salt/password (or keyring) is need to signed user event documents. Will use a generated key [%s]", this.nodePubkey)); + if (logger.isDebugEnabled()) { + logger.debug(String.format(" salt: " + getKeyringSalt().replaceAll(".", "*"))); + logger.debug(String.format("password: " + getKeyringPassword().replaceAll(".", "*"))); + } + } + } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index dcf605ec..bc9c7761 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -219,10 +219,10 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { } @Override - public void updatePeersAsDown(String currencyName, long upTimeLimit) { + public void updatePeersAsDown(String currencyName, long upTimeLimitInSec) { if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] Setting peers as DOWN, if older than [%s]...", currencyName, new Date(upTimeLimit*1000))); + logger.debug(String.format("[%s] Setting peers as DOWN, if older than [%s]...", currencyName, new Date(upTimeLimitInSec*1000))); } SearchRequestBuilder searchRequest = client.prepareSearch(currencyName) @@ -232,7 +232,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { // Query = filter on lastUpTime BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() // where lastUpTime < upTimeLimit - .filter(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(upTimeLimit)) + .filter(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(upTimeLimitInSec)) // AND status = UP .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name())); searchRequest.setQuery(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, QueryBuilders.constantScoreQuery(boolQuery))); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java index fa523902..0a872252 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java @@ -23,7 +23,8 @@ package org.duniter.elasticsearch.rest; */ import org.duniter.elasticsearch.rest.attachment.RestImageAttachmentAction; -import org.duniter.elasticsearch.rest.currency.RestCurrencyIndexAction; +import org.duniter.elasticsearch.rest.network.RestNetworkPeeringGetAction; +import org.duniter.elasticsearch.rest.network.RestNetworkPeeringPostAction; import org.duniter.elasticsearch.rest.node.RestNodeSummaryGetAction; import org.duniter.elasticsearch.rest.security.RestSecurityAuthAction; import org.duniter.elasticsearch.rest.security.RestSecurityController; @@ -51,5 +52,10 @@ public class RestModule extends AbstractModule implements Module { // Currency //bind(RestCurrencyIndexAction.class).asEagerSingleton(); + // Network + bind(RestNetworkPeeringGetAction.class).asEagerSingleton(); + bind(RestNetworkPeeringPostAction.class).asEagerSingleton(); + + } } \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringGetAction.java new file mode 100644 index 00000000..7822dbf4 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringGetAction.java @@ -0,0 +1,89 @@ +package org.duniter.elasticsearch.rest.network; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.entity.ContentType; +import org.duniter.core.client.model.bma.NetworkPeering; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.NetworkService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.*; +import org.nuiton.i18n.I18n; + +import java.io.IOException; + +/** + * A rest to post a request to process a new currency/peer. + * + */ +public class RestNetworkPeeringGetAction extends BaseRestHandler { + + + private NetworkService networkService; + + @Inject + public RestNetworkPeeringGetAction(Settings settings, PluginSettings pluginSettings, RestController controller, Client client, RestSecurityController securityController, + NetworkService networkService) { + super(settings, controller, client); + + if (StringUtils.isBlank(pluginSettings.getClusterRemoteHost())) { + logger.warn(I18n.t("duniter.p2p.error.noRemoteUrl")); + } + else { + securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/network/peering"); + + controller.registerHandler(RestRequest.Method.GET, "/network/peering", this); + controller.registerHandler(RestRequest.Method.GET, "/{currency}/network/peering", this); + } + + this.networkService = networkService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String currency = request.param("currency"); + NetworkPeering peering = networkService.getLastPeering(currency); + + try { + channel.sendResponse(new BytesRestResponse(RestStatus.OK, + ContentType.APPLICATION_JSON.toString(), + getObjectMapper() + .writerWithDefaultPrettyPrinter() + .writeValueAsString(peering))); + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while generating JSON for [/network/peering]: %s", ioe.getMessage()), ioe); + } + } + + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPostAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPostAction.java new file mode 100644 index 00000000..399a179f --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPostAction.java @@ -0,0 +1,117 @@ +package org.duniter.elasticsearch.rest.network; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.apache.http.entity.ContentType; +import org.duniter.core.client.model.bma.NetworkPeering; +import org.duniter.core.client.model.bma.NetworkPeerings; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.rest.XContentThrowableRestResponse; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.NetworkService; +import org.duniter.elasticsearch.service.PeerService; +import org.duniter.elasticsearch.service.ServiceLocator; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.*; +import org.nuiton.i18n.I18n; +import org.yaml.snakeyaml.util.UriEncoder; + +import java.io.IOException; +import java.io.StringReader; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +/** + * A rest to post a request to process a new currency/peer. + * + */ +public class RestNetworkPeeringPostAction extends BaseRestHandler { + + + private NetworkService networkService; + + @Inject + public RestNetworkPeeringPostAction(Settings settings, PluginSettings pluginSettings, RestController controller, Client client, + RestSecurityController securityController, + NetworkService networkService) { + super(settings, controller, client); + + if (StringUtils.isBlank(pluginSettings.getClusterRemoteHost())) { + logger.warn(I18n.t("duniter.p2p.error.noRemoteUrl")); + } + else { + securityController.allow(RestRequest.Method.POST, "/network/peering"); + controller.registerHandler(RestRequest.Method.POST, "/network/peering", this); + } + + this.networkService = networkService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + + try { + Properties content = new Properties(); + content.load(new StringReader(request.content().toUtf8())); + + String peerDocument = content.getProperty("peer"); + if (StringUtils.isBlank(peerDocument)) { + throw new TechnicalException("Inavlid request: 'peer' property not found"); + } + + // Decode content + peerDocument = UriEncoder.decode(peerDocument); + logger.debug("Received peer document: " + peerDocument); + + NetworkPeering peering = networkService.checkAndSavePeering(peerDocument); + + channel.sendResponse(new BytesRestResponse( + RestStatus.OK, + ContentType.APPLICATION_JSON.toString(), + getObjectMapper() + .writerWithDefaultPrettyPrinter() // enable pretty printer + .writeValueAsBytes(peering))); + } + catch(Exception e) { + logger.debug("Error while parsing peer document: " + e.getMessage()); + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } + } + + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index aa5d0fb0..139ace81 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -104,7 +104,7 @@ public abstract class AbstractService implements Bean { protected void waitReady() { try { while (!ready) { - Thread.sleep(500); + Thread.sleep(1000 /*1sec*/); } } catch (InterruptedException e){ // Silent diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 59f7baeb..a0dc8c6c 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -48,10 +48,15 @@ import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.dao.BlockDao; import org.duniter.elasticsearch.exception.DuplicateIndexIdException; import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequestBuilder; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfoAction; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction; import org.nuiton.i18n.I18n; import java.io.IOException; diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java new file mode 100644 index 00000000..ebf295c6 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java @@ -0,0 +1,540 @@ +package org.duniter.elasticsearch.service; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.commons.lang3.ArrayUtils; +import org.duniter.core.client.dao.CurrencyDao; +import org.duniter.core.client.dao.PeerDao; +import org.duniter.core.client.model.bma.*; +import org.duniter.core.client.model.local.Currency; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.service.HttpService; +import org.duniter.core.client.service.bma.NetworkRemoteService; +import org.duniter.core.client.util.KnownBlocks; +import org.duniter.core.client.util.KnownCurrencies; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.Preconditions; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.CurrencyExtendDao; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.inject.Inject; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Created by Benoit on 30/03/2015. + */ +public class NetworkService extends AbstractService { + + private static final BlockchainBlock DEFAULT_BLOCK = KnownBlocks.getFirstBlock(KnownCurrencies.G1); + + private CurrencyExtendDao currencyDao; + private BlockchainService blockchainService; + private Map<String, NetworkPeering> peeringByCurrencyCache = Maps.newHashMap(); + + // API where to send the peer document + private final static Set<EndpointApi> targetPeersEndpointApis = Sets.newHashSet(); + // API to include inside the peer document + private final static Set<EndpointApi> publishedEndpointApis = Sets.newHashSet(); + + private final ThreadPool threadPool; + private final PeerDao peerDao; + private HttpService httpService; + private NetworkRemoteService networkRemoteService; + private PeerService peerService; + + @Inject + public NetworkService(Duniter4jClient client, + PluginSettings settings, + CryptoService cryptoService, + CurrencyDao currencyDao, + PeerDao peerDao, + BlockchainService blockchainService, + PeerService peerService, + ThreadPool threadPool, + final ServiceLocator serviceLocator + ) { + super("duniter.network", client, settings, cryptoService); + this.peerDao = peerDao; + this.currencyDao = (CurrencyExtendDao)currencyDao; + this.blockchainService = blockchainService; + this.peerService = peerService; + this.threadPool = threadPool; + threadPool.scheduleOnStarted(() -> { + this.httpService = serviceLocator.getHttpService(); + this.networkRemoteService = serviceLocator.getNetworkRemoteService(); + setIsReady(true); + }); + + // If published API defined in settings, use this list + if (CollectionUtils.isNotEmpty(pluginSettings.getPeeringPublishedApis())) { + addAllPublishEndpointApis(pluginSettings.getPeeringPublishedApis()); + } + // Else (nothing in settings), register ES_CORE_API as published API + else { + addPublishEndpointApi(EndpointApi.ES_CORE_API); + } + + // If targeted API defined in settings, use this list + if (CollectionUtils.isNotEmpty(pluginSettings.getPeeringTargetedApis())) { + addAllTargetPeerEndpointApis(pluginSettings.getPeeringTargetedApis()); + } + } + + + protected List<Peer> getConfigIncludesPeers(final String currencyId, final EndpointApi api) { + Preconditions.checkNotNull(currencyId); + String[] endpoints = pluginSettings.getSynchroIncludesEndpoints(); + if (ArrayUtils.isEmpty(endpoints)) return null; + + List<Peer> peers = Lists.newArrayList(); + for (String endpoint: endpoints) { + try { + String[] endpointPart = endpoint.split(":"); + if (endpointPart.length > 2) { + logger.warn(String.format("Error in config: Unable to parse P2P endpoint [%s]: %s", endpoint)); + } + String epCurrencyId = (endpointPart.length == 2) ? endpointPart[0] : null /*optional*/; + + NetworkPeering.Endpoint ep = (endpointPart.length == 2) ? Endpoints.parse(endpointPart[1]) : Endpoints.parse(endpoint); + if (ep != null && ep.api == api && (epCurrencyId == null || currencyId.equals(epCurrencyId))) { + Peer peer = Peer.newBuilder() + .setEndpoint(ep) + .setCurrency(currencyId) + .build(); + + String hash = cryptoService.hash(peer.computeKey()); + peer.setHash(hash); + peer.setId(hash); + + peers.add(peer); + } + + } catch (IOException e) { + if (logger.isDebugEnabled()) { + logger.warn(String.format("Unable to parse P2P endpoint [%s]: %s", endpoint, e.getMessage()), e); + } + else { + logger.warn(String.format("Unable to parse P2P endpoint [%s]: %s", endpoint, e.getMessage())); + } + } + } + return peers; + } + + public boolean hasSomePeers(Set<EndpointApi> peerApiFilters) { + + List<String> currencyIds = currencyDao.getCurrencyIds(); + if (CollectionUtils.isEmpty(currencyIds)) return false; + + for (String currencyId: currencyIds) { + boolean hasSome = peerDao.hasPeersUpWithApi(currencyId, peerApiFilters); + if (hasSome) return true; + } + + return false; + } + + public boolean waitPeersReady(Set<EndpointApi> peerApiFilters) throws InterruptedException{ + + waitReady(); + + final int sleepTime = 30 * 1000 /*30s*/; + + int maxWaitingDuration = 5 * 6 * sleepTime; // 5 min + int waitingDuration = 0; + while (!isReady() && !hasSomePeers(peerApiFilters)) { + // Wait + Thread.sleep(sleepTime); + waitingDuration += sleepTime; + if (waitingDuration >= maxWaitingDuration) { + logger.warn(String.format("Could not start to publish peering. No Peer found (after waiting %s min).", waitingDuration/60/1000)); + return false; // stop here + } + } + + // Wait again, to make sure all peers have been saved by NetworkService + Thread.sleep(sleepTime*2); + + return true; + } + + public Collection<Peer> getPeersFromApis(final String currencyId, final Collection<EndpointApi> apis) { + + return apis.stream().flatMap(api -> getPeersFromApi(currencyId, api).stream()).collect(Collectors.toList()); + } + + public Collection<Peer> getPeersFromApi(final String currencyId, final EndpointApi api) { + Preconditions.checkNotNull(api); + Preconditions.checkArgument(StringUtils.isNotBlank(currencyId)); + + try { + + // Use map by URL, to avoid duplicated peer + Map<String, Peer> peersByUrls = Maps.newHashMap(); + + // Get peers from config + List<Peer> configPeers = getConfigIncludesPeers(currencyId, api); + if (CollectionUtils.isNotEmpty(configPeers)) { + configPeers.forEach(p -> peersByUrls.put(p.getUrl(), p)); + } + + // Get peers by pubkeys, from config + String[] includePubkeys = pluginSettings.getSynchroIncludesPubkeys(); + if (ArrayUtils.isNotEmpty(includePubkeys)) { + + // Get from DAO, by API and pubkeys + List<Peer> pubkeysPeers = peerDao.getPeersByCurrencyIdAndApiAndPubkeys(currencyId, api.name(), includePubkeys); + if (CollectionUtils.isNotEmpty(pubkeysPeers)) { + pubkeysPeers.stream() + .filter(Objects::nonNull) + .forEach(p -> peersByUrls.put(p.getUrl(), p)); + } + } + + // Add discovered peers + if (pluginSettings.enableSynchroDiscovery()) { + List<Peer> discoveredPeers = peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name()); + if (CollectionUtils.isNotEmpty(discoveredPeers)) { + discoveredPeers.stream() + .filter(Objects::nonNull) + .forEach(p -> peersByUrls.put(p.getUrl(), p)); + } + } + + return peersByUrls.values(); + } + catch (Exception e) { + logger.error(String.format("Could not get peers for Api [%s]", api.name()), e); + return null; + } + } + + public boolean isEsNodeAliveAndValid(Peer peer) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(peer.getCurrency()); + + try { + // TODO: check version is compatible + //String version = networkService.getVersion(peer); + + Currency currency = currencyDao.getById(peer.getCurrency()); + if (currency == null) return false; + + BlockchainBlock block = httpService.executeRequest(peer, String.format("/%s/block/0/_source", peer.getCurrency()), BlockchainBlock.class); + + return Objects.equals(block.getCurrency(), peer.getCurrency()) && + Objects.equals(block.getSignature(), currency.getFirstBlockSignature()); + + } + catch(Exception e) { + logger.debug(String.format("[%s] [%s] Peer not alive or invalid: %s", peer.getCurrency(), peer, e.getMessage())); + return false; + } + } + + public void addTargetPeerEndpointApi(EndpointApi api) { + Preconditions.checkNotNull(api); + + if (!targetPeersEndpointApis.contains(api)) { + targetPeersEndpointApis.add(api); + } + } + + + public void addPublishEndpointApi(EndpointApi api) { + Preconditions.checkNotNull(api); + + if (!publishedEndpointApis.contains(api)) { + if (pluginSettings.enablePeering()) { + logger.debug(String.format("Adding {%s} as published endpoint", api.name())); + } + publishedEndpointApis.add(api); + } + } + + public NetworkPeering getPeering(String currency, boolean useCache) { + + waitReady(); + + // Retrieve the currency to use + boolean enableBlockchainIndexation = pluginSettings.enableBlockchainIndexation() && currencyDao.existsIndex(); + if (StringUtils.isBlank(currency)) { + List<String> currencyIds = enableBlockchainIndexation ? currencyDao.getCurrencyIds() : null; + if (CollectionUtils.isNotEmpty(currencyIds)) { + currency = currencyIds.get(0); + } else { + currency = DEFAULT_BLOCK.getCurrency(); + } + } + + // Get result from cache, is allow + if (useCache) { + NetworkPeering result = peeringByCurrencyCache.get(currency); + if (result != null) return result; + } + + // create and fill a new peering object + NetworkPeering result = new NetworkPeering(); + + // Get current block + BlockchainBlock currentBlock = enableBlockchainIndexation ? blockchainService.getCurrentBlock(currency) : null; + if (currentBlock == null) { + currentBlock = DEFAULT_BLOCK; + currency = currentBlock.getCurrency(); + } + + result.setVersion(Protocol.VERSION); + result.setCurrency(currency); + result.setBlock(String.format("%s-%s", currentBlock.getNumber(), currentBlock.getHash())); + result.setPubkey(pluginSettings.getNodePubkey()); + result.setStatus("UP"); + + // Add endpoints + if (CollectionUtils.isNotEmpty(publishedEndpointApis)) { + List<NetworkPeering.Endpoint> endpoints = Lists.newArrayList(); + for (EndpointApi endpointApi: publishedEndpointApis) { + NetworkPeering.Endpoint ep = new NetworkPeering.Endpoint(); + ep.setDns(pluginSettings.getClusterRemoteHost()); + ep.setApi(endpointApi); + ep.setPort(pluginSettings.getClusterRemotePort()); + endpoints.add(ep); + } + result.setEndpoints(endpoints.toArray(new NetworkPeering.Endpoint[endpoints.size()])); + } + + + // Compute raw, then sign it + String raw = result.toString(); + String signature = cryptoService.sign(raw, pluginSettings.getNodeKeypair().getSecKey()); + raw += signature + "\n"; + + result.setRaw(raw); + result.setSignature(signature); + + // Add result to cache + peeringByCurrencyCache.put(currency, result); + + return result; + } + + public NetworkPeering getLastPeering(String currency) { + return getPeering(currency, true); + } + + public NetworkService startPublishingPeerDocumentToNetwork() { + + if (CollectionUtils.isEmpty(publishedEndpointApis)) { + logger.debug("Skipping peer document publishing (No endpoint API to publish)"); + return this; + } + if (CollectionUtils.isEmpty(targetPeersEndpointApis)) { + logger.debug("Skipping peer document publishing (No endpoint API to target)"); + return this; + } + + // Launch once, at startup (after a delay) + threadPool.schedule(() -> { + logger.info(String.format("Publishing endpoints %s to targeted peers %s", publishedEndpointApis, targetPeersEndpointApis)); + boolean launchAtStartup; + try { + // wait for some peers + launchAtStartup = waitPeersReady(targetPeersEndpointApis); + } catch (InterruptedException e) { + return; // stop + } + + if (launchAtStartup) { + publishPeerDocumentToNetwork(); + } + + // Schedule next execution + threadPool.scheduleAtFixedRate( + this::publishPeerDocumentToNetwork, + pluginSettings.getPeeringInterval() * 1000, + pluginSettings.getPeeringInterval() * 1000 /* convert in ms */, + TimeUnit.MILLISECONDS); + }, + 30 * 1000 /*wait 30 s */ , + TimeUnit.MILLISECONDS + ); + + return this; + } + + public NetworkPeering checkAndSavePeering(String peeringDocument) { + Preconditions.checkNotNull(peeringDocument); + NetworkPeering peering; + try { + peering = NetworkPeerings.parse(peeringDocument); + } + catch(Exception e) { + throw new TechnicalException("Inavlid peer document: " + e.getMessage(), e); + } + + // Check validity then save + return checkAndSavePeering(peering); + + } + + public NetworkPeering checkAndSavePeering(NetworkPeering peering) { + + if (CollectionUtils.isEmpty(peering.getEndpoints())) { + logger.debug("Ignoring peer document (no endpoint to process)"); + return peering; + } + + // Check signature + checkSignature(peering); + + // Transform endpoint to peers + List<Peer> peers = Lists.newArrayList(); + for (NetworkPeering.Endpoint ep : peering.getEndpoints()) { + Peer peer = Peer.newBuilder() + .setCurrency(peering.getCurrency()) + .setPubkey(peering.getPubkey()) + .setEndpoint(ep).build(); + EndpointApi api = EndpointApi.valueOf(peer.getApi()); + peers.add(peer); + + // TODO: filter to keep only useful API ? + //if (targetPeersEndpointApis.contains(api)) { + // peers.add(peer); + //} + //else { + // logger.debug(String.format("Ignoring endpoint {%s}: not a targeted API", peer)); + //} + } + + // Save peers + if (CollectionUtils.isEmpty(peers)) { + peerService.save(peering.getCurrency(), peers, false); + } + + return peering; + } + + /* -- protected -- */ + + public void checkSignature(NetworkPeering peering) { + Preconditions.checkNotNull(peering); + Preconditions.checkNotNull(peering.getSignature()); + + String signature = peering.getSignature(); + + try { + // Generate raw document + peering.setSignature(null); + String raw = peering.toString(); + + // Check signature + if (!cryptoService.verify(raw, signature, peering.getPubkey())) { + throw new TechnicalException("Invalid document signature"); + } + } + finally { + peering.setSignature(signature); // Restore the signature + } + } + + protected void publishPeerDocumentToNetwork() { + List<String> currencyIds; + try { + currencyIds = currencyDao.getCurrencyIds(); + } + catch (Exception e) { + logger.error("Could not retrieve indexed currencies", e); + currencyIds = null; + } + if (CollectionUtils.isEmpty(currencyIds)) { + logger.warn("Skipping the publication of peer document (no indexed currency)"); + return; + } + + if (CollectionUtils.isEmpty(targetPeersEndpointApis) || + CollectionUtils.isEmpty(publishedEndpointApis)) { + logger.warn("Skipping the publication of peer document (no targeted API, or no API to publish)"); + return; + } + + // For each currency + currencyIds.forEach(currencyId -> { + + logger.debug(String.format("[%s] Publishing peer document to network... {peers discovery: %s}", currencyId, pluginSettings.enableSynchroDiscovery())); + + // Create a new peer document (will add it to cache) + String peerDocument = getPeering(currencyId, false/*force new peering*/).toString(); + + // Get peers for targeted APIs + Collection<Peer> peers = getPeersFromApis(currencyId, targetPeersEndpointApis); + + if (CollectionUtils.isNotEmpty(peers)) { + // Send document to every peers + long count = peers.stream().map(p -> this.safePublishPeerDocumentToPeer(currencyId, p, peerDocument)).filter(Boolean.TRUE::equals).count(); + + logger.info(String.format("[%s] Peer document sent to %s/%s peers", currencyId, count, peers.size())); + } else { + logger.debug(String.format("[%s] No peer document sent (no targeted peer found)", currencyId)); + } + }); + } + + protected boolean safePublishPeerDocumentToPeer(final String currencyId, final Peer peer, final String peerDocument) { + Preconditions.checkNotNull(currencyId); + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(peer.getApi()); + Preconditions.checkNotNull(peer.getUrl()); + Preconditions.checkNotNull(peerDocument); + + try { + networkRemoteService.postPeering(peer, peerDocument); + return true; + } + catch(Exception e) { + logger.error(String.format("[%s] [%s] Error when sending peer document: %s", currencyId, peer, e.getMessage())); + return false; + } + + } + + protected void addAllTargetPeerEndpointApis(List<EndpointApi> apis) { + Preconditions.checkNotNull(apis); + apis.forEach(this::addTargetPeerEndpointApi); + } + + protected void addAllPublishEndpointApis(List<EndpointApi> apis) { + Preconditions.checkNotNull(apis); + apis.forEach(this::addPublishEndpointApi); + } +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java index 87d2bbba..166d4117 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java @@ -53,7 +53,7 @@ public class PeerService extends AbstractService { private ThreadPool threadPool; // Define endpoint API to include - private List<String> includeEndpointApis = Lists.newArrayList( + private static List<String> includeEndpointApis = Lists.newArrayList( EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name(), EndpointApi.WS2P.name()); @@ -133,6 +133,14 @@ public class PeerService extends AbstractService { return this; } + public void save(final Peer peer) { + delegate.save(peer); + } + + public void save(final String currencyId, final List<Peer> peers, boolean isFullList) { + delegate.save(currencyId, peers, isFullList); + } + public void listenAndIndexPeers(final Peer mainPeer) { // Get the blockchain name from node BlockchainParameters parameter = blockchainRemoteService.getParameters(mainPeer); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java index cdee4cd1..a59de688 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java @@ -32,6 +32,7 @@ import org.duniter.core.client.service.HttpServiceImpl; import org.duniter.core.client.service.bma.*; import org.duniter.core.client.service.local.CurrencyService; import org.duniter.core.client.service.local.*; +import org.duniter.core.client.service.local.NetworkService; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.service.Ed25519CryptoServiceImpl; @@ -43,7 +44,6 @@ import org.duniter.elasticsearch.dao.impl.BlockDaoImpl; import org.duniter.elasticsearch.dao.impl.CurrencyDaoImpl; import org.duniter.elasticsearch.dao.impl.PeerDaoImpl; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java index a1794c14..ae966abf 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java @@ -22,23 +22,18 @@ package org.duniter.elasticsearch.synchro; * #L% */ -import com.google.common.collect.*; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.ArrayUtils; import org.duniter.core.client.dao.CurrencyDao; -import org.duniter.core.client.dao.PeerDao; -import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.EndpointApi; -import org.duniter.core.client.model.bma.Endpoints; -import org.duniter.core.client.model.bma.NetworkPeering; -import org.duniter.core.client.model.local.Currency; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.HttpService; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.DateUtils; import org.duniter.core.util.Preconditions; -import org.duniter.core.util.StringUtils; import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; @@ -46,6 +41,7 @@ import org.duniter.elasticsearch.dao.SynchroExecutionDao; import org.duniter.elasticsearch.model.SynchroExecution; import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.service.AbstractService; +import org.duniter.elasticsearch.service.NetworkService; import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.service.changes.ChangeEvent; import org.duniter.elasticsearch.service.changes.ChangeEvents; @@ -53,7 +49,6 @@ import org.duniter.elasticsearch.service.changes.ChangeSource; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; -import java.io.IOException; import java.text.DateFormat; import java.util.*; import java.util.concurrent.TimeUnit; @@ -65,16 +60,16 @@ import java.util.stream.Collectors; public class SynchroService extends AbstractService { private static final String WS_CHANGES_URL = "/ws/_changes"; + private final static Set<EndpointApi> includeEndpointApis = Sets.newHashSet(); + private static List<WebsocketClientEndpoint> wsClientEndpoints = Lists.newArrayList(); + private static List<SynchroAction> actions = Lists.newArrayList(); private HttpService httpService; - //private NetworkService networkService; - private final Set<EndpointApi> peerApiFilters = Sets.newHashSet(); private final ThreadPool threadPool; - private final PeerDao peerDao; private final CurrencyDao currencyDao; private final SynchroExecutionDao synchroExecutionDao; - private List<WebsocketClientEndpoint> wsClientEndpoints = Lists.newArrayList(); - private List<SynchroAction> actions = Lists.newArrayList(); + private final NetworkService networkService; + private boolean forceFullResync = false; @Inject @@ -83,17 +78,16 @@ public class SynchroService extends AbstractService { CryptoService cryptoService, ThreadPool threadPool, CurrencyDao currencyDao, - PeerDao peerDao, SynchroExecutionDao synchroExecutionDao, + NetworkService networkService, final ServiceLocator serviceLocator) { super("duniter.p2p", client, settings, cryptoService); this.threadPool = threadPool; this.currencyDao = currencyDao; - this.peerDao = peerDao; this.synchroExecutionDao = synchroExecutionDao; + this.networkService = networkService; threadPool.scheduleOnStarted(() -> { httpService = serviceLocator.getHttpService(); - //networkService = serviceLocator.getNetworkService(); setIsReady(true); }); } @@ -102,8 +96,8 @@ public class SynchroService extends AbstractService { Preconditions.checkNotNull(action); Preconditions.checkNotNull(action.getEndPointApi()); - if (!peerApiFilters.contains(action.getEndPointApi())) { - peerApiFilters.add(action.getEndPointApi()); + if (!includeEndpointApis.contains(action.getEndPointApi())) { + includeEndpointApis.add(action.getEndPointApi()); } actions.add(action); } @@ -118,7 +112,7 @@ public class SynchroService extends AbstractService { boolean launchAtStartup; try { // wait for some peers - launchAtStartup = waitPeersReady(); + launchAtStartup = networkService.waitPeersReady(includeEndpointApis); } catch (InterruptedException e) { return; // stop } @@ -174,22 +168,22 @@ public class SynchroService extends AbstractService { currencyIds = null; } - if (CollectionUtils.isEmpty(currencyIds) || CollectionUtils.isEmpty(peerApiFilters)) { + if (CollectionUtils.isEmpty(currencyIds) || CollectionUtils.isEmpty(includeEndpointApis)) { logger.warn("Skipping synchronization: no indexed currency or no API configured"); return; } - currencyIds.forEach(currencyId -> peerApiFilters.forEach(peerApiFilter -> { + currencyIds.forEach(currencyId -> includeEndpointApis.forEach(peerApiFilter -> { - logger.info(String.format("[%s] [%s] Starting synchronization... {discovery: %s}", currencyId, peerApiFilter.name(), pluginSettings.enableSynchroDiscovery())); + logger.info(String.format("[%s] [%s] Starting synchronization from network... {peers discovery: %s}", currencyId, peerApiFilter.name(), pluginSettings.enableSynchroDiscovery())); // Get peers for currencies and API - Collection<Peer> peers = getPeersFromApi(currencyId, peerApiFilter); + Collection<Peer> peers = networkService.getPeersFromApi(currencyId, peerApiFilter); if (CollectionUtils.isNotEmpty(peers)) { peers.forEach(p -> synchronizePeer(p, enableSynchroWebsocket)); logger.info(String.format("[%s] [%s] Synchronization [OK]", currencyId, peerApiFilter.name())); } else { - logger.info(String.format("[%s] [%s] Synchronization [OK] - no endpoint to synchronize", currencyId, peerApiFilter.name())); + logger.debug(String.format("[%s] [%s] Synchronization [OK] (no source peer found)", currencyId, peerApiFilter.name())); } } )); @@ -199,7 +193,7 @@ public class SynchroService extends AbstractService { long startExecutionTime = System.currentTimeMillis(); // Check if peer alive and valid - boolean isAliveAndValid = isAliveAndValid(peer); + boolean isAliveAndValid = networkService.isEsNodeAliveAndValid(peer); if (!isAliveAndValid) { logger.warn(String.format("[%s] [%s] Not reachable, or not running on this currency. Skipping.", peer.getCurrency(), peer)); return null; @@ -263,127 +257,6 @@ public class SynchroService extends AbstractService { /* -- protected methods -- */ - protected List<Peer> getConfigIncludesPeers(final String currencyId, final EndpointApi api) { - Preconditions.checkNotNull(currencyId); - String[] endpoints = pluginSettings.getSynchroIncludesEndpoints(); - if (ArrayUtils.isEmpty(endpoints)) return null; - - List<Peer> peers = Lists.newArrayList(); - for (String endpoint: endpoints) { - try { - String[] endpointPart = endpoint.split(":"); - if (endpointPart.length > 2) { - logger.warn(String.format("Error in config: Unable to parse P2P endpoint [%s]: %s", endpoint)); - } - String epCurrencyId = (endpointPart.length == 2) ? endpointPart[0] : null /*optional*/; - - NetworkPeering.Endpoint ep = (endpointPart.length == 2) ? Endpoints.parse(endpointPart[1]) : Endpoints.parse(endpoint); - if (ep.api == api && (epCurrencyId == null || currencyId.equals(epCurrencyId))) { - Peer peer = Peer.newBuilder() - .setEndpoint(ep) - .setCurrency(currencyId) - .build(); - - String hash = cryptoService.hash(peer.computeKey()); - peer.setHash(hash); - peer.setId(hash); - - peers.add(peer); - } - - } catch (IOException e) { - if (logger.isDebugEnabled()) { - logger.warn(String.format("Unable to parse P2P endpoint [%s]: %s", endpoint, e.getMessage()), e); - } - else { - logger.warn(String.format("Unable to parse P2P endpoint [%s]: %s", endpoint, e.getMessage())); - } - } - } - return peers; - } - - protected Collection<Peer> getPeersFromApi(final String currencyId, final EndpointApi api) { - Preconditions.checkNotNull(api); - Preconditions.checkArgument(StringUtils.isNotBlank(currencyId)); - - try { - - // Use map by URL, to avoid duplicated peer - Map<String, Peer> peersByUrls = Maps.newHashMap(); - - // Get peers from config - List<Peer> configPeers = getConfigIncludesPeers(currencyId, api); - if (CollectionUtils.isNotEmpty(configPeers)) { - configPeers.forEach(p -> peersByUrls.put(p.getUrl(), p)); - } - - // Get peers by pubkeys, from config - String[] includePubkeys = pluginSettings.getSynchroIncludesPubkeys(); - if (ArrayUtils.isNotEmpty(includePubkeys)) { - - // Get from DAO, by API and pubkeys - List<Peer> pubkeysPeers = peerDao.getPeersByCurrencyIdAndApiAndPubkeys(currencyId, api.name(), includePubkeys); - if (CollectionUtils.isNotEmpty(pubkeysPeers)) { - pubkeysPeers.stream() - .filter(Objects::nonNull) - .forEach(p -> peersByUrls.put(p.getUrl(), p)); - } - } - - // Add discovered peers - if (pluginSettings.enableSynchroDiscovery()) { - List<Peer> discoveredPeers = peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name()); - if (CollectionUtils.isNotEmpty(discoveredPeers)) { - discoveredPeers.stream() - .filter(Objects::nonNull) - .forEach(p -> peersByUrls.put(p.getUrl(), p)); - } - } - - return peersByUrls.values(); - } - catch (Exception e) { - logger.error(String.format("Could not get peers for Api [%s]", api.name()), e); - return null; - } - } - - protected boolean hasSomePeers() { - - List<String> currencyIds = currencyDao.getCurrencyIds(); - if (CollectionUtils.isEmpty(currencyIds)) return false; - - for (String currencyId: currencyIds) { - boolean hasSome = peerDao.hasPeersUpWithApi(currencyId, peerApiFilters); - if (hasSome) return true; - } - - return false; - } - - protected boolean waitPeersReady() throws InterruptedException{ - final int sleepTime = 10 * 1000 /*10s*/; - - int maxWaitingDuration = 5 * 6 * sleepTime; // 5 min - int waitingDuration = 0; - while (!isReady() && !hasSomePeers()) { - // Wait 10s - Thread.sleep(sleepTime); - waitingDuration += sleepTime; - if (waitingDuration >= maxWaitingDuration) { - logger.warn(String.format("Could not start data synchronisation. No Peer found (after waiting %s min).", waitingDuration/60/1000)); - return false; // stop here - } - } - - // Wait again, to make sure all peers have been saved by NetworkService - Thread.sleep(sleepTime*2); - - return true; - } - - protected long getLastExecutionTime(Peer peer) { Preconditions.checkNotNull(peer); @@ -484,26 +357,5 @@ public class SynchroService extends AbstractService { } } - protected boolean isAliveAndValid(Peer peer) { - Preconditions.checkNotNull(peer); - Preconditions.checkNotNull(peer.getCurrency()); - - try { - // TODO: check version is compatible - //String version = networkService.getVersion(peer); - - Currency currency = currencyDao.getById(peer.getCurrency()); - if (currency == null) return false; - - BlockchainBlock block = httpService.executeRequest(peer, String.format("/%s/block/0/_source", peer.getCurrency()), BlockchainBlock.class); - - return Objects.equals(block.getCurrency(), peer.getCurrency()) && - Objects.equals(block.getSignature(), currency.getFirstBlockSignature()); - } - catch(Exception e) { - logger.debug(String.format("[%s] [%s] Peer not alive or invalid: %s", peer.getCurrency(), peer, e.getMessage())); - return false; - } - } } diff --git a/cesium-plus-pod-core/src/main/resources/i18n/cesium-plus-pod-core_en_GB.properties b/cesium-plus-pod-core/src/main/resources/i18n/cesium-plus-pod-core_en_GB.properties index 51a8a10c..7071316d 100644 --- a/cesium-plus-pod-core/src/main/resources/i18n/cesium-plus-pod-core_en_GB.properties +++ b/cesium-plus-pod-core/src/main/resources/i18n/cesium-plus-pod-core_en_GB.properties @@ -1,4 +1,5 @@ cesium-plus-pod-core.config= +duniter.p2p.error.noRemoteUrl=The cluster address can not be published on the network. /\!\\ Fill in the options [cluster.remote.xxx] in the configuration (recommended). duniter4j.blockIndexerService.detectFork.invalidBlock=[%s] [%s] Detecting fork\: block \#%s -> new hash [%s] duniter4j.blockIndexerService.detectFork.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping block \#%s - hash [%s]. duniter4j.blockIndexerService.detectFork.remoteBlockNotFound=[%s] [%s] Unable to get block \#%s from peer\: %s diff --git a/cesium-plus-pod-core/src/main/resources/i18n/cesium-plus-pod-core_fr_FR.properties b/cesium-plus-pod-core/src/main/resources/i18n/cesium-plus-pod-core_fr_FR.properties index ad94de22..f5e39f34 100644 --- a/cesium-plus-pod-core/src/main/resources/i18n/cesium-plus-pod-core_fr_FR.properties +++ b/cesium-plus-pod-core/src/main/resources/i18n/cesium-plus-pod-core_fr_FR.properties @@ -1,4 +1,5 @@ cesium-plus-pod-core.config= +duniter.p2p.error.noRemoteUrl=L'adresse publique du cluster ne peut pas être publiée sur le réseau. /\!\\ Renseignez les options [cluster.remote.xxx] dans la configuration (conseillé). duniter4j.blockIndexerService.detectFork.invalidBlock=[%s] [%s] Detecting fork\: block \#%s -> new hash [%s] duniter4j.blockIndexerService.detectFork.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping block \#%s - hash [%s]. duniter4j.blockIndexerService.detectFork.remoteBlockNotFound=[%s] [%s] Unable to get block \#%s from peer\: %s diff --git a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java index 401fb9ca..413150bb 100644 --- a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java +++ b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java @@ -23,10 +23,13 @@ package org.duniter.elasticsearch.subscription; */ +import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.util.crypto.KeyPair; import org.elasticsearch.common.component.*; import org.elasticsearch.common.inject.Inject; +import java.util.List; + /** * Access to configuration options * @author Benoit Lavenier <benoit.lavenier@e-is.pro> @@ -135,6 +138,18 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return delegate.enableSynchro(); } + public boolean enablePeering() { + return delegate.enablePeering(); + } + + public List<EndpointApi> getPeeringTargetedApis() { + return this.delegate.getPeeringTargetedApis(); + } + + public List<EndpointApi> getPeeringPublishedApis() { + return this.delegate.getPeeringPublishedApis(); + } + public int getSynchroTimeOffset() { return delegate.getSynchroTimeOffset(); } diff --git a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/NetworkServiceConfiguration.java b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/NetworkServiceConfiguration.java new file mode 100644 index 00000000..bb7f8433 --- /dev/null +++ b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/NetworkServiceConfiguration.java @@ -0,0 +1,22 @@ +package org.duniter.elasticsearch.subscription.service; + +import org.duniter.core.beans.Bean; +import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.core.util.CollectionUtils; +import org.duniter.elasticsearch.service.NetworkService; +import org.duniter.elasticsearch.subscription.PluginSettings; +import org.elasticsearch.common.inject.Inject; + +public class NetworkServiceConfiguration implements Bean { + + + @Inject + public NetworkServiceConfiguration(PluginSettings pluginSettings, + NetworkService networkService) { + // Register ES_USER_API, if list of APIs has not already defined in settings + if (CollectionUtils.isEmpty(pluginSettings.getPeeringPublishedApis()) + && pluginSettings.enableSubscription()) { + networkService.addPublishEndpointApi(EndpointApi.ES_SUBSCRIPTION_API); + } + } +} diff --git a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/ServiceModule.java b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/ServiceModule.java index 72c4223c..c1cfcef0 100644 --- a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/ServiceModule.java +++ b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/ServiceModule.java @@ -30,5 +30,8 @@ public class ServiceModule extends AbstractModule implements Module { @Override protected void configure() { // Subscription services bind(SubscriptionService.class).asEagerSingleton(); + + // Configure network + bind(NetworkServiceConfiguration.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java index 5f93f052..45d10486 100644 --- a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java +++ b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java @@ -325,7 +325,7 @@ public class SubscriptionService extends AbstractService { senderName, profileTitles, userEvents, - pluginSettings.getCesiumUrl()) + pluginSettings.getEmailLinkUrl()) .render(issuerLocale); // Compute HTML content @@ -336,7 +336,7 @@ public class SubscriptionService extends AbstractService { senderName, profileTitles, userEvents, - pluginSettings.getCesiumUrl()) + pluginSettings.getEmailLinkUrl()) .render(issuerLocale); final String object = emailSubjectPrefix + I18n.t("duniter4j.es.subscription.email.subject", userEvents.size()); diff --git a/cesium-plus-pod-user/src/main/filtered-resources/plugin-descriptor.properties b/cesium-plus-pod-user/src/main/filtered-resources/plugin-descriptor.properties index 6f6d3fa7..5a18474c 100644 --- a/cesium-plus-pod-user/src/main/filtered-resources/plugin-descriptor.properties +++ b/cesium-plus-pod-user/src/main/filtered-resources/plugin-descriptor.properties @@ -1,5 +1,5 @@ name=cesium-plus-pod-user -description=Plugin to manage user date in Cesium+ pod +description=Plugin to manage user data in Cesium+ pod version=${project.version} site=false jvm=true diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java index f3b81354..823bae9d 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java @@ -23,14 +23,13 @@ package org.duniter.elasticsearch.user; */ -import org.duniter.core.service.CryptoService; -import org.duniter.core.util.StringUtils; -import org.duniter.core.util.crypto.CryptoUtils; +import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.util.crypto.KeyPair; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import java.util.List; import java.util.Locale; /** @@ -41,19 +40,12 @@ import java.util.Locale; public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { private org.duniter.elasticsearch.PluginSettings delegate; - private CryptoService cryptoService; - - private KeyPair nodeKeyPair; - private boolean isRandomNodeKeyPair; - private String nodePubkey; @Inject public PluginSettings(Settings settings, - org.duniter.elasticsearch.PluginSettings delegate, - CryptoService cryptoService) { + org.duniter.elasticsearch.PluginSettings delegate) { super(settings); this.delegate = delegate; - this.cryptoService = cryptoService; // Add i18n bundle name delegate.addI18nBundleName(getI18nBundleName()); @@ -102,6 +94,18 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return delegate.enableSynchro(); } + public boolean enablePeering() { + return this.delegate.enablePeering(); + } + + public List<EndpointApi> getPeeringTargetedApis() { + return this.delegate.getPeeringTargetedApis(); + } + + public List<EndpointApi> getPeeringPublishedApis() { + return this.delegate.getPeeringPublishedApis(); + } + public int getSynchroTimeOffset() { return settings.getAsInt("duniter.synchro.timeOffsetInSec", 60*60 /*1 hour*/ ); } @@ -214,18 +218,15 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public KeyPair getNodeKeypair() { - initNodeKeyring(); - return this.nodeKeyPair; + return delegate.getNodeKeypair(); } public boolean isRandomNodeKeypair() { - initNodeKeyring(); - return this.isRandomNodeKeyPair; + return delegate.isRandomNodeKeypair(); } public String getNodePubkey() { - initNodeKeyring(); - return this.nodePubkey; + return delegate.getNodePubkey(); } /** @@ -270,26 +271,6 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return "cesium-plus-pod-user-i18n"; } - protected synchronized void initNodeKeyring() { - if (this.nodeKeyPair != null) return; - if (StringUtils.isNotBlank(getKeyringSalt()) && - StringUtils.isNotBlank(getKeyringPassword())) { - this.nodeKeyPair = cryptoService.getKeyPair(getKeyringSalt(), getKeyringPassword()); - this.nodePubkey = CryptoUtils.encodeBase58(this.nodeKeyPair.getPubKey()); - this.isRandomNodeKeyPair = false; - } - else { - // Use a ramdom keypair - this.nodeKeyPair = cryptoService.getRandomKeypair(); - this.nodePubkey = CryptoUtils.encodeBase58(this.nodeKeyPair.getPubKey()); - this.isRandomNodeKeyPair = true; - - logger.warn(String.format("No keyring in config. salt/password (or keyring) is need to signed user event documents. Will use a generated key [%s]", this.nodePubkey)); - if (logger.isDebugEnabled()) { - logger.debug(String.format(" salt: " + getKeyringSalt().replaceAll(".", "*"))); - logger.debug(String.format("password: " + getKeyringPassword().replaceAll(".", "*"))); - } - } - } + } diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/NetworkServiceConfiguration.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/NetworkServiceConfiguration.java new file mode 100644 index 00000000..b82ab86c --- /dev/null +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/NetworkServiceConfiguration.java @@ -0,0 +1,22 @@ +package org.duniter.elasticsearch.user.service; + +import org.duniter.core.beans.Bean; +import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.core.util.CollectionUtils; +import org.duniter.elasticsearch.service.NetworkService; +import org.duniter.elasticsearch.user.PluginSettings; +import org.elasticsearch.common.inject.Inject; + +public class NetworkServiceConfiguration implements Bean { + + + @Inject + public NetworkServiceConfiguration(PluginSettings pluginSettings, + NetworkService networkService) { + + // Register ES_USER_API, if list of APIs has not already defined in settings + if (CollectionUtils.isEmpty(pluginSettings.getPeeringPublishedApis())) { + networkService.addPublishEndpointApi(EndpointApi.ES_USER_API); + } + } +} diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java index 86a95b9a..15c1fe22 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java @@ -44,7 +44,8 @@ public class ServiceModule extends AbstractModule implements Module { bind(BlockchainUserEventService.class).asEagerSingleton(); - //bind(SynchroService.class).asEagerSingleton(); + // Configure the P2P network + bind(NetworkServiceConfiguration.class).asEagerSingleton(); } /* protected methods */ diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java index 2d080a88..013747d9 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java @@ -28,6 +28,7 @@ import org.apache.lucene.queryparser.flexible.core.util.StringUtils; import org.duniter.core.util.Preconditions; import org.apache.commons.collections4.MapUtils; import org.duniter.core.client.model.ModelUtils; +import org.duniter.elasticsearch.exception.NotFoundException; import org.duniter.elasticsearch.user.model.Attachment; import org.duniter.elasticsearch.user.model.UserProfile; import org.duniter.core.service.CryptoService; @@ -100,7 +101,7 @@ public class UserService extends AbstractService { /** * * Index an user profile - * @param profileJson + * @param json * @return the profile id */ public String indexProfileFromJson(String json) { @@ -184,7 +185,14 @@ public class UserService extends AbstractService { } // Check time is valid - fix #27 - verifyTimeForUpdate(INDEX, SETTINGS_TYPE, id, actualObj); + try { + verifyTimeForUpdate(INDEX, SETTINGS_TYPE, id, actualObj); + } + catch (NotFoundException e) { + // Settings not exists yet (can occur when user change node in the app settings) + indexSettingsFromJson(json); + return; + } if (logger.isDebugEnabled()) { logger.debug(String.format("Indexing a user settings from issuer [%s]", issuer.substring(0, 8))); diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java index 981f1fd8..8c477c2d 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java @@ -23,6 +23,8 @@ package org.duniter.elasticsearch.user.synchro; */ import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.core.util.CollectionUtils; +import org.duniter.elasticsearch.service.NetworkService; import org.duniter.elasticsearch.service.PeerService; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.synchro.group.SynchroGroupCommentAction; @@ -35,28 +37,14 @@ import org.duniter.elasticsearch.user.synchro.page.SynchroPageCommentAction; import org.duniter.elasticsearch.user.synchro.page.SynchroPageRecordAction; import org.duniter.elasticsearch.user.synchro.user.SynchroUserProfileAction; import org.duniter.elasticsearch.user.synchro.user.SynchroUserSettingsAction; -import org.duniter.elasticsearch.user.websocket.WebsocketUserEventEndPoint; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; public class SynchroModule extends AbstractModule implements Module { - public static class Init { - - @Inject - public Init(PeerService peerService, PluginSettings pluginSettings) { - if (pluginSettings.enableSynchro()) { - // Make sure PeerService will index ES_USER_API peers - peerService.addIncludeEndpointApi(EndpointApi.ES_USER_API); - } - } - } @Override protected void configure() { - - bind(Init.class).asEagerSingleton(); - // History bind(SynchroHistoryIndexAction.class).asEagerSingleton(); diff --git a/pom.xml b/pom.xml index 004536de..39e83347 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ <signatureVersion>1.0</signatureVersion> <!-- Commons versions --> - <duniter4j.version>1.0.4-SNAPSHOT</duniter4j.version> + <duniter4j.version>1.1.0</duniter4j.version> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.5</slf4j.version> <guava.version>22.0</guava.version> -- GitLab