diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java index 89dcb53517991810f3b4c3645b866e0470266d41..55d28e88bf4830f2fed7baf141ae14faa9b6c313 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java @@ -25,6 +25,7 @@ package org.duniter.core.client.dao; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; +import java.util.Collection; import java.util.List; import java.util.Set; @@ -37,6 +38,8 @@ public interface PeerDao extends EntityDao<String, Peer> { List<Peer> getPeersByCurrencyIdAndApi(String currencyId, String endpointApi); + List<Peer> getPeersByCurrencyIdAndApiAndPubkeys(String currencyId, String endpointApi, String[] pubkeys); + boolean isExists(String currencyId, String peerId); Long getMaxLastUpTime(String currencyId); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java index bf9e4682cec6874609dba0a3ddd72a406ec9da04..f9a3f1407fc274d8b49da029c1b6956aeea77636 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java @@ -22,6 +22,8 @@ package org.duniter.core.client.dao.mem; * #L% */ +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; @@ -79,12 +81,34 @@ public class MemoryPeerDaoImpl implements PeerDao { Preconditions.checkNotNull(currencyId); Preconditions.checkNotNull(endpointApi); return peersByCurrencyId.values().stream() - .filter(peer -> currencyId.equals(peer.getCurrency()) && + .filter(peer -> + // Filter on currency + currencyId.equals(peer.getCurrency()) && + // Filter on API peer.getApi() != null && endpointApi.equals(peer.getApi())) .collect(Collectors.toList()); } + @Override + public List<Peer> getPeersByCurrencyIdAndApiAndPubkeys(String currencyId, String endpointApi, String[] pubkeys) { + Preconditions.checkNotNull(currencyId); + Preconditions.checkNotNull(endpointApi); + List pubkeysAsList = ImmutableList.copyOf(pubkeys); + + return peersByCurrencyId.values().stream() + .filter(peer -> + // Filter on currency + currencyId.equals(peer.getCurrency()) && + // Filter on API + peer.getApi() != null && + endpointApi.equals(peer.getApi()) && + // Filter on pubkeys + peer.getPubkey() != null && + pubkeysAsList.contains(peer.getPubkey())) + .collect(Collectors.toList()); + } + @Override public boolean isExists(final String currencyId, final String peerId) { Preconditions.checkNotNull(currencyId); diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/service/MailServiceImpl.java b/duniter4j-core-shared/src/main/java/org/duniter/core/service/MailServiceImpl.java index 915fa3d9430d1d5e912d1a03031e7a85811eae4f..f6fd0ace6401d1761bd4ddf2a27fba344502b161 100644 --- a/duniter4j-core-shared/src/main/java/org/duniter/core/service/MailServiceImpl.java +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/service/MailServiceImpl.java @@ -35,7 +35,6 @@ import javax.mail.*; import javax.mail.internet.*; import java.io.Closeable; import java.io.UnsupportedEncodingException; -import java.net.InetAddress; import java.util.Arrays; import java.util.Properties; import java.util.stream.Collectors; diff --git a/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml b/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml index 995bbfa7299f1d90d9eca760556458f3dc890565..74413f8f0ab9d391d5c2ef36d6f268c4d16ba412 100644 --- a/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml @@ -175,6 +175,10 @@ duniter.p2p.includes.endpoints: [ "ES_SUBSCRIPTION_API g1.data.duniter.fr 443" ] # +# Pass a list of pubkeys to always synchronize (default: <empty>) +# +# duniter.p2p.includes.pubkeys: [""] +# # ---------------------------------- Duniter4j Mail module ----------------------- # # Enable mail module ? diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index 9752b2915a1c481c30cb99d257f91f9e0649204b..f21eee21fc2574230bf956ef0deb16cfa60aa6c4 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -174,13 +174,19 @@ duniter.security.enable: true # # Enable discovery on network peers, to automatically synchronize this peers (default: true) # -# duniter.p2p.discovery.enable: false +duniter.p2p.discovery.enable: false # # Pass a list of hosts to always synchronize (default: <empty>) # -duniter.p2p.includes.endpoints: [ - "ES_USER_API g1.data.duniter.fr 443", - "ES_SUBSCRIPTION_API g1.data.duniter.fr 443" +#duniter.p2p.includes.endpoints: [ +# "ES_USER_API g1.data.duniter.fr 443", +# "ES_SUBSCRIPTION_API g1.data.duniter.fr 443" +#] +# +# Pass a list of pubkeys to always synchronize (default: <empty>) +# +duniter.p2p.includes.pubkeys: [ + "38MEAZN68Pz1DTvT3tqgxx4yQP6snJCQhPqEFxbDk4aE" ] # # ---------------------------------- Duniter4j Mail module ----------------------- diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 0e91df4e90ee54976551c09400ac606b3501d00b..caf1240db1574517eeaa70cb1028d713b48be99a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -234,6 +234,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsArray("duniter.p2p.includes.endpoints"); } + public String[] getSynchroIncludesPubkeys() { + return settings.getAsArray("duniter.p2p.includes.pubkeys"); + } + public boolean enableSynchroDiscovery() { return settings.getAsBoolean("duniter.p2p.discovery.enable", true); } @@ -286,8 +290,12 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsBoolean("duniter.security.enable", true); } - public int getDocumentMaxTimeDelta() { - return settings.getAsInt("duniter.documentMaxTimeDelta", 7200); // in seconds = 2h + public int getDocumentTimeMaxPastDelta() { + return settings.getAsInt("duniter.document.time.maxPastDelta", 7200); // in seconds = 2h + } + + public int getDocumentTimeMaxFutureDelta() { + return settings.getAsInt("duniter.document.time.maxFutureDelta", 600); // in seconds = 10min } public String getWebSocketHost() { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index 47b7efa5a19065af4b0fc671c0dc8053d550cde5..0911dbc757d4a7fb383200cf9411efa2679ec391 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -148,6 +148,11 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { @Override public List<Peer> getPeersByCurrencyIdAndApi(String currencyId, String endpointApi) { + return getPeersByCurrencyIdAndApiAndPubkeys(currencyId, endpointApi, null); + } + + @Override + public List<Peer> getPeersByCurrencyIdAndApiAndPubkeys(String currencyId, String endpointApi, String[] pubkeys) { Preconditions.checkNotNull(currencyId); Preconditions.checkNotNull(endpointApi); @@ -158,12 +163,15 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { // Query = filter on lastUpTime NestedQueryBuilder statusQuery = QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name()))); + .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name()))); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Peer.PROPERTY_API, endpointApi)); + if (CollectionUtils.isNotEmpty(pubkeys)) { + boolQuery.filter(QueryBuilders.termsQuery(Peer.PROPERTY_PUBKEY, pubkeys)); + } - QueryBuilder apiQuery = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Peer.PROPERTY_API, endpointApi)); - - request.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().must(apiQuery).must(statusQuery))); + request.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().must(boolQuery).must(statusQuery))); SearchResponse response = request.execute().actionGet(); return toList(response, Peer.class); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/exception/InvalidTimeException.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/exception/InvalidTimeException.java new file mode 100644 index 0000000000000000000000000000000000000000..388e3dac1a7b1c043502fb14f67816ba07cdc2a6 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/exception/InvalidTimeException.java @@ -0,0 +1,47 @@ +package org.duniter.elasticsearch.exception; + +/* + * #%L + * Duniter4j :: ElasticSearch Plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.elasticsearch.rest.RestStatus; + +/** + * Created by blavenie on 01/03/16. + */ +public class InvalidTimeException extends DuniterElasticsearchException { + public InvalidTimeException(Throwable cause) { + super(cause); + } + + public InvalidTimeException(String msg, Object... args) { + super(msg, args); + } + + public InvalidTimeException(String msg, Throwable cause, Object... args) { + super(msg, args, cause); + } + + @Override + public RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java index 79e118514e01d6f3e960c5ba58f21abc074a9140..d36211c9d9cc051abc6a9c24c79396ea04f2f714 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java @@ -42,10 +42,12 @@ public class SynchroResult implements Serializable { private long updateTotal = 0; private long deleteTotal = 0; private long invalidSignatureTotal = 0; + private long invalidTimeTotal = 0; private Map<String, Long> insertHits = new HashMap<>(); private Map<String, Long> updateHits = new HashMap<>(); private Map<String, Long> deleteHits = new HashMap<>(); private Map<String, Long> invalidSignatureHits = new HashMap<>(); + private Map<String, Long> invalidTimeHits = new HashMap<>(); public void addInserts(String index, String type, long nbHits) { insertHits.put(index + "/" + type, getInserts(index, type) + nbHits); @@ -67,6 +69,12 @@ public class SynchroResult implements Serializable { invalidSignatureTotal += nbHits; } + + public void addInvalidTimes(String index, String type, long nbHits) { + invalidTimeHits.put(index + "/" + type, getDeletes(index, type) + nbHits); + invalidTimeTotal += nbHits; + } + @JsonIgnore public long getInserts(String index, String type) { return insertHits.getOrDefault(index + "/" + type, 0l); @@ -99,11 +107,14 @@ public class SynchroResult implements Serializable { return deleteTotal; } - public long getInvalidSignatures() { return invalidSignatureTotal; } + public long getInvalidTimes() { + return invalidTimeTotal; + } + @JsonIgnore public long getTotal() { return insertTotal + updateTotal + deleteTotal; @@ -121,6 +132,9 @@ public class SynchroResult implements Serializable { public void setInvalidSignatures(long invalidSignatures) { this.invalidSignatureTotal = invalidSignatures; } + public void setInvalidTimes(long invalidTimes) { + this.invalidTimeTotal = invalidTimes; + } public String toString() { return new StringBuilder() diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index baa417975b2d32028be91685d50da64ada1111d8..cfc6306a451a73581545acfc95fff89694d7488a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -37,6 +37,7 @@ import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.exception.InvalidSignatureException; +import org.duniter.elasticsearch.exception.InvalidTimeException; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -57,7 +58,8 @@ public abstract class AbstractService implements Bean { protected CryptoService cryptoService; protected final int retryCount; protected final int retryWaitDuration; - protected final int documentMaxTimeDelta; + protected final int documentTimeMaxPastDelta; + protected final int documentTimeMaxFutureDelta; protected boolean ready = false; public AbstractService(String loggerName, Duniter4jClient client, PluginSettings pluginSettings) { @@ -80,7 +82,8 @@ public abstract class AbstractService implements Bean { this.cryptoService = cryptoService; this.retryCount = pluginSettings.getNodeRetryCount(); this.retryWaitDuration = pluginSettings.getNodeRetryWaitDuration(); - this.documentMaxTimeDelta = pluginSettings.getDocumentMaxTimeDelta(); + this.documentTimeMaxPastDelta = pluginSettings.getDocumentTimeMaxPastDelta(); + this.documentTimeMaxFutureDelta = pluginSettings.getDocumentTimeMaxFutureDelta(); } /* -- protected methods --*/ @@ -161,17 +164,18 @@ public abstract class AbstractService implements Bean { } protected void verifyTimeForUpdate(String index, String type, String id, JsonNode actualObj, String timeFieldName) { + verifyTimeForUpdate(index, type, id, actualObj, false, timeFieldName); + } + + protected void verifyTimeForUpdate(String index, String type, String id, JsonNode actualObj, boolean allowOldDocuments, String timeFieldName) { // Check time has been increase - fix #27 int actualTime = getMandatoryField(actualObj, timeFieldName).asInt(); int existingTime = client.getTypedFieldById(index, type, id, timeFieldName); if (actualTime <= existingTime) { - throw new InvalidFormatException(String.format("Invalid '%s' value: can not be less or equal to the previous value.", timeFieldName, timeFieldName)); + throw new InvalidTimeException(String.format("Invalid '%s' value: can not be less or equal to the previous value.", timeFieldName, timeFieldName)); } - // Check time has been increase - fix #27 - if (Math.abs(System.currentTimeMillis()/1000 - actualTime) > documentMaxTimeDelta) { - throw new InvalidFormatException(String.format("Invalid '%s' value: too far from the UTC server time. Check your device's clock.", timeFieldName)); - } + verifyTime(actualTime, allowOldDocuments, timeFieldName); } protected void verifyTimeForInsert(JsonNode actualObj) { @@ -179,12 +183,28 @@ public abstract class AbstractService implements Bean { } protected void verifyTimeForInsert(JsonNode actualObj, String timeFieldName) { - // Check time has been increase - fix #27 + verifyTime(actualObj, false, timeFieldName); + } + + protected void verifyTime(JsonNode actualObj, boolean allowOldDocuments, String timeFieldName) { int actualTime = getMandatoryField(actualObj, timeFieldName).asInt(); + verifyTime(actualTime, allowOldDocuments, timeFieldName); + } + protected void verifyTime(int actualTime, + boolean allowOldDocuments, + String timeFieldName) { // Check time has been increase - fix #27 - if (Math.abs(System.currentTimeMillis()/1000 - actualTime) > documentMaxTimeDelta) { - throw new InvalidFormatException(String.format("Invalid '%s' value: too far from the UTC server time. Check your device's clock.", timeFieldName)); + long deltaTime = System.currentTimeMillis()/1000 - actualTime; + + // Past time + if (!allowOldDocuments && (deltaTime > 0 && Math.abs(deltaTime) > documentTimeMaxPastDelta)) { + throw new InvalidTimeException(String.format("Invalid '%s' value: too far (in the past) from the UTC server time. Check your device's clock.", timeFieldName)); + } + + // Future time + if (deltaTime < 0 && Math.abs(deltaTime) > documentTimeMaxFutureDelta) { + throw new InvalidTimeException(String.format("Invalid '%s' value: too far (in the future) from the UTC server time. Check your device's clock.", timeFieldName)); } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java index 2c44418cc48be96324bcdc600c64db67a1bb73b1..252f37c678d1ca027909f89fe0047862d48d0e84 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/AbstractSynchroAction.java @@ -16,7 +16,6 @@ import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; -import org.duniter.core.util.PrimitiveIterators; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; @@ -61,12 +60,14 @@ public abstract class AbstractSynchroAction extends AbstractService implements S private String toType; private String issuerFieldName = Record.PROPERTY_ISSUER; private String versionFieldName = Record.PROPERTY_TIME; + private String timeFieldName = versionFieldName; private ChangeSource changeSource; private HttpService httpService; private boolean enableUpdate = false; private boolean enableSignatureValidation = true; + private boolean enableTimeValidation = true; private List<SourceConsumer> insertionListeners; private List<SourceConsumer> updateListeners; private List<SourceConsumer> validationListeners; @@ -225,8 +226,11 @@ public abstract class AbstractSynchroAction extends AbstractService implements S protected void notifyValidation(final String id, final JsonNode source, + final boolean allowOldDocuments, final SynchroActionResult actionResult, final String logPrefix) throws Exception { + + // Validate signature if (enableSignatureValidation) { try { readAndVerifyIssuerSignature(source, issuerFieldName); @@ -240,6 +244,18 @@ public abstract class AbstractSynchroAction extends AbstractService implements S } } + // Validate time + if (enableTimeValidation) { + try { + verifyTime(source, allowOldDocuments, timeFieldName); + } catch (InvalidSignatureException e) { + actionResult.addInvalidTime(); + if (trace) { + logger.warn(String.format("%s %s.", logPrefix, e.getMessage())); + } + } + } + if (CollectionUtils.isNotEmpty(validationListeners)) { for (SourceConsumer listener : validationListeners) { listener.accept(id, source, actionResult); @@ -320,9 +336,9 @@ public abstract class AbstractSynchroAction extends AbstractService implements S ObjectMapper objectMapper = getObjectMapper(); // DEV ONLY: skip - if (!"user".equalsIgnoreCase(fromIndex) || !"profile".equalsIgnoreCase(fromType)) { - return; - } + //if (!"user".equalsIgnoreCase(fromIndex) || !"profile".equalsIgnoreCase(fromType)) { + // return; + //} long counter = 0; boolean stop = false; @@ -386,6 +402,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S save(id, source, objectMapper, bulkRequest, + true, // allow old documents actionResult, logPrefix); } @@ -418,18 +435,20 @@ public abstract class AbstractSynchroAction extends AbstractService implements S result.addUpdates(toIndex, toType, actionResult.getUpdates()); result.addDeletes(toIndex, toType, actionResult.getDeletes()); result.addInvalidSignatures(toIndex, toType, actionResult.getInvalidSignatures()); + result.addInvalidTimes(toIndex, toType, actionResult.getInvalidTimes()); return counter; } protected void save(String id, JsonNode source, String logPrefix) { - save(id, source, getObjectMapper(), null, NULL_ACTION_RESULT, logPrefix); + save(id, source, getObjectMapper(), null, false, NULL_ACTION_RESULT, logPrefix); } protected void save(final String id, final JsonNode source, final ObjectMapper objectMapper, final BulkRequestBuilder bulkRequest, + final boolean allowOldDocuments, final SynchroActionResult actionResult, final String logPrefix) { @@ -454,7 +473,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S } // Validate doc - notifyValidation(id, source, actionResult, logPrefix); + notifyValidation(id, source, allowOldDocuments, actionResult, logPrefix); // Execute insertion IndexRequestBuilder request = client.prepareIndex(toIndex, toType, id) @@ -491,15 +510,16 @@ public abstract class AbstractSynchroAction extends AbstractService implements S } // Validate source - notifyValidation(id, source, actionResult, logPrefix); + notifyValidation(id, source, allowOldDocuments, actionResult, logPrefix); // Execute update - UpdateRequestBuilder request = client.prepareUpdate(toIndex, toType, id) - .setDoc(objectMapper.writeValueAsBytes(source)); + UpdateRequestBuilder request = client.prepareUpdate(toIndex, toType, id); if (bulkRequest != null) { + request.setDoc(objectMapper.writeValueAsBytes(source)); bulkRequest.add(request); } else { + request.setSource(objectMapper.writeValueAsBytes(source)); request.setRefresh(true); client.safeExecuteRequest(request, false); } @@ -532,6 +552,10 @@ public abstract class AbstractSynchroAction extends AbstractService implements S this.versionFieldName = versionFieldName; } + protected void setTimeFieldName(String timeFieldName) { + this.timeFieldName = timeFieldName; + } + protected ObjectMapper getObjectMapper() { return JacksonUtils.getThreadObjectMapper(); } @@ -544,5 +568,8 @@ public abstract class AbstractSynchroAction extends AbstractService implements S this.enableSignatureValidation = enableSignatureValidation; } + protected void setEnableTimeValidation(boolean enableTimeValidation) { + this.enableTimeValidation = enableTimeValidation; + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroActionResult.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroActionResult.java index f0caeec80178c36671f6bba66cc8cd9608ed7657..36e0fc880c0440f6ce900dd75de0fcdb7185acc3 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroActionResult.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroActionResult.java @@ -6,9 +6,11 @@ public interface SynchroActionResult { void addUpdate(); void addDelete(); void addInvalidSignature(); + void addInvalidTime(); long getInserts(); long getUpdates(); long getDeletes(); long getInvalidSignatures(); + long getInvalidTimes(); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java index a5b172e3b6e39963c5273d0c573f70a9f801e0c0..2639e01daa960b0d2ebbc5c2465b8d9eeb387571 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java @@ -22,9 +22,7 @@ package org.duniter.elasticsearch.synchro; * #L% */ -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.google.common.collect.*; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.ArrayUtils; import org.duniter.core.client.dao.CurrencyDao; @@ -57,10 +55,7 @@ import org.elasticsearch.common.inject.Inject; import java.io.IOException; import java.text.DateFormat; -import java.util.Date; -import java.util.List; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -183,7 +178,7 @@ public class SynchroService extends AbstractService { logger.info(String.format("[%s] [%s] Starting synchronization... {discovery: %s}", currencyId, peerApiFilter.name(), pluginSettings.enableSynchroDiscovery())); // Get peers for currencies and API - List<Peer> peers = getPeersFromApi(currencyId, peerApiFilter); + Collection<Peer> peers = getPeersFromApi(currencyId, peerApiFilter); if (CollectionUtils.isNotEmpty(peers)) { peers.forEach(p -> synchronizePeer(p, enableSynchroWebsocket)); logger.info(String.format("[%s] [%s] Synchronization [OK]", currencyId, peerApiFilter.name())); @@ -295,30 +290,45 @@ public class SynchroService extends AbstractService { return peers; } - protected List<Peer> getPeersFromApi(final String currencyId, final EndpointApi api) { + protected Collection<Peer> getPeersFromApi(final String currencyId, final EndpointApi api) { Preconditions.checkNotNull(api); Preconditions.checkArgument(StringUtils.isNotBlank(currencyId)); try { - // Get default peer, defined in config option - List<Peer> peers = getConfigIncludesPeers(currencyId, api); - if (peers == null) { - peers = Lists.newArrayList(); + // Use map by URL, to avoid duplicated peer + Map<String, Peer> peersByUrls = Maps.newHashMap(); + + // Get peers from config + List<Peer> configPeers = getConfigIncludesPeers(currencyId, api); + if (CollectionUtils.isNotEmpty(configPeers)) { + configPeers.forEach(p -> peersByUrls.put(p.getUrl(), p)); + } + + // Get peers by pubkeys, from config + String[] includePubkeys = pluginSettings.getSynchroIncludesPubkeys(); + if (ArrayUtils.isNotEmpty(includePubkeys)) { + + // Get from DAO, by API and pubkeys + List<Peer> pubkeysPeers = peerDao.getPeersByCurrencyIdAndApiAndPubkeys(currencyId, api.name(), includePubkeys); + if (CollectionUtils.isNotEmpty(pubkeysPeers)) { + pubkeysPeers.stream() + .filter(Objects::nonNull) + .forEach(p -> peersByUrls.put(p.getUrl(), p)); + } } // Add discovered peers if (pluginSettings.enableSynchroDiscovery()) { - List<Peer> indexedPeers = peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name()); - if (CollectionUtils.isNotEmpty(indexedPeers)) { - peers.addAll(indexedPeers - .stream() + List<Peer> discoveredPeers = peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name()); + if (CollectionUtils.isNotEmpty(discoveredPeers)) { + discoveredPeers.stream() .filter(Objects::nonNull) - .collect(Collectors.toList())); + .forEach(p -> peersByUrls.put(p.getUrl(), p)); } } - return peers; + return peersByUrls.values(); } catch (Exception e) { logger.error(String.format("Could not get peers for Api [%s]", api.name()), e); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/impl/NullSynchroActionResult.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/impl/NullSynchroActionResult.java index 961808010865c6dadef0247350f4ab5f06b0dd12..e4b9b1bfbad97b6ba843609cc501bbfe24737a11 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/impl/NullSynchroActionResult.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/impl/NullSynchroActionResult.java @@ -20,6 +20,11 @@ public class NullSynchroActionResult implements SynchroActionResult { public void addInvalidSignature() { } + @Override + public void addInvalidTime() { + + } + @Override public long getInserts() { return 0; @@ -39,4 +44,9 @@ public class NullSynchroActionResult implements SynchroActionResult { public long getInvalidSignatures() { return 0; } + + @Override + public long getInvalidTimes() { + return 0; + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/impl/SynchroActionResultImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/impl/SynchroActionResultImpl.java index d63364fc0430c6640bcbd4e11b55951e2877390e..855d409eac26139b1bcd276237016bb3a3c2db8a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/impl/SynchroActionResultImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/synchro/impl/SynchroActionResultImpl.java @@ -9,6 +9,7 @@ public class SynchroActionResultImpl implements SynchroActionResult { private final PrimitiveIterators.OfLong updateHits = PrimitiveIterators.newLongSequence(); private final PrimitiveIterators.OfLong deleteHits = PrimitiveIterators.newLongSequence(); private final PrimitiveIterators.OfLong invalidSignatureHits = PrimitiveIterators.newLongSequence(); + private final PrimitiveIterators.OfLong invalidTimesHits = PrimitiveIterators.newLongSequence(); @Override public void addInsert() { @@ -29,6 +30,10 @@ public class SynchroActionResultImpl implements SynchroActionResult { public void addInvalidSignature() { invalidSignatureHits.nextLong(); } + @Override + public void addInvalidTime() { + invalidTimesHits.nextLong(); + } @Override public long getInserts() { @@ -46,4 +51,8 @@ public class SynchroActionResultImpl implements SynchroActionResult { public long getInvalidSignatures() { return invalidSignatureHits.current(); } + @Override + public long getInvalidTimes() { + return invalidTimesHits.current(); + } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/RestModule.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/RestModule.java index 2a486d506a7cc21dc1774add7a751c7978798946..5af0224d844c433f1f997b111763c74784100208 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/RestModule.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/RestModule.java @@ -34,6 +34,7 @@ import org.duniter.elasticsearch.user.rest.message.compat.RestMessageRecordGetAc import org.duniter.elasticsearch.user.rest.message.compat.RestMessageRecordIndexAction; import org.duniter.elasticsearch.user.rest.message.compat.RestMessageRecordMarkAsReadAction; import org.duniter.elasticsearch.user.rest.message.compat.RestMessageRecordSearchAction; +import org.duniter.elasticsearch.user.rest.mixed.RestMixedSearchAction; import org.duniter.elasticsearch.user.rest.page.*; import org.duniter.elasticsearch.user.rest.user.*; import org.elasticsearch.common.inject.AbstractModule; @@ -76,6 +77,9 @@ public class RestModule extends AbstractModule implements Module { bind(RestRegistryCategoryAction.class).asEagerSingleton(); bind(RestRegistryImageAction.class).asEagerSingleton(); + // Mixed search + bind(RestMixedSearchAction.class).asEagerSingleton(); + // Backward compatibility { // message/record diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/mixed/RestMixedSearchAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/mixed/RestMixedSearchAction.java new file mode 100644 index 0000000000000000000000000000000000000000..5d7a7421388669a261a3f143ce1c44a9ad74d0c6 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/mixed/RestMixedSearchAction.java @@ -0,0 +1,58 @@ +package org.duniter.elasticsearch.user.rest.mixed; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.user.dao.page.RegistryIndexDao; +import org.duniter.elasticsearch.user.dao.page.RegistryRecordDao; +import org.duniter.elasticsearch.user.service.GroupService; +import org.duniter.elasticsearch.user.service.UserService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.rest.RestRequest; + +/** + * Created by blavenie on 13/12/16. + */ +public class RestMixedSearchAction { + + @Inject + public RestMixedSearchAction(RestSecurityController securityController) { + + String[] paths = { + // Allow search on profile + page + group + String.format("/%s,%s,%s/%s,%s/_search", + UserService.INDEX, RegistryIndexDao.INDEX, GroupService.INDEX, + UserService.PROFILE_TYPE, RegistryRecordDao.TYPE), + + // Allow search on profile + page + String.format("/%s,%s/%s,%s/_search", + UserService.INDEX, RegistryIndexDao.INDEX, + UserService.PROFILE_TYPE, RegistryRecordDao.TYPE) + }; + + for(String path: paths) { + securityController.allow(RestRequest.Method.GET, path); + securityController.allow(RestRequest.Method.POST, path); + } + } +} \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/GroupService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/GroupService.java index 3ed1e7faf1889fe80a341ca907a37526beafeed2..2feed095309cf68d09b140306eebc4016b495546 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/GroupService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/GroupService.java @@ -244,24 +244,52 @@ public class GroupService extends AbstractService { .field("index", "not_analyzed") .endObject() - // avatar - .startObject("avatar") - .field("type", "attachment") - .startObject("fields") // fields - .startObject("content") // content - .field("index", "no") - .endObject() - .startObject("title") // title - .field("type", "string") - .field("store", "no") - .endObject() - .startObject("author") // author - .field("store", "no") - .endObject() - .startObject("content_type") // content_type - .field("store", "yes") - .endObject() - .endObject() + // thumbnail + .startObject("thumbnail") + .field("type", "attachment") + .startObject("fields") // src + .startObject("content") // title + .field("index", "no") + .endObject() + .startObject("title") // title + .field("type", "string") + .field("store", "no") + .endObject() + .startObject("author") // title + .field("store", "no") + .endObject() + .startObject("content_type") // title + .field("store", "yes") + .endObject() + .endObject() + .endObject() + + // pictures + .startObject("pictures") + .field("type", "nested") + .field("dynamic", "false") + .startObject("properties") + .startObject("file") // file + .field("type", "attachment") + .startObject("fields") + .startObject("content") // content + .field("index", "no") + .endObject() + .startObject("title") // title + .field("type", "string") + .field("store", "yes") + .field("analyzer", stringAnalyzer) + .endObject() + .startObject("author") // author + .field("type", "string") + .field("store", "no") + .endObject() + .startObject("content_type") // content_type + .field("store", "yes") + .endObject() + .endObject() + .endObject() + .endObject() .endObject() // social networks diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java index 97b08985c53e05e96b6584c86e3779fa1b4e3e9e..b89fef091b1dc3ed01a1ef6d28d3627a7fba2d5d 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java @@ -30,7 +30,6 @@ import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.exception.InvalidSignatureException; -import org.duniter.elasticsearch.synchro.SynchroActionResult; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.model.Message; import org.duniter.elasticsearch.user.model.UserEvent; diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index a7dadf25c878c2dd8b9b2620ca770568ee930525..1b138cbfdb303ddb6d6cddbc43ccc28f81e182c0 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -149,8 +149,8 @@ public class UserEventService extends AbstractService implements ChangeService.C // Generate json String eventJson = toJson(completeUserEvent); - if (logger.isDebugEnabled()) { - logger.debug(String.format("Indexing a event to recipient [%s]", event.getRecipient().substring(0, 8))); + if (logger.isTraceEnabled()) { + logger.trace(String.format("Indexing a event to recipient [%s]", event.getRecipient().substring(0, 8))); } // do indexation @@ -167,8 +167,8 @@ public class UserEventService extends AbstractService implements ChangeService.C if (checkSignature) { JsonNode jsonNode = readAndVerifyIssuerSignature(eventJson); String recipient = getMandatoryField(jsonNode, UserEvent.PROPERTY_ISSUER).asText(); - if (logger.isDebugEnabled()) { - logger.debug(String.format("Indexing a event to recipient [%s]", recipient.substring(0, 8))); + if (logger.isTraceEnabled()) { + logger.trace(String.format("Indexing a event to recipient [%s]", recipient.substring(0, 8))); } } if (logger.isTraceEnabled()) { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java index fbe8679d953ce202625b85fec8aa199be8750171..b1b698a276cd23a4d0c9f36535d6f38f3130c42d 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java @@ -37,6 +37,7 @@ import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.exception.AccessDeniedException; import org.duniter.elasticsearch.service.AbstractService; +import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; @@ -147,7 +148,7 @@ public class UserService extends AbstractService { * Update an user profile * @param profileJson */ - public ListenableActionFuture<UpdateResponse> updateProfileFromJson(String id, String profileJson) { + public ListenableActionFuture<? extends ActionWriteResponse> updateProfileFromJson(String id, String profileJson) { JsonNode actualObj = readAndVerifyIssuerSignature(profileJson); String issuer = getIssuer(actualObj); @@ -163,8 +164,13 @@ public class UserService extends AbstractService { logger.debug(String.format("Updating a user profile from issuer [%s]", issuer.substring(0, 8))); } - return client.prepareUpdate(INDEX, PROFILE_TYPE, issuer) - .setDoc(profileJson) + // First delete + client.prepareDelete(INDEX, PROFILE_TYPE, issuer) + .execute().actionGet(); + + // Then re-create + return client.prepareIndex(INDEX, PROFILE_TYPE, issuer) + .setSource(profileJson) .execute(); }