From 01d0748395008cd0b73434b482c13c49de20ea7b Mon Sep 17 00:00:00 2001
From: blavenie <benoit.lavenier@e-is.pro>
Date: Tue, 3 Oct 2017 22:22:32 +0200
Subject: [PATCH] [fix] P2P Synchro: Error Refresh is not supported on an item
 request bug - fix #25 [enh] Allow to disable network discovery

---
 .../bma/jackson/EndpointDeserializer.java     |   2 +-
 .../websocket/WebsocketClientEndpoint.java    |  12 +-
 .../main/assembly/config/elasticsearch.yml    |  16 +-
 .../src/test/es-home/config/elasticsearch.yml |  29 ++--
 .../src/test/misc/test_synchro.sh             |  37 +++++
 .../duniter/elasticsearch/PluginSettings.java |   7 +-
 .../dao/impl/SynchroExecutionDaoImpl.java     |   5 +
 .../elasticsearch/model/SynchroExecution.java |  10 ++
 .../synchro/AbstractSynchroAction.java        |  16 +-
 .../elasticsearch/synchro/SynchroService.java | 148 ++++++++++++------
 .../websocket/WebSocketServer.java            |   2 +-
 11 files changed, 201 insertions(+), 83 deletions(-)
 create mode 100755 duniter4j-es-assembly/src/test/misc/test_synchro.sh

diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/EndpointDeserializer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/EndpointDeserializer.java
index e8926159..c739b25f 100644
--- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/EndpointDeserializer.java
+++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/EndpointDeserializer.java
@@ -58,7 +58,7 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi
                 log.warn(e.getMessage(), e); // link the exception
             }
             else {
-                log.warn(e.getMessage());
+                log.debug(e.getMessage());
             }
             return null;
         }
diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java
index 71f3c546..3f91af8e 100644
--- a/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java
+++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java
@@ -64,11 +64,13 @@ public class WebsocketClientEndpoint implements Closeable {
 
     @Override
     public void close() throws IOException {
-        if (log.isDebugEnabled()) {
-            log.debug(String.format("[%s] Closing WebSocket session...", endpointURI));
+        if (userSession != null) {
+            if (log.isDebugEnabled()) {
+                log.debug(String.format("[%s] Closing WebSocket session...", endpointURI));
+            }
+            userSession.close();
+            userSession = null;
         }
-        userSession.close();
-        userSession = null;
     }
 
     /**
@@ -116,7 +118,7 @@ public class WebsocketClientEndpoint implements Closeable {
                 log.debug("[%s] Received message: " + message);
             }
             synchronized (messageListeners) {
-                messageListeners.stream().forEach(messageListener -> {
+                messageListeners.forEach(messageListener -> {
                     try {
                         messageListener.onMessage(message);
                     } catch (Exception e) {
diff --git a/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml b/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml
index 4db776bf..995bbfa7 100644
--- a/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml
+++ b/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml
@@ -156,19 +156,23 @@ duniter.security.enable: true
 #
 # duniter.p2p.enable: false
 #
-# Enable P2P websocket direct synchronisation ? (default: true)
+# Enable P2P synchronisation using websocket ? (default: true)
 #
 # duniter.p2p.ws.enable: false
 #
-# Max peer time offset, in seconds (max peer's clock error) - use to request peer's data. (default: 3600 = 1hour)
+# Time delay (in seconds) to request last documents to peer (e.g. if peer's clock is late). (default: 3600s = 1h)
 #
 # duniter.p2p.peerTimeOffset: 3600
 #
-# Pass an initial list of hosts to perform synchronization when new node is started:
+# Enable discovery on network peers, to automatically synchronize this peers (default: true)
 #
-duniter.p2p.ping.endpoints: [
-   "g1:ES_USER_API g1.data.duniter.fr 443",
-   "g1:ES_SUBSCRIPTION_API g1.data.duniter.fr 443"
+# duniter.p2p.discovery.enable: false
+#
+# Pass a list of hosts to always synchronize (default: <empty>)
+#
+duniter.p2p.includes.endpoints: [
+   "ES_USER_API g1.data.duniter.fr 443",
+   "ES_SUBSCRIPTION_API g1.data.duniter.fr 443"
 ]
 #
 # ---------------------------------- Duniter4j Mail module -----------------------
diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml
index 3b541e42..9752b291 100644
--- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml
+++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml
@@ -14,13 +14,13 @@
 #
 # Use a descriptive name for your cluster:
 #
-cluster.name: duniter4j-es-assembly-test
+cluster.name: g1-es-data
 #
 # ------------------------------------ Node ------------------------------------
 #
 # Use a descriptive name for the node:
 #
-# node.name: node-1
+node.name: EIS-DEV
 #
 # Add custom attributes to the node:
 #
@@ -130,9 +130,9 @@ duniter.blockchain.enable: true
 #
 # Duniter node address
 #
-duniter.host: g1-test.duniter.org
-duniter.port: 10900
-# duniter.useSsl: true
+duniter.host: g1.duniter.fr
+duniter.port: 443
+duniter.useSsl: true
 #
 # Compute statistics on indices (each hour) ? (default: true)
 #
@@ -162,23 +162,26 @@ duniter.security.enable: true
 #
 # Enable P2P synchronize between ES peers ? (default: true)
 #
-# duniter.p2p.enable: true
+# duniter.p2p.enable: false
 #
-# Enable P2P websocket direct synchronisation ? (default: true)
+# Enable P2P synchronisation using websocket ? (default: true)
 #
 # duniter.p2p.ws.enable: false
 #
-# Max peer time offset, in seconds (max peer's clock error) - use to request peer's data. (default: 3600 = 1hour)
+# Time delay (in seconds) to request last documents to peer (e.g. if peer's clock is late). (default: 3600s = 1h)
 #
 # duniter.p2p.peerTimeOffset: 3600
 #
-# Pass an initial list of hosts to perform synchronization when new node is started:
+# Enable discovery on network peers, to automatically synchronize this peers (default: true)
 #
-duniter.p2p.ping.endpoints: [
-  "g1-test:ES_USER_API g1-test.data.duniter.fr 443",
-  "g1-test:ES_SUBSCRIPTION_API g1-test.data.duniter.fr 443"
+# duniter.p2p.discovery.enable: false
+#
+# Pass a list of hosts to always synchronize (default: <empty>)
+#
+duniter.p2p.includes.endpoints: [
+   "ES_USER_API g1.data.duniter.fr 443",
+   "ES_SUBSCRIPTION_API g1.data.duniter.fr 443"
 ]
-
 #
 # ---------------------------------- Duniter4j Mail module -----------------------
 #
diff --git a/duniter4j-es-assembly/src/test/misc/test_synchro.sh b/duniter4j-es-assembly/src/test/misc/test_synchro.sh
new file mode 100755
index 00000000..7f564cb0
--- /dev/null
+++ b/duniter4j-es-assembly/src/test/misc/test_synchro.sh
@@ -0,0 +1,37 @@
+
+curl -XPOST 'https://g1.data.le-sou.org/g1/synchro/_search?pretty' -d '
+   {
+      "size": 0,
+      "aggs": {
+         "range": {
+           "range": {
+              "field": "time",
+              "ranges": [
+                {"from":0, "to": 9996178800 }
+              ]
+           },
+           "aggs": {
+             "peer": {
+               "terms": {
+                 "field": "peer",
+                 "size": 0
+               },
+               "aggs" : {
+                 "result": {
+                   "nested": {
+                      "path": "result"
+                    },
+                    "aggs" : {
+                        "inserts" : {
+                        "stats": {
+                          "field" : "result.inserts"
+                        }
+                      }
+                    }
+                 }
+               }
+             }
+           }
+         }
+      }
+   }'
\ No newline at end of file
diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java
index 36a30cba..e1d6eaf7 100644
--- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java
+++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java
@@ -230,10 +230,13 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
         return settings.getAsInt("duniter.p2p.peerTimeOffset", 60*60/*=1hour*/);
     }
 
-    public String[] getSynchroPingEndpoints()  {
-        return settings.getAsArray("duniter.p2p.ping.endpoints");
+    public String[] getSynchroIncludesEndpoints()  {
+        return settings.getAsArray("duniter.p2p.includes.endpoints");
     }
 
+    public boolean enableSynchroDiscovery()  {
+        return settings.getAsBoolean("duniter.p2p.discovery.enable", true);
+    }
 
     public boolean isDevMode() {
         return settings.getAsBoolean("duniter.dev.enable", false);
diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java
index c91ecfa5..45a8ae75 100644
--- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java
+++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java
@@ -157,6 +157,11 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu
                     .field("index", "not_analyzed")
                     .endObject()
 
+                    // execution time
+                    .startObject(SynchroExecution.PROPERTY_EXECUTION_TIME)
+                    .field("type", "long")
+                    .endObject()
+
                     // result
                     .startObject(SynchroExecution.PROPERTY_RESULT)
                     .field("type", "nested")
diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroExecution.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroExecution.java
index 9ce4b4df..e64d3410 100644
--- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroExecution.java
+++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroExecution.java
@@ -8,11 +8,13 @@ public class SynchroExecution extends Record {
     public static final String PROPERTY_PEER = "peer";
     public static final String PROPERTY_API = "api";
     public static final String PROPERTY_RESULT = "result";
+    public static final String PROPERTY_EXECUTION_TIME = "executionTime";
 
 
     private String currency;
     private String peer;
     private String api;
+    private Long executionTime;
     private SynchroResult result;
 
     public String getCurrency() {
@@ -46,4 +48,12 @@ public class SynchroExecution extends Record {
     public void setApi(String api) {
         this.api = api;
     }
+
+    public Long getExecutionTime() {
+        return executionTime;
+    }
+
+    public void setExecutionTime(Long executionTime) {
+        this.executionTime = executionTime;
+    }
 }
diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java
index a0563444..8e427a9c 100644
--- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java
+++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java
@@ -43,8 +43,11 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.json.JsonXContent;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.joda.time.format.DateTimeFormat;
 
 import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
 import java.util.*;
 
 public abstract class AbstractSynchroAction extends AbstractService implements SynchroAction {
@@ -119,7 +122,14 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
         Preconditions.checkNotNull(result);
 
         if (logger.isDebugEnabled()) {
-            logger.debug(String.format("[%s] [%s/%s] Synchronizing where [%s > %s]...", peer, toIndex, toType, versionFieldName, fromTime));
+            if (Record.PROPERTY_TIME.equals(versionFieldName)) {
+                logger.debug(String.format("[%s] [%s] [%s/%s] Synchronization {since %s}...", peer.getCurrency(), peer, toIndex, toType,
+                        DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM)
+                                .format(new Date(fromTime * 1000))));
+            }
+            else {
+                logger.debug(String.format("[%s] [%s] [%s/%s] Synchronization {where %s > %s}...", peer.getCurrency(), peer, toIndex, toType, versionFieldName, fromTime));
+            }
         }
 
         try {
@@ -489,12 +499,12 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
 
                     // Execute update
                     UpdateRequestBuilder request = client.prepareUpdate(toIndex, toType, id)
-                            .setRefresh(true)
-                            .setSource(objectMapper.writeValueAsBytes(source));
+                            .setDoc(objectMapper.writeValueAsBytes(source));
                     if (bulkRequest != null) {
                         bulkRequest.add(request);
                     }
                     else {
+                        request.setRefresh(true);
                         client.safeExecuteRequest(request, false);
                     }
 
diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java
index 0c126106..a5b172e3 100644
--- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java
+++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java
@@ -36,11 +36,11 @@ import org.duniter.core.client.model.bma.NetworkPeering;
 import org.duniter.core.client.model.local.Currency;
 import org.duniter.core.client.model.local.Peer;
 import org.duniter.core.client.service.HttpService;
-import org.duniter.core.client.service.local.NetworkService;
 import org.duniter.core.service.CryptoService;
 import org.duniter.core.util.CollectionUtils;
 import org.duniter.core.util.DateUtils;
 import org.duniter.core.util.Preconditions;
+import org.duniter.core.util.StringUtils;
 import org.duniter.core.util.websocket.WebsocketClientEndpoint;
 import org.duniter.elasticsearch.PluginSettings;
 import org.duniter.elasticsearch.client.Duniter4jClient;
@@ -56,6 +56,8 @@ import org.duniter.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.common.inject.Inject;
 
 import java.io.IOException;
+import java.text.DateFormat;
+import java.util.Date;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -154,7 +156,6 @@ public class SynchroService extends AbstractService {
     }
 
     public void synchronize() {
-        logger.info("Starting synchronization...");
 
         final boolean enableSynchroWebsocket = pluginSettings.enableSynchroWebsocket();
 
@@ -163,24 +164,37 @@ public class SynchroService extends AbstractService {
             closeWsClientEndpoints();
         }
 
-        if (CollectionUtils.isNotEmpty(peerApiFilters)) {
-
-            peerApiFilters.forEach(peerApiFilter -> {
+        List<String> currencyIds;
+        try {
+            currencyIds = currencyDao.getCurrencyIds();
+        }
+        catch (Exception e) {
+            logger.error("Could not retrieve indexed currencies", e);
+            currencyIds = null;
+        }
 
-                // Get peers
-                List<Peer> peers = getPeersFromApi(peerApiFilter);
-                if (CollectionUtils.isNotEmpty(peers)) {
-                    peers.forEach(p -> synchronizePeer(p, enableSynchroWebsocket));
-                    logger.info("Synchronization [OK]");
-                } else {
-                    logger.info(String.format("Synchronization [OK] - no endpoint found for API [%s]", peerApiFilter.name()));
-                }
-            });
+        if (CollectionUtils.isEmpty(currencyIds) || CollectionUtils.isEmpty(peerApiFilters)) {
+            logger.warn("Skipping synchronization: no indexed currency or no API configured");
+            return;
         }
+
+        currencyIds.forEach(currencyId -> peerApiFilters.forEach(peerApiFilter -> {
+
+            logger.info(String.format("[%s] [%s] Starting synchronization... {discovery: %s}", currencyId, peerApiFilter.name(), pluginSettings.enableSynchroDiscovery()));
+
+            // Get peers for currencies and API
+            List<Peer> peers = getPeersFromApi(currencyId, peerApiFilter);
+            if (CollectionUtils.isNotEmpty(peers)) {
+                peers.forEach(p -> synchronizePeer(p, enableSynchroWebsocket));
+                logger.info(String.format("[%s] [%s] Synchronization [OK]", currencyId, peerApiFilter.name()));
+            } else {
+                logger.info(String.format("[%s] [%s] Synchronization [OK] - no endpoint to synchronize", currencyId, peerApiFilter.name()));
+            }
+            }));
     }
 
     public SynchroResult synchronizePeer(final Peer peer, boolean enableSynchroWebsocket) {
-        long now = System.currentTimeMillis();
+        long startExecutionTime = System.currentTimeMillis();
 
         // Check if peer alive and valid
         boolean isAliveAndValid = isAliveAndValid(peer);
@@ -195,9 +209,29 @@ public class SynchroService extends AbstractService {
         // If not the first synchro, add a delay to last execution time
         // to avoid missing data because incorrect clock configuration
         long lastExecutionTime = getLastExecutionTime(peer);
+        if (logger.isDebugEnabled() && lastExecutionTime > 0) {
+            logger.debug(String.format("[%s] [%s] Found last synchronization execution at {%s}. Will apply time offset of {-%s ms}", peer.getCurrency(), peer,
+                    DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM)
+                            .format(new Date(lastExecutionTime * 1000)),
+                    pluginSettings.getSynchroTimeOffset()));
+        }
 
-        // Execute actions
         final long fromTime = lastExecutionTime > 0 ? lastExecutionTime - pluginSettings.getSynchroTimeOffset() : 0;
+
+        if (logger.isInfoEnabled()) {
+            if (fromTime == 0) {
+                logger.info(String.format("[%s] [%s] Synchronization {ALL}...", peer.getCurrency(), peer));
+            }
+            else {
+                logger.info(String.format("[%s] [%s] Synchronization delta since {%s}...",
+                        peer.getCurrency(),
+                        peer,
+                        DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM)
+                                .format(new Date(fromTime * 1000))));
+            }
+        }
+
+        // Execute actions
         List<SynchroAction> executedActions = actions.stream()
                 .filter(a -> a.getEndPointApi().name().equals(peer.getApi()))
                 .map(a -> {
@@ -211,10 +245,10 @@ public class SynchroService extends AbstractService {
                 .collect(Collectors.toList());
 
         if (logger.isDebugEnabled()) {
-            logger.debug(String.format("[%s] [%s] User data imported in %s ms: %s", peer.getCurrency(), peer, System.currentTimeMillis() - now, result.toString()));
+            logger.debug(String.format("[%s] [%s] Synchronized in %s ms: %s", peer.getCurrency(), peer, System.currentTimeMillis() - startExecutionTime, result.toString()));
         }
 
-        saveExecution(peer, result);
+        saveExecution(peer, result, startExecutionTime);
 
         // Start listen changes on this peer
         if (enableSynchroWebsocket) {
@@ -226,33 +260,34 @@ public class SynchroService extends AbstractService {
 
     /* -- protected methods -- */
 
-    protected List<Peer> getConfigPingPeers(List<String> currencyIds, final EndpointApi api) {
-        String[] endpoints = pluginSettings.getSynchroPingEndpoints();
+    protected List<Peer> getConfigIncludesPeers(final String currencyId, final EndpointApi api) {
+        Preconditions.checkNotNull(currencyId);
+        String[] endpoints = pluginSettings.getSynchroIncludesEndpoints();
         if (ArrayUtils.isEmpty(endpoints)) return null;
 
         List<Peer> peers = Lists.newArrayList();
         for (String endpoint: endpoints) {
             try {
                 String[] endpointPart = endpoint.split(":");
+                if (endpointPart.length > 2) {
+                    logger.warn(String.format("Error in config: Unable to parse P2P endpoint [%s]: %s", endpoint));
+                }
+                String epCurrencyId = (endpointPart.length == 2) ? endpointPart[0] : null /*optional*/;
 
-                if (endpointPart.length == 2) {
-                    String currency = endpointPart[0];
-                    NetworkPeering.Endpoint ep = Endpoints.parse(endpointPart[1]);
-                    if (ep.api == api && currencyIds.contains(currency)) {
-                        Peer peer = Peer.newBuilder()
-                                .setEndpoint(ep)
-                                .setCurrency(currency)
-                                .build();
+                NetworkPeering.Endpoint ep = (endpointPart.length == 2) ? Endpoints.parse(endpointPart[1]) : Endpoints.parse(endpoint);
+                if (ep.api == api && (epCurrencyId == null || currencyId.equals(epCurrencyId))) {
+                    Peer peer = Peer.newBuilder()
+                            .setEndpoint(ep)
+                            .setCurrency(currencyId)
+                            .build();
 
-                        String peerId = cryptoService.hash(peer.computeKey());
-                        peer.setId(peerId);
+                    String hash = cryptoService.hash(peer.computeKey());
+                    peer.setHash(hash);
+                    peer.setId(hash);
 
-                        peers.add(peer);
-                    }
-                }
-                else {
-                    logger.warn(String.format("Unable to parse P2P endpoint [%s]: %s", endpoint));
+                    peers.add(peer);
                 }
+
             } catch (IOException e) {
                 logger.warn(String.format("Unable to parse P2P endpoint [%s]: %s", endpoint, e.getMessage()));
             }
@@ -260,24 +295,28 @@ public class SynchroService extends AbstractService {
         return peers;
     }
 
-    protected List<Peer> getPeersFromApi(final EndpointApi api) {
+    protected List<Peer> getPeersFromApi(final String currencyId, final EndpointApi api) {
         Preconditions.checkNotNull(api);
+        Preconditions.checkArgument(StringUtils.isNotBlank(currencyId));
 
         try {
-            List<String> currencyIds = currencyDao.getCurrencyIds();
-            if (CollectionUtils.isEmpty(currencyIds)) return null;
 
             // Get default peer, defined in config option
-            List<Peer> peers = getConfigPingPeers(currencyIds, api);
+            List<Peer> peers = getConfigIncludesPeers(currencyId, api);
             if (peers == null) {
                 peers = Lists.newArrayList();
             }
 
-            peers.addAll(currencyIds.stream()
-                    .map(currencyId -> peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name()))
-                    .filter(Objects::nonNull)
-                    .flatMap(List::stream)
-                    .collect(Collectors.toList()));
+            // Add discovered peers
+            if (pluginSettings.enableSynchroDiscovery()) {
+                List<Peer> indexedPeers = peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name());
+                if (CollectionUtils.isNotEmpty(indexedPeers)) {
+                    peers.addAll(indexedPeers
+                            .stream()
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toList()));
+                }
+            }
 
             return peers;
         }
@@ -335,7 +374,7 @@ public class SynchroService extends AbstractService {
         }
     }
 
-    protected void saveExecution(Peer peer, SynchroResult result) {
+    protected void saveExecution(Peer peer, SynchroResult result, long startExecutionTime) {
         Preconditions.checkNotNull(peer);
         Preconditions.checkNotNull(peer.getId());
         Preconditions.checkNotNull(result);
@@ -344,11 +383,12 @@ public class SynchroService extends AbstractService {
             SynchroExecution execution = new SynchroExecution();
             execution.setCurrency(peer.getCurrency());
             execution.setPeer(peer.getId());
+            execution.setApi(peer.getApi());
+            execution.setExecutionTime(System.currentTimeMillis() - startExecutionTime);
             execution.setResult(result);
 
-            // Last execution time (in seconds)
-            long executionTime = System.currentTimeMillis()/1000;
-            execution.setTime(executionTime);
+            // Start execution time (in seconds)
+            execution.setTime(startExecutionTime/1000);
 
             synchroExecutionDao.save(execution);
         }
@@ -358,9 +398,11 @@ public class SynchroService extends AbstractService {
     }
 
     protected void closeWsClientEndpoints() {
-        // Closing all opened WS
-        wsClientEndpoints.forEach(IOUtils::closeQuietly);
-        wsClientEndpoints.clear();
+        synchronized(wsClientEndpoints) {
+            // Closing all opened WS
+            wsClientEndpoints.forEach(IOUtils::closeQuietly);
+            wsClientEndpoints.clear();
+        }
     }
 
     protected void startListenChangesOnPeer(final Peer peer,
@@ -414,7 +456,9 @@ public class SynchroService extends AbstractService {
         });
 
         // Add to list
-        wsClientEndpoints.add(wsClientEndPoint);
+        synchronized(wsClientEndpoints) {
+            wsClientEndpoints.add(wsClientEndPoint);
+        }
     }
 
     protected boolean isAliveAndValid(Peer peer) {
diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java
index cafc2972..4abb50fd 100644
--- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java
+++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java
@@ -131,7 +131,7 @@ public class WebSocketServer {
         }
 
         if (started) {
-            logger.info(String.format("Websocket server started {%s:%s} on path [%s]", host, port, WS_PATH));
+            logger.info(String.format("Websocket server started {%s:%s%s}", host, port, WS_PATH));
         }
         else {
             String error = String.format("Failed to startScheduling Websocket server. Could not bind address {%s:%s}", host, port);
-- 
GitLab