From bb771c6e6c77f4ab036220adfc59dd375efe7f68 Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Tue, 12 Sep 2017 11:12:43 +0200 Subject: [PATCH] Lib upgrade : guava 22 Upgrade to java 1.8 [core] Add config option to reload as part of the BC [core] Fix unit test config file [core] Peer: mark peers as DOWN is not alive since X time --- .../org/duniter/core/client/dao/PeerDao.java | 4 + .../client/dao/mem/MemoryPeerDaoImpl.java | 29 ++- .../duniter/core/client/model/local/Peer.java | 17 ++ .../service/local/NetworkServiceImpl.java | 58 ++--- .../src/test/es-home/config/elasticsearch.yml | 5 +- .../plugin-descriptor.properties | 2 +- .../org/duniter/elasticsearch/PluginInit.java | 36 +++- .../duniter/elasticsearch/PluginSettings.java | 14 +- .../duniter/elasticsearch/dao/BlockDao.java | 2 + .../duniter/elasticsearch/dao/PeerDao.java | 1 + .../elasticsearch/dao/impl/BlockDaoImpl.java | 69 +++--- .../elasticsearch/dao/impl/PeerDaoImpl.java | 200 +++++++++++++++++- .../service/AbstractService.java | 19 ++ .../service/BlockchainService.java | 16 +- .../service/CurrencyService.java | 14 +- .../elasticsearch/service/PeerService.java | 69 +++--- .../src/test/es-home/config/elasticsearch.yml | 2 +- .../service/PeerServiceTest.java | 113 ++++++++++ .../plugin-descriptor.properties | 2 +- .../plugin-descriptor.properties | 2 +- .../elasticsearch/user/PluginInit.java | 7 +- .../user/service/SynchroService.java | 1 + .../user/service/UserEventService.java | 25 ++- pom.xml | 2 +- 24 files changed, 582 insertions(+), 127 deletions(-) create mode 100644 duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java index 3b3d5989..199d817a 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java @@ -34,4 +34,8 @@ public interface PeerDao extends EntityDao<String, Peer> { List<Peer> getPeersByCurrencyId(String currencyId); boolean isExists(String currencyId, String peerId); + + Long getMaxLastUpTime(String currencyId); + + void updatePeersAsDown(String currencyId, long lastUpTimeTimeout); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java index 24ce18f6..544c4e69 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java @@ -25,10 +25,7 @@ package org.duniter.core.client.dao.mem; import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.local.Peer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.stream.Collectors; /** @@ -79,4 +76,28 @@ public class MemoryPeerDaoImpl implements PeerDao { return peersByCurrencyId.values().stream() .anyMatch(peer -> currencyId.equals(peer.getCurrency()) && peerId.equals(peer.getId())); } + + @Override + public Long getMaxLastUpTime(String currencyId) { + OptionalLong max = getPeersByCurrencyId(currencyId).stream() + .mapToLong(peer -> peer.getStats() != null ? peer.getStats().getLastUpTime() : -1) + .max(); + + if (!max.isPresent()) { + return null; + } + return max.getAsLong(); + } + + @Override + public void updatePeersAsDown(String currencyId, long lastUpTimeTimeout) { + long maxLastUpTime = (System.currentTimeMillis() - lastUpTimeTimeout)/1000; + + getPeersByCurrencyId(currencyId).stream() + .filter(peer -> peer.getStats() != null && peer.getStats().getLastUpTime() <= maxLastUpTime) + .forEach(peer -> { + peer.getStats().setStatus(Peer.PeerStatus.DOWN); + }); + + } } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java index 534d14e2..e4d69e63 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java @@ -160,6 +160,7 @@ public class Peer implements LocalEntity<String>, Serializable { } + public static final String PROPERTY_STATS = "stats"; private String id; @@ -262,6 +263,10 @@ public class Peer implements LocalEntity<String>, Serializable { return api; } + public void setApi(String api) { + this.api = api; + } + public String getDns() { return dns; } @@ -366,6 +371,9 @@ public class Peer implements LocalEntity<String>, Serializable { } public static class Stats { + public static final String PROPERTY_STATUS = "status"; + public static final String PROPERTY_LAST_UP_TIME = "lastUpTime"; + private String version; private PeerStatus status = PeerStatus.UP; // default private Integer blockNumber; @@ -377,6 +385,7 @@ public class Peer implements LocalEntity<String>, Serializable { private boolean isForkConsensus = false; private Double consensusPct = 0d; private String uid; + private Long lastUpTime; public Stats() { @@ -474,5 +483,13 @@ public class Peer implements LocalEntity<String>, Serializable { public void setUid(String uid) { this.uid = uid; } + + public Long getLastUpTime() { + return lastUpTime; + } + + public void setLastUpTime(Long lastUpTime) { + this.lastUpTime = lastUpTime; + } } } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java index 5323374f..9c3f66bb 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java @@ -166,9 +166,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network Preconditions.checkNotNull(mainPeer); log.debug("Loading network peers..."); - final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool(); - CompletableFuture<List<Peer>> peersFuture = CompletableFuture.supplyAsync(() -> loadPeerLeafs(mainPeer, filterEndpoints), pool); CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool); @@ -178,11 +176,14 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network final Map<String, String> memberUids = memberUidsFuture.join(); return peersFuture.join().stream() .map(peer -> { + // For if same as main peer, if (mainPeer.getUrl().equals(peer.getUrl())) { + // Update properties mainPeer.setPubkey(peer.getPubkey()); mainPeer.setHash(peer.getHash()); mainPeer.setCurrency(peer.getCurrency()); - return asyncRefreshPeer(mainPeer, memberUids, pool); + // reuse instance + peer = mainPeer; } return asyncRefreshPeer(peer, memberUids, pool); @@ -212,8 +213,15 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network .thenApply(apeer -> { String uid = StringUtils.isNotBlank(peer.getPubkey()) ? memberUids.get(peer.getPubkey()) : null; peer.getStats().setUid(uid); - if (peer.getStats().isReacheable() && StringUtils.isNotBlank(uid)) { - getHardship(peer); + if (peer.getStats().isReacheable()) { + + // Last UP time + peer.getStats().setLastUpTime(System.currentTimeMillis()/1000 /*unix timestamp*/ ); + + // Hardship + if (StringUtils.isNotBlank(uid)) { + getHardship(peer); + } } return apeer; }) @@ -308,7 +316,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network List<Peer> updatedPeers = getPeers(mainPeer, filter, sort, pool); knownPeers.clear(); - updatedPeers.stream().forEach(peer -> { + updatedPeers.forEach(peer -> { String buid = buid(peer.getStats()); if (!knownBlocks.contains(buid)) { knownBlocks.add(buid); @@ -361,22 +369,22 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network } return include; })) - .thenApply(addedPeers -> { - result.addAll(addedPeers); - fillPeerStatsConsensus(result); - result.sort(peerComparator); - - result.stream().forEach(peer -> { - String buid = buid(peer.getStats()); - if (!knownBlocks.contains(buid)) { - knownBlocks.add(buid); - } - knownPeers.put(peer.toString(), peer); - }); - - listener.onChanged(result); - return result; + .thenApply(addedPeers -> { + result.addAll(addedPeers); + fillPeerStatsConsensus(result); + result.sort(peerComparator); + + result.forEach(peer -> { + String buid = buid(peer.getStats()); + if (!knownBlocks.contains(buid)) { + knownBlocks.add(buid); + } + knownPeers.put(peer.toString(), peer); }); + + listener.onChanged(result); + return result; + }); } catch(Exception e) { log.error("Error while refreshing a peer: " + e.getMessage(), e); @@ -399,7 +407,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network BlockchainBlock block = readValue(json, BlockchainBlock.class); String blockBuid = buid(block); boolean isNewBlock = (blockBuid != null && !knownBlocks.contains(blockBuid)); - // If new block + wait 5s for network propagation + // If new block + wait 3s for network propagation if (!isNewBlock) return; } catch(IOException e) { log.error("Could not parse peer received by WS: " + e.getMessage(), e); @@ -431,11 +439,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network // If less than 100 node, get it in ONE call if (leaves.size() < 100) { - // TODO uncomment on prod - //List<Peer> peers = networkService.getPeers(peer); - //return ImmutableList.of(peers.get(0), peers.get(1), peers.get(2), peers.get(3)); - - //return networkService.getPeers(peer); + return networkRemoteService.getPeers(peer); } // Get it by multiple call on /network/peering?leaf= diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index f7b808ae..696d6b28 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -113,7 +113,7 @@ security.manager.enabled: false # # Delete then create all indices at startup - DO NOT set to true in production # -# duniter.indices.reload: true +#duniter.indices.reload: true # # Default string analyzer # @@ -125,7 +125,8 @@ duniter.blockchain.enable: true # # Force blockchain reload - WARNING: all user events will be resetted to 'unread' # -#duniter.blockchain.reload: true +duniter.blockchain.reload: true +duniter.blockchain.reload.from: 50999 # # Duniter node address # diff --git a/duniter4j-es-core/src/main/filtered-resources/plugin-descriptor.properties b/duniter4j-es-core/src/main/filtered-resources/plugin-descriptor.properties index df06d1c9..3a44810a 100644 --- a/duniter4j-es-core/src/main/filtered-resources/plugin-descriptor.properties +++ b/duniter4j-es-core/src/main/filtered-resources/plugin-descriptor.properties @@ -4,6 +4,6 @@ version=${project.version} site=false jvm=true classname=org.duniter.elasticsearch.Plugin -java.version=1.7 +java.version=1.8 elasticsearch.version=2.4.5 isolated=false diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 3ea14f07..554a7928 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -84,7 +84,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { protected void createIndices() { // Reload All indices - if (pluginSettings.reloadAllIndices() || pluginSettings.reloadBlockchainIndices()) { + if (pluginSettings.reloadAllIndices()) { if (logger.isWarnEnabled()) { logger.warn("Reloading indices..."); } @@ -98,8 +98,22 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { } } + else if (pluginSettings.enableBlockchain() && pluginSettings.reloadBlockchainIndices() && pluginSettings.reloadBlockchainIndicesFrom() <= 0) { + if (logger.isWarnEnabled()) { + logger.warn("/!\\ Reloading blockchain indices..."); + } + injector.getInstance(CurrencyService.class) + .deleteIndex() + .createIndexIfNotExists(); + + if (logger.isInfoEnabled()) { + logger.info("Reloading blockchain indices [OK]"); + } + } + else { + if (logger.isInfoEnabled()) { logger.info("Checking indices..."); } @@ -132,9 +146,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { } final String currencyName = currency.getCurrencyName(); - if (logger.isInfoEnabled()) { - logger.info(String.format("[%s] Indexing blockchain...", currencyName)); - } + // Add access security rules, for the currency indices injector.getInstance(RestSecurityController.class) @@ -171,6 +183,22 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { currencyName, MovementDao.TYPE); + // If partial reload (from a block) + if (pluginSettings.reloadBlockchainIndices() && pluginSettings.reloadBlockchainIndicesFrom() > 0) { + if (logger.isWarnEnabled()) { + logger.warn(String.format("/!\\ Re-indexing blockchain from block #%s...", pluginSettings.reloadBlockchainIndicesFrom())); + } + + injector.getInstance(BlockchainService.class) + .deleteFrom(currencyName, pluginSettings.reloadBlockchainIndicesFrom()); + } + else { + if (logger.isInfoEnabled()) { + logger.info(String.format("[%s] Indexing blockchain...", currencyName)); + } + } + + // Wait end of currency index creation, then index blocks threadPool.scheduleOnClusterHealthStatus(() -> { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 7955ebbe..5483ad8e 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -194,12 +194,24 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsBoolean("duniter.blockchain.reload", false); } + public int reloadBlockchainIndicesFrom() { + return settings.getAsInt("duniter.blockchain.reload.from", 0); + } + + public boolean reloadPeerIndices() { + return settings.getAsBoolean("duniter.peer.reload", false); + } + public File getTempDirectory() { return Configuration.instance().getTempDirectory(); } public int getNetworkTimeout() { - return settings.getAsInt("duniter.network.timeout", 5000 /*5s*/); + return settings.getAsInt("duniter.network.timeout", 300000 /*300s*/); + } + + public int getPeerDownTimeout() { + return settings.getAsInt("duniter.peer.down.timeout", 10*60*1000 /*10min*/); } public int getNetworkMaxConnections() { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java index 16d29387..9e4dca4c 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java @@ -68,4 +68,6 @@ public interface BlockDao extends Bean, TypeDao<BlockDao> { void deleteRange(final String currencyName, final int fromNumber, final int toNumber); List<BlockchainBlock> getBlocksByIds(String currencyName, Collection<String> ids); + + void deleteById(final String currencyName, String id); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java index d1eee33b..304e4b2e 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java @@ -31,4 +31,5 @@ public interface PeerDao extends org.duniter.core.client.dao.PeerDao, TypeDao<Pe String TYPE = "peer"; + } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java index df263021..867a5757 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java @@ -244,6 +244,45 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { : (int)result.getValue(); } + + public BlockchainBlock getBlockById(String currencyName, String id) { + return client.getSourceById(currencyName, TYPE, id, BlockchainBlock.class); + } + + /** + * Delete blocks from a start number (using bulk) + * @param currencyName + * @param fromNumber + */ + public void deleteRange(final String currencyName, final int fromNumber, final int toNumber) { + + int bulkSize = pluginSettings.getIndexBulkSize(); + + BulkRequestBuilder bulkRequest = client.prepareBulk(); + for (int number=fromNumber; number<=toNumber; number++) { + + bulkRequest.add( + client.prepareDelete(currencyName, TYPE, String.valueOf(number)) + ); + + // Flush the bulk if not empty + if ((fromNumber - number % bulkSize) == 0) { + client.flushDeleteBulk(currencyName, TYPE, bulkRequest); + bulkRequest = client.prepareBulk(); + } + } + + // last flush + client.flushDeleteBulk(currencyName, TYPE, bulkRequest); + } + + @Override + public void deleteById(String currencyName, String number) { + client.prepareDelete(currencyName, TYPE, number).execute().actionGet(); + } + + + @Override public XContentBuilder createTypeMapping() { try { @@ -333,36 +372,6 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { } } - public BlockchainBlock getBlockById(String currencyName, String id) { - return client.getSourceById(currencyName, TYPE, id, BlockchainBlock.class); - } - - /** - * Delete blocks from a start number (using bulk) - * @param currencyName - * @param fromNumber - */ - public void deleteRange(final String currencyName, final int fromNumber, final int toNumber) { - - int bulkSize = pluginSettings.getIndexBulkSize(); - - BulkRequestBuilder bulkRequest = client.prepareBulk(); - for (int number=fromNumber; number<=toNumber; number++) { - - bulkRequest.add( - client.prepareDelete(currencyName, TYPE, String.valueOf(number)) - ); - - // Flush the bulk if not empty - if ((fromNumber - number % bulkSize) == 0) { - client.flushDeleteBulk(currencyName, TYPE, bulkRequest); - bulkRequest = client.prepareBulk(); - } - } - - // last flush - client.flushDeleteBulk(currencyName, TYPE, bulkRequest); - } /* -- Internal methods -- */ diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index 61475a4b..57a6f8b9 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -29,11 +29,22 @@ import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.PeerDao; -import org.duniter.elasticsearch.dao.TypeDao; +import org.duniter.elasticsearch.model.Movement; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; +import org.elasticsearch.search.aggregations.metrics.max.Max; import java.io.IOException; import java.util.List; @@ -135,6 +146,119 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { return client.isDocumentExists(currencyId, TYPE, peerId); } + @Override + public Long getMaxLastUpTime(String currencyName) { + + // Prepare request + SearchRequestBuilder searchRequest = client + .prepareSearch(currencyName) + .setTypes(TYPE) + .setFetchSource(false) + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + + // Get max(number) + searchRequest.addAggregation(AggregationBuilders.nested(Peer.PROPERTY_STATS) + .path(Peer.PROPERTY_STATS) + .subAggregation( + AggregationBuilders.max(Peer.Stats.PROPERTY_LAST_UP_TIME) + .field(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME) + .missing(0) + )); + + // Execute query + SearchResponse searchResponse = searchRequest.execute().actionGet(); + + // Read query result + SingleBucketAggregation stats = searchResponse.getAggregations().get(Peer.PROPERTY_STATS); + if (stats == null) return null; + + Max result = stats.getAggregations().get(Peer.Stats.PROPERTY_LAST_UP_TIME); + if (result == null) { + return null; + } + + return (result.getValue() == Double.NEGATIVE_INFINITY) + ? null + : (long)result.getValue(); + } + + @Override + public void updatePeersAsDown(String currencyName, long lastUpTimeTimeout) { + + long minUpTime = (System.currentTimeMillis() - lastUpTimeTimeout)/1000; + + SearchRequestBuilder searchRequest = client.prepareSearch(currencyName) + .setFetchSource(false) + .setTypes(TYPE); + + // Query = filter on lastUpTime + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + // where lastUpTime < minUpTime + .filter(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(minUpTime)) + // AND status = UP + .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name())); + searchRequest.setQuery(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, QueryBuilders.constantScoreQuery(boolQuery))); + + BulkRequestBuilder bulkRequest = client.prepareBulk(); + + // Execute query, while there is some data + try { + + int counter = 0; + boolean loop = true; + int bulkSize = pluginSettings.getIndexBulkSize(); + searchRequest.setSize(bulkSize); + SearchResponse response = searchRequest.execute().actionGet(); + + // Execute query, while there is some data + do { + + // Read response + SearchHit[] searchHits = response.getHits().getHits(); + for (SearchHit searchHit : searchHits) { + + // Add deletion to bulk + bulkRequest.add( + client.prepareUpdate(currencyName, TYPE, searchHit.getId()) + .setDoc(String.format("{\"%s\": {\"%s\": \"%s\"}}", Peer.PROPERTY_STATS, Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.DOWN.name()).getBytes()) + ); + counter++; + + // Flush the bulk if not empty + if ((bulkRequest.numberOfActions() % bulkSize) == 0) { + client.flushBulk(bulkRequest); + bulkRequest = client.prepareBulk(); + } + } + + // Prepare next iteration + if (counter == 0 || counter >= response.getHits().getTotalHits()) { + loop = false; + } + // Prepare next iteration + else { + searchRequest.setFrom(counter); + response = searchRequest.execute().actionGet(); + } + } while(loop); + + // last flush + if ((bulkRequest.numberOfActions() % bulkSize) != 0) { + client.flushBulk(bulkRequest); + } + + if (counter > 0) { + logger.info(String.format("Mark %s peers as DOWN", counter)); + } + + } catch (SearchPhaseExecutionException e) { + // Failed or no item on index + logger.error(String.format("Error while update peer status to DOWN: %s.", e.getMessage()), e); + } + + + } + @Override public XContentBuilder createTypeMapping() { try { @@ -180,6 +304,80 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { .field("type", "string") .endObject() + // stats + .startObject(Peer.PROPERTY_STATS) + .field("type", "nested") + //.field("dynamic", "false") + .startObject("properties") + + // stats.version + .startObject("version") + .field("type", "string") + .endObject() + + // stats.status + .startObject(Peer.Stats.PROPERTY_STATUS) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // stats.blockNumber + .startObject("blockNumber") + .field("type", "integer") + .endObject() + + // stats.blockHash + .startObject("version") + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // stats.error + .startObject("error") + .field("type", "string") + .endObject() + + + // stats.medianTime + .startObject("medianTime") + .field("type", "integer") + .endObject() + + // stats.hardshipLevel + .startObject("hardshipLevel") + .field("type", "integer") + .endObject() + + // stats.consensusPct + .startObject("consensusPct") + .field("type", "integer") + .endObject() + + // stats.uid + .startObject("uid") + .field("type", "string") + .endObject() + + // stats.mainConsensus + .startObject("mainConsensus") + .field("type", "boolean") + .field("index", "not_analyzed") + .endObject() + + // stats.uid + .startObject("forkConsensus") + .field("type", "boolean") + .field("index", "not_analyzed") + .endObject() + + // stats.lastUP + .startObject(Peer.Stats.PROPERTY_LAST_UP_TIME) + .field("type", "integer") + .endObject() + + .endObject() + .endObject() + .endObject() .endObject().endObject(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index 7df37465..19a1d760 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -35,6 +35,7 @@ import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.elasticsearch.ElasticsearchException; @@ -58,6 +59,7 @@ public abstract class AbstractService implements Bean { protected CryptoService cryptoService; protected final int retryCount; protected final int retryWaitDuration; + protected boolean ready = false; public AbstractService(String loggerName, Duniter4jClient client, PluginSettings pluginSettings) { this(loggerName, client, pluginSettings, null); @@ -84,6 +86,23 @@ public abstract class AbstractService implements Bean { /* -- protected methods --*/ + protected void setIsReady(boolean ready) { + this.ready = ready; + } + protected boolean isReady() { + return this.ready; + } + + protected void waitReady() { + try { + while (!ready) { + Thread.sleep(500); + } + } catch (InterruptedException e){ + // Silent + } + } + protected <T> T executeWithRetry(RetryFunction<T> retryFunction) throws TechnicalException{ int retry = 0; while (retry < retryCount) { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 5be04843..6cc334e4 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -369,7 +369,7 @@ public class BlockchainService extends AbstractService { * @param json block as JSON * @pram wait need to wait until block processed ? */ - public void indexCurrentBlockFromJson(String currencyName, String json, boolean wait) { + public void indexCurrentBlockFromJson(final String currencyName, final String json, final boolean wait) { Preconditions.checkNotNull(json); Preconditions.checkArgument(json.length() > 0); Preconditions.checkArgument(StringUtils.isNotBlank(currencyName)); @@ -383,14 +383,24 @@ public class BlockchainService extends AbstractService { } } - public BlockchainBlock getBlockById(String currencyName, int number) { + public BlockchainBlock getBlockById(final String currencyName, final int number) { return blockDao.getBlockById(currencyName, String.valueOf(number)); } - public BlockchainBlock getCurrentBlock(String currencyName) { + public BlockchainBlock getCurrentBlock(final String currencyName) { return blockDao.getBlockById(currencyName, CURRENT_BLOCK_ID); } + public void deleteFrom(final String currencyName, final int fromBlock) { + int maxBlock = blockDao.getMaxBlockNumber(currencyName); + + blockDao.deleteRange(currencyName, fromBlock, maxBlock); + + // Delete current also + blockDao.deleteById(currencyName, CURRENT_BLOCK_ID); + + } + /* -- Internal methods -- */ protected Collection<String> indexBlocksNoBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel) { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java index 9511aee0..27956d90 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java @@ -40,6 +40,7 @@ import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.dao.*; import org.duniter.elasticsearch.exception.AccessDeniedException; import org.duniter.elasticsearch.exception.DuplicateIndexIdException; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; @@ -65,12 +66,17 @@ public class CurrencyService extends AbstractService { PluginSettings settings, CryptoService cryptoService, CurrencyDao currencyDao, - BlockchainRemoteService blockchainRemoteService, - Injector injector) { + ThreadPool threadPool, + Injector injector, + final ServiceLocator serviceLocator) { super("duniter." + INDEX, client, settings, cryptoService); - this.blockchainRemoteService = blockchainRemoteService; this.currencyDao = (CurrencyExtendDao)currencyDao; this.injector = injector; + + threadPool.scheduleOnStarted(() -> { + this.blockchainRemoteService = serviceLocator.getBlockchainRemoteService(); + setIsReady(true); + }); } public CurrencyService createIndexIfNotExists() { @@ -123,6 +129,8 @@ public class CurrencyService extends AbstractService { */ public Currency indexCurrencyFromPeer(Peer peer) { + waitReady(); + BlockchainParameters parameters = blockchainRemoteService.getParameters(peer); BlockchainBlock firstBlock = blockchainRemoteService.getBlock(peer, 0l); BlockchainBlock currentBlock = blockchainRemoteService.getCurrentBlock(peer); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java index 4199e3fe..0dc761b5 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java @@ -77,45 +77,34 @@ public class PeerService extends AbstractService { threadPool.scheduleOnStarted(() -> { this.blockchainRemoteService = serviceLocator.getBlockchainRemoteService(); this.networkService = serviceLocator.getNetworkService(); + setIsReady(true); }); } - public PeerService indexAllPeers(Peer peer) { - indexAllPeers(peer, nullProgressionModel); - return this; - } - - public PeerService indexAllPeers(Peer peer, ProgressionModel progressionModel) { + public PeerService indexPeers(Peer peer) { try { // Get the blockchain name from node BlockchainParameters parameter = blockchainRemoteService.getParameters(peer); if (parameter == null) { - progressionModel.setStatus(ProgressionModel.Status.FAILED); logger.error(I18n.t("duniter4j.es.networkService.indexPeers.remoteParametersError", peer)); return this; } String currencyName = parameter.getCurrency(); - indexPeers(currencyName, peer, progressionModel); + indexPeers(currencyName, peer); } catch(Exception e) { logger.error("Error during indexAllPeers: " + e.getMessage(), e); - progressionModel.setStatus(ProgressionModel.Status.FAILED); } return this; } - - public PeerService indexPeers(String currencyName, Peer firstPeer, ProgressionModel progressionModel) { - progressionModel.setStatus(ProgressionModel.Status.RUNNING); - progressionModel.setTotal(100); + public PeerService indexPeers(String currencyName, Peer firstPeer) { long timeStart = System.currentTimeMillis(); try { - - progressionModel.setTask(I18n.t("duniter4j.es.networkService.indexPeers.task", currencyName, firstPeer)); logger.info(I18n.t("duniter4j.es.networkService.indexPeers.task", currencyName, firstPeer)); // Default filter @@ -128,28 +117,11 @@ public class PeerService extends AbstractService { org.duniter.core.client.service.local.NetworkService.Sort sortDef = new org.duniter.core.client.service.local.NetworkService.Sort(); sortDef.sortType = null; - try { - networkService.asyncGetPeers(firstPeer, filterDef.filterEndpoints, threadPool.scheduler()) - .thenCompose(CompletableFutures::allOfToList) - .thenApply(networkService::fillPeerStatsConsensus) - .thenApply(peers -> peers.stream() - // filter on currency - .filter(peer -> ObjectUtils.equals(firstPeer.getCurrency(), peer.getCurrency())) - // filter, then sort - .filter(networkService.peerFilter(filterDef)) - .map(peer -> savePeer(peer)) - .collect(Collectors.toList())) - .thenApply(peers -> { - logger.info(I18n.t("duniter4j.es.networkService.indexPeers.succeed", currencyName, firstPeer, peers.size(), (System.currentTimeMillis() - timeStart))); - progressionModel.setStatus(ProgressionModel.Status.SUCCESS); - return peers; - }); - } catch (InterruptedException | ExecutionException e) { - throw new TechnicalException("Error while loading peers: " + e.getMessage(), e); - } + List<Peer> peers = networkService.getPeers(firstPeer, filterDef, sortDef, threadPool.scheduler()); + savePeers(currencyName, peers); + logger.info(I18n.t("duniter4j.es.networkService.indexPeers.succeed", currencyName, firstPeer, peers.size(), (System.currentTimeMillis() - timeStart))); } catch(Exception e) { logger.error("Error during indexBlocksFromNode: " + e.getMessage(), e); - progressionModel.setStatus(ProgressionModel.Status.FAILED); } return this; @@ -174,14 +146,13 @@ public class PeerService extends AbstractService { NetworkService.Sort sortDef = new NetworkService.Sort(); sortDef.sortType = null; - networkService.addPeersChangeListener(mainPeer, peers -> { - if (CollectionUtils.isNotEmpty(peers)) { - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] Updating peers endpoints (%s endpoints found)", currencyName, peers.size())); - } - peers.stream().forEach(peer -> savePeer(peer)); - } - }, filterDef, sortDef, true /*autoreconnect*/, threadPool.scheduler()); + networkService.addPeersChangeListener(mainPeer, + peers -> savePeers(currencyName, peers), + filterDef, sortDef, true /*autoreconnect*/, threadPool.scheduler()); + } + + public Long getMaxLastUpTime(String currencyName) { + return peerDao.getMaxLastUpTime(currencyName); } /** @@ -225,6 +196,18 @@ public class PeerService extends AbstractService { /* -- protected methods -- */ + protected void savePeers(String currencyName, List<Peer> peers) { + if (CollectionUtils.isNotEmpty(peers)) { + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] Updating peers endpoints (%s endpoints found)", currencyName, peers.size())); + } + peers.forEach(this::savePeer); + } + + // Mark old peers as DOWN + peerDao.updatePeersAsDown(currencyName, pluginSettings.getPeerDownTimeout()); + } + protected List<Peer> toPeers(SearchResponse response, boolean withHighlight) { // Read query result List<Peer> result = Lists.newArrayList(); diff --git a/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml index 9d3087fc..877e82a2 100644 --- a/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml @@ -125,7 +125,7 @@ duniter.blockchain.enable: false # # Duniter node to synchronize # -duniter.host: gtest.duniter.org +duniter.host: g1-test.duniter.org duniter.port: 10900 # # ---------------------------------- Duniter4j security ------------------------- diff --git a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java new file mode 100644 index 00000000..f932e97e --- /dev/null +++ b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java @@ -0,0 +1,113 @@ +package org.duniter.elasticsearch.service; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import org.duniter.core.client.config.Configuration; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.core.client.model.bma.NetworkPeering; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.service.bma.BlockchainRemoteService; +import org.duniter.core.client.service.bma.NetworkRemoteService; +import org.duniter.core.client.service.local.NetworkService; +import org.duniter.elasticsearch.TestResource; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class PeerServiceTest { + + private static final Logger log = LoggerFactory.getLogger(PeerServiceTest.class); + + @ClassRule + public static final TestResource resource = TestResource.create(); + + private CurrencyService currencyService; + private PeerService service; + private NetworkRemoteService remoteService; + private Configuration config; + private Peer peer; + private ObjectMapper objectMapper; + + @Before + public void setUp() throws Exception { + currencyService = ServiceLocator.instance().getBean(CurrencyService.class); + service = ServiceLocator.instance().getBean(PeerService.class); + remoteService = ServiceLocator.instance().getNetworkRemoteService(); + config = Configuration.instance(); + peer = new Peer.Builder() + .setHost(config.getNodeHost()) + .setPort(config.getNodePort()).build(); + objectMapper = JacksonUtils.newObjectMapper(); + + // Waiting services started + while(!service.isReady() || !currencyService.isReady()) { + Thread.sleep(1000); + } + + // Init the currency + currencyService.createIndexIfNotExists() + .indexCurrencyFromPeer(peer); + + Thread.sleep(5000); + } + + @Test + public void savePeers() throws Exception { + + // First Peer + Peer peer1 = new Peer.Builder() + .setHost(config.getNodeHost()) + .setPort(config.getNodePort()) + .setPubkey(resource.getFixtures().getUserPublicKey()) + .setCurrency(resource.getFixtures().getCurrency()) + .build(); + peer1.getStats().setLastUpTime(120000L); + + // Second peer + Peer peer2 = new Peer.Builder() + .setHost(config.getNodeHost()) + .setPort(peer1.getPort() + 1) + .setPubkey(resource.getFixtures().getUserPublicKey()) + .setCurrency(resource.getFixtures().getCurrency()) + .build(); + peer2.getStats().setLastUpTime(peer1.getStats().getLastUpTime() - 150); // Set UP just before the peer 1 + + // Save peers + service.savePeers(peer1.getCurrency(), ImmutableList.of(peer1, peer2)); + + // Try to read + Long maxLastUpTime = service.getMaxLastUpTime(peer1.getCurrency()); + Assert.assertNotNull(maxLastUpTime); + Assert.assertEquals(peer1.getStats().getLastUpTime().longValue(), maxLastUpTime.longValue()); + } +} diff --git a/duniter4j-es-subscription/src/main/filtered-resources/plugin-descriptor.properties b/duniter4j-es-subscription/src/main/filtered-resources/plugin-descriptor.properties index 77759b1c..2f0b3765 100644 --- a/duniter4j-es-subscription/src/main/filtered-resources/plugin-descriptor.properties +++ b/duniter4j-es-subscription/src/main/filtered-resources/plugin-descriptor.properties @@ -4,6 +4,6 @@ version=${project.version} site=false jvm=true classname=org.duniter.elasticsearch.subscription.Plugin -java.version=1.7 +java.version=1.8 elasticsearch.version=2.4.5 isolated=false diff --git a/duniter4j-es-user/src/main/filtered-resources/plugin-descriptor.properties b/duniter4j-es-user/src/main/filtered-resources/plugin-descriptor.properties index e29af830..12990157 100644 --- a/duniter4j-es-user/src/main/filtered-resources/plugin-descriptor.properties +++ b/duniter4j-es-user/src/main/filtered-resources/plugin-descriptor.properties @@ -4,6 +4,6 @@ version=${project.version} site=false jvm=true classname=org.duniter.elasticsearch.user.Plugin -java.version=1.7 +java.version=1.8 elasticsearch.version=2.4.5 isolated=false diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java index 90601bc2..dd3953b1 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java @@ -23,7 +23,6 @@ package org.duniter.elasticsearch.user; */ import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.service.*; @@ -129,12 +128,14 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { // Clean user events on blockchain if (cleanBlockchainUserEvents) { + int blockNumber = pluginSettings.reloadBlockchainIndicesFrom(); if (logger.isInfoEnabled()) { - logger.info("Deleting user events on blockchain (blockchain will be reload)..."); + logger.info(String.format("Deleting user events on blockchain from block #%s (blockchain will be reload)...", blockNumber)); } + // Delete events that reference a block injector.getInstance(UserEventService.class) - .deleteEventsByReference(new UserEvent.Reference(null/*all*/, BlockchainService.BLOCK_TYPE, null/*all*/)); + .deleteBlockEventsFrom(blockNumber); if (logger.isInfoEnabled()) { logger.info("Deleting user events on blockchain [OK]"); } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java index 38cdb593..7b0a3fc1 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java @@ -83,6 +83,7 @@ public class SynchroService extends AbstractSynchroService { protected void importUserChanges(SynchroResult result, Peer peer, long sinceTime) { importChanges(result, peer, UserService.INDEX, UserService.PROFILE_TYPE, sinceTime); importChanges(result, peer, UserService.INDEX, UserService.SETTINGS_TYPE, sinceTime); + importChanges(result, peer, UserService.INDEX, UserEventService.EVENT_TYPE, sinceTime); } protected void importMessageChanges(SynchroResult result, Peer peer, long sinceTime) { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index 0cabefd7..e2514c35 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -36,6 +36,7 @@ import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.exception.InvalidSignatureException; +import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.changes.ChangeEvent; import org.duniter.elasticsearch.service.changes.ChangeService; import org.duniter.elasticsearch.service.changes.ChangeSource; @@ -192,10 +193,32 @@ public class UserEventService extends AbstractService implements ChangeService.C final int bulkSize = pluginSettings.getIndexBulkSize(); - BulkRequestBuilder bulkRequest = client.prepareBulk(); + BulkRequestBuilder bulkRequest = client.prepareBulk(); addDeleteEventsByReferenceToBulk(reference, bulkRequest, bulkSize, true); } + public void deleteBlockEventsFrom(final int fromBlockNumber) { + final int bulkSize = pluginSettings.getIndexBulkSize(); + + BulkRequestBuilder bulkRequest = client.prepareBulk(); + + // Prepare search request + SearchRequestBuilder searchRequest = client + .prepareSearch(INDEX) + .setTypes(EVENT_TYPE) + .setFetchSource(false) + .setSearchType(SearchType.QUERY_AND_FETCH); + + // Query = filter on reference + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, BlockchainService.BLOCK_TYPE)); + boolQuery.filter(QueryBuilders.rangeQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_ID).gte(fromBlockNumber)); + + searchRequest.setQuery(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery))); + + client.bulkDeleteFromSearch(INDEX, EVENT_TYPE, searchRequest, bulkRequest, bulkSize, true); + } + public ActionFuture<UpdateResponse> markEventAsRead(String id, String signature) { Map<String, Object> fields = client.getMandatoryFieldsById(INDEX, EVENT_TYPE, id, UserEvent.PROPERTY_HASH, UserEvent.PROPERTY_RECIPIENT); diff --git a/pom.xml b/pom.xml index 0acceaec..97d29663 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ <file.encoding>UTF-8</file.encoding> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.5</slf4j.version> - <guava.version>18.0</guava.version> + <guava.version>22.0</guava.version> <xml-apis.version>2.0.2</xml-apis.version> <kalium.version>0.5.0_blavenie</kalium.version> <jnr-ffi.version>2.0.5</jnr-ffi.version> -- GitLab