diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/EndpointApi.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/EndpointApi.java index a77d22cabfe3b9248f0b59831069d35bba2289ad..b3f4b3961886e66e295357332660178b1fdf4741 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/EndpointApi.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/EndpointApi.java @@ -27,6 +27,7 @@ public enum EndpointApi { BASIC_MERKLED_API, BMAS, BMATOR, + WS2P, ES_CORE_API, ES_USER_API, ES_SUBSCRIPTION_API, diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/NetworkPeering.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/NetworkPeering.java index 06d93e5d477262634661717a858e6d5c532f0ee3..c85cb067414687a4620f08c2af71333e310e5e12 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/NetworkPeering.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/NetworkPeering.java @@ -127,6 +127,7 @@ public class NetworkPeering implements Serializable { public String ipv4; public String ipv6; public Integer port; + public String id; public EndpointApi getApi() { return api; @@ -168,9 +169,18 @@ public class NetworkPeering implements Serializable { this.port = port; } + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + @Override public String toString() { String s = "api=" + api.name() + "\n" + + (id != null ? ("id=" + id + "\n") : "" ) + "dns=" + dns + "\n" + "ipv4=" + ipv4 + "\n" + "ipv6=" + ipv6 + "\n" + diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/EndpointDeserializer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/EndpointDeserializer.java index 05e0ba736fe95af12a46f403ff973f781a6fec51..a70dcb518b5668efbdaafd81af2e43bc97c8c4a9 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/EndpointDeserializer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/EndpointDeserializer.java @@ -46,15 +46,18 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi public static final String EP_END_REGEXP = "(?:[ ]+([a-z0-9-_]+[.][a-z0-9-_.]*))?(?:[ ]+([0-9.]+))?(?:[ ]+([0-9a-f:]+))?(?:[ ]+([0-9]+))$"; public static final String BMA_API_REGEXP = "^BASIC_MERKLED_API" + EP_END_REGEXP; public static final String BMAS_API_REGEXP = "^BMAS" + EP_END_REGEXP; + public static final String WS2P_API_REGEXP = "^WS2P[ ]+([a-z0-9]+)[ ]+" + EP_END_REGEXP; public static final String OTHER_API_REGEXP = "^([A-Z_-]+)" + EP_END_REGEXP; private Pattern bmaPattern; private Pattern bmasPattern; + private Pattern ws2pPattern; private Pattern otherApiPattern; public EndpointDeserializer() { bmaPattern = Pattern.compile(BMA_API_REGEXP); bmasPattern = Pattern.compile(BMAS_API_REGEXP); + ws2pPattern = Pattern.compile(WS2P_API_REGEXP); otherApiPattern = Pattern.compile(OTHER_API_REGEXP); } @@ -81,6 +84,15 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi return endpoint; } + // WS2P API + mather = ws2pPattern.matcher(ept); + if (mather.matches()) { + endpoint.api = EndpointApi.WS2P; + endpoint.id = mather.group(1); + parseDefaultFormatEndPoint(mather, endpoint, 2); + return endpoint; + } + // Other API mather = otherApiPattern.matcher(ept); if (mather.matches()) { diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java index 3ec81df5e02a78bc658ec6fb5fe9a26d01884fcf..152cbc1bc5c9f1642ca9344df7265428ee4f32b2 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java @@ -22,10 +22,7 @@ package org.duniter.core.client.model.bma.jackson; * #L% */ -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationConfig; import com.fasterxml.jackson.databind.module.SimpleModule; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.NetworkPeering; diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java index cbd00a33319abdc05006dfe7a3f10444bea17c85..c42507cc09caf155d945f8533b314a19561ece1a 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java @@ -47,6 +47,7 @@ public class Peer implements LocalEntity<String>, Serializable { private String ipv4; private String ipv6; private Integer port; + private String epId; private Boolean useSsl; private String pubkey; private String hash; @@ -101,6 +102,11 @@ public class Peer implements LocalEntity<String>, Serializable { return this; } + public Builder setEpId(String epId) { + this.epId = epId; + return this; + } + public Builder setHost(String host) { Preconditions.checkNotNull(host); if (InetAddressUtils.isIPv4Address(host)) { @@ -135,6 +141,9 @@ public class Peer implements LocalEntity<String>, Serializable { if (source.port != null) { setPort(source.port); } + if (StringUtils.isNotBlank(source.id)) { + setEpId(source.id); + } return this; } @@ -144,6 +153,9 @@ public class Peer implements LocalEntity<String>, Serializable { (port == 443 || this.api == EndpointApi.BMAS.name()); String api = this.api != null ? this.api : EndpointApi.BASIC_MERKLED_API.name(); Peer ep = new Peer(api, dns, ipv4, ipv6, port, useSsl); + if (StringUtils.isNotBlank(this.epId)) { + ep.setEpId(this.epId); + } if (StringUtils.isNotBlank(this.currency)) { ep.setCurrency(this.currency); } @@ -165,6 +177,7 @@ public class Peer implements LocalEntity<String>, Serializable { public static final String PROPERTY_DNS = "dns"; public static final String PROPERTY_IPV4 = "ipv4"; public static final String PROPERTY_IPV6 = "ipv6"; + public static final String PROPERTY_EP_ID = "epId"; public static final String PROPERTY_STATS = "stats"; private String id; @@ -173,6 +186,7 @@ public class Peer implements LocalEntity<String>, Serializable { private String dns; private String ipv4; private String ipv6; + private String epId; private String url; private String host; @@ -308,6 +322,14 @@ public class Peer implements LocalEntity<String>, Serializable { init(); } + public String getEpId() { + return epId; + } + + public void setEpId(String epId) { + this.epId = epId; + } + public boolean isUseSsl() { return useSsl; } @@ -354,6 +376,9 @@ public class Peer implements LocalEntity<String>, Serializable { if (api != null) { joiner.add(api); } + if (epId != null) { + joiner.add(epId); + } if (dns != null) { joiner.add(dns); } diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/DateUtils.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/DateUtils.java index a463c6dcc7253385ed4cb636cf94fe323bc868b6..a8b2d19f310c51a38f3e1a1a150dd00718e5d92f 100644 --- a/duniter4j-core-shared/src/main/java/org/duniter/core/util/DateUtils.java +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/DateUtils.java @@ -45,7 +45,6 @@ public class DateUtils { Calendar cal = new GregorianCalendar(); cal.setTimeInMillis(System.currentTimeMillis()); cal.add(Calendar.HOUR, 1); - cal.add(Calendar.HOUR_OF_DAY, 1); cal.set(Calendar.MINUTE, 0); cal.set(Calendar.SECOND, 0); cal.set(Calendar.MILLISECOND, 0); diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/PrimitiveIterators.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/PrimitiveIterators.java new file mode 100644 index 0000000000000000000000000000000000000000..5229e47dc237f90b1086edbb1fdad49d84ef3a18 --- /dev/null +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/PrimitiveIterators.java @@ -0,0 +1,55 @@ +package org.duniter.core.util; + +import java.util.Iterator; +import java.util.PrimitiveIterator; + +public class PrimitiveIterators { + + public interface OfLong extends PrimitiveIterator.OfLong { + long current(); + } + + private static OfLong nullLongSequence = new OfLong() { + + @Override + public long nextLong() { + return 0; + } + + @Override + public boolean hasNext() { + return false; + } + + public long current() { + return 0; + } + }; + + public static class LongSequence implements OfLong { + long value = 0; + + @Override + public long nextLong() { + return value++; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public long current() { + return value; + } + } + + public static OfLong newLongSequence() { + return new LongSequence(); + } + + public static OfLong nullLongSequence() { + return nullLongSequence; + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 0045a4061fbdb7795458fece2116a20fce41b089..9b6a91de2838daaaa4add0c2e97e35c868c86837 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -30,7 +30,7 @@ import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.CurrencyService; import org.duniter.elasticsearch.service.DocStatService; import org.duniter.elasticsearch.service.PeerService; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 737d591546ffec1ba19edeb661bc7eb78e94a0a4..813ca522949e467c09e6957d971c8c8867fd7310 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -222,6 +222,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsBoolean("duniter.synchro.enable", true); } + public boolean enableSynchroWebsocket() { + return settings.getAsBoolean("duniter.synchro.ws.enable", true); + } + public int getSynchroTimeOffset() { return settings.getAsInt("duniter.synchro.timeOffset", 60*60/*=1hour*/); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index b7c95949d84af86f449114569729d2169abf41c8..966cd1e8d491f44f0f886382b632c3e59dbae6cf 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -326,6 +326,12 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { .field("type", "string") .endObject() + // epId + .startObject(Peer.PROPERTY_EP_ID) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + // stats .startObject(Peer.PROPERTY_STATS) .field("type", "nested") diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java index fe615af0f59e70e55f0245cf640a6dd0f83817e6..746ab651b9a3e232a15fb5c9f4ea513a7a2e1145 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java @@ -65,9 +65,8 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu Preconditions.checkNotNull(execution.getTime()); Preconditions.checkArgument(execution.getTime() > 0); - // Serialize into JSON - // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { + // Serialize into JSON String json = getObjectMapper().writeValueAsString(execution); // Preparing indexBlocksFromNode @@ -77,7 +76,7 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu // Execute indexBlocksFromNode indexRequest .setRefresh(true) - .execute(); + .execute().actionGet(); } catch(JsonProcessingException e) { throw new TechnicalException(e); @@ -153,7 +152,8 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu // result .startObject(SynchroExecution.PROPERTY_RESULT) .field("type", "nested") - //.field("dynamic", "false") + .field("dynamic", "false") + .startObject("properties") // inserts .startObject(SynchroResult.PROPERTY_INSERTS) @@ -170,6 +170,11 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu .field("type", "long") .endObject() + // deletes + .startObject(SynchroResult.PROPERTY_INVALID_SIGNATURES) + .field("type", "long") + .endObject() + .endObject() .endObject() diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java index 5dfc72e72673ec8e7fb16f42827ca807141d221d..de746186a549c820869f9a123ea86cffb7104af5 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java @@ -36,6 +36,7 @@ public class SynchroResult implements Serializable { public static final String PROPERTY_INSERTS = "inserts"; public static final String PROPERTY_UPDATES = "updates"; public static final String PROPERTY_DELETES = "deletes"; + public static final String PROPERTY_INVALID_SIGNATURES = "invalidSignatures"; private long insertTotal = 0; private long updateTotal = 0; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java index 531e1777fb8da01306407adaa7f6ee89f3c3c5f1..1856f4deab1feec4d8771714b499c79f81e297d9 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java @@ -52,7 +52,11 @@ public class PeerService extends AbstractService { private PeerDao peerDao; private ThreadPool threadPool; - private List<String> includeEndpointApis = Lists.newArrayList(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + // Define endpoint API to include + private List<String> includeEndpointApis = Lists.newArrayList( + EndpointApi.BASIC_MERKLED_API.name(), + EndpointApi.BMAS.name(), + EndpointApi.WS2P.name()); @Inject public PeerService(Duniter4jClient client, PluginSettings settings, ThreadPool threadPool, @@ -77,6 +81,12 @@ public class PeerService extends AbstractService { return this; } + public PeerService addIncludeEndpointApi(EndpointApi api) { + Preconditions.checkNotNull(api); + addIncludeEndpointApi(api.name()); + return this; + } + public PeerService indexPeers(Peer peer) { try { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java index f5a01631464356c9ba76b1186b03e5ba9dced936..744cd2b59324842f3d9c41c7b4d940f2ef403419 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java @@ -35,7 +35,7 @@ import org.duniter.core.service.MailService; import org.duniter.elasticsearch.PluginInit; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.service.changes.ChangeService; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java index 57b68fc061c9a6ea791dded300feb88d8767f8b9..b01a30f13c29d2d6bf6f1e31be2a0bb6d9af1e5e 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java @@ -38,18 +38,10 @@ package org.duniter.elasticsearch.service.changes; limitations under the License. */ -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.duniter.core.exception.TechnicalException; -import org.duniter.elasticsearch.exception.InvalidFormatException; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.joda.time.DateTime; -import java.io.IOException; - public class ChangeEvent { private final String id; private final String index; @@ -111,53 +103,9 @@ public class ChangeEvent { return source; } - public String toJson() { - try { - XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput()); - builder.startObject() - .field("_index", getIndex()) - .field("_type", getType()) - .field("_id", getId()) - .field("_timestamp", getTimestamp()) - .field("_version", getVersion()) - .field("_operation", getOperation().toString()); - if (getSource() != null) { - builder.rawField("_source", getSource()); - } - builder.endObject(); - - return builder.string(); - } catch (IOException e) { - throw new TechnicalException("Error while generating JSON from change event", e); - } - } - - public static ChangeEvent fromJson(String json) { - try { - ObjectMapper objectMapper = new ObjectMapper(); - JsonNode actualObj = objectMapper.readTree(json); - String index = actualObj.get("_index").asText(); - String type = actualObj.get("_type").asText(); - String id = actualObj.get("_id").asText(); - DateTime timestamp = new DateTime(actualObj.get("_timestamp").asLong()); - ChangeEvent.Operation operation = ChangeEvent.Operation.valueOf(actualObj.get("_operation").asText()); - long version = actualObj.get("_version").asLong(); - - JsonNode sourceNode = actualObj.get("_source"); - BytesReference source = null; - if (sourceNode != null) { - // TODO : fill bytes reference from source - //source = - } - - ChangeEvent event = new ChangeEvent(index, type, id, timestamp, operation, version, source); - return event; - } catch (IOException e) { - throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e); - } + @JsonIgnore + public boolean hasSource() { + return source != null; } - public ChangeEvent clone(ChangeEvent event, boolean withSource) { - return new ChangeEvent(this, withSource); - } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvents.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvents.java new file mode 100644 index 0000000000000000000000000000000000000000..d7b5bb09fa43367d9b9d90faf5171cfa4dc6e389 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvents.java @@ -0,0 +1,135 @@ +package org.duniter.elasticsearch.service.changes; + +/* + * #%L + * Duniter4j :: ElasticSearch Plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +/* + Copyright 2015 ForgeRock AS + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.duniter.core.exception.TechnicalException; +import org.duniter.elasticsearch.exception.InvalidFormatException; +import org.duniter.elasticsearch.util.bytes.BytesJsonNode; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.joda.time.DateTime; + +import java.io.IOException; + +public class ChangeEvents { + + private ChangeEvents() { + // helper class + } + + public static ChangeEvent fromJson(String json) { + return fromJson(new ObjectMapper(), json); + } + + public static ChangeEvent fromJson(ObjectMapper objectMapper, String json) { + try { + JsonNode actualObj = objectMapper.readTree(json); + String index = actualObj.get("_index").asText(); + String type = actualObj.get("_type").asText(); + String id = actualObj.get("_id").asText(); + DateTime timestamp = new DateTime(actualObj.get("_timestamp").asLong()); + ChangeEvent.Operation operation = ChangeEvent.Operation.valueOf(actualObj.get("_operation").asText()); + long version = actualObj.get("_version").asLong(); + + JsonNode sourceNode = actualObj.get("_source"); + BytesReference source = null; + if (sourceNode != null) { + source = new BytesJsonNode(sourceNode); + } + + return new ChangeEvent(index, type, id, timestamp, operation, version, source); + } catch (IOException e) { + throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e); + } + } + + public static String toJson(ChangeEvent event) { + try { + XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput()); + builder.startObject() + .field("_index", event.getIndex()) + .field("_type", event.getType()) + .field("_id", event.getId()) + .field("_timestamp", event.getTimestamp()) + .field("_version", event.getVersion()) + .field("_operation", event.getOperation().toString()); + if (event.hasSource()) { + builder.rawField("_source", event.getSource()); + } + builder.endObject(); + + return builder.string(); + } catch (IOException e) { + throw new TechnicalException("Error while generating JSON from change event", e); + } + } + + public static JsonNode readTree(BytesReference source) throws IOException { + if (source == null) return null; + + if (source instanceof BytesJsonNode) { + // Avoid new deserialization + return ((BytesJsonNode) source).toJsonNode(); + } + + return new ObjectMapper().readTree(source.streamInput()); + } + + public static JsonNode readTree(ObjectMapper objectMapper, BytesReference source) throws IOException { + if (source == null) return null; + + if (source instanceof BytesJsonNode) { + // Avoid new deserialization + return ((BytesJsonNode) source).toJsonNode(); + } + + return objectMapper.readTree(source.streamInput()); + } + + public static <T> T readValue(BytesReference source, Class<T> clazz) throws IOException { + if (source == null) return null; + + return new ObjectMapper().readValue(source.streamInput(), clazz); + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java index 8c77d5c37e11fc4b0b6c61b211d6d78e4e7339cc..5f6be682b39efde3799e8ecc82e4f5178967411b 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java @@ -132,7 +132,7 @@ public class ChangeService { return; } - ChangeEvent change=new ChangeEvent( + ChangeEvent change = new ChangeEvent( indexName, index.type(), index.id(), diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java index e806a37ce7d22cf7efbf4b6ff432f4f1c97b7af3..046391cf2dacd743d7e58d1e720a1e9feabfcccf 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java @@ -99,14 +99,17 @@ public class ChangeSource { return types; } - public void addIndex(String index){ + public ChangeSource addIndex(String index){ this.indices.add(index); + return this; } - public void addType(String type){ + public ChangeSource addType(String type){ this.types.add(type); + return this; } - public void addId(String id){ + public ChangeSource addId(String id){ this.ids.add(id); + return this; } public String toString() { @@ -159,4 +162,17 @@ public class ChangeSource { public boolean isEmpty() { return indices == null && types == null && ids == null; } + + public void merge(ChangeSource s) { + if (s == null) return; + if (CollectionUtils.isNotEmpty(s.getIndices())) { + indices.addAll(s.getIndices()); + } + if (CollectionUtils.isNotEmpty(s.getTypes())) { + types.addAll(s.getTypes()); + } + if (CollectionUtils.isNotEmpty(s.getIds())) { + ids.addAll(s.getIds()); + } + } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/AbstractSynchroAction.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java similarity index 50% rename from duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/AbstractSynchroAction.java rename to duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java index 7b67d18fdcd813c3c485e29beb1c3aa8d913a5e9..6a6bb264101f9735ca7b6c1f4f18496316ea00ed 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/AbstractSynchroAction.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.service.synchro; +package org.duniter.elasticsearch.synchro; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -14,8 +14,11 @@ import org.duniter.core.client.service.HttpService; import org.duniter.core.client.service.exception.HttpUnauthorizeException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; +import org.duniter.core.util.PrimitiveIterators; import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.InvalidFormatException; @@ -23,21 +26,21 @@ import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.duniter.elasticsearch.model.SearchResponse; import org.duniter.elasticsearch.model.SearchScrollResponse; import org.duniter.elasticsearch.model.SynchroResult; +import org.duniter.elasticsearch.service.AbstractService; import org.duniter.elasticsearch.service.ServiceLocator; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeEvents; +import org.duniter.elasticsearch.service.changes.ChangeSource; import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.duniter.elasticsearch.user.PluginSettings; -import org.duniter.elasticsearch.user.model.UserEvent; -import org.duniter.elasticsearch.user.service.AbstractService; -import org.duniter.elasticsearch.user.service.UserEventService; -import org.duniter.elasticsearch.user.service.UserService; +import org.duniter.elasticsearch.util.bytes.BytesJsonNode; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -48,8 +51,8 @@ public abstract class AbstractSynchroAction extends AbstractService implements S private static final String SCROLL_PARAM_VALUE = "1m"; - public interface InsertListener { - void onInsert(String id, JsonNode source); + public interface SourceConsumer { + void accept(String id, JsonNode source) throws Exception; } private String fromIndex; @@ -58,11 +61,17 @@ public abstract class AbstractSynchroAction extends AbstractService implements S private String toType; private String issuerFieldName = Record.PROPERTY_ISSUER; private String versionFieldName = Record.PROPERTY_TIME; + private ChangeSource changeSource; private HttpService httpService; private boolean enableUpdate = false; - private List<InsertListener> insertListeners; + private boolean enableSignatureValidation = true; + private List<SourceConsumer> insertionListeners; + private List<SourceConsumer> updateListeners; + private List<SourceConsumer> validationListeners; + + private boolean trace = false; public AbstractSynchroAction(String index, String type, Duniter4jClient client, @@ -83,6 +92,10 @@ public abstract class AbstractSynchroAction extends AbstractService implements S this.fromType = fromType; this.toIndex = toIndex; this.toType = toType; + this.changeSource = new ChangeSource() + .addIndex(fromIndex) + .addType(fromType); + this.trace = logger.isTraceEnabled(); threadPool.scheduleOnStarted(() -> httpService = ServiceLocator.instance().getHttpService()); } @@ -92,6 +105,11 @@ public abstract class AbstractSynchroAction extends AbstractService implements S return EndpointApi.ES_USER_API; } + @Override + public ChangeSource getChangeSource() { + return changeSource; + } + @Override public void handleSynchronize(Peer peer, long fromTime, @@ -119,18 +137,107 @@ public abstract class AbstractSynchroAction extends AbstractService implements S } } - public void addInsertListener(InsertListener listener) { - if (insertListeners == null) { - insertListeners = Lists.newArrayList(); + @Override + public void handleChange(Peer peer, ChangeEvent changeEvent) { + + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(changeEvent); + Preconditions.checkNotNull(changeEvent.getOperation()); + + String id = changeEvent.getId(); + String logPrefix = String.format("[%s] [%s] [%s/%s/%s] [WS]", peer.getCurrency(), peer, toIndex, toType, id); + + boolean skip = changeEvent.getOperation() == ChangeEvent.Operation.DELETE || + !enableUpdate && changeEvent.getOperation() == ChangeEvent.Operation.INDEX || + !changeEvent.hasSource(); + if (skip) { + if (trace) { + logger.trace(String.format("%s Ignoring change event of type [%s]", logPrefix, changeEvent.getOperation().name())); + } + return; + } + try { + if (trace) { + logger.trace(String.format("%s [WS] Processing new change event...", logPrefix)); + } + + JsonNode source = ChangeEvents.readTree(changeEvent.getSource()); + + // Save doc + save(changeEvent.getId(), source, logPrefix); + } + catch(Exception e1) { + // Log the first error + if (logger.isDebugEnabled()) { + logger.error(e1.getMessage(), e1); + } + else { + logger.error(e1.getMessage()); + } } - insertListeners.add(listener); + } + + public void addInsertionListener(SourceConsumer listener) { + if (insertionListeners == null) { + insertionListeners = Lists.newArrayList(); + } + insertionListeners.add(listener); + } + + public void addUpdateListener(SourceConsumer listener) { + if (updateListeners == null) { + updateListeners = Lists.newArrayList(); + } + updateListeners.add(listener); + } + + public void addValidationListener(SourceConsumer listener) { + if (validationListeners == null) { + validationListeners = Lists.newArrayList(); + } + validationListeners.add(listener); } /* -- protected methods -- */ - protected void notifyInsertListeners(final String id, final JsonNode source) { - if (insertListeners == null) return; - insertListeners.forEach(l -> l.onInsert(id, source)); + protected void notifyInsertion(final String id, final JsonNode source) throws Exception { + if (CollectionUtils.isNotEmpty(insertionListeners)) { + for (SourceConsumer listener: insertionListeners) { + listener.accept(id, source); + } + } + } + + protected void notifyUpdate(final String id, final JsonNode source) throws Exception { + if (CollectionUtils.isNotEmpty(updateListeners)) { + for (SourceConsumer listener: updateListeners) { + listener.accept(id, source); + } + } + } + + protected void notifyValidation(final String id, + final JsonNode source, + final Iterator<Long> invalidSignatureHits, + final String logPrefix) throws Exception { + if (enableSignatureValidation) { + try { + readAndVerifyIssuerSignature(source, issuerFieldName); + } catch (InvalidSignatureException e) { + // FIXME: some user/profile document failed ! - see issue #11 + // Il semble que le format JSON ne soit pas le même que celui qui a été signé + invalidSignatureHits.next(); + if (trace) { + logger.warn(String.format("%s %s.\n%s", logPrefix, e.getMessage(), source.toString())); + } + } + } + + if (CollectionUtils.isNotEmpty(validationListeners)) { + for (SourceConsumer listener : validationListeners) { + listener.accept(id, source); + } + } } protected QueryBuilder createQuery(long fromTime) { @@ -158,7 +265,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S pluginSettings.getIndexBulkSize()); httpPost.setEntity(new StringEntity(content, "UTF-8")); - if (logger.isTraceEnabled()) { + if (trace) { logger.trace(String.format("[%s] [%s] [%s/%s] Sending POST scroll request: %s", peer.getCurrency(), peer, fromIndex, fromType, content)); } @@ -188,7 +295,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S } catch (HttpUnauthorizeException e) { throw new TechnicalException(String.format("[%s] [%s] [%s/%s] Unable to access (%s).", peer.getCurrency(), peer, fromIndex, fromType, e.getMessage()), e); } catch (TechnicalException e) { - throw new TechnicalException(String.format("[%s] [%s] [%s/%s] Unable to synchronize: %s", peer.getCurrency(), peer, fromIndex, fromType, e.getMessage()), e); + throw new TechnicalException(String.format("[%s] [%s] [%s/%s] Unable to scroll request: %s", peer.getCurrency(), peer, fromIndex, fromType, e.getMessage()), e); } catch (Exception e) { throw new TechnicalException(String.format("[%s] [%s] [%s/%s] Unable to parse response: ", peer.getCurrency(), peer, fromIndex, fromType, e.getMessage()), e); } @@ -230,23 +337,24 @@ public abstract class AbstractSynchroAction extends AbstractService implements S stop = true; } else { - counter += fetchAndIndex(peer, response, objectMapper, result); + counter += fetchAndSave(peer, response, objectMapper, result); stop = counter >= total; } } } - private long fetchAndIndex(final Peer peer, - SearchScrollResponse response, - final ObjectMapper objectMapper, - SynchroResult result) { - boolean debug = logger.isTraceEnabled(); + private long fetchAndSave(final Peer peer, + SearchScrollResponse response, + final ObjectMapper objectMapper, + SynchroResult result) { + long counter = 0; - long insertHits = 0; - long updateHits = 0; - long invalidSignatureHits = 0; + + PrimitiveIterators.OfLong insertHits = PrimitiveIterators.newLongSequence(); + PrimitiveIterators.OfLong updateHits = PrimitiveIterators.newLongSequence(); + PrimitiveIterators.OfLong invalidSignatureHits = PrimitiveIterators.newLongSequence(); BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.setRefresh(true); @@ -256,99 +364,22 @@ public abstract class AbstractSynchroAction extends AbstractService implements S String id = hit.getId(); JsonNode source = hit.getSource(); + String logPrefix = String.format("[%s] [%s] [%s/%s/%s]", peer.getCurrency(), peer, toIndex, toType, id); + if (source == null) { - logger.error(String.format("[%s] [%s] [%s/%s/%s] No source found. Skipping.", peer.getCurrency(), peer, - toIndex, toType, id)); + logger.error(String.format("%s No source found. Skipping.", logPrefix)); } else { counter++; - try { - String issuer = source.get(issuerFieldName).asText(); - if (StringUtils.isBlank(issuer)) { - throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", issuerFieldName)); - } - Long version = source.get(versionFieldName).asLong(); - if (version == null) { - throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", versionFieldName)); - } - - Map<String, Object> existingFields = client.getFieldsById(toIndex, toType, id, versionFieldName, issuerFieldName); - boolean exists = existingFields != null; - - // Insert (new doc) - if (!exists) { - - if (debug) { - logger.trace(String.format("[%s] [%s] [%s/%s] insert _id=%s\n%s", peer.getCurrency(), peer, toIndex, toType, id, source.toString())); - } - - // FIXME: some user/profile document failed ! - see issue #11 - // Il semble que le format JSON ne soit pas le même que celui qui a été signé - try { - readAndVerifyIssuerSignature(source, issuerFieldName); - } catch (InvalidSignatureException e) { - invalidSignatureHits++; - // FIXME: should enable this log (after issue #11 resolution) - //logger.warn(String.format("[%s] [%s/%s/%s] %s.\n%s", peer, toIndex, toType, id, e.getMessage(), source.toString())); - } - - bulkRequest.add(client.prepareIndex(toIndex, toType, id) - .setSource(objectMapper.writeValueAsBytes(source)) - ); - - // Notify insert listeners - notifyInsertListeners(id, source); - - insertHits++; - } - - // Existing doc: do update (if enable) - else if (enableUpdate){ - - // Check same issuer - String existingIssuer = (String) existingFields.get(issuerFieldName); - if (!Objects.equals(issuer, existingIssuer)) { - throw new InvalidFormatException(String.format("Invalid document: not same [%s].", issuerFieldName)); - } - - // Check version - Number existingVersion = ((Number) existingFields.get(versionFieldName)); - boolean doUpdate = (existingVersion == null || version > existingVersion.longValue()); - - if (doUpdate) { - if (debug) { - logger.trace(String.format("[%s] [%s] [%s/%s] update _id=%s\n%s", peer.getCurrency(), peer, toIndex, toType, id, source.toString())); - } - - // FIXME: some user/profile document failed ! - see issue #11 - // Il semble que le format JSON ne soit pas le même que celui qui a été signé - try { - readAndVerifyIssuerSignature(source, issuerFieldName); - } catch (InvalidSignatureException e) { - invalidSignatureHits++; - // FIXME: should enable this log (after issue #11 resolution) - //logger.warn(String.format("[%s] [%s/%s/%s] %s.\n%s", peer, toIndex, toType, id, e.getMessage(), source.toString())); - } - - bulkRequest.add(client.prepareIndex(toIndex, toType, id) - .setSource(objectMapper.writeValueAsBytes(source))); - - updateHits++; - } - } - - } catch (DuniterElasticsearchException e) { - if (logger.isDebugEnabled()) { - logger.warn(String.format("[%s] [%s] [%s/%s/%s] %s. Skipping.\n%s", peer.getCurrency(), peer, toIndex, toType, id, e.getMessage(), source.toString())); - } else { - logger.warn(String.format("[%s] [%s] [%s/%s/%s] %s. Skipping.", peer.getCurrency(), peer, toIndex, toType, id, e.getMessage())); - } - // Skipping document (continue) - } catch (Exception e) { - logger.error(String.format("[%s] [%s] [%s/%s/%s] %s. Skipping.", peer.getCurrency(), peer, toIndex, toType, id, e.getMessage()), e); - // Skipping document (continue) - } + // Save (create or update) + save(id, source, + objectMapper, + bulkRequest, + insertHits, + updateHits, + invalidSignatureHits, + logPrefix); } } @@ -358,14 +389,14 @@ public abstract class AbstractSynchroAction extends AbstractService implements S BulkResponse bulkResponse = bulkRequest.get(); Set<String> missingDocIds = new LinkedHashSet<>(); - // If failures, continue but save missing blocks + // If failures, continue but saveInBulk missing blocks if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item for (BulkItemResponse itemResponse : bulkResponse) { boolean skip = !itemResponse.isFailed() || missingDocIds.contains(itemResponse.getId()); if (!skip) { - if (debug) { + if (trace) { logger.debug(String.format("[%s] [%s] [%s/%s] could not process _id=%s: %s. Skipping.", peer.getCurrency(), peer, toIndex, toType, itemResponse.getId(), itemResponse.getFailureMessage())); } missingDocIds.add(itemResponse.getId()); @@ -375,13 +406,118 @@ public abstract class AbstractSynchroAction extends AbstractService implements S } // update result stats - result.addInserts(toIndex, toType, insertHits); - result.addUpdates(toIndex, toType, updateHits); - result.addInvalidSignatures(toIndex, toType, invalidSignatureHits); + result.addInserts(toIndex, toType, insertHits.current()); + result.addUpdates(toIndex, toType, updateHits.current()); + result.addInvalidSignatures(toIndex, toType, invalidSignatureHits.current()); return counter; } + protected void save(String id, JsonNode source, String logPrefix) { + Iterator<Long> nullSeq = PrimitiveIterators.nullLongSequence(); + save(id, source, getObjectMapper(), null, nullSeq, nullSeq, nullSeq, logPrefix); + } + + protected void save(final String id, + final JsonNode source, + final ObjectMapper objectMapper, + final BulkRequestBuilder bulkRequest, + final Iterator<Long> insertHits, + final Iterator<Long> updateHits, + final Iterator<Long> invalidSignatureHits, + final String logPrefix) { + + try { + String issuer = source.get(issuerFieldName).asText(); + if (StringUtils.isBlank(issuer)) { + throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", issuerFieldName)); + } + long version = source.get(versionFieldName).asLong(-1); + if (version == -1) { + throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", versionFieldName)); + } + + Map<String, Object> existingFields = client.getFieldsById(toIndex, toType, id, versionFieldName, issuerFieldName); + boolean exists = existingFields != null; + + // Insert (new doc) + if (!exists) { + + if (trace) { + logger.trace(String.format("%s insert found\n%s", logPrefix, source.toString())); + } + + // Validate doc + notifyValidation(id, source, invalidSignatureHits, logPrefix); + + // Execute insertion + IndexRequestBuilder request = client.prepareIndex(toIndex, toType, id) + .setSource(objectMapper.writeValueAsBytes(source)); + if (bulkRequest != null) { + bulkRequest.add(request); + } + else { + client.safeExecuteRequest(request, false); + } + + // Notify insert listeners + notifyInsertion(id, source); + + insertHits.next(); + } + + // Existing doc: do update (if enable) + else if (enableUpdate){ + + // Check same issuer + String existingIssuer = (String) existingFields.get(issuerFieldName); + if (!Objects.equals(issuer, existingIssuer)) { + throw new InvalidFormatException(String.format("Invalid document: not same [%s].", issuerFieldName)); + } + + // Check version + Number existingVersion = ((Number) existingFields.get(versionFieldName)); + boolean doUpdate = (existingVersion == null || version > existingVersion.longValue()); + + if (doUpdate) { + if (trace) { + logger.trace(String.format("%s found update\n%s", logPrefix, source.toString())); + } + + // Validate source + notifyValidation(id, source, invalidSignatureHits, logPrefix); + + // Execute update + UpdateRequestBuilder request = client.prepareUpdate(toIndex, toType, id) + .setRefresh(true) + .setSource(objectMapper.writeValueAsBytes(source)); + if (bulkRequest != null) { + bulkRequest.add(request); + } + else { + client.safeExecuteRequest(request, false); + } + + // Notify insert listeners + notifyUpdate(id, source); + + updateHits.next(); + } + } + + } catch (DuniterElasticsearchException e) { + // Skipping document: log, then continue + if (logger.isDebugEnabled()) { + logger.warn(String.format("%s %s. Skipping.\n%s", logPrefix, e.getMessage(), source.toString())); + } else { + logger.warn(String.format("%s %s. Skipping.", logPrefix, e.getMessage())); + } + } catch (Exception e) { + // Skipping document: log, then continue + logger.error(String.format("%s %s. Skipping.", logPrefix, e.getMessage()), e); + } + } + protected void setIssuerFieldName(String issuerFieldName) { this.issuerFieldName = issuerFieldName; } @@ -398,23 +534,9 @@ public abstract class AbstractSynchroAction extends AbstractService implements S this.enableUpdate = enableUpdate; } - private void synchronizeUserEventsByReference(Peer peer, UserEvent.Reference reference, long fromTime, SynchroResult result) { - - BoolQueryBuilder query = QueryBuilders.boolQuery() - .filter(QueryBuilders.rangeQuery(Record.PROPERTY_TIME).gte(fromTime)); - - // Query = filter on reference - BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); - if (StringUtils.isNotBlank(reference.getIndex())) { - boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_INDEX, reference.getIndex())); - } - if (StringUtils.isNotBlank(reference.getType())) { - boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, reference.getType())); - } - - query.should(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery))); - - //safeSynchronizeIndex(peer, UserService.INDEX, UserEventService.EVENT_TYPE, query, result); + protected void setEnableSignatureValidation(boolean enableSignatureValidation) { + this.enableSignatureValidation = enableSignatureValidation; } + } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroAction.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroAction.java similarity index 56% rename from duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroAction.java rename to duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroAction.java index c030ad16c1358bd751f86bcbe97b36defea42961..cbd0b9aaa9eef6728f0d49c86863135225cc194b 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroAction.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroAction.java @@ -1,14 +1,20 @@ -package org.duniter.elasticsearch.service.synchro; +package org.duniter.elasticsearch.synchro; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; import org.duniter.elasticsearch.model.SynchroResult; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeSource; public interface SynchroAction { EndpointApi getEndPointApi(); + ChangeSource getChangeSource(); + void handleSynchronize(Peer peer, long fromTime, SynchroResult result); + + void handleChange(Peer peer, ChangeEvent changeEvent); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java similarity index 62% rename from duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java rename to duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java index 7037621bd0444b16e50ffc75995c613f43195122..2ab2181ef7f92f0d9066041f4ff1ee2ac9fa0732 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.service.synchro; +package org.duniter.elasticsearch.synchro; /* * #%L @@ -22,6 +22,8 @@ package org.duniter.elasticsearch.service.synchro; * #L% */ +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.io.IOUtils; @@ -43,6 +45,7 @@ import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.service.AbstractService; import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeEvents; import org.duniter.elasticsearch.service.changes.ChangeSource; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; @@ -104,35 +107,53 @@ public class SynchroService extends AbstractService { * @return */ public SynchroService startScheduling() { - long delayBeforeNextHour = DateUtils.delayBeforeNextHour(); + // Launch once, at startup (after a delay of 10s) + threadPool.schedule(() -> { + boolean launchAtStartup; + try { + // wait for some peers + launchAtStartup = waitPeersReady(); + } catch (InterruptedException e) { + return; // stop + } - // Five minute before the hour (to make sure to be ready when computing doc stat - see DocStatService) - delayBeforeNextHour -= 5 * 60 * 1000; + // If can be launched now: do it + if (launchAtStartup) { + synchronize(); + } - // If not already scheduling to early (in the next 5 min) then launch it - if (delayBeforeNextHour > 5 * 60 * 1000) { + // Schedule next execution, to 5 min before each hour + // (to make sure to be ready when computing doc stat - see DocStatService) + long nextExecutionDelay = DateUtils.nextHour().getTime() - System.currentTimeMillis() - 5 * 60 * 1000; - // Launch with a delay of 10 sec - threadPool.schedule(this::synchronize, 10 * 1000, TimeUnit.MILLISECONDS); - } + // If next execution is too close, skip it + if (launchAtStartup && nextExecutionDelay < 5 * 60 * 1000) { + // add an hour + nextExecutionDelay += 60 * 60 * 1000; + } - // Schedule every hour - threadPool.scheduleAtFixedRate( - this::synchronize, - delayBeforeNextHour, - 60 * 60 * 1000 /* every hour */, - TimeUnit.MILLISECONDS); + // Schedule every hour + threadPool.scheduleAtFixedRate( + this::synchronize, + nextExecutionDelay, + 60 * 60 * 1000 /* every hour */, + TimeUnit.MILLISECONDS); + }, + 10 * 1000 /*wait 10 s */ , + TimeUnit.MILLISECONDS); return this; } - /* -- protected methods -- */ - - protected void synchronize() { + public void synchronize() { logger.info("Starting synchronization..."); + final boolean enableSynchroWebsocket = pluginSettings.enableSynchroWebsocket(); + // Closing all opened WS - closeWsClientEndpoints(); + if (enableSynchroWebsocket) { + closeWsClientEndpoints(); + } if (CollectionUtils.isNotEmpty(peerApiFilters)) { @@ -141,7 +162,7 @@ public class SynchroService extends AbstractService { // Get peers List<Peer> peers = getPeersFromApi(peerApiFilter); if (CollectionUtils.isNotEmpty(peers)) { - peers.forEach(this::synchronize); + peers.forEach(p -> synchronizePeer(p, enableSynchroWebsocket)); logger.info("Synchronization [OK]"); } else { logger.info(String.format("Synchronization [OK] - no endpoint found for API [%s]", peerApiFilter.name())); @@ -150,6 +171,47 @@ public class SynchroService extends AbstractService { } } + public SynchroResult synchronizePeer(final Peer peer, boolean listenChanges) { + long now = System.currentTimeMillis(); + SynchroResult result = new SynchroResult(); + + // Get the last execution time (or 0 is never synchronized) + // If not the first synchro, add a delay to last execution time + // to avoid missing data because incorrect clock configuration + long lastExecutionTime = getLastExecutionTime(peer); + + // Execute actions + final long fromTime = lastExecutionTime > 0 ? lastExecutionTime - pluginSettings.getSynchroTimeOffset() : 0; + List<SynchroAction> executedActions = actions.stream() + .filter(a -> a.getEndPointApi().name().equals(peer.getApi())) + .map(a -> { + try { + a.handleSynchronize(peer, fromTime, result); + } catch(Exception e) { + logger.error(String.format("[%s] [%s] Failed to execute synchro action: %s", peer.getCurrency(), peer, e.getMessage()), e); + } + return a; + }) + .collect(Collectors.toList()); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s] User data imported in %s ms: %s", peer.getCurrency(), peer, System.currentTimeMillis() - now, result.toString())); + } + + saveExecution(peer, result); + + // Start listen changes on this peer + if (listenChanges) { + startListenChangesOnPeer(peer, executedActions); + } + + return result; + } + + /* -- protected methods -- */ + + + protected List<Peer> getPeersFromApi(final EndpointApi api) { Preconditions.checkNotNull(api); @@ -169,38 +231,34 @@ public class SynchroService extends AbstractService { } } - protected void synchronize(final Peer peer) { - long now = System.currentTimeMillis(); - SynchroResult result = new SynchroResult(); + protected boolean hasSomePeers() { - long fromTime = getLastExecutionTime(peer); + List<String> currencyIds = currencyDao.getCurrencyIds(); + if (CollectionUtils.isEmpty(currencyIds)) return false; - // If not the first synchro, add a delay to last execution time - // to avoid missing data because incorrect clock configuration - if (fromTime > 0) { - fromTime -= Math.abs(pluginSettings.getSynchroTimeOffset()); + for (String currencyId: currencyIds) { + Long lastUpTime = peerDao.getMaxLastUpTime(currencyId); + if (lastUpTime != null) return true; } - ChangeSource changeSourceToListen = new ChangeSource(); - - // insert - for (SynchroAction action: actions) { - - action.handleSynchronize(peer, fromTime, result); - } - //synchronize(peer, fromTime, result, changeSourceToListen); + return false; + } - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] [%s] User data imported in %s ms: %s", peer.getCurrency(), peer, System.currentTimeMillis() - now, result.toString())); + protected boolean waitPeersReady() throws InterruptedException{ + int tryCounter = 0; + while (!isReady() && !hasSomePeers()) { + // Wait 10s + Thread.sleep(10 * 1000); + tryCounter++; + if (tryCounter == 6 /*1 min wait*/) { + logger.warn("Could not start data synchronisation. No Peer found."); + return false; // stop here + } } - - saveExecution(peer, result); - - // Listens changes on this peer - //startListenChanges(peer); - + return true; } + protected long getLastExecutionTime(Peer peer) { Preconditions.checkNotNull(peer); Preconditions.checkNotNull(peer.getId()); @@ -243,10 +301,27 @@ public class SynchroService extends AbstractService { wsClientEndpoints.clear(); } - protected void listenChanges(final Peer peer, ChangeSource changeSource) { + protected void startListenChangesOnPeer(final Peer peer, + final List<SynchroAction> actions) { // Listens changes on this peer Preconditions.checkNotNull(peer); - Preconditions.checkNotNull(changeSource); + Preconditions.checkNotNull(actions); + + // Compute a change source for ALL indices/types + final ChangeSource changeSource = new ChangeSource(); + actions.stream() + .map(SynchroAction::getChangeSource) + .filter(Objects::nonNull) + .forEach(changeSource::merge); + + // Prepare a map of actions by index/type + final ArrayListMultimap<String, SynchroAction> actionsBySource = ArrayListMultimap.create(actions.size(), 2); + actions.stream() + .forEach(a -> { + if (a.getChangeSource() != null) { + actionsBySource.put(a.getChangeSource().toString(), a); + } + }); // Get (or create) the websocket endpoint WebsocketClientEndpoint wsClientEndPoint = httpService.getWebsocketClientEndpoint(peer, WS_CHANGES_URL, false); @@ -257,8 +332,15 @@ public class SynchroService extends AbstractService { // add listener wsClientEndPoint.registerListener( message -> { try { - ChangeEvent changeEvent = getObjectMapper().readValue(message, ChangeEvent.class); - importChangeEvent(peer, changeEvent); + ChangeEvent changeEvent = ChangeEvents.fromJson(getObjectMapper(), message); + String source = changeEvent.getIndex() + "/" + changeEvent.getType(); + List<SynchroAction> sourceActions = actionsBySource.get(source); + + // Call each mapped actions + if (CollectionUtils.isNotEmpty(sourceActions)) { + sourceActions.forEach(a -> a.handleChange(peer, changeEvent)); + } + } catch (Exception e) { if (logger.isDebugEnabled()) { logger.warn(String.format("[%s] Unable to process changes received by [/ws/_changes]: %s", peer, e.getMessage()), e); @@ -273,16 +355,4 @@ public class SynchroService extends AbstractService { wsClientEndpoints.add(wsClientEndPoint); } - protected void importChangeEvent(final Peer peer, ChangeEvent changeEvent) { - Preconditions.checkNotNull(changeEvent); - Preconditions.checkNotNull(changeEvent.getOperation()); - - // Skip delete operation (imported by history/delete) - if (changeEvent.getOperation() == ChangeEvent.Operation.DELETE) return; - - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] [%s/%s] Received change event", peer, changeEvent.getIndex(), changeEvent.getType())); - } - changeEvent.getSource(); - } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/util/bytes/BytesJsonNode.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/util/bytes/BytesJsonNode.java new file mode 100644 index 0000000000000000000000000000000000000000..e59cbe7d8069771d77fbb7675968612b6069fc7a --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/util/bytes/BytesJsonNode.java @@ -0,0 +1,130 @@ +package org.duniter.elasticsearch.util.bytes; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.GatheringByteChannel; +import java.util.Objects; + +public class BytesJsonNode implements BytesReference { + + private JsonNode node; + private BytesArray delegate; + private ObjectMapper objectMapper; + + public BytesJsonNode(JsonNode node) { + this(node, new ObjectMapper()); + } + + public BytesJsonNode(JsonNode node, ObjectMapper objectMapper) { + this.node = node; + this.objectMapper = objectMapper; + } + + public byte get(int index) { + return getOrInitDelegate().get(index); + } + + public int length() { + return getOrInitDelegate().length(); + } + + public BytesReference slice(int from, int length) { + return getOrInitDelegate().slice(from, length); + } + + public StreamInput streamInput() { + return getOrInitDelegate().streamInput(); + } + + public void writeTo(OutputStream os) throws IOException { + objectMapper.writeValue(os, node); + } + + public void writeTo(GatheringByteChannel channel) throws IOException { + getOrInitDelegate().writeTo(channel); + } + + public byte[] toBytes() { + try { + return objectMapper.writeValueAsBytes(node); + } + catch(JsonProcessingException e) { + throw new ElasticsearchException(e); + } + } + + public BytesArray toBytesArray() { + return getOrInitDelegate(); + } + + public BytesArray copyBytesArray() { + return getOrInitDelegate().copyBytesArray(); + } + + public ChannelBuffer toChannelBuffer() { + return getOrInitDelegate().toChannelBuffer(); + } + + public boolean hasArray() { + return true; + } + + public byte[] array() { + return toBytes(); + } + + public int arrayOffset() { + return getOrInitDelegate().arrayOffset(); + } + + public String toUtf8() { + return getOrInitDelegate().toUtf8(); + } + + public BytesRef toBytesRef() { + return getOrInitDelegate().toBytesRef(); + } + + public BytesRef copyBytesRef() { + return getOrInitDelegate().copyBytesRef(); + } + + public int hashCode() { + return getOrInitDelegate().hashCode(); + } + + public JsonNode toJsonNode() { + return node; + } + + public JsonNode copyJsonNode() { + return node.deepCopy(); + } + + public boolean equals(Object obj) { + return Objects.equals(this, obj); + } + + + protected BytesArray getOrInitDelegate() { + if (delegate == null) { + try { + this.delegate = new BytesArray(objectMapper.writeValueAsBytes(node)); + } + catch(JsonProcessingException e) { + throw new ElasticsearchException(e); + } + } + return delegate; + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java index 3bcc36a24ef1f0b0a5c52ee01f799d54d58c5771..32b22b1185e9a9dbea71e62420f039444086c6e2 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java @@ -42,6 +42,7 @@ import com.google.common.collect.Maps; import org.apache.commons.collections4.MapUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeEvents; import org.duniter.elasticsearch.service.changes.ChangeService; import org.duniter.elasticsearch.service.changes.ChangeSource; import org.elasticsearch.common.inject.Inject; @@ -91,7 +92,7 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ @Override public void onChange(ChangeEvent changeEvent) { - session.getAsyncRemote().sendText(changeEvent.toJson()); + session.getAsyncRemote().sendText(ChangeEvents.toJson(changeEvent)); } @Override diff --git a/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml index d4c2f71eb580dab82eaaf85d3fb96baae231aa5a..0274bb7d93c9471b17676e4841c8f7b1315245b0 100644 --- a/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml @@ -123,7 +123,7 @@ duniter.string.analyzer: french # duniter.blockchain.enable: false # -# Duniter node to synchronize +# Duniter node to synchronizePeer # duniter.host: g1-test.duniter.org duniter.port: 10900 @@ -147,7 +147,7 @@ duniter.security.enable: false # # ---------------------------------- Duniter4j P2P sync ------------------------- # -# Should synchronize data using P2P +# Should synchronizePeer data using P2P # duniter.data.sync.enable: false #duniter.data.sync.host: data.duniter.fr diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java index 67c4904db50cd551db9d111f1cdfaccfbc35e6d9..b4f5c004e26b3d4f50c06283ecdc71167758c350 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java @@ -56,7 +56,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { threadPool.scheduleOnClusterReady(() -> { createIndices(); - // Waiting cluster back to GREEN or YELLOW state, before synchronize + // Waiting cluster back to GREEN or YELLOW state, before synchronizePeer threadPool.scheduleOnClusterReady(this::doAfterStart); }); } diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java index 6b6efbaa26939aff0d16fa5136deef3f46b3074a..f7a9405a179df1ebcbcf7491bd55f6e7e805fab8 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java @@ -122,12 +122,12 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return delegate.reloadAllIndices(); } - public boolean enableP2PSync() { - return delegate.enableP2PSync(); + public boolean enableSynchro() { + return delegate.enableSynchro(); } - public int getP2PSyncTimeOffset() { - return delegate.getP2PSyncTimeOffset(); + public int getSynchroTimeOffset() { + return delegate.getSynchroTimeOffset(); } public boolean getMailEnable() { diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionExecutionIndexAction.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionExecutionIndexAction.java index cd506568a87c56bd11b793817ff02261e7dd952d..179cf749e3662cac0d051d6390aa98c172e3409f 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionExecutionIndexAction.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionExecutionIndexAction.java @@ -2,12 +2,12 @@ package org.duniter.elasticsearch.subscription.synchro; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao; import org.duniter.elasticsearch.subscription.dao.execution.SubscriptionExecutionDao; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroSubscriptionExecutionIndexAction extends AbstractSynchroAction { @@ -18,7 +18,8 @@ public class SynchroSubscriptionExecutionIndexAction extends AbstractSynchroActi CryptoService cryptoService, ThreadPool threadPool, SynchroService synchroService) { - super(SubscriptionIndexDao.INDEX, SubscriptionExecutionDao.TYPE, client, pluginSettings, cryptoService, threadPool); + super(SubscriptionIndexDao.INDEX, SubscriptionExecutionDao.TYPE, client, pluginSettings.getDelegate(), + cryptoService, threadPool); synchroService.register(this); } diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionRecordAction.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionRecordAction.java index ca7b0684f9f707c82c25e07beeed9e457ef8fd65..8311745d10192528bdae978aa808c8f27d801476 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionRecordAction.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionRecordAction.java @@ -2,12 +2,12 @@ package org.duniter.elasticsearch.subscription.synchro; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao; import org.duniter.elasticsearch.subscription.dao.record.SubscriptionRecordDao; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroSubscriptionRecordAction extends AbstractSynchroAction { @@ -18,7 +18,7 @@ public class SynchroSubscriptionRecordAction extends AbstractSynchroAction { ThreadPool threadPool, CryptoService cryptoService, SynchroService synchroService) { - super(SubscriptionIndexDao.INDEX, SubscriptionRecordDao.TYPE, client, pluginSettings, cryptoService, threadPool); + super(SubscriptionIndexDao.INDEX, SubscriptionRecordDao.TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); setEnableUpdate(true); // with update diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java index 90bfec498b07369fb3b044bad4a95883d23b5d19..3e880d3503677fae1bfded1dbcd754777b3c784c 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java @@ -98,12 +98,12 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return delegate.enableDocStats(); } - public boolean enableP2PSync() { - return settings.getAsBoolean("duniter.p2p.enable", true); + public boolean enableSynchro() { + return delegate.enableSynchro(); } - public int getP2PSyncTimeOffset() { - return settings.getAsInt("duniter.p2p.timeOffsetInSec", 60*60 /*1 hour*/ ); + public int getSynchroTimeOffset() { + return settings.getAsInt("duniter.synchro.timeOffsetInSec", 60*60 /*1 hour*/ ); } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java index 68658ded892e6e2ff0f4f00e7ebf6912264b942b..63e7419283072a49d74451288847685b77f5af70 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java @@ -22,6 +22,9 @@ package org.duniter.elasticsearch.user.synchro; * #L% */ +import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.elasticsearch.service.PeerService; +import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.synchro.group.SynchroGroupRecordAction; import org.duniter.elasticsearch.user.synchro.history.SynchroHistoryIndexAction; import org.duniter.elasticsearch.user.synchro.invitation.SynchroInvitationCertificationIndexAction; @@ -31,13 +34,28 @@ import org.duniter.elasticsearch.user.synchro.page.SynchroPageCommentAction; import org.duniter.elasticsearch.user.synchro.page.SynchroPageRecordAction; import org.duniter.elasticsearch.user.synchro.user.SynchroUserProfileAction; import org.duniter.elasticsearch.user.synchro.user.SynchroUserSettingsAction; +import org.duniter.elasticsearch.user.websocket.WebsocketUserEventEndPoint; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; public class SynchroModule extends AbstractModule implements Module { + public static class Init { + + @Inject + public Init(PeerService peerService, PluginSettings pluginSettings) { + if (pluginSettings.enableSynchro()) { + // Make sure PeerService will index ES_USER_API peers + peerService.addIncludeEndpointApi(EndpointApi.ES_USER_API); + } + } + } + @Override protected void configure() { + bind(Init.class).asEagerSingleton(); + // History bind(SynchroHistoryIndexAction.class).asEagerSingleton(); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/group/SynchroGroupRecordAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/group/SynchroGroupRecordAction.java index c21d4fdfd6ffe0e3b0d4541b0c537327eb242da1..d870b6b9774dcfdcb312296a33c725149b8b7ed4 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/group/SynchroGroupRecordAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/group/SynchroGroupRecordAction.java @@ -2,11 +2,11 @@ package org.duniter.elasticsearch.user.synchro.group; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.service.GroupService; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroGroupRecordAction extends AbstractSynchroAction { @@ -17,7 +17,7 @@ public class SynchroGroupRecordAction extends AbstractSynchroAction { CryptoService cryptoService, ThreadPool threadPool, SynchroService synchroService) { - super(GroupService.INDEX, GroupService.RECORD_TYPE, client, pluginSettings, cryptoService, threadPool); + super(GroupService.INDEX, GroupService.RECORD_TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); setEnableUpdate(true); // with update diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/history/SynchroHistoryIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/history/SynchroHistoryIndexAction.java index ce37ed07c3ce42049f89d68391e60762ff4c5abd..edfd55fee056ae0183dd69a68633cd13ee51e961 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/history/SynchroHistoryIndexAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/history/SynchroHistoryIndexAction.java @@ -4,11 +4,11 @@ import com.fasterxml.jackson.databind.JsonNode; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.exception.NotFoundException; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.service.HistoryService; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroHistoryIndexAction extends AbstractSynchroAction { @@ -21,15 +21,18 @@ public class SynchroHistoryIndexAction extends AbstractSynchroAction { ThreadPool threadPool, SynchroService synchroService, HistoryService service) { - super(service.INDEX, service.DELETE_TYPE, client, pluginSettings, cryptoService, threadPool); + super(service.INDEX, service.DELETE_TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); this.service = service; - addInsertListener(this::onInsertDeletion); + addValidationListener(this::onValidate); + addInsertionListener(this::onInsert); synchroService.register(this); } - protected void onInsertDeletion(String historyId, JsonNode source) { + /* -- protected method -- */ + + protected void onValidate(String deleteId, JsonNode source) { try { // Check if valid document service.checkIsValidDeletion(source); @@ -37,6 +40,16 @@ public class SynchroHistoryIndexAction extends AbstractSynchroAction { // Delete the document service.applyDocDelete(source); + } catch(NotFoundException e) { + // doc not exists: continue + } + } + + protected void onInsert(String deleteId, JsonNode source) { + try { + // Delete the document + service.applyDocDelete(source); + } catch(NotFoundException e) { // doc not exists: continue logger.debug("Doc to delete could not be found. Skipping deletion"); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/invitation/SynchroInvitationCertificationIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/invitation/SynchroInvitationCertificationIndexAction.java index 5f456e104fa6860027f32797dd64f828208a371e..c64034bb001433dd1b7244af975f59c55b9ae3c6 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/invitation/SynchroInvitationCertificationIndexAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/invitation/SynchroInvitationCertificationIndexAction.java @@ -2,11 +2,11 @@ package org.duniter.elasticsearch.user.synchro.invitation; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.service.UserInvitationService; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroInvitationCertificationIndexAction extends AbstractSynchroAction { @@ -18,11 +18,13 @@ public class SynchroInvitationCertificationIndexAction extends AbstractSynchroAc ThreadPool threadPool, SynchroService synchroService, UserInvitationService service) { - super(UserInvitationService.INDEX, UserInvitationService.CERTIFICATION_TYPE, client, pluginSettings, cryptoService, threadPool); + super(UserInvitationService.INDEX, UserInvitationService.CERTIFICATION_TYPE, client, + pluginSettings.getDelegate(), cryptoService, threadPool); - addInsertListener(service::notifyUser); + addInsertionListener(service::notifyUser); synchroService.register(this); } + } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageInboxIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageInboxIndexAction.java index b9b2e0a66b24a2188fe3cc504f02df865098220c..75ffb7f815829cdca807c0363f1dc835adf8b28d 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageInboxIndexAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageInboxIndexAction.java @@ -2,11 +2,11 @@ package org.duniter.elasticsearch.user.synchro.message; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.service.MessageService; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroMessageInboxIndexAction extends AbstractSynchroAction { @@ -18,9 +18,9 @@ public class SynchroMessageInboxIndexAction extends AbstractSynchroAction { ThreadPool threadPool, SynchroService synchroService, MessageService service) { - super(MessageService.INDEX, MessageService.INBOX_TYPE, client, pluginSettings, cryptoService, threadPool); + super(MessageService.INDEX, MessageService.INBOX_TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); - addInsertListener(service::notifyUser); + addInsertionListener(service::notifyUser); synchroService.register(this); } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageOutboxIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageOutboxIndexAction.java index b3c319704d8fcff1fbaa2079f4ed21b57ee8b30e..17c20aba5a235c32c2924cd25515898129d7e7d0 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageOutboxIndexAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageOutboxIndexAction.java @@ -2,11 +2,11 @@ package org.duniter.elasticsearch.user.synchro.message; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.service.MessageService; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroMessageOutboxIndexAction extends AbstractSynchroAction { @@ -17,7 +17,7 @@ public class SynchroMessageOutboxIndexAction extends AbstractSynchroAction { CryptoService cryptoService, ThreadPool threadPool, SynchroService synchroService) { - super(MessageService.INDEX, MessageService.OUTBOX_TYPE, client, pluginSettings, cryptoService, threadPool); + super(MessageService.INDEX, MessageService.OUTBOX_TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); setEnableUpdate(false); // no update diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageCommentAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageCommentAction.java index 04dbbfc7799bab05e120aab016a8456de12e490b..b774cb35b4e0c21ed7e644c3dbe675601ec624ae 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageCommentAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageCommentAction.java @@ -2,12 +2,12 @@ package org.duniter.elasticsearch.user.synchro.page; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.dao.page.RegistryCommentDao; import org.duniter.elasticsearch.user.dao.page.RegistryIndexDao; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroPageCommentAction extends AbstractSynchroAction { @@ -18,7 +18,7 @@ public class SynchroPageCommentAction extends AbstractSynchroAction { CryptoService cryptoService, ThreadPool threadPool, SynchroService synchroService) { - super(RegistryIndexDao.INDEX, RegistryCommentDao.TYPE, client, pluginSettings, cryptoService, threadPool); + super(RegistryIndexDao.INDEX, RegistryCommentDao.TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); setEnableUpdate(true); // with update diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageRecordAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageRecordAction.java index ff4038e1cb9d7e02049fab49e429b59af6b671c6..de3982fcf7f6225ac7ea868fcb925dd28ceb1a0e 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageRecordAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageRecordAction.java @@ -2,12 +2,12 @@ package org.duniter.elasticsearch.user.synchro.page; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.dao.page.RegistryIndexDao; import org.duniter.elasticsearch.user.dao.page.RegistryRecordDao; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroPageRecordAction extends AbstractSynchroAction { @@ -18,7 +18,7 @@ public class SynchroPageRecordAction extends AbstractSynchroAction { CryptoService cryptoService, ThreadPool threadPool, SynchroService synchroService) { - super(RegistryIndexDao.INDEX, RegistryRecordDao.TYPE, client, pluginSettings, cryptoService, threadPool); + super(RegistryIndexDao.INDEX, RegistryRecordDao.TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); setEnableUpdate(true); // with update diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserProfileAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserProfileAction.java index dfb4d354c5f70c304663ffd472abcf39060b8983..e3c0b90240c18c5547a344b672592c43c584f307 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserProfileAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserProfileAction.java @@ -2,11 +2,11 @@ package org.duniter.elasticsearch.user.synchro.user; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.service.UserService; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroUserProfileAction extends AbstractSynchroAction { @@ -17,7 +17,7 @@ public class SynchroUserProfileAction extends AbstractSynchroAction { CryptoService cryptoService, ThreadPool threadPool, SynchroService synchroService) { - super(UserService.INDEX, UserService.PROFILE_TYPE, client, pluginSettings, cryptoService, threadPool); + super(UserService.INDEX, UserService.PROFILE_TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); setEnableUpdate(true); // with update diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserSettingsAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserSettingsAction.java index cb07474bd59881b4d88bf2f373b6af70a604f25f..a1b7baa32c22a6d175407b6f717508971f92827d 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserSettingsAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserSettingsAction.java @@ -2,11 +2,11 @@ package org.duniter.elasticsearch.user.synchro.user; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; -import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.service.UserService; +import org.duniter.elasticsearch.synchro.AbstractSynchroAction; import org.elasticsearch.common.inject.Inject; public class SynchroUserSettingsAction extends AbstractSynchroAction { @@ -17,7 +17,7 @@ public class SynchroUserSettingsAction extends AbstractSynchroAction { CryptoService cryptoService, ThreadPool threadPool, SynchroService synchroService) { - super(UserService.INDEX, UserService.SETTINGS_TYPE, client, pluginSettings, cryptoService, threadPool); + super(UserService.INDEX, UserService.SETTINGS_TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool); setEnableUpdate(true); // with update diff --git a/duniter4j-es-user/src/test/java/org/duniter/elasticsearch/user/service/SynchroServiceTest.java b/duniter4j-es-user/src/test/java/org/duniter/elasticsearch/user/service/SynchroServiceTest.java index 3340a5f4b2baa1d6bd99af66fab3310e9360285f..9df5014b86c7e4710ba4389f3f3a9a46ed3690a5 100644 --- a/duniter4j-es-user/src/test/java/org/duniter/elasticsearch/user/service/SynchroServiceTest.java +++ b/duniter4j-es-user/src/test/java/org/duniter/elasticsearch/user/service/SynchroServiceTest.java @@ -3,6 +3,7 @@ package org.duniter.elasticsearch.user.service; import org.duniter.core.client.model.local.Peer; import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.service.ServiceLocator; +import org.duniter.elasticsearch.synchro.SynchroService; import org.duniter.elasticsearch.user.TestResource; import org.junit.*; import org.slf4j.Logger; @@ -36,15 +37,9 @@ public class SynchroServiceTest { } @Test - public void synchronizeUser() throws Exception { - - SynchroResult result = new SynchroResult(); - long fromTime = 0L; - - service.synchronizeUser(peer, fromTime, result); - + public void synchronizePeer() throws Exception { + SynchroResult result = service.synchronizePeer(peer, false); Assert.assertTrue(result.getInserts() > 0); - } @Test