From 405bc2679e001b574bfd4abd4d5155f476f02ba9 Mon Sep 17 00:00:00 2001
From: blavenie <benoit.lavenier@e-is.pro>
Date: Sat, 12 Nov 2016 21:10:27 +0100
Subject: [PATCH] - Blockchain indexation : Auto-reconnect when websocket
 closed abnormally - fix #6 - start implemntation of websocket server, to
 listenning changes

---
 .../duniter/core/client/model/ModelUtils.java |   2 +-
 .../core/client/model/bma/Constants.java      |  11 +
 .../core/client/service/HttpServiceImpl.java  |   8 +-
 .../bma/BlockchainRemoteServiceImpl.java      |  60 +++--
 .../exception/HttpConnectException.java       |  56 +++++
 .../websocket/WebsocketClientEndpoint.java    |  43 +++-
 duniter4j-elasticsearch/pom.xml               |  19 ++
 .../main/assembly/config/elasticsearch.yml    |  53 +++--
 .../org/duniter/elasticsearch/Plugin.java     |  16 +-
 .../duniter/elasticsearch/PluginSettings.java |  10 +-
 .../currency/RestCurrencyIndexAction.java     |   8 +-
 .../security/RestSecurityController.java      |   9 +-
 .../elasticsearch/node/DuniterNode.java       |  11 +-
 .../service/BlockchainService.java            |   3 +-
 .../elasticsearch/service/MarketService.java  |   2 +
 .../elasticsearch/service/MessageService.java |  14 +-
 .../service/RegistryService.java              |  32 ++-
 .../service/synchro/SynchroService.java       |  17 +-
 .../elasticsearch/websocket/ChangeEvent.java  |  66 ++++++
 .../websocket/ChangeRegister.java             | 218 ++++++++++++++++++
 .../elasticsearch/websocket/ChangeSource.java |  58 +++++
 .../websocket/ChangesModule.java              |  31 +++
 .../websocket/WebSocketServerEndPoint.java    |  64 +++++
 .../src/test/es-home/config/elasticsearch.yml |  19 +-
 pom.xml                                       |  16 ++
 25 files changed, 755 insertions(+), 91 deletions(-)
 create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/Constants.java
 create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/service/exception/HttpConnectException.java
 create mode 100644 duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeEvent.java
 create mode 100644 duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeRegister.java
 create mode 100644 duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeSource.java
 create mode 100644 duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangesModule.java
 create mode 100644 duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServerEndPoint.java

diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java
index 55ee7953..b46fa597 100644
--- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java
+++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java
@@ -143,6 +143,6 @@ public class ModelUtils {
         if (pubkey == null || pubkey.length() < 6) {
             return pubkey;
         }
-        return pubkey.substring(0, 6);
+        return pubkey.substring(0, 8);
     }
 }
diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/Constants.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/Constants.java
new file mode 100644
index 00000000..0bfd272a
--- /dev/null
+++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/Constants.java
@@ -0,0 +1,11 @@
+package org.duniter.core.client.model.bma;
+
+/**
+ * Created by blavenie on 11/11/16.
+ */
+public interface Constants {
+
+    interface Regex {
+        String CURRENCY_NAME = "[a-zA-Z_-]";
+    }
+}
diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java
index 102e5196..dd92996d 100644
--- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java
+++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java
@@ -25,16 +25,14 @@ package org.duniter.core.client.service;
 import com.google.common.base.Joiner;
 import com.google.gson.Gson;
 import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.conn.HttpHostConnectException;
 import org.apache.http.entity.mime.content.InputStreamBody;
 import org.duniter.core.beans.InitializingBean;
 import org.duniter.core.client.config.Configuration;
 import org.duniter.core.client.model.bma.Error;
 import org.duniter.core.client.model.bma.gson.GsonUtils;
 import org.duniter.core.client.model.local.Peer;
-import org.duniter.core.client.service.exception.HttpBadRequestException;
-import org.duniter.core.client.service.exception.HttpNotFoundException;
-import org.duniter.core.client.service.exception.JsonSyntaxException;
-import org.duniter.core.client.service.exception.PeerConnectionException;
+import org.duniter.core.client.service.exception.*;
 import org.duniter.core.exception.TechnicalException;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -243,7 +241,7 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean
             }
         }
         catch (ConnectException e) {
-            throw new TechnicalException(I18n.t("duniter4j.client.core.connect", request.toString()), e);
+            throw new HttpConnectException(I18n.t("duniter4j.client.core.connect", request.toString()), e);
         }
         catch (SocketTimeoutException e) {
             throw new TechnicalException(I18n.t("duniter4j.client.core.timeout"), e);
diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceImpl.java
index 490b3548..f277e5c7 100644
--- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceImpl.java
+++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceImpl.java
@@ -272,26 +272,35 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement
     public long getLastUD(Peer peer) {
         // get block number with UD
         String blocksWithUdResponse = executeRequest(peer, URL_BLOCK_WITH_UD, String.class);
-        Integer blockNumber = getLastBlockNumberFromJson(blocksWithUdResponse);
+
+        int[] blocksWithUD = getBlockNumbersFromJson(blocksWithUdResponse);
 
         // If no result (this could happen when no UD has been send
-        if (blockNumber == null) {
-            // get the first UD from currency parameter
-            BlockchainParameters parameter = getParameters(peer);
-            return parameter.getUd0();
-        }
+        if (blocksWithUD != null && blocksWithUD.length > 0) {
 
-        // Get the UD from the last block with UD
-        String path = String.format(URL_BLOCK, blockNumber);
-        String json = executeRequest(peer, path, String.class);
-        Long lastUD = getDividendFromBlockJson(json);
+            int index = blocksWithUD.length - 1;
+            while (index >= 0) {
 
-        // Check not null (should never append)
-        if (lastUD == null) {
-            throw new TechnicalException("Unable to get last UD from server");
+                try {
+                    // Get the UD from the last block with UD
+                    String path = String.format(URL_BLOCK, blocksWithUD[index]);
+                    String json = executeRequest(peer, path, String.class);
+                    Long lastUD = getDividendFromBlockJson(json);
+
+                    // Check not null (should never append)
+                    if (lastUD == null) {
+                        throw new TechnicalException("Unable to get last UD from server");
+                    }
+                    return lastUD.longValue();
+                } catch (HttpNotFoundException e) {
+                    index--; // Can occur something (observed in Duniter 0.50.0)
+                }
+            }
         }
-        return lastUD.longValue();
 
+        // get the first UD from currency parameter
+        BlockchainParameters parameter = getParameters(peer);
+        return parameter.getUd0();
     }
 
     /**
@@ -557,7 +566,7 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement
             // Get the websocket, or open new one if not exists
             WebsocketClientEndpoint wsClientEndPoint = blockWsEndPoints.get(wsBlockURI);
             if (wsClientEndPoint == null || wsClientEndPoint.isClosed()) {
-                wsClientEndPoint = new WebsocketClientEndpoint(wsBlockURI);
+                wsClientEndPoint = new WebsocketClientEndpoint(wsBlockURI,  true/*autoReconnect*/);
                 blockWsEndPoints.put(wsBlockURI, wsClientEndPoint);
             }
 
@@ -728,16 +737,31 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement
     }
 
     private Integer getLastBlockNumberFromJson(final String json) {
+        int[] numbers = getBlockNumbersFromJson(json);
+        if (numbers == null || numbers.length == 0) {
+            return null;
+        }
+        return numbers[numbers.length-1];
+    }
+
+    private int[] getBlockNumbersFromJson(final String json) {
 
-        int startIndex = json.lastIndexOf(',');
+        String arrayPrefix = "\"blocks\": [";
+        int startIndex = json.indexOf(arrayPrefix);
         int endIndex = json.lastIndexOf(']');
         if (startIndex == -1 || endIndex == -1) {
             return null;
         }
 
-        String blockNumberStr = json.substring(startIndex+1,endIndex).trim();
+        String[] blockNumbers = json.substring(startIndex+arrayPrefix.length()+1,endIndex).trim().split(",");
+
         try {
-            return Integer.parseInt(blockNumberStr);
+            int[] result = new int[blockNumbers.length];
+            int index = 0;
+            for (String blockNumber: blockNumbers) {
+                result[index++] = Integer.parseInt(blockNumber.trim());
+            }
+            return result;
         } catch(NumberFormatException e) {
             if (log.isDebugEnabled()) {
                 log.debug("Could not parse JSON (block numbers)");
diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/exception/HttpConnectException.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/exception/HttpConnectException.java
new file mode 100644
index 00000000..973c4b07
--- /dev/null
+++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/exception/HttpConnectException.java
@@ -0,0 +1,56 @@
+package org.duniter.core.client.service.exception;
+
+/*
+ * #%L
+ * UCoin Java :: Core Client API
+ * %%
+ * Copyright (C) 2014 - 2016 EIS
+ * %%
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of the 
+ * License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public 
+ * License along with this program.  If not, see
+ * <http://www.gnu.org/licenses/gpl-3.0.html>.
+ * #L%
+ */
+
+import org.duniter.core.client.model.bma.Error;
+import org.duniter.core.exception.TechnicalException;
+
+/**
+ * Created by eis on 11/02/15.
+ */
+public class HttpConnectException extends TechnicalException{
+
+    private static final long serialVersionUID = -5260280401104018980L;
+
+    public HttpConnectException() {
+        super();
+    }
+
+    public HttpConnectException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public HttpConnectException(String message) {
+        super(message);
+    }
+
+    public HttpConnectException(Error error) {
+        super(error.getMessage());
+        setCode(error.getUcode());
+    }
+
+    public HttpConnectException(Throwable cause) {
+        super(cause);
+    }
+
+}
diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java
index 83e8f941..9e1c68e4 100644
--- a/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java
+++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java
@@ -46,17 +46,19 @@ public class WebsocketClientEndpoint implements Closeable {
     private Session userSession = null;
     private List<MessageHandler> messageHandlers = Lists.newArrayList();
     private final URI endpointURI;
+    private final boolean autoReconnect;
 
     public WebsocketClientEndpoint(URI endpointURI) {
+        this(endpointURI, true);
+    }
+
+    public WebsocketClientEndpoint(URI endpointURI, boolean autoReconnect) {
         this.endpointURI = endpointURI;
-        try {
-            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-            container.connectToServer(this, endpointURI);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        this.autoReconnect = autoReconnect;
+        connect(true);
     }
 
+
     @Override
     public void close() throws IOException {
         if (log.isDebugEnabled()) {
@@ -91,6 +93,11 @@ public class WebsocketClientEndpoint implements Closeable {
             log.debug(String.format("Closing WebSocket... [%s]", endpointURI));
         }
         this.userSession = null;
+
+        // abnormal close : try to reconnect
+        if (reason.getCloseCode() == CloseReason.CloseCodes.CLOSED_ABNORMALLY)  {
+            connect(false);
+        }
     }
 
     /**
@@ -144,6 +151,7 @@ public class WebsocketClientEndpoint implements Closeable {
     public boolean isClosed() {
         return (userSession == null);
     }
+
     /**
      * Message handler.
      *
@@ -153,4 +161,27 @@ public class WebsocketClientEndpoint implements Closeable {
 
         public void handleMessage(String message);
     }
+
+    /* -- Internal method */
+
+    private void connect(boolean throwErrorIfFailed) {
+        while(isClosed()) {
+            try {
+                WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+                container.connectToServer(this, endpointURI);
+                return; // stop
+            } catch (Exception e) {
+                if (throwErrorIfFailed) throw new RuntimeException(e);
+                log.warn(String.format("[%s] Unable to connect. Retrying in 10s...", endpointURI.toString()));
+            }
+
+            // wait 20s, then try again
+            try {
+                Thread.sleep(10 * 1000);
+            }
+            catch(Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/duniter4j-elasticsearch/pom.xml b/duniter4j-elasticsearch/pom.xml
index 5171309a..0a13bb9e 100644
--- a/duniter4j-elasticsearch/pom.xml
+++ b/duniter4j-elasticsearch/pom.xml
@@ -85,6 +85,20 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+        <groupId>com.github.spullara.mustache.java</groupId>
+        <artifactId>compiler</artifactId>
+        <version>0.9.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.glassfish.tyrus</groupId>
+      <artifactId>tyrus-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.glassfish.tyrus</groupId>
+      <artifactId>tyrus-container-grizzly-server</artifactId>
+    </dependency>
 
     <!-- Unit test -->
     <dependency>
@@ -325,6 +339,11 @@
                 <artifactId>tyrus-client</artifactId>
                 <version>${tyrus.version}</version>
               </dependency>
+              <dependency>
+                <groupId>org.glassfish.tyrus</groupId>
+                <artifactId>tyrus-server</artifactId>
+                <version>${tyrus.version}</version>
+              </dependency>
               <dependency>
                 <groupId>org.glassfish.tyrus</groupId>
                 <artifactId>tyrus-container-grizzly-client</artifactId>
diff --git a/duniter4j-elasticsearch/src/main/assembly/config/elasticsearch.yml b/duniter4j-elasticsearch/src/main/assembly/config/elasticsearch.yml
index d3e9c8cc..2e090a72 100644
--- a/duniter4j-elasticsearch/src/main/assembly/config/elasticsearch.yml
+++ b/duniter4j-elasticsearch/src/main/assembly/config/elasticsearch.yml
@@ -101,27 +101,40 @@ http.cors.enabled: true
 
 security.manager.enabled: false
 
-
-#duniter.disable: true
-#duniter.host: cgeek.fr
-#duniter.port: 9330
-
-duniter.host: 192.168.0.5
-duniter.port: 9201
-
+#
+# ---------------------------------- Duniter4j ---------------------------------
+#
+# Disbale duniter4j plugin
+#
+# duniter.enabled: false
+#
+# Reset and reload all Duniter4j data at startup - DO SET to true in production
+#
+# duniter.indices.reload: true
+#
+# Default string analyzer
+#
 duniter.string.analyzer: french
-
-#duniter.indices.reload: true
-
-# Should synchronize node blockchain ?
+#
+# Enabling node blockchain synchronization
+#
 duniter.blockchain.sync.enable: true
-
-#duniter.dev.enable: true
-
-#script.groovy.sandbox.enabled: true
-
+#
+# Duniter node to synchronize
+#
+duniter.host: cgeek.fr
+duniter.port: 9330
+#
+# ---------------------------------- Duniter4j security -------------------------
+#
+# Enable security, to disable HTTP access to the default ES admin API
+#
+duniter.security.enable: true
+#
 # Security token prefix (default: 'duniter-')
-#duniter.auth.token.prefix: duniter-
-
+#
+# duniter.auth.token.prefix: duniter-
+#
 # Token validity duration, in seconds (default: 600)
-duniter.auth.tokenValidityDuration: 3600  # = 1hour
\ No newline at end of file
+#
+# duniter.auth.tokenValidityDuration: 3600  # = 1hour
\ No newline at end of file
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/Plugin.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/Plugin.java
index c660a140..bb8715b4 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/Plugin.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/Plugin.java
@@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.inject.Module;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.logging.ESLoggerFactory;
+import org.elasticsearch.common.settings.Settings;
 
 import java.util.Collection;
 
@@ -40,12 +41,10 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
 
     private ESLogger log = ESLoggerFactory.getLogger(Plugin.class.getName());
 
-    private org.elasticsearch.common.settings.Settings settings;
-    private boolean disable;
+    private boolean enable;
 
-    @Inject public Plugin(org.elasticsearch.common.settings.Settings settings) {
-        this.settings = settings;
-        this.disable = settings.getAsBoolean("duniter.disable", false);
+    @Inject public Plugin(Settings settings) {
+        this.enable = settings.getAsBoolean("duniter.enabled", true);
     }
 
     @Override
@@ -61,20 +60,23 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
     @Override
     public Collection<Module> nodeModules() {
         Collection<Module> modules = Lists.newArrayList();
-        if (disable) {
+        if (!enable) {
             log.warn(description() + " has been disabled.");
             return modules;
         }
         modules.add(new SecurityModule());
         modules.add(new RestModule());
         modules.add(new ServiceModule());
+        // TODO : must be tested inside full release assembly
+        //modules.add(new ChangesModule());
+
         return modules;
     }
 
     @Override
     public Collection<Class<? extends LifecycleComponent>> nodeServices() {
         Collection<Class<? extends LifecycleComponent>> components = Lists.newArrayList();
-        if (disable) {
+        if (!enable) {
             return components;
         }
         components.add(PluginSettings.class);
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java
index e72c2ead..37a994cc 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java
@@ -229,10 +229,18 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
         return settings.getAsBoolean("duniter.security.enable", true);
     }
 
-    public boolean enableNetworkSync()  {
+    public boolean enableDataSync()  {
         return settings.getAsBoolean("duniter.data.sync.enable", false);
     }
 
+    public String getDataSyncHost()  {
+        return settings.get("duniter.data.sync.host", "data.duniter.fr");
+    }
+
+    public int getDataSyncPort()  {
+        return settings.getAsInt("duniter.data.sync.port", 80);
+    }
+
     /* protected methods */
 
     protected void initI18n() throws IOException {
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/currency/RestCurrencyIndexAction.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/currency/RestCurrencyIndexAction.java
index 71b5f4c5..dea0ce5a 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/currency/RestCurrencyIndexAction.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/currency/RestCurrencyIndexAction.java
@@ -29,12 +29,18 @@ import org.elasticsearch.rest.*;
 
 import static org.elasticsearch.rest.RestStatus.OK;
 
+/**
+ * A action to post a request to process a new currency/peer.
+ *
+ * TODO :
+ *  - add security, to allow only request from admin (check signature against settings keyring)
+ */
 public class RestCurrencyIndexAction extends BaseRestHandler {
 
     @Inject
     public RestCurrencyIndexAction(Settings settings, RestController controller, Client client) {
         super(settings, controller, client);
-        controller.registerHandler(RestRequest.Method.POST, "/blockchain", this);
+        controller.registerHandler(RestRequest.Method.POST, "/currency", this);
     }
 
     @Override
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/security/RestSecurityController.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/security/RestSecurityController.java
index a7729e5b..870cefc5 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/security/RestSecurityController.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/security/RestSecurityController.java
@@ -29,20 +29,21 @@ public class RestSecurityController extends AbstractLifecycleComponent<RestSecur
     }
 
     public RestSecurityController allowIndexType(RestRequest.Method method, String index, String type) {
-        return allow(method, String.format("/%s/%s", index, type));
+        return allow(method, String.format("/%s/%s(/.*)?", index, type));
     }
 
-    public RestSecurityController allow(RestRequest.Method method, String path) {
+    public RestSecurityController allow(RestRequest.Method method, String regexPath) {
         Set<String> allowRules = allowRulesByMethod.get(method);
         if (allowRules == null) {
             allowRules = new TreeSet<>();
             allowRulesByMethod.put(method, allowRules);
         }
-        allowRules.add(path);
+        allowRules.add(regexPath);
         return this;
     }
 
     public boolean isAllow(RestRequest request) {
+        if (!this.enable) return true;
         RestRequest.Method method = request.method();
         if (log.isTraceEnabled()) {
             log.trace(String.format("Checking rules for %s request [%s]...", method, request.path()));
@@ -52,7 +53,7 @@ public class RestSecurityController extends AbstractLifecycleComponent<RestSecur
         String path = request.path();
         if (allowRules != null) {
             for (String allowRule : allowRules) {
-                if (path.startsWith(allowRule)) {
+                if (path.matches(allowRule)) {
                     if (log.isTraceEnabled()) {
                         log.trace(String.format("Find matching rule [%s] for %s request [%s]: allow", allowRule, method, path));
                     }
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java
index 75b4450c..63949933 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java
@@ -22,8 +22,10 @@ package org.duniter.elasticsearch.node;
  * #L%
  */
 
+import org.duniter.core.client.model.elasticsearch.Currency;
 import org.duniter.core.client.model.local.Peer;
 import org.duniter.elasticsearch.PluginSettings;
+import org.duniter.elasticsearch.action.security.RestSecurityController;
 import org.duniter.elasticsearch.service.*;
 import org.duniter.elasticsearch.service.synchro.SynchroService;
 import org.duniter.elasticsearch.threadpool.ThreadPool;
@@ -34,6 +36,7 @@ import org.elasticsearch.common.inject.Injector;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.rest.RestRequest;
 
 /**
  * Created by blavenie on 17/06/16.
@@ -127,16 +130,20 @@ public class DuniterNode extends AbstractLifecycleComponent<DuniterNode> {
             Peer peer = pluginSettings.checkAndGetPeer();
 
             // Index (or refresh) node's currency
-            injector.getInstance(RegistryService.class).indexCurrencyFromPeer(peer);
+            Currency currency = injector.getInstance(RegistryService.class).indexCurrencyFromPeer(peer, true);
 
             // Index blocks (and listen if new block appear)
             injector.getInstance(BlockchainService.class)
                     .indexLastBlocks(peer)
                     .listenAndIndexNewBlock(peer);
 
+            // Add access to currency index
+            injector.getInstance(RestSecurityController.class).allowIndexType(RestRequest.Method.GET,
+                    currency.getCurrencyName(),
+                    BlockchainService.BLOCK_TYPE);
         }
 
-        if (pluginSettings.enableNetworkSync()) {
+        if (pluginSettings.enableDataSync()) {
             // Synchronize
             injector.getInstance(SynchroService.class).synchronize();
         }
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java
index 645dbfd8..0786e89e 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java
@@ -668,7 +668,6 @@ public class BlockchainService extends AbstractService {
     protected List<BlockchainBlock> toBlocks(SearchResponse response, boolean withHighlight) {
         // Read query result
         List<BlockchainBlock> result = Lists.newArrayList();
-        // TODO : test this lambda expression
         response.getHits().forEach(searchHit -> {
             BlockchainBlock block;
             if (searchHit.source() != null) {
@@ -685,7 +684,7 @@ public class BlockchainService extends AbstractService {
             else {
                 block = new BlockchainBlock();
                 SearchHitField field = searchHit.getFields().get("hash");
-                block.setHash((String) field.getValue());
+                block.setHash(field.getValue());
             }
             result.add(block);
 
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MarketService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MarketService.java
index d82ac728..7b1a5a11 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MarketService.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MarketService.java
@@ -249,11 +249,13 @@ public class MarketService extends AbstractService {
                     // price Unit
                     .startObject("unit")
                     .field("type", "string")
+                    .field("index", "not_analyzed")
                     .endObject()
 
                     // currency
                     .startObject("currency")
                     .field("type", "string")
+                    .field("index", "not_analyzed")
                     .endObject()
 
                     // issuer
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MessageService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MessageService.java
index 535d9e01..e6674b87 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MessageService.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MessageService.java
@@ -141,8 +141,16 @@ public class MessageService extends AbstractService {
     /* -- Internal methods -- */
 
     public XContentBuilder createRecordType() {
+        return createMapping(RECORD_TYPE);
+    }
+
+    public XContentBuilder createOutboxType() {
+        return createMapping(OUTBOX_TYPE);
+    }
+
+    public XContentBuilder createMapping(String typeName) {
         try {
-            XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_TYPE)
+            XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(typeName)
                     .startObject("properties")
 
                     // issuer
@@ -183,8 +191,4 @@ public class MessageService extends AbstractService {
             throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_TYPE, ioe.getMessage()), ioe);
         }
     }
-
-    public XContentBuilder createOutboxType() {
-        return createRecordType(); // same as outbox
-    }
 }
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/RegistryService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/RegistryService.java
index d25b7541..8d8da251 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/RegistryService.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/RegistryService.java
@@ -24,7 +24,6 @@ package org.duniter.elasticsearch.service;
 
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
@@ -36,12 +35,13 @@ import org.duniter.core.client.model.bma.gson.GsonUtils;
 import org.duniter.core.client.model.elasticsearch.Currency;
 import org.duniter.core.client.model.local.Peer;
 import org.duniter.core.client.service.bma.BlockchainRemoteService;
-import org.duniter.core.client.service.bma.WotRemoteService;
+import org.duniter.core.client.service.exception.HttpConnectException;
 import org.duniter.core.exception.TechnicalException;
 import org.duniter.core.service.CryptoService;
 import org.duniter.core.util.ObjectUtils;
 import org.duniter.core.util.StringUtils;
 import org.duniter.elasticsearch.PluginSettings;
+import org.duniter.elasticsearch.action.security.RestSecurityController;
 import org.duniter.elasticsearch.exception.AccessDeniedException;
 import org.duniter.elasticsearch.exception.DuplicateIndexIdException;
 import org.duniter.elasticsearch.exception.InvalidSignatureException;
@@ -84,7 +84,6 @@ public class RegistryService extends AbstractService {
     @Inject
     public RegistryService(Client client,
                            PluginSettings settings,
-                           WotRemoteService wotRemoteService,
                            CryptoService cryptoService,
                            BlockchainRemoteService blockchainRemoteService) {
         super("gchange." + INDEX, client, settings, cryptoService);
@@ -185,6 +184,32 @@ public class RegistryService extends AbstractService {
         bulkFromFile(bulkFile, INDEX, RECORD_TYPE);
     }
 
+    /**
+     * Retrieve the blockchain data, from peer
+     *
+     * @param peer
+     * @param autoReconnect
+     * @return the created blockchain
+     */
+    public Currency indexCurrencyFromPeer(Peer peer, boolean autoReconnect) {
+        if (!autoReconnect) return indexCurrencyFromPeer(peer);
+
+        while(true) {
+            try {
+                return indexCurrencyFromPeer(peer);
+            } catch (HttpConnectException e) {
+                // log then retry
+                logger.warn(String.format("[%s] Unable to connect. Retrying in 10s...", peer.toString()));
+            }
+
+            try {
+                Thread.sleep(10 * 1000); // wait 20s
+            } catch(Exception e) {
+                throw new TechnicalException(e);
+            }
+        }
+    }
+
     /**
      * Retrieve the blockchain data, from peer
      *
@@ -197,6 +222,7 @@ public class RegistryService extends AbstractService {
         BlockchainBlock currentBlock = blockchainRemoteService.getCurrentBlock(peer);
         long lastUD = blockchainRemoteService.getLastUD(peer);
 
+
         Currency result = new Currency();
         result.setCurrencyName(parameters.getCurrency());
         result.setFirstBlockSignature(firstBlock.getSignature());
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java
index f0efeaa8..68596eda 100644
--- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java
@@ -51,25 +51,22 @@ public class SynchroService extends AbstractService {
     public void synchronize() {
         logger.info("Synchronizing data...");
 
-        // TODO : get peers from currency ?
-        // check ESA (ES API) in peer document, and select only this peer
-
-        //Peer peer = new Peer("data.duniter.fr", 80);
-        Peer peer = new Peer("data.le-sou.org", 80);
-        //Peer peer = new Peer("192.168.0.28", 9203);
+        // TODO : get peers from currency - use peering BMA API, and select peers with ESA (ES API)
+        Peer peer = new Peer(pluginSettings.getDataSyncHost(), pluginSettings.getDataSyncPort());
 
         synchronize(peer);
     }
 
     public void synchronize(Peer peer) {
 
-        long sinceTime = 0; // ToDO: get time from somewhere ?
+        long sinceTime = 0; // ToDO: get last sync time from somewhere ? (e.g. a specific index)
 
         logger.info(String.format("[%s] Synchronizing data since %s...", peer.toString(), sinceTime));
 
         importMarketChanges(peer, sinceTime);
         importRegistryChanges(peer, sinceTime);
         importUserChanges(peer, sinceTime);
+        importMessageChanges(peer, sinceTime);
 
         logger.info(String.format("[%s] Synchronizing data since %s [OK]", peer.toString(), sinceTime));
     }
@@ -89,6 +86,10 @@ public class SynchroService extends AbstractService {
         importChanges(peer, UserService.INDEX, UserService.SETTINGS_TYPE,  sinceTime);
     }
 
+    public void importMessageChanges(Peer peer, long sinceTime) {
+        importChanges(peer, MessageService.INDEX, MessageService.RECORD_TYPE,  sinceTime);
+    }
+
     public void importChanges(Peer peer, String index, String type, long sinceTime) {
         importChanges(peer, index, type, Record.PROPERTY_ISSUER, Record.PROPERTY_TIME, sinceTime);
     }
@@ -115,7 +116,7 @@ public class SynchroService extends AbstractService {
                             // currency
                             /*.startObject("filter")
                                 .startObject("term")
-                                    .field("currency", "sou") // todo
+                                    .field("currency", "sou") // todo, filter on configured currency only
                                 .endObject()
                             .endObject()*/
                         .endObject()
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeEvent.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeEvent.java
new file mode 100644
index 00000000..19f1e73b
--- /dev/null
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeEvent.java
@@ -0,0 +1,66 @@
+package org.duniter.elasticsearch.websocket;
+
+/*
+    Copyright 2015 ForgeRock AS
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+*/
+
+import org.elasticsearch.common.bytes.BytesReference;
+import org.joda.time.DateTime;
+
+public class ChangeEvent {
+    private final String id;
+    private final String type;
+    private final DateTime timestamp;
+    private final Operation operation;
+    private final long version;
+    private final BytesReference source;
+
+    public enum Operation {
+        INDEX,CREATE,DELETE
+    }
+
+    public ChangeEvent(String id, String type, DateTime timestamp, Operation operation, long version, BytesReference source) {
+        this.id = id;
+        this.type = type;
+        this.timestamp = timestamp;
+        this.operation = operation;
+        this.version = version;
+        this.source = source;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public Operation getOperation() {
+        return operation;
+    }
+
+    public DateTime getTimestamp() {
+        return timestamp;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public long getVersion() {
+        return version;
+    }
+
+    public BytesReference getSource() {
+        return source;
+    }
+}
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeRegister.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeRegister.java
new file mode 100644
index 00000000..18ddd628
--- /dev/null
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeRegister.java
@@ -0,0 +1,218 @@
+package org.duniter.elasticsearch.websocket;
+
+/*
+    Copyright 2015 ForgeRock AS
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+*/
+
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.indexing.IndexingOperationListener;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.indices.IndicesLifecycle;
+import org.elasticsearch.indices.IndicesService;
+import org.glassfish.tyrus.server.Server;
+import org.joda.time.DateTime;
+
+import javax.websocket.DeploymentException;
+import java.io.IOException;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class ChangeRegister {
+
+    private static final String SETTING_PRIMARY_SHARD_ONLY = "changes.primaryShardOnly";
+    private static final String SETTING_PORT = "changes.port";
+    private static final String SETTING_LISTEN_SOURCE = "changes.listenSource";
+
+    private final ESLogger log = Loggers.getLogger(ChangeRegister.class);
+
+    private static final Map<String, WebSocketServerEndPoint> LISTENERS = new HashMap<String, WebSocketServerEndPoint>();
+
+    @Inject
+    public ChangeRegister(final Settings settings, IndicesService indicesService) {
+        final boolean allShards = !settings.getAsBoolean(SETTING_PRIMARY_SHARD_ONLY, Boolean.FALSE);
+        final int port = settings.getAsInt(SETTING_PORT, 9400);
+        final String[] sourcesStr = settings.getAsArray(SETTING_LISTEN_SOURCE, new String[]{"*"});
+        final Set<ChangeSource> sources = new HashSet<>();
+        for(String sourceStr : sourcesStr) {
+            sources.add(new ChangeSource(sourceStr));
+        }
+
+        final Server server = new Server("localhost", port, "/ws", null, WebSocketServerEndPoint.class) ;
+
+        try {
+            log.info("Starting WebSocketServerEndPoint server");
+            AccessController.doPrivileged(new PrivilegedAction() {
+                @Override
+                public Object run() {
+                    try {
+                        // Tyrus tries to load the server code using reflection. In Elasticsearch 2.x Java
+                        // security manager is used which breaks the reflection code as it can't find the class.
+                        // This is a workaround for that
+                        Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+                        server.start();
+                        return null;
+                    } catch (DeploymentException e) {
+                        throw new RuntimeException("Failed to start server", e);
+                    }
+                }
+            });
+            log.info("WebSocketServerEndPoint server started");
+        } catch (Exception e) {
+            log.error("Failed to start WebSocketServerEndPoint server",e);
+            throw new RuntimeException(e);
+        }
+
+        indicesService.indicesLifecycle().addListener(new IndicesLifecycle.Listener() {
+            @Override
+            public void afterIndexShardStarted(IndexShard indexShard) {
+                final String indexName = indexShard.routingEntry().getIndex();
+                if (allShards || indexShard.routingEntry().primary()) {
+
+                    indexShard.indexingService().addListener(new IndexingOperationListener() {
+                        @Override
+                        public void postCreate(Engine.Create create) {
+                            ChangeEvent change=new ChangeEvent(
+                                    create.id(),
+                                    create.type(),
+                                    new DateTime(),
+                                    ChangeEvent.Operation.CREATE,
+                                    create.version(),
+                                    create.source()
+                            );
+
+                            addChange(change);
+                        }
+
+                        @Override
+                        public void postDelete(Engine.Delete delete) {
+                            ChangeEvent change=new ChangeEvent(
+                                    delete.id(),
+                                    delete.type(),
+                                    new DateTime(),
+                                    ChangeEvent.Operation.DELETE,
+                                    delete.version(),
+                                    null
+                            );
+
+                            addChange(change);
+                        }
+
+                        @Override
+                        public void postIndex(Engine.Index index) {
+
+                            ChangeEvent change=new ChangeEvent(
+                                    index.id(),
+                                    index.type(),
+                                    new DateTime(),
+                                    ChangeEvent.Operation.INDEX,
+                                    index.version(),
+                                    index.source()
+                            );
+
+                            addChange(change);
+                        }
+
+                        private boolean filter(String index, String type, String id, ChangeSource source) {
+                            if (source.getIndices() != null && !source.getIndices().contains(index)) {
+                                return false;
+                            }
+
+                            if (source.getTypes() != null && !source.getTypes().contains(type)) {
+                                return false;
+                            }
+
+                            if (source.getIds() != null && !source.getIds().contains(id)) {
+                                return false;
+                            }
+
+                            return true;
+                        }
+
+                        private boolean filter(String index, ChangeEvent change) {
+                            for (ChangeSource source : sources) {
+                                if (filter(index, change.getType(), change.getId(), source)) {
+                                    return true;
+                                }
+                            }
+
+                            return false;
+                        }
+
+                        private void addChange(ChangeEvent change) {
+
+                            if (!filter(indexName, change)) {
+                                return;
+                            }
+
+
+                            String message;
+                            try {
+                                XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput());
+                                builder.startObject()
+                                        .field("_index", indexName)
+                                        .field("_type", change.getType())
+                                        .field("_id", change.getId())
+                                        .field("_timestamp", change.getTimestamp())
+                                        .field("_version", change.getVersion())
+                                        .field("_operation", change.getOperation().toString());
+                                if (change.getSource() != null) {
+                                    builder.rawField("_source", change.getSource());
+                                }
+                                builder.endObject();
+
+
+
+                                message = builder.string();
+                            } catch (IOException e) {
+                                log.error("Failed to write JSON", e);
+                                return;
+                            }
+
+                            for (WebSocketServerEndPoint listener : LISTENERS.values()) {
+                                try {
+                                    listener.sendMessage(message);
+                                } catch (Exception e) {
+                                    log.error("Failed to send message", e);
+                                }
+
+                            }
+
+                        }
+                    });
+                }
+            }
+
+        });
+    }
+
+    public static void registerListener(WebSocketServerEndPoint webSocket) {
+        LISTENERS.put(webSocket.getId(), webSocket);
+    }
+
+    public static void unregisterListener(WebSocketServerEndPoint webSocket) {
+        LISTENERS.remove(webSocket.getId());
+    }
+}
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeSource.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeSource.java
new file mode 100644
index 00000000..bf3b959d
--- /dev/null
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangeSource.java
@@ -0,0 +1,58 @@
+package org.duniter.elasticsearch.websocket;
+
+/*
+    Copyright 2015 ForgeRock AS
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+*/
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+public class ChangeSource {
+    private final Set<String> indices;
+    private final Set<String> types;
+    private final Set<String> ids;
+
+    public ChangeSource(String source) {
+        String[] parts = source.split("/");
+
+        indices = parts[0].equals("*") ? null : ImmutableSet.copyOf(parts[0].split(","));
+
+        if (parts.length > 1) {
+            types = parts[1].equals("*") ? null : ImmutableSet.copyOf(parts[1].split(","));
+        } else {
+            types = null;
+        }
+
+        if (parts.length > 2) {
+            ids = parts[2].equals("*") ? null : ImmutableSet.copyOf(parts[2].split(","));
+        } else {
+            ids = null;
+        }
+    }
+
+    public Set<String> getIds() {
+        return ids;
+    }
+
+    public Set<String> getIndices() {
+        return indices;
+    }
+
+    public Set<String> getTypes() {
+        return types;
+    }
+
+}
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangesModule.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangesModule.java
new file mode 100644
index 00000000..9bc266eb
--- /dev/null
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/ChangesModule.java
@@ -0,0 +1,31 @@
+package org.duniter.elasticsearch.websocket;
+
+/*
+    Copyright 2015 ForgeRock AS
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+*/
+
+import org.elasticsearch.common.inject.AbstractModule;
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+
+public class ChangesModule extends AbstractModule {
+    private final ESLogger log = Loggers.getLogger(ChangesModule.class);
+    
+    @Override
+    protected void configure() {
+        log.debug("Binding Changes Module");
+        bind(ChangeRegister.class).asEagerSingleton();
+    }
+}
diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServerEndPoint.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServerEndPoint.java
new file mode 100644
index 00000000..c7e8ea3d
--- /dev/null
+++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServerEndPoint.java
@@ -0,0 +1,64 @@
+package org.duniter.elasticsearch.websocket;
+
+/*
+    Copyright 2015 ForgeRock AS
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+*/
+
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.common.logging.Loggers;
+
+import javax.websocket.*;
+import javax.websocket.server.ServerEndpoint;
+
+@ServerEndpoint(value = "/_changes")
+public class WebSocketServerEndPoint {
+
+    private final ESLogger log = Loggers.getLogger(WebSocketServerEndPoint.class);
+    private Session session;
+
+    @OnOpen
+    public void onOpen(Session session) {
+        log.info("Connected ... " + session.getId());
+        this.session = session;
+        ChangeRegister.registerListener(this);
+    }
+
+    public void sendMessage(String message) {
+        session.getAsyncRemote().sendText(message);
+    }
+
+    public String getId() {
+        return session == null ? null : session.getId();
+    }
+
+    @OnMessage
+    public void onMessage(String message) {
+        log.info("Received message: "+message);
+    }
+
+    @OnClose
+    public void onClose(CloseReason reason) {
+        log.info("Closing websocket: "+reason);
+        ChangeRegister.unregisterListener(this);
+        this.session = null;
+    }
+
+    @OnError
+    public void onError(Throwable t) {
+        log.error("Error on websocket "+(session == null ? null : session.getId()), t);
+    }
+
+
+}
diff --git a/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml b/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml
index d4ea3c65..8f8cfb69 100644
--- a/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml
+++ b/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml
@@ -74,7 +74,7 @@ http.cors.enabled: true
 # The default list of hosts is ["127.0.0.1", "[::1]"]
 #
 #discovery.zen.ping.unicast.hosts: ["192.168.0.5", "192.168.0.28"]
-discovery.zen.ping.unicast.hosts: ["127.0.0.1", ""]
+discovery.zen.ping.unicast.hosts: ["127.0.0.1"]
 #
 # Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
 #
@@ -105,28 +105,31 @@ action.destructive_requires_name: true
 security.manager.enabled: false
 
 
-#duniter.disable: true
+#duniter.enabled: true
 #duniter.host: cgeek.fr
 #duniter.port: 9330
 
-duniter.host: 192.168.0.5
-duniter.port: 9602
+duniter.host: 192.168.0.28
+duniter.port: 9604
 #duniter.host: 192.168.0.28
 #duniter.port: 9202
 
+changes.listenSource: currency,*/block,*/record
+
 duniter.string.analyzer: french
 
 #duniter.indices.reload: true
 
 # Should synchronize node blockchain ?
 #duniter.blockchain.sync.enable: true
-duniter.blockchain.sync.enable: false
+duniter.blockchain.sync.enable: true
 
-#TODO : enable
-duniter.security.enable: false
+duniter.security.enable: true
 
 # Should synchronize data using P2P
-duniter.data.sync.enable: true
+duniter.data.sync.enable: false
+duniter.data.sync.host: data.le-sou.org
+duniter.data.sync.port: 80
 #TODO duniter.network.timeout:
 
 # TODO : implement this option (check if same cluster)
diff --git a/pom.xml b/pom.xml
index 52234280..6e0ae132 100644
--- a/pom.xml
+++ b/pom.xml
@@ -362,6 +362,22 @@
         <artifactId>tyrus-container-grizzly-client</artifactId>
         <version>${tyrus.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.glassfish.tyrus</groupId>
+        <artifactId>tyrus-server</artifactId>
+        <version>${tyrus.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.glassfish.tyrus</groupId>
+        <artifactId>tyrus-container-grizzly-server</artifactId>
+        <version>${tyrus.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>com.github.spullara.mustache.java</groupId>
+        <artifactId>mustache.java</artifactId>
+        <version>0.9.4</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
-- 
GitLab