diff --git a/README.md b/README.md index 0cc3c4b3bac94885b11517272c2e3d0359575ce5..7e9cf26c3a301539bfc20ae461eece36ff1111db 100644 --- a/README.md +++ b/README.md @@ -41,15 +41,37 @@ sudo apt-get install openjdk-8-jre ### Install ElasticSearch plugins +```bash /bin/plugin install mapper-attachments /bin/plugin install https://github.com/duniter/duniter4j/releases/download/0.2.0/duniter4j-elasticsearch-0.2.0.zip +``` + +### Install libsodium + +[The Sodium crypto library (libsodium)](https://download.libsodium.org/doc/installation/) is a modern, easy-to-use software library for encryption, decryption, signatures, password hashing and more. + +- Get libsodium +``` + wget -kL https://github.com/jedisct1/libsodium/releases/download/1.0.11/libsodium-1.0.11.tar.gz + tar -xvf libsodium-1.0.11.tar.gz +``` +- Installation: +``` + cd libsodium-1.0.11 + sudo apt-get install build-essential + sudo ./configure + sudo make && make check + sudo make install +``` ## Install from standalone bundle - Install Java (see on top) + - Install Libsodium (see on top) + - Download lastest release of file duniter4j-elasticsearch-X.Y-standalone.zip - Unzip diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/MessageRecord.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/MessageRecord.java index b58b05ad9e8943104d416b4b846ba410f6185cfa..5ab690534818f7078d90e7cf259fa042d75f0e02 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/MessageRecord.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/MessageRecord.java @@ -27,22 +27,13 @@ package org.duniter.core.client.model.elasticsearch; */ public class MessageRecord extends Record { - public static final String PROPERTY_TIME="time"; public static final String PROPERTY_CONTENT="content"; public static final String PROPERTY_RECIPIENT="recipient"; - private Integer time; + private String content; private String recipient; - public Integer getTime() { - return time; - } - - public void setTime(Integer time) { - this.time = time; - } - public String getContent() { return content; } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/Record.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/Record.java index ccdf7c8d95a97920547a78bf5d577201717fbb6d..ad56aeb6b8553b5d502adef4f431b9dfd8428a06 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/Record.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/Record.java @@ -30,10 +30,12 @@ public class Record { public static final String PROPERTY_ISSUER="issuer"; public static final String PROPERTY_HASH="hash"; public static final String PROPERTY_SIGNATURE="signature"; + public static final String PROPERTY_TIME="time"; private String issuer; private String hash; private String signature; + private Integer time; public String getIssuer() { return issuer; @@ -58,4 +60,14 @@ public class Record { public void setSignature(String signature) { this.signature = signature; } + + + public Integer getTime() { + return time; + } + + public void setTime(Integer time) { + this.time = time; + } + } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpService.java index 633508934cb12c60fc459732b45238d6f1b0f45a..b2390f4087da0e050af5749f7d4fb3798854b315 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpService.java @@ -22,11 +22,14 @@ package org.duniter.core.client.service; * #L% */ +import org.apache.http.client.utils.URIBuilder; import org.duniter.core.beans.Service; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.exception.PeerConnectionException; import org.apache.http.client.methods.HttpUriRequest; +import java.net.URI; + /** * Created by blavenie on 29/12/15. */ @@ -38,6 +41,8 @@ public interface HttpService extends Service { <T> T executeRequest(HttpUriRequest request, Class<? extends T> resultClass) ; + <T> T executeRequest(HttpUriRequest request, Class<? extends T> resultClass, Class<?> errorClass); + <T> T executeRequest(String absolutePath, Class<? extends T> resultClass) ; <T> T executeRequest(Peer peer, String absolutePath, Class<? extends T> resultClass); @@ -45,4 +50,6 @@ public interface HttpService extends Service { String getPath(Peer peer, String absolutePath); String getPath(String absolutePath); + + URIBuilder getURIBuilder(URI baseUri, String... path); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java index ac6e2f870ad5658a62ff92098be699e95962d5aa..102e51961a2fa6b4547dde168198729de4ef70e5 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java @@ -22,7 +22,10 @@ package org.duniter.core.client.service; * #L% */ +import com.google.common.base.Joiner; import com.google.gson.Gson; +import org.apache.http.client.utils.URIBuilder; +import org.apache.http.entity.mime.content.InputStreamBody; import org.duniter.core.beans.InitializingBean; import org.duniter.core.client.config.Configuration; import org.duniter.core.client.model.bma.Error; @@ -42,6 +45,7 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.duniter.core.util.StringUtils; import org.nuiton.i18n.I18n; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +53,8 @@ import org.slf4j.LoggerFactory; import java.io.*; import java.net.ConnectException; import java.net.SocketTimeoutException; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; /** @@ -124,6 +130,10 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean return executeRequest(httpClient, request, resultClass); } + public <T> T executeRequest(HttpUriRequest request, Class<? extends T> resultClass, Class<?> errorClass) { + return executeRequest(httpClient, request, resultClass, errorClass); + } + public <T> T executeRequest(String absolutePath, Class<? extends T> resultClass) { HttpGet httpGet = new HttpGet(getPath(absolutePath)); return executeRequest(httpClient, httpGet, resultClass); @@ -144,6 +154,24 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean return new StringBuilder().append(defaultPeer.getUrl()).append(absolutePath).toString(); } + public URIBuilder getURIBuilder(URI baseUri, String... path) { + String pathToAppend = Joiner.on('/').skipNulls().join(path); + + int customQueryStartIndex = pathToAppend.indexOf('?'); + String customQuery = null; + if (customQueryStartIndex != -1) { + customQuery = pathToAppend.substring(customQueryStartIndex+1); + pathToAppend = pathToAppend.substring(0, customQueryStartIndex); + } + + URIBuilder builder = new URIBuilder(baseUri); + + builder.setPath(baseUri.getPath() + pathToAppend); + if (StringUtils.isNotBlank(customQuery)) { + builder.setCustomQuery(customQuery); + } + return builder; + } /* -- Internal methods -- */ @@ -165,8 +193,12 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean return RequestConfig.custom().setSocketTimeout(baseTimeOut).setConnectTimeout(baseTimeOut).build(); } - @SuppressWarnings("unchecked") protected <T> T executeRequest(HttpClient httpClient, HttpUriRequest request, Class<? extends T> resultClass) { + return executeRequest(httpClient, request, resultClass, Error.class); + } + + @SuppressWarnings("unchecked") + protected <T> T executeRequest(HttpClient httpClient, HttpUriRequest request, Class<? extends T> resultClass, Class<?> errorClass) { T result = null; if (log.isDebugEnabled()) { @@ -195,8 +227,13 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean throw new HttpNotFoundException(I18n.t("duniter4j.client.notFound", request.toString())); case HttpStatus.SC_BAD_REQUEST: try { - Error error = (Error)parseResponse(response, Error.class); - throw new HttpBadRequestException(error); + Object errorResponse = parseResponse(response, errorClass); + if (errorResponse instanceof Error) { + throw new HttpBadRequestException((Error)errorResponse); + } + else { + throw new HttpBadRequestException(errorResponse.toString()); + } } catch(IOException e) { throw new HttpBadRequestException(I18n.t("duniter4j.client.status", response.getStatusLine().toString())); @@ -232,19 +269,24 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean protected Object parseResponse(HttpResponse response, Class<?> ResultClass) throws IOException { Object result = null; - boolean stringOutput = ResultClass != null && ResultClass.equals(String.class); + boolean isStreamContent = ResultClass == null || ResultClass.equals(InputStream.class); + boolean isStringContent = !isStreamContent && ResultClass != null && ResultClass.equals(String.class); - // If trace enable, log the response before parsing - Exception error = null; - if (stringOutput) { - InputStream content = null; + InputStream content = response.getEntity().getContent(); + + // If should return an inputstream + if (isStreamContent) { + result = content; // must be close by caller + } + + // If should return String + else if (isStringContent) { try { - content = response.getEntity().getContent(); String stringContent = getContentAsString(content); + // Add a debug before returning the result if (log.isDebugEnabled()) { log.debug("Parsing response:\n" + stringContent); } - return stringContent; } finally { @@ -254,11 +296,9 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean } } - // trace not enable + // deserialize using gson else { - InputStream content = null; try { - content = response.getEntity().getContent(); Reader reader = new InputStreamReader(content, StandardCharsets.UTF_8); if (ResultClass != null) { result = gson.fromJson(reader, ResultClass); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java index 5f4b669631058ac10953a4ec58dffa722579fa51..8e9b039c11acc6e5b3791cb6282b8272b3d848a1 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java @@ -27,6 +27,7 @@ import org.duniter.core.beans.Bean; import org.duniter.core.beans.BeanFactory; import org.duniter.core.client.service.bma.*; import org.duniter.core.client.service.elasticsearch.CurrencyRegistryRemoteService; +import org.duniter.core.client.service.elasticsearch.MarketRemoteService; import org.duniter.core.client.service.local.CurrencyService; import org.duniter.core.client.service.local.PeerService; import org.duniter.core.service.CryptoService; @@ -139,6 +140,10 @@ public class ServiceLocator implements Closeable { return getBean(CurrencyRegistryRemoteService.class); } + public MarketRemoteService getMarketRemoteService() { + return getBean(MarketRemoteService.class); + } + public <S extends Bean> S getBean(Class<S> clazz) { if (beanFactory == null) { initBeanFactory(); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BaseRemoteServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BaseRemoteServiceImpl.java index 753a6398639d87549b3a0cc6a9413c97c3bd03b1..d0d2e07375440ac61c5572d571c8011621d5dc8e 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BaseRemoteServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BaseRemoteServiceImpl.java @@ -22,6 +22,7 @@ package org.duniter.core.client.service.bma; * #L% */ +import org.apache.http.client.utils.URIBuilder; import org.duniter.core.beans.InitializingBean; import org.duniter.core.beans.Service; import org.duniter.core.client.model.local.Peer; @@ -29,8 +30,12 @@ import org.duniter.core.client.service.HttpService; import org.duniter.core.client.service.local.PeerService; import org.duniter.core.client.service.ServiceLocator; import org.apache.http.client.methods.HttpUriRequest; +import org.duniter.core.exception.TechnicalException; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; /** * Created by eis on 05/02/15. @@ -73,4 +78,17 @@ public abstract class BaseRemoteServiceImpl implements Service, InitializingBean public String getPath(Peer peer, String aPath) { return httpService.getPath(peer, aPath); } + + public URIBuilder getURIBuilder(URI baseUri, String... path) { + return httpService.getURIBuilder(baseUri, path); + } + + public URIBuilder getURIBuilder(URL baseUrl, String... path) { + try { + return httpService.getURIBuilder(baseUrl.toURI(), path); + } + catch(URISyntaxException e) { + throw new TechnicalException(e); + } + } } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/elasticsearch/CurrencyRegistryRemoteServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/elasticsearch/CurrencyRegistryRemoteServiceImpl.java index b11d7f7bfe071adb8f0abf769ec81bfe36406af4..f9ce3c7e43d137a38b02e0d977b1498e9713b922 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/elasticsearch/CurrencyRegistryRemoteServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/elasticsearch/CurrencyRegistryRemoteServiceImpl.java @@ -154,7 +154,7 @@ public class CurrencyRegistryRemoteServiceImpl extends BaseRemoteServiceImpl imp log.debug("Registering a new currency..."); } - URIBuilder builder = getURIBuilder(URL_ADD_CURRENCY); + URIBuilder builder = getURIBuilder(config.getNodeElasticSearchUrl(), URL_ADD_CURRENCY); builder.addParameter("pubkey", pubkey); builder.addParameter("currency", jsonCurrency); builder.addParameter("sig", signature); @@ -176,28 +176,5 @@ public class CurrencyRegistryRemoteServiceImpl extends BaseRemoteServiceImpl imp /* -- protected methods -- */ - protected URIBuilder getURIBuilder(String... path) { - String pathToAppend = Joiner.on('/').skipNulls().join(path); - int customQueryStartIndex = pathToAppend.indexOf('?'); - String customQuery = null; - if (customQueryStartIndex != -1) { - customQuery = pathToAppend.substring(customQueryStartIndex+1); - pathToAppend = pathToAppend.substring(0, customQueryStartIndex); - } - - try { - URI baseUri = config.getNodeElasticSearchUrl().toURI(); - URIBuilder builder = new URIBuilder(baseUri); - - builder.setPath(baseUri.getPath() + pathToAppend); - if (StringUtils.isNotBlank(customQuery)) { - builder.setCustomQuery(customQuery); - } - return builder; - } - catch(URISyntaxException e) { - throw new TechnicalException(e); - } - } } diff --git a/duniter4j-core-client/src/test/resources/META-INF/services/org.duniter.core.beans.Bean b/duniter4j-core-client/src/test/resources/META-INF/services/org.duniter.core.beans.Bean index e4fdb2bbd7e5e975380f1b3256c23970d322ba73..4b0ec8d58e681c647043ae4cd21bac45e20cea27 100644 --- a/duniter4j-core-client/src/test/resources/META-INF/services/org.duniter.core.beans.Bean +++ b/duniter4j-core-client/src/test/resources/META-INF/services/org.duniter.core.beans.Bean @@ -3,6 +3,7 @@ org.duniter.core.client.service.bma.NetworkRemoteServiceImpl org.duniter.core.client.service.bma.WotRemoteServiceImpl org.duniter.core.client.service.bma.TransactionRemoteServiceImpl org.duniter.core.client.service.elasticsearch.CurrencyRegistryRemoteServiceImpl +org.duniter.core.client.service.elasticsearch.MarketRemoteServiceImpl org.duniter.core.service.Ed25519CryptoServiceImpl org.duniter.core.client.service.HttpServiceImpl org.duniter.core.client.service.DataContext diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 5ca82c34b7ec294186826b07fcf50f3dd4edcd26..e72c2ead59255f24e5049c1cae3ffc63ef62cc0a 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -229,6 +229,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsBoolean("duniter.security.enable", true); } + public boolean enableNetworkSync() { + return settings.getAsBoolean("duniter.data.sync.enable", false); + } + /* protected methods */ protected void initI18n() throws IOException { diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/RestModule.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/RestModule.java index 8057d6abe5cc61f53fee29f072e178d824ace6c1..6c68949b37aca1b2e3e7ea5009e5043103286375 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/RestModule.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/RestModule.java @@ -25,7 +25,8 @@ package org.duniter.elasticsearch.action; import org.duniter.elasticsearch.action.currency.RestCurrencyIndexAction; import org.duniter.elasticsearch.action.history.RestHistoryDeleteIndexAction; import org.duniter.elasticsearch.action.market.*; -import org.duniter.elasticsearch.action.message.RestMessageIndexAction; +import org.duniter.elasticsearch.action.message.RestMessageInboxIndexAction; +import org.duniter.elasticsearch.action.message.RestMessageOutboxIndexAction; import org.duniter.elasticsearch.action.registry.*; import org.duniter.elasticsearch.action.security.RestSecurityAuthAction; import org.duniter.elasticsearch.action.security.RestSecurityController; @@ -75,7 +76,7 @@ public class RestModule extends AbstractModule implements Module { bind(RestHistoryDeleteIndexAction.class).asEagerSingleton(); // Message - bind(RestMessageIndexAction.class).asEagerSingleton(); - + bind(RestMessageInboxIndexAction.class).asEagerSingleton(); + bind(RestMessageOutboxIndexAction.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageIndexAction.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageInboxIndexAction.java similarity index 81% rename from duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageIndexAction.java rename to duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageInboxIndexAction.java index 90e944a3ce1d5ce61bef3756b63f6abd031574f5..d6c5bdfd75b95bebc24b2f2d10b39c214ff61f26 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageIndexAction.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageInboxIndexAction.java @@ -30,12 +30,12 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; -public class RestMessageIndexAction extends AbstractRestPostIndexAction { +public class RestMessageInboxIndexAction extends AbstractRestPostIndexAction { @Inject - public RestMessageIndexAction(Settings settings, RestController controller, Client client, - RestSecurityController securityController, - final MessageService service) { + public RestMessageInboxIndexAction(Settings settings, RestController controller, Client client, + RestSecurityController securityController, + final MessageService service) { super(settings, controller, client, securityController, MessageService.INDEX, MessageService.RECORD_TYPE, diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageOutboxIndexAction.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageOutboxIndexAction.java new file mode 100644 index 0000000000000000000000000000000000000000..70e08b0ab147e44d2aa98a12b89d8c18c35c395c --- /dev/null +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/message/RestMessageOutboxIndexAction.java @@ -0,0 +1,44 @@ +package org.duniter.elasticsearch.action.message; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.action.AbstractRestPostIndexAction; +import org.duniter.elasticsearch.action.security.RestSecurityController; +import org.duniter.elasticsearch.service.MessageService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestController; + +public class RestMessageOutboxIndexAction extends AbstractRestPostIndexAction { + + @Inject + public RestMessageOutboxIndexAction(Settings settings, RestController controller, Client client, + RestSecurityController securityController, + final MessageService service) { + super(settings, controller, client, securityController, + MessageService.INDEX, + MessageService.OUTBOX_TYPE, + json -> service.indexRecordFromJson(json)); + } +} \ No newline at end of file diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/security/RestSecurityFilter.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/security/RestSecurityFilter.java index 87fdac972fdd502180b6374ad9fbfd5b1e3e81cb..189a999b477498efecd28760eecf15a419915c1f 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/security/RestSecurityFilter.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/security/RestSecurityFilter.java @@ -40,6 +40,7 @@ public class RestSecurityFilter extends RestFilter { public RestSecurityFilter(PluginSettings pluginSettings, RestController controller, RestSecurityController securityController) { super(); if (pluginSettings.enableSecurity()) { + log.info("Enable security on duniter4j index access"); controller.registerFilter(this); } this.securityController = securityController; diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java index 842a505ff8cdbb5ebce3acbcd21192d0b8ff3a32..75b4450ce9d18af03dc7b42f7288b01ad98706d5 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java @@ -22,15 +22,11 @@ package org.duniter.elasticsearch.node; * #L% */ -import org.duniter.core.client.model.bma.BlockchainBlock; -import org.duniter.core.client.model.bma.gson.GsonUtils; import org.duniter.core.client.model.local.Peer; -import org.duniter.core.client.service.bma.BlockchainRemoteService; -import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.service.*; +import org.duniter.elasticsearch.service.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -139,5 +135,10 @@ public class DuniterNode extends AbstractLifecycleComponent<DuniterNode> { .listenAndIndexNewBlock(peer); } + + if (pluginSettings.enableNetworkSync()) { + // Synchronize + injector.getInstance(SynchroService.class).synchronize(); + } } } diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index e417f1e0eac554dd15265284de0d6ff82001b79f..f7cc988290f8642466b068a0b6c38b3971257b1b 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -153,25 +153,6 @@ public abstract class AbstractService implements Bean { .execute().actionGet(); } - protected XContentBuilder createDefaultAnalyzer() { - try { - XContentBuilder analyzer = XContentFactory.jsonBuilder().startObject().startObject("analyzer") - .startObject("custom_french_analyzer") - .field("tokenizer", "letter") - .field("filter", "asciifolding", "lowercase", "french_stem", "elision", "stop") - .endObject() - .startObject("tag_analyzer") - .field("tokenizer", "keyword") - .field("filter", "asciifolding", "lowercase") - .endObject() - .endObject().endObject(); - - return analyzer; - } catch(IOException e) { - throw new TechnicalException("Error while preparing default index analyzer: " + e.getMessage(), e); - } - } - protected JsonNode readAndVerifyIssuerSignature(String recordJson) throws ElasticsearchException { try { @@ -199,7 +180,7 @@ public abstract class AbstractService implements Bean { .replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), ""); if (!cryptoService.verify(recordNoSign, signature, issuer)) { - throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign); + throw new InvalidSignatureException("Invalid signature of JSON string"); } // TODO: check issuer is in the WOT ? diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index a847e4780a166cdd8a215d619e529a2eb0c7043c..645dbfd82be874cde2b6336c9370dec379764396 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -667,9 +667,9 @@ public class BlockchainService extends AbstractService { protected List<BlockchainBlock> toBlocks(SearchResponse response, boolean withHighlight) { // Read query result - SearchHit[] searchHits = response.getHits().getHits(); - List<BlockchainBlock> result = Lists.newArrayListWithCapacity(searchHits.length); - for (SearchHit searchHit : searchHits) { + List<BlockchainBlock> result = Lists.newArrayList(); + // TODO : test this lambda expression + response.getHits().forEach(searchHit -> { BlockchainBlock block; if (searchHit.source() != null) { String jsonString = new String(searchHit.source()); @@ -697,7 +697,7 @@ public class BlockchainService extends AbstractService { block.setHash(blockNameHighLight); } } - } + }); return result; } diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MessageService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MessageService.java index 3dd902f50ab960bfad652d015a4fb6acf950ab70..535d9e01bc42a0e8a79b7d7d1c8a66607f51d514 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MessageService.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MessageService.java @@ -45,6 +45,7 @@ public class MessageService extends AbstractService { public static final String INDEX = "message"; public static final String RECORD_TYPE = "record"; + public static final String OUTBOX_TYPE = "outbox"; @Inject @@ -97,6 +98,7 @@ public class MessageService extends AbstractService { .build(); createIndexRequestBuilder.setSettings(indexSettings); createIndexRequestBuilder.addMapping(RECORD_TYPE, createRecordType()); + createIndexRequestBuilder.addMapping(OUTBOX_TYPE, createOutboxType()); createIndexRequestBuilder.execute().actionGet(); return this; @@ -119,11 +121,26 @@ public class MessageService extends AbstractService { return response.getId(); } + public String indexOuboxFromJson(String recordJson) { + + JsonNode actualObj = readAndVerifyIssuerSignature(recordJson); + String issuer = getIssuer(actualObj); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Indexing a message from issuer [%s]", issuer.substring(0, 8))); + } + + IndexResponse response = client.prepareIndex(INDEX, OUTBOX_TYPE) + .setSource(recordJson) + .setRefresh(false) + .execute().actionGet(); + + return response.getId(); + } + /* -- Internal methods -- */ public XContentBuilder createRecordType() { - String stringAnalyzer = pluginSettings.getDefaultStringAnalyzer(); - try { XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_TYPE) .startObject("properties") @@ -167,4 +184,7 @@ public class MessageService extends AbstractService { } } + public XContentBuilder createOutboxType() { + return createRecordType(); // same as outbox + } } diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java index c9b604add94af77402129a88ce6fba3dd6662a8e..c2604584460c86c9c6180616adb622f166ae1dba 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java @@ -35,6 +35,7 @@ import org.duniter.core.client.service.local.CurrencyService; import org.duniter.core.client.service.local.PeerService; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.service.synchro.SynchroService; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -43,14 +44,19 @@ public class ServiceModule extends AbstractModule implements Module { @Override protected void configure() { bind(ServiceLocator.class).asEagerSingleton(); - // ES service + // ES common service bind(PluginSettings.class).asEagerSingleton(); + + // ES indexation services bind(RegistryService.class).asEagerSingleton(); bind(MarketService.class).asEagerSingleton(); bind(BlockchainService.class).asEagerSingleton(); bind(MessageService.class).asEagerSingleton(); bind(HistoryService.class).asEagerSingleton(); + // ES Synchro services + bind(SynchroService.class).asEagerSingleton(); + // Duniter Client API beans bindWithLocator(BlockchainRemoteService.class); bindWithLocator(NetworkRemoteService.class); diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java new file mode 100644 index 0000000000000000000000000000000000000000..f0efeaa845e61d8a0512dd00ebbb4fccaaef88aa --- /dev/null +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java @@ -0,0 +1,267 @@ +package org.duniter.elasticsearch.service.synchro; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Joiner; +import org.apache.commons.io.IOUtils; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.ByteArrayEntity; +import org.duniter.core.client.model.elasticsearch.Record; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.service.HttpService; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.exception.InvalidFormatException; +import org.duniter.elasticsearch.service.*; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; + +/** + * Created by blavenie on 27/10/16. + */ +public class SynchroService extends AbstractService { + + protected HttpService httpService; + + @Inject + public SynchroService(Client client, PluginSettings settings, CryptoService cryptoService, + ThreadPool threadPool, final ServiceLocator serviceLocator) { + super("duniter.network.p2p", client, settings,cryptoService); + threadPool.scheduleOnStarted(() -> { + httpService = serviceLocator.getHttpService(); + }); + } + + public void synchronize() { + logger.info("Synchronizing data..."); + + // TODO : get peers from currency ? + // check ESA (ES API) in peer document, and select only this peer + + //Peer peer = new Peer("data.duniter.fr", 80); + Peer peer = new Peer("data.le-sou.org", 80); + //Peer peer = new Peer("192.168.0.28", 9203); + + synchronize(peer); + } + + public void synchronize(Peer peer) { + + long sinceTime = 0; // ToDO: get time from somewhere ? + + logger.info(String.format("[%s] Synchronizing data since %s...", peer.toString(), sinceTime)); + + importMarketChanges(peer, sinceTime); + importRegistryChanges(peer, sinceTime); + importUserChanges(peer, sinceTime); + + logger.info(String.format("[%s] Synchronizing data since %s [OK]", peer.toString(), sinceTime)); + } + + public void importMarketChanges(Peer peer, long sinceTime) { + importChanges(peer, MarketService.INDEX, MarketService.RECORD_TYPE, sinceTime); + importChanges(peer, MarketService.INDEX, MarketService.RECORD_COMMENT_TYPE, sinceTime); + } + + public void importRegistryChanges(Peer peer, long sinceTime) { + importChanges(peer, RegistryService.INDEX, RegistryService.RECORD_TYPE, sinceTime); + importChanges(peer, RegistryService.INDEX, RegistryService.RECORD_COMMENT_TYPE, sinceTime); + } + + public void importUserChanges(Peer peer, long sinceTime) { + importChanges(peer, UserService.INDEX, UserService.PROFILE_TYPE, sinceTime); + importChanges(peer, UserService.INDEX, UserService.SETTINGS_TYPE, sinceTime); + } + + public void importChanges(Peer peer, String index, String type, long sinceTime) { + importChanges(peer, index, type, Record.PROPERTY_ISSUER, Record.PROPERTY_TIME, sinceTime); + } + + public void importChanges(Peer peer, String index, String type, String issuerFieldName, String versionFieldName, long sinceTime) { + + // Create the search query + BytesStreamOutput bos; + try { + bos = new BytesStreamOutput(); + XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, bos); + builder.startObject() + .startObject("query") + // bool.should + .startObject("bool") + .startObject("should") + // time > sinceDate + .startObject("range") + .startObject("time") + .field("gte", sinceTime) + .endObject() + .endObject() + .endObject() + // currency + /*.startObject("filter") + .startObject("term") + .field("currency", "sou") // todo + .endObject() + .endObject()*/ + .endObject() + // end: query + .endObject() + .field("from", 0) // todo + .field("size", 100) // todo + .endObject(); + builder.flush(); + + } catch(IOException e) { + throw new TechnicalException("Error while preparing default index analyzer: " + e.getMessage(), e); + } + + // Execute query + String path = "/" + Joiner.on('/').join(new String[]{index, type, "_search"}); + HttpPost httpPost = new HttpPost(httpService.getPath(peer, "/" + path)); + httpPost.setEntity(new ByteArrayEntity(bos.bytes().array())); + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s/%s] Sending POST request: %s", peer, index, type, new String(bos.bytes().array()))); + } + InputStream response = httpService.executeRequest(httpPost, InputStream.class, String.class); + + + // Parse response + try { + JsonNode node = objectMapper.readTree(response); + + node = node.get("hits"); + int total = node == null ? 0 : node.get("total").asInt(0); + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s/%s] total to update: %s", peer, index, type, total)); + } + + boolean debug = logger.isTraceEnabled(); + + if (total > 0) { + + int batchSize = pluginSettings.getIndexBulkSize(); + + BulkRequestBuilder bulkRequest = client.prepareBulk(); + bulkRequest.setRefresh(true); + + for (Iterator<JsonNode> hits = node.get("hits").iterator(); hits.hasNext();){ + JsonNode hit = hits.next(); + String hitIndex = hit.get("_index").asText(); + String hitType = hit.get("_type").asText(); + String id = hit.get("_id").asText(); + JsonNode source = hit.get("_source"); + + try { + String issuer = source.get(issuerFieldName).asText(); + if (StringUtils.isBlank(issuer)) { + throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", issuerFieldName)); + } + Long version = source.get(versionFieldName).asLong(); + if (version == null) { + throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", versionFieldName)); + } + + GetResponse existingDoc = client.prepareGet(index, type, id) + .setFields(versionFieldName, issuerFieldName) + .execute().actionGet(); + + boolean doInsert = !existingDoc.isExists(); + + // Insert (new doc) + if (doInsert) { + String json = source.toString(); + //readAndVerifyIssuerSignature(json, source); + if (debug) { + logger.trace(String.format("[%s] [%s/%s] insert _id=%s\n%s", peer, hitIndex, hitType, id, json)); + } + bulkRequest.add(client.prepareIndex(hitIndex, hitType, id) + .setSource(json.getBytes()) + ); + } + + // Existing doc + else { + + // Check same issuer + String existingIssuer = (String)existingDoc.getFields().get(issuerFieldName).getValue(); + if (!Objects.equals(issuer, existingIssuer)) { + throw new InvalidFormatException(String.format("Invalid document: not same [%s].", issuerFieldName)); + } + + // Check version + Long existingVersion = ((Number)existingDoc.getFields().get(versionFieldName).getValue()).longValue(); + boolean doUpdate = (existingVersion == null || version > existingVersion.longValue()); + + if (doUpdate) { + String json = source.toString(); + //readAndVerifyIssuerSignature(json, source); + if (debug) { + logger.trace(String.format("[%s] [%s/%s] update _id=%s\n%s", peer, hitIndex, hitType, id, json)); + } + bulkRequest.add(client.prepareIndex(hitIndex, hitType, id) + .setSource(json.getBytes())); + } + } + + } catch (InvalidFormatException e) { + if (debug) { + logger.debug(String.format("[%s] [%s/%s] %s. Skipping.", peer, index, type, e.getMessage())); + } + // Skipping document (continue) + } + catch (Exception e) { + logger.warn(String.format("[%s] [%s/%s] %s. Skipping.", peer, index, type, e.getMessage()), e); + // Skipping document (continue) + } + } + + if (bulkRequest.numberOfActions() > 0) { + + // Flush the bulk if not empty + BulkResponse bulkResponse = bulkRequest.get(); + Set<String> missingDocIds = new LinkedHashSet<>(); + + // If failures, continue but save missing blocks + if (bulkResponse.hasFailures()) { + // process failures by iterating through each bulk response item + for (BulkItemResponse itemResponse : bulkResponse) { + boolean skip = !itemResponse.isFailed() + || missingDocIds.contains(itemResponse.getId()); + if (!skip) { + if (debug) { + logger.debug(String.format("[%s] [%s/%s] could not process _id=%s: %s. Skipping.", peer, index, type, itemResponse.getId(), itemResponse.getFailureMessage())); + } + missingDocIds.add(itemResponse.getId()); + } + } + } + } + } + + } catch(IOException e) { + throw new TechnicalException("Unable to parse search response", e); + } + finally { + IOUtils.closeQuietly(response); + } + } + + /* -- protected methods -- */ + + +} diff --git a/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml b/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml index 605f3d2eb39673153641233ac493bb7a99a9e3d0..d4ea3c6555b102393cc9bd1748c1317bd79d6d05 100644 --- a/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml @@ -119,8 +119,18 @@ duniter.string.analyzer: french #duniter.indices.reload: true # Should synchronize node blockchain ? -duniter.blockchain.sync.enable: true -#duniter.blockchain.sync.enable: false +#duniter.blockchain.sync.enable: true +duniter.blockchain.sync.enable: false + +#TODO : enable +duniter.security.enable: false + +# Should synchronize data using P2P +duniter.data.sync.enable: true +#TODO duniter.network.timeout: + +# TODO : implement this option (check if same cluster) +#duniter.data.sync.checkClusterName=false #duniter.dev.enable: true diff --git a/duniter4j-elasticsearch/src/test/es-home/config/logging.yml b/duniter4j-elasticsearch/src/test/es-home/config/logging.yml index 3642d94ceb55d815596504176c306f9d527dd874..63c95362ee33c175973e0c3d2d82dd076b5d400e 100644 --- a/duniter4j-elasticsearch/src/test/es-home/config/logging.yml +++ b/duniter4j-elasticsearch/src/test/es-home/config/logging.yml @@ -20,6 +20,9 @@ logger: org.duniter.elasticsearch: DEBUG + duniter : DEBUG + duniter.network.p2p: TRACE + security: DEBUG org.nuiton.i18n: WARN