From a5ce21e6966e1a0a08df8591a76cadf3fe5ba343 Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Fri, 15 Sep 2017 22:08:20 +0200 Subject: [PATCH] - PeerService : get ES endpoint --- .../duniter/core/client/dao/CurrencyDao.java | 2 + .../org/duniter/core/client/dao/PeerDao.java | 3 + .../client/dao/mem/MemoryCurrencyDaoImpl.java | 6 ++ .../client/dao/mem/MemoryPeerDaoImpl.java | 15 ++++ .../duniter/core/client/model/local/Peer.java | 8 ++ .../core/client/model/local/Peers.java | 7 +- .../service/local/NetworkServiceImpl.java | 19 ++--- .../src/test/misc/test_scroll.sh | 33 +++++++- .../org/duniter/elasticsearch/PluginInit.java | 3 +- .../elasticsearch/beans/ESBeanFactory.java | 2 + .../client/Duniter4jClientImpl.java | 4 + .../elasticsearch/dao/AbstractDao.java | 38 +++++++++ .../elasticsearch/dao/CurrencyExtendDao.java | 2 + .../duniter/elasticsearch/dao/DaoModule.java | 3 + .../dao/impl/CurrencyDaoImpl.java | 12 +++ .../elasticsearch/dao/impl/PeerDaoImpl.java | 49 +++++++---- .../elasticsearch/model/SearchResponse.java | 28 ++++--- .../service/AbstractSynchroService.java | 84 +++++++++++++++---- .../elasticsearch/service/DocStatService.java | 2 +- .../elasticsearch/service/PeerService.java | 16 +++- .../elasticsearch/service/ServiceLocator.java | 9 +- .../websocket/WebSocketServer.java | 6 +- .../duniter/elasticsearch/TestResource.java | 2 +- .../subscription/PluginInit.java | 6 +- .../service/SubscriptionService.java | 2 +- .../subscription/service/SynchroService.java | 35 ++++++-- .../subscription/TestResource.java | 2 +- .../elasticsearch/user/PluginInit.java | 2 +- .../user/service/SynchroService.java | 51 ++++++++--- .../elasticsearch/user/TestResource.java | 2 +- 30 files changed, 363 insertions(+), 90 deletions(-) diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/CurrencyDao.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/CurrencyDao.java index cef9f90a..e33b0676 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/CurrencyDao.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/CurrencyDao.java @@ -40,6 +40,8 @@ public interface CurrencyDao extends Bean, EntityDao<String, Currency> { void remove(final Currency currency); + List<String> getCurrencyIds(); + List<Currency> getCurrencies(long accountId); /** diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java index 2e6b031e..5fbdfaa5 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java @@ -33,9 +33,12 @@ public interface PeerDao extends EntityDao<String, Peer> { List<Peer> getPeersByCurrencyId(String currencyId); + List<Peer> getPeersByCurrencyIdAndApi(String currencyId, String endpointApi); + boolean isExists(String currencyId, String peerId); Long getMaxLastUpTime(String currencyId); void updatePeersAsDown(String currencyId, long maxUpTime); + } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryCurrencyDaoImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryCurrencyDaoImpl.java index 20699952..82ed699d 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryCurrencyDaoImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryCurrencyDaoImpl.java @@ -22,6 +22,7 @@ package org.duniter.core.client.dao.mem; * #L% */ +import com.google.common.collect.ImmutableList; import org.duniter.core.client.dao.CurrencyDao; import java.util.*; @@ -59,6 +60,11 @@ public class MemoryCurrencyDaoImpl implements CurrencyDao { currencies.remove(currency.getId()); } + @Override + public List<String> getCurrencyIds() { + return ImmutableList.copyOf(currencies.keySet()); + } + @Override public List<org.duniter.core.client.model.local.Currency> getCurrencies(long accountId) { List<org.duniter.core.client.model.local.Currency> result = new ArrayList<>(); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java index 2ed9eed2..2a55c192 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java @@ -24,6 +24,7 @@ package org.duniter.core.client.dao.mem; import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.local.Peer; +import org.duniter.core.util.Preconditions; import java.util.*; import java.util.stream.Collectors; @@ -66,19 +67,33 @@ public class MemoryPeerDaoImpl implements PeerDao { @Override public List<Peer> getPeersByCurrencyId(final String currencyId) { + Preconditions.checkNotNull(currencyId); return peersByCurrencyId.values().stream() .filter(peer -> currencyId.equals(peer.getCurrency())) .collect(Collectors.toList()); } + @Override + public List<Peer> getPeersByCurrencyIdAndApi(final String currencyId, final String endpointApi) { + Preconditions.checkNotNull(currencyId); + Preconditions.checkNotNull(endpointApi); + return peersByCurrencyId.values().stream() + .filter(peer -> currencyId.equals(peer.getCurrency()) && + peer.getApi() != null && + endpointApi.equals(peer.getApi())) + .collect(Collectors.toList()); + } + @Override public boolean isExists(final String currencyId, final String peerId) { + Preconditions.checkNotNull(currencyId); return peersByCurrencyId.values().stream() .anyMatch(peer -> currencyId.equals(peer.getCurrency()) && peerId.equals(peer.getId())); } @Override public Long getMaxLastUpTime(String currencyId) { + Preconditions.checkNotNull(currencyId); OptionalLong max = getPeersByCurrencyId(currencyId).stream() .mapToLong(peer -> peer.getStats() != null ? peer.getStats().getLastUpTime() : -1) .max(); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java index c7c2be00..cbd00a33 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java @@ -158,6 +158,13 @@ public class Peer implements LocalEntity<String>, Serializable { } + + public static final String PROPERTY_PUBKEY = "pubkey"; + public static final String PROPERTY_CURRENCY = "currency"; + public static final String PROPERTY_API = "api"; + public static final String PROPERTY_DNS = "dns"; + public static final String PROPERTY_IPV4 = "ipv4"; + public static final String PROPERTY_IPV6 = "ipv6"; public static final String PROPERTY_STATS = "stats"; private String id; @@ -373,6 +380,7 @@ public class Peer implements LocalEntity<String>, Serializable { public static class Stats { public static final String PROPERTY_STATUS = "status"; public static final String PROPERTY_LAST_UP_TIME = "lastUpTime"; + public static final String PROPERTY_UID = "uid"; private String version; private PeerStatus status = PeerStatus.UP; // default diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java index 8d0723c2..404ad6d6 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java @@ -20,8 +20,13 @@ public final class Peers { } public static String buid(Peer.Stats stats) { - return stats.getStatus() == Peer.PeerStatus.UP + return stats.getStatus() == Peer.PeerStatus.UP && stats.getBlockNumber() != null ? stats.getBlockNumber() + "-" + stats.getBlockHash() : null; } + + public static boolean hasBmaEndpoint(Peer peer) { + return hasEndPointAPI(peer, EndpointApi.BASIC_MERKLED_API) || + hasEndPointAPI(peer, EndpointApi.BMAS); + } } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java index 1bf7c5ca..ef16e97d 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java @@ -26,7 +26,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.duniter.core.client.config.Configuration; -import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.*; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.model.local.Peers; @@ -188,7 +187,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network public CompletableFuture<Peer> asyncRefreshPeer(final Peer peer, final Map<String, String> memberUids, final ExecutorService pool) { return CompletableFuture.supplyAsync(() -> getVersion(peer), pool) - .thenApply(this::getCurrentBlock) + .thenApply(p -> Peers.hasBmaEndpoint(p) ? getCurrentBlock(p) : p) .exceptionally(throwable -> { peer.getStats().setStatus(Peer.PeerStatus.DOWN); if(!(throwable instanceof HttpConnectException)) { @@ -204,17 +203,17 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network else if (log.isTraceEnabled()) log.debug(String.format("[%s] is DOWN", peer)); return peer; }) - .thenApply(apeer -> { - String uid = StringUtils.isNotBlank(peer.getPubkey()) ? memberUids.get(peer.getPubkey()) : null; - peer.getStats().setUid(uid); - if (peer.getStats().isReacheable()) { + .thenApply(p -> { + String uid = StringUtils.isNotBlank(p.getPubkey()) ? memberUids.get(p.getPubkey()) : null; + p.getStats().setUid(uid); + if (p.getStats().isReacheable() && Peers.hasBmaEndpoint(p)) { // Hardship if (StringUtils.isNotBlank(uid)) { - getHardship(peer); + getHardship(p); } } - return apeer; + return p; }) .exceptionally(throwable -> { peer.getStats().setHardshipLevel(0); @@ -225,13 +224,13 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network public List<Peer> fillPeerStatsConsensus(final List<Peer> peers) { final Map<String,Long> peerCountByBuid = peers.stream() - .filter(peer -> peer.getStats().getStatus() == Peer.PeerStatus.UP) + .filter(peer -> peer.getStats().isReacheable() && Peers.hasBmaEndpoint(peer)) .map(Peers::buid) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); // Compute main consensus buid Optional<Map.Entry<String, Long>> maxPeerCountEntry = peerCountByBuid.entrySet().stream() - .sorted(Comparator.comparing(Map.Entry::getValue, (l1, l2) -> l2.compareTo(l1))) + .sorted(Comparator.comparing(Map.Entry::getValue, Comparator.reverseOrder())) .findFirst(); final String mainBuid = maxPeerCountEntry.isPresent() ? maxPeerCountEntry.get().getKey() : null;; diff --git a/duniter4j-es-assembly/src/test/misc/test_scroll.sh b/duniter4j-es-assembly/src/test/misc/test_scroll.sh index be6a65f8..f98fa551 100755 --- a/duniter4j-es-assembly/src/test/misc/test_scroll.sh +++ b/duniter4j-es-assembly/src/test/misc/test_scroll.sh @@ -21,4 +21,35 @@ curl -XPOST 'http://localhost:9200/history/delete/_search?scroll=1m' -curl -XPOST 'http://localhost:9200/history/delete/_search/scroll?scroll=1m' -d 'cXVlcnlUaGVuRmV0Y2g7Mjs3MToxNlZjRUplMVMyaW1sZERvdVU2dHZnOzcyOjE2VmNFSmUxUzJpbWxkRG91VTZ0dmc7MDs=' \ No newline at end of file +#curl -XPOST 'http://localhost:9200/history/delete/_search/scroll?scroll=1m' -d 'cXVlcnlUaGVuRmV0Y2g7Mjs3MToxNlZjRUplMVMyaW1sZERvdVU2dHZnOzcyOjE2VmNFSmUxUzJpbWxkRG91VTZ0dmc7MDs=' + +curl -XPOST 'http://localhost:9200/g1-test/peer/_search' -d '{ + "constant_score" : { + "filter" : { + "bool" : { + "must" : [ { + "bool" : { + "filter" : { + "term" : { + "api" : "ES_USER_API" + } + } + } + }, { + "nested" : { + "query" : { + "bool" : { + "filter" : { + "term" : { + "stats.status" : "UP" + } + } + } + }, + "path" : "stats" + } + } ] + } + } + } +}'' \ No newline at end of file diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 578f3045..a40d22f6 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -31,7 +31,6 @@ import org.duniter.elasticsearch.service.CurrencyService; import org.duniter.elasticsearch.service.DocStatService; import org.duniter.elasticsearch.service.PeerService; import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; @@ -257,7 +256,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .registerIndex(CurrencyService.INDEX, CurrencyService.RECORD_TYPE); // Wait end of currency index creation, then index blocks - threadPool.scheduleOnClusterReady(docStatService::start); + threadPool.scheduleOnClusterReady(docStatService::startScheduling); } // Allow scroll search diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/beans/ESBeanFactory.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/beans/ESBeanFactory.java index 5d369923..7bc9af4c 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/beans/ESBeanFactory.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/beans/ESBeanFactory.java @@ -25,6 +25,7 @@ package org.duniter.elasticsearch.beans; import org.duniter.core.beans.Bean; import org.duniter.core.beans.BeanCreationException; import org.duniter.core.beans.BeanFactory; +import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; /** @@ -34,6 +35,7 @@ public class ESBeanFactory extends BeanFactory { private Injector injector = null; + @Inject public void setInjector(Injector injector) { this.injector = injector; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java index d94dbd84..24703ac9 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java @@ -25,10 +25,12 @@ package org.duniter.elasticsearch.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; +import com.google.common.collect.Lists; import org.apache.commons.collections4.MapUtils; import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.client.model.local.LocalEntity; +import org.duniter.core.client.model.local.Peer; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.ObjectUtils; @@ -97,6 +99,7 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.SearchHits; import org.elasticsearch.threadpool.ThreadPool; import java.io.*; @@ -396,6 +399,7 @@ public class Duniter4jClientImpl implements Duniter4jClient { } } + @Override public void bulkFromClasspathFile(String classpathFile, String indexName, String indexType) { bulkFromClasspathFile(classpathFile, indexName, indexType, null); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java index 055c72f4..aaba866f 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java @@ -24,14 +24,22 @@ package org.duniter.elasticsearch.dao; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.client.model.local.LocalEntity; +import org.duniter.core.client.model.local.Peer; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.search.SearchHit; + +import java.io.IOException; +import java.util.List; /** * Created by Benoit on 08/04/2015. @@ -72,4 +80,34 @@ public abstract class AbstractDao implements Bean { protected ObjectMapper getObjectMapper() { return JacksonUtils.getThreadObjectMapper(); } + + protected <C extends LocalEntity<String>> List<C> toList(SearchResponse response, Class<? extends C> clazz) { + ObjectMapper objectMapper = getObjectMapper(); + + if (response.getHits() == null || response.getHits().getTotalHits() == 0) return null; + + List<C> result = Lists.newArrayList(); + for (SearchHit hit: response.getHits().getHits()) { + + try { + C value = objectMapper.readValue(hit.getSourceRef().streamInput(), clazz); + value.setId(hit.getId()); + result.add(value); + } + catch(IOException e) { + logger.warn(String.format("Unable to deserialize source [%s/%s/%s] into [%s]: %s", hit.getIndex(), hit.getType(), hit.getId(), clazz.getName(), e.getMessage())); + } + } + return result; + } + + protected List<String> toListIds(SearchResponse response) { + if (response.getHits() == null || response.getHits().getTotalHits() == 0) return null; + + List<String> result = Lists.newArrayList(); + for (SearchHit hit: response.getHits().getHits()) { + result.add(hit.getId()); + } + return result; + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java index ad226bf3..de4aae0a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java @@ -30,4 +30,6 @@ import org.duniter.core.client.dao.CurrencyDao; public interface CurrencyExtendDao extends CurrencyDao, IndexTypeDao<CurrencyExtendDao> { String INDEX = "currency"; String RECORD_TYPE = "record"; + + } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java index 984cd0f9..d9e609fd 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java @@ -38,6 +38,8 @@ public class DaoModule extends AbstractModule implements Module { @Override protected void configure() { + requestInjection(ServiceLocator.getESBeanFactory()); + // Common instance bind(Duniter4jClient.class).to(Duniter4jClientImpl.class).asEagerSingleton(); bind(DocStatDao.class).to(DocStatDaoImpl.class).asEagerSingleton(); @@ -50,6 +52,7 @@ public class DaoModule extends AbstractModule implements Module { bindWithLocator(PeerDao.class); bindWithLocator(CurrencyDao.class); + } /* protected methods */ diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java index df9e7cd0..be4033f9 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java @@ -32,6 +32,8 @@ import org.duniter.elasticsearch.dao.AbstractIndexTypeDao; import org.duniter.elasticsearch.dao.CurrencyExtendDao; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -126,6 +128,16 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp throw new TechnicalException("Not implemented yet"); } + @Override + public List<String> getCurrencyIds() { + SearchRequestBuilder request = client.prepareSearch(INDEX) + .setTypes(RECORD_TYPE) + .setSize(pluginSettings.getIndexBulkSize()) + .setFetchSource(false); + + return toListIds(request.execute().actionGet()); + } + @Override public long getLastUD(String currencyId) { org.duniter.core.client.model.local.Currency currency = getById(currencyId); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index 530fc899..b7c95949 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -39,6 +39,8 @@ import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.NestedQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -141,6 +143,29 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { throw new TechnicalException("no implemented: loading all peers may be unsafe for memory..."); } + @Override + public List<Peer> getPeersByCurrencyIdAndApi(String currencyId, String endpointApi) { + Preconditions.checkNotNull(currencyId); + Preconditions.checkNotNull(endpointApi); + + SearchRequestBuilder request = client.prepareSearch(currencyId) + .setTypes(TYPE) + .setSize(1000); + + // Query = filter on lastUpTime + NestedQueryBuilder statusQuery = QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name()))); + + + QueryBuilder apiQuery = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Peer.PROPERTY_API, endpointApi)); + + request.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().must(apiQuery).must(statusQuery))); + + SearchResponse response = request.execute().actionGet(); + return toList(response, Peer.class); + } + @Override public boolean isExists(String currencyId, String peerId) { return client.isDocumentExists(currencyId, TYPE, peerId); @@ -270,39 +295,34 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { .startObject("properties") // currency - .startObject("currency") + .startObject(Peer.PROPERTY_CURRENCY) .field("type", "string") .endObject() // pubkey - .startObject("pubkey") + .startObject(Peer.PROPERTY_PUBKEY) .field("type", "string") .field("index", "not_analyzed") .endObject() // api - .startObject("api") + .startObject(Peer.PROPERTY_API) .field("type", "string") .field("index", "not_analyzed") .endObject() - // uid - .startObject("uid") - .field("type", "string") - .endObject() - // dns - .startObject("dns") + .startObject(Peer.PROPERTY_DNS) .field("type", "string") .endObject() // ipv4 - .startObject("ipv4") + .startObject(Peer.PROPERTY_IPV4) .field("type", "string") .endObject() // ipv6 - .startObject("ipv6") + .startObject(Peer.PROPERTY_IPV6) .field("type", "string") .endObject() @@ -329,7 +349,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { .endObject() // stats.blockHash - .startObject("version") + .startObject("blockHash") .field("type", "string") .field("index", "not_analyzed") .endObject() @@ -339,7 +359,6 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { .field("type", "string") .endObject() - // stats.medianTime .startObject("medianTime") .field("type", "integer") @@ -356,7 +375,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { .endObject() // stats.uid - .startObject("uid") + .startObject(Peer.Stats.PROPERTY_UID) .field("type", "string") .endObject() @@ -366,7 +385,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { .field("index", "not_analyzed") .endObject() - // stats.uid + // stats.forkConsensus .startObject("forkConsensus") .field("type", "boolean") .field("index", "not_analyzed") diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SearchResponse.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SearchResponse.java index d959155a..b6cb909f 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SearchResponse.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SearchResponse.java @@ -24,8 +24,16 @@ package org.duniter.elasticsearch.model; import com.fasterxml.jackson.databind.JsonNode; - +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.StreamInput; +import org.jboss.netty.buffer.ChannelBuffer; + +import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; +import java.nio.channels.GatheringByteChannel; import java.util.Iterator; /** @@ -39,35 +47,35 @@ public class SearchResponse implements Serializable { this.node = response; } - public Hits getHits() { - return new Hits(node.get("hits")); + public SearchHits getHits() { + return new SearchHits(node.get("hits")); } - public class Hits implements Iterator<Hit>{ + public class SearchHits implements Iterator<SearchHit>{ protected JsonNode node; private Iterator<JsonNode> hits; - Hits(JsonNode node) { + SearchHits(JsonNode node) { this.node = node; this.hits = node == null ? null : node.get("hits").iterator(); } - public int getTotal() { + public int getTotalHits() { return node == null ? 0 : node.get("total").asInt(0); } public boolean hasNext() { return hits != null && hits.hasNext(); } - public Hit next() { - return hits == null ? null : new Hit(hits.next()); + public SearchHit next() { + return hits == null ? null : new SearchHit(hits.next()); } } - public class Hit { + public class SearchHit { private JsonNode node; - Hit(JsonNode node) { + SearchHit(JsonNode node) { this.node = node; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java index a8048d47..5e936c8c 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java @@ -27,6 +27,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.entity.StringEntity; +import org.duniter.core.client.dao.CurrencyDao; +import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.client.model.local.Peer; @@ -34,20 +36,23 @@ import org.duniter.core.client.service.HttpService; import org.duniter.core.client.service.exception.HttpUnauthorizeException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.DateUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.CurrencyExtendDao; import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.exception.InvalidSignatureException; +import org.duniter.elasticsearch.model.SearchResponse; import org.duniter.elasticsearch.model.SearchScrollResponse; import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -56,39 +61,88 @@ import org.elasticsearch.index.query.QueryBuilders; import java.io.IOException; import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * Created by blavenie on 27/10/16. */ -public abstract class AbstractSynchroService extends AbstractService { +public abstract class AbstractSynchroService<T extends AbstractService> extends AbstractService { private static final String SCROLL_PARAM_VALUE = "1m"; protected HttpService httpService; + protected final ThreadPool threadPool; + protected final PeerDao peerDao; + protected final CurrencyDao currencyDao; - @Inject public AbstractSynchroService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, - ThreadPool threadPool, final ServiceLocator serviceLocator) { + ThreadPool threadPool, + CurrencyDao currencyDao, + PeerDao peerDao, + final ServiceLocator serviceLocator) { super("duniter.network.p2p", client, settings,cryptoService); + this.threadPool = threadPool; + this.currencyDao = currencyDao; + this.peerDao = peerDao; threadPool.scheduleOnStarted(() -> { httpService = serviceLocator.getHttpService(); setIsReady(true); }); } + /** + * Start scheduling doc stats update + * @return + */ + public T startScheduling() { + long delayBeforeNextHour = DateUtils.delayBeforeNextHour(); + + // Five minute before the hour (to make sure to be ready when computing doc stat - see DocStatService) + delayBeforeNextHour -= 5 * 60 * 1000; + + // If not already scheduling to early (in the next 5 min) then launch it + if (delayBeforeNextHour > 5 * 60 * 1000) { + + // Launch with a delay of 10 sec + threadPool.schedule(this::synchronize, 10 * 1000, TimeUnit.MILLISECONDS); + } + + // Schedule every hour + threadPool.scheduleAtFixedRate( + this::synchronize, + delayBeforeNextHour, + 60 * 60 * 1000 /* every hour */, + TimeUnit.MILLISECONDS); + + return (T)this; + } + /* -- protected methods -- */ - protected Peer getPeerFromAPI(EndpointApi api) { - // TODO : get peers from currency - use peering BMA API, and select peers with ESA (ES API) - Peer peer = Peer.newBuilder() - .setHost(pluginSettings.getDataSyncHost()) - .setPort(pluginSettings.getDataSyncPort()) - .setApi(api.name()) - .build(); - return peer; + + protected abstract void synchronize(); + + protected List<Peer> getPeersFromApi(final EndpointApi api) { + Preconditions.checkNotNull(api); + + try { + List<String> currencyIds = currencyDao.getCurrencyIds(); + if (CollectionUtils.isEmpty(currencyIds)) return null; + + return currencyIds.stream() + .map(currencyId -> peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name())) + .filter(Objects::nonNull) + .flatMap(List::stream) + .collect(Collectors.toList()); + } + catch (Exception e) { + logger.error(String.format("Could not get peers for Api [%s]", api.name()), e); + return null; + } } protected void safeSynchronizeIndex(Peer peer, String index, String type, long fromTime, SynchroResult result) { @@ -187,7 +241,7 @@ public abstract class AbstractSynchroService extends AbstractService { response = executeAndParseRequest(peer, fromIndex, fromType, request); if (response != null) { scrollId = response.getScrollId(); - total = response.getHits().getTotal(); + total = response.getHits().getTotalHits(); if (total > 0 && logger.isDebugEnabled()) { logger.debug(String.format("[%s] [%s/%s] %s docs to check...", peer, toIndex, toType, total)); } @@ -286,8 +340,8 @@ public abstract class AbstractSynchroService extends AbstractService { BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.setRefresh(true); - for (Iterator<SearchScrollResponse.Hit> hits = response.getHits(); hits.hasNext();){ - SearchScrollResponse.Hit hit = hits.next(); + for (Iterator<SearchResponse.SearchHit> hits = response.getHits(); hits.hasNext();){ + SearchResponse.SearchHit hit = hits.next(); String id = hit.getId(); JsonNode source = hit.getSource(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/DocStatService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/DocStatService.java index 8f1cd917..84ee223f 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/DocStatService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/DocStatService.java @@ -131,7 +131,7 @@ public class DocStatService extends AbstractService { * Start scheduling doc stats update * @return */ - public DocStatService start() { + public DocStatService startScheduling() { long delayBeforeNextHour = DateUtils.delayBeforeNextHour(); threadPool.scheduleAtFixedRate( diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java index 070190a7..531e1777 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java @@ -24,6 +24,7 @@ package org.duniter.elasticsearch.service; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.BlockchainParameters; import org.duniter.core.client.model.bma.EndpointApi; @@ -31,6 +32,7 @@ import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.local.NetworkService; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.Preconditions; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -50,6 +52,8 @@ public class PeerService extends AbstractService { private PeerDao peerDao; private ThreadPool threadPool; + private List<String> includeEndpointApis = Lists.newArrayList(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + @Inject public PeerService(Duniter4jClient client, PluginSettings settings, ThreadPool threadPool, CryptoService cryptoService, PeerDao peerDao, @@ -65,6 +69,14 @@ public class PeerService extends AbstractService { }); } + public PeerService addIncludeEndpointApi(String api) { + Preconditions.checkNotNull(api); + if (!includeEndpointApis.contains(api)) { + includeEndpointApis.add(api); + } + return this; + } + public PeerService indexPeers(Peer peer) { try { @@ -95,7 +107,7 @@ public class PeerService extends AbstractService { org.duniter.core.client.service.local.NetworkService.Filter filterDef = new org.duniter.core.client.service.local.NetworkService.Filter(); filterDef.filterType = null; filterDef.filterStatus = Peer.PeerStatus.UP; - filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + filterDef.filterEndpoints = ImmutableList.copyOf(includeEndpointApis); // Default sort org.duniter.core.client.service.local.NetworkService.Sort sortDef = new org.duniter.core.client.service.local.NetworkService.Sort(); @@ -124,7 +136,7 @@ public class PeerService extends AbstractService { NetworkService.Filter filterDef = new NetworkService.Filter(); filterDef.filterType = null; filterDef.filterStatus = Peer.PeerStatus.UP; - filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + filterDef.filterEndpoints = ImmutableList.copyOf(includeEndpointApis); filterDef.currency = currencyName; // Default sort diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java index cdc9b9c3..71596fca 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java @@ -59,12 +59,11 @@ public class ServiceLocator private static ESBeanFactory beanFactory = null; @Inject - public ServiceLocator(Injector injector) { + public ServiceLocator() { super(getOrCreateBeanFactory()); if (logger.isDebugEnabled()) { logger.debug("Starting Duniter4j ServiceLocator..."); } - beanFactory.setInjector(injector); org.duniter.core.client.service.ServiceLocator.setInstance(this); } @@ -80,9 +79,13 @@ public class ServiceLocator org.duniter.core.client.service.ServiceLocator.setInstance(null); } + public static ESBeanFactory getESBeanFactory() { + return getOrCreateBeanFactory(); + } + /* -- Internal methods -- */ - protected static ESBeanFactory getOrCreateBeanFactory() { + public static ESBeanFactory getOrCreateBeanFactory() { if (beanFactory != null) { return beanFactory; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java index 77eb1314..cafc2972 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java @@ -70,7 +70,7 @@ public class WebSocketServer { if (pluginSettings.getWebSocketEnable()) { // When node started threadPool.scheduleOnStarted(() -> { - // start WS server + // startScheduling WS server startServer(pluginSettings.getWebSocketHost(), pluginSettings.getWebSocketPort(), getEndPoints()); @@ -124,7 +124,7 @@ public class WebSocketServer { port++; } else { - throw new TechnicalException("Failed to start Websocket server", e); + throw new TechnicalException("Failed to startScheduling Websocket server", e); } } @@ -134,7 +134,7 @@ public class WebSocketServer { logger.info(String.format("Websocket server started {%s:%s} on path [%s]", host, port, WS_PATH)); } else { - String error = String.format("Failed to start Websocket server. Could not bind address {%s:%s}", host, port); + String error = String.format("Failed to startScheduling Websocket server. Could not bind address {%s:%s}", host, port); logger.error(error); throw new TechnicalException(error); } diff --git a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/TestResource.java b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/TestResource.java index 82860796..aedbb8d8 100644 --- a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/TestResource.java +++ b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/TestResource.java @@ -75,7 +75,7 @@ public class TestResource extends org.duniter.core.test.TestResource { FileUtils.copyDirectory(new File("src/test/es-home"), esHomeDir); FileUtils.copyDirectory(new File("target/classes"), new File(esHomeDir, "plugins/duniter4j-es-core")); - Elasticsearch.main(new String[]{"start"}); + Elasticsearch.main(new String[]{"startScheduling"}); } /** diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java index fab3a306..dd465fec 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java @@ -115,10 +115,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { injector.getInstance(SubscriptionService.class).startScheduling(); } - // Synchronize data + // Start synchro service if (pluginSettings.enableDataSync()) { - // Synchronize - injector.getInstance(SynchroService.class).synchronize(); + injector.getInstance(SynchroService.class) + .startScheduling(); } } diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java index 7e3b0b57..4172e5bb 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java @@ -137,7 +137,7 @@ public class SubscriptionService extends AbstractService { if (logger.isDebugEnabled()) { threadPool.scheduleAtFixedRate( () -> logger.debug("Scheduled fake task successfully executed - scheduled every [1 min]"), - 20 * 1000 /* start in 20s */, + 20 * 1000 /* startScheduling in 20s */, 60 * 1000 /* every 1 min */, TimeUnit.MILLISECONDS); } diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java index 6428e007..6050a38a 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java @@ -22,12 +22,16 @@ package org.duniter.elasticsearch.subscription.service; * #L% */ +import org.duniter.core.client.dao.CurrencyDao; +import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.service.AbstractSynchroService; +import org.duniter.elasticsearch.service.PeerService; import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao; import org.duniter.elasticsearch.subscription.dao.execution.SubscriptionExecutionDao; @@ -37,24 +41,41 @@ import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.elasticsearch.common.inject.Inject; +import java.util.List; + /** * Created by blavenie on 27/10/16. */ public class SynchroService extends AbstractSynchroService { + private static final EndpointApi ENDPOINT_API = EndpointApi.ES_SUBSCRIPTION_API; + @Inject public SynchroService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, - ThreadPool threadPool, final ServiceLocator serviceLocator) { - super(client, settings.getDelegate(), cryptoService, threadPool, serviceLocator); + ThreadPool threadPool, + PeerService peerService, + CurrencyDao currencyDao, + PeerDao peerDao, + final ServiceLocator serviceLocator) { + super(client, settings.getDelegate(), cryptoService, threadPool, currencyDao, peerDao, serviceLocator); + + // Configure peer service to allow API + peerService.addIncludeEndpointApi(ENDPOINT_API.name()); } - public void synchronize() { + @Override + protected void synchronize() { logger.info("Starting subscription data synchronization..."); - Peer peer = getPeerFromAPI(EndpointApi.ES_SUBSCRIPTION_API); - synchronize(peer); - - logger.info("Subscription data synchronization [OK]"); + // Get peers + List<Peer> peers = getPeersFromApi(ENDPOINT_API); + if (CollectionUtils.isNotEmpty(peers)) { + peers.forEach(this::synchronize); + logger.info("User subscription synchronization [OK]"); + } + else { + logger.info(String.format("User subscription synchronization [OK] - no endpoint found for API [%s]", ENDPOINT_API.name())); + } } /* -- protected methods -- */ diff --git a/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/TestResource.java b/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/TestResource.java index 04804775..c39db30d 100644 --- a/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/TestResource.java +++ b/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/TestResource.java @@ -88,7 +88,7 @@ public class TestResource extends org.duniter.core.test.TestResource { FileUtils.copyDirectory(new File("../duniter4j-es-user/target/classes"), new File(esHomeDir, "plugins/duniter4j-es-user")); if (startESNode) { - Elasticsearch.main(new String[]{"start"}); + Elasticsearch.main(new String[]{"startScheduling"}); } /*while(true) { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java index 3dcfa2d3..ff8446cd 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java @@ -168,7 +168,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { protected void doAfterStart() { // Synchronize if (pluginSettings.enableDataSync()) { - injector.getInstance(SynchroService.class).synchronize(); + injector.getInstance(SynchroService.class).startScheduling(); } // Notify admin diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java index e1aeaee0..1aa05be8 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java @@ -22,42 +22,69 @@ package org.duniter.elasticsearch.user.service; * #L% */ +import org.duniter.core.client.dao.CurrencyDao; +import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.elasticsearch.Protocol; import org.duniter.core.client.model.local.Peer; import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.DateUtils; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.service.AbstractSynchroService; +import org.duniter.elasticsearch.service.DocStatService; +import org.duniter.elasticsearch.service.PeerService; import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.nuiton.i18n.I18n; + +import java.util.List; +import java.util.concurrent.TimeUnit; /** * Created by blavenie on 27/10/16. */ -public class SynchroService extends AbstractSynchroService { - - @Inject - public SynchroService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, - ThreadPool threadPool, final ServiceLocator serviceLocator) { - super(client, settings.getDelegate(), cryptoService, threadPool, serviceLocator); +public class SynchroService extends AbstractSynchroService<SynchroService> { + + private static final EndpointApi ENDPOINT_API = EndpointApi.ES_USER_API; + + @Inject + public SynchroService(Duniter4jClient client, PluginSettings settings, + CryptoService cryptoService, + PeerService peerService, + ThreadPool threadPool, + CurrencyDao currencyDao, + PeerDao peerDao, + final ServiceLocator serviceLocator) { + super(client, settings.getDelegate(), cryptoService, threadPool, currencyDao, peerDao, serviceLocator); + + // Configure peer service to allow API + peerService.addIncludeEndpointApi(ENDPOINT_API.name()); } - public void synchronize() { + /* -- protected methods -- */ + + @Override + protected void synchronize() { logger.info("Starting user data synchronization..."); - Peer peer = getPeerFromAPI(EndpointApi.ES_USER_API); - synchronize(peer); + // Get peers + List<Peer> peers = getPeersFromApi(ENDPOINT_API); + if (CollectionUtils.isNotEmpty(peers)) { + peers.forEach(this::synchronize); + logger.info("User data synchronization [OK]"); + } + else { + logger.info(String.format("User data synchronization [OK] - no endpoint found for API [%s]", ENDPOINT_API.name())); + } - logger.info("User data synchronization [OK]"); } - /* -- protected methods -- */ - protected void synchronize(Peer peer) { long now = System.currentTimeMillis(); diff --git a/duniter4j-es-user/src/test/java/org/duniter/elasticsearch/user/TestResource.java b/duniter4j-es-user/src/test/java/org/duniter/elasticsearch/user/TestResource.java index 22822a60..ad99137d 100644 --- a/duniter4j-es-user/src/test/java/org/duniter/elasticsearch/user/TestResource.java +++ b/duniter4j-es-user/src/test/java/org/duniter/elasticsearch/user/TestResource.java @@ -60,7 +60,7 @@ public class TestResource extends org.duniter.core.test.TestResource { FileUtils.copyDirectory(new File("src/test/es-home"), esHomeDir); FileUtils.copyDirectory(new File("target/classes"), new File(esHomeDir, "plugins/duniter4j-es-user")); - Elasticsearch.main(new String[]{"start"}); + Elasticsearch.main(new String[]{"startScheduling"}); // Init a configuration testConfiguration = new TestConfiguration(getConfigFileName()); -- GitLab