From 42b359da7d7a9f788d633058431db3d57e157d61 Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Tue, 2 May 2017 11:40:53 +0200 Subject: [PATCH] [fix] User event reload when blockchain reload [fix] better log on node start --- .../org/duniter/elasticsearch/PluginInit.java | 18 +++++++++++++----- .../duniter/elasticsearch/PluginSettings.java | 2 +- .../AbstractBlockchainListenerService.java | 15 +++++++++++++-- .../subscription/PluginInit.java | 8 ++++---- .../elasticsearch/user/PluginInit.java | 19 +++++++++++-------- .../service/BlockchainUserEventService.java | 10 +--------- .../user/service/UserEventService.java | 8 +++----- .../user/service/UserService.java | 7 +++++++ 8 files changed, 53 insertions(+), 34 deletions(-) diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 7547e363..778b8b5d 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -85,7 +85,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { // Reload All indices if (pluginSettings.reloadAllIndices() || pluginSettings.reloadBlockchainIndices()) { if (logger.isWarnEnabled()) { - logger.warn("Reloading [core-plugin] indices..."); + logger.warn("Reloading indices..."); } injector.getInstance(CurrencyService.class) @@ -93,21 +93,21 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Reloading [core-plugin] indices. [OK]"); + logger.info("Reloading indices [OK]"); } } else { if (logger.isInfoEnabled()) { - logger.info("Checking if [core-plugin] indices exists..."); + logger.info("Checking indices..."); } injector.getInstance(CurrencyService.class) .createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Checking if [core-plugin] indices exists. [OK]"); + logger.info("Checking indices [OK]"); } } } @@ -120,9 +120,13 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { Peer peer = pluginSettings.checkAndGetPeer(); // Index (or refresh) node's currency - Currency currency = injector.getInstance(CurrencyService.class) + final Currency currency = injector.getInstance(CurrencyService.class) .indexCurrencyFromPeer(peer, true); + if (logger.isInfoEnabled()) { + logger.info(String.format("[%s] Indexing blockchain...", currency.getCurrencyName())); + } + injector.getInstance(RestSecurityController.class) // Add access to <currency>/block index @@ -161,6 +165,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { injector.getInstance(PeerService.class) .listenAndIndexPeers(peer); + if (logger.isInfoEnabled()) { + logger.info(String.format("[%s] Indexing blockchain [OK]", currency.getCurrencyName())); + } + }, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 0227fd27..35490063 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -197,7 +197,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public boolean reloadBlockchainIndices() { - return settings.getAsBoolean("duniter.blockchain.indices.reload", false); + return settings.getAsBoolean("duniter.blockchain.reload", false); } public File getTempDirectory() { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java index c565ec9c..4c9bbf18 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java @@ -96,10 +96,21 @@ public abstract class AbstractBlockchainListenerService extends AbstractService if("current".equals(change.getId())) return; switch (change.getOperation()) { + // on INDEX + case CREATE: + if (change.getSource() != null) { + synchronized (threadLock) { + processBlockIndex(change); + } + } + break; + // on INDEX case INDEX: - synchronized (threadLock) { - processBlockIndex(change); + if (change.getSource() != null) { + synchronized (threadLock) { + processBlockIndex(change); + } } break; diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java index 6dc6e5ad..9ef6343c 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java @@ -79,24 +79,24 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { if (reloadIndices) { if (logger.isInfoEnabled()) { - logger.info("Reloading all subscription indices..."); + logger.info("Reloading indices..."); } injector.getInstance(SubscriptionIndexDao.class) .deleteIndex() .createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Reloading all subscription indices... [OK]"); + logger.info("Reloading indices [OK]"); } } else { if (logger.isInfoEnabled()) { - logger.info("Checking subscription indices..."); + logger.info("Checking indices..."); } injector.getInstance(SubscriptionIndexDao.class).createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Checking subscription indices... [OK]"); + logger.info("Checking indices [OK]"); } } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java index 2833d2c0..db47f44a 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java @@ -83,7 +83,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { // Reload all indices if (pluginSettings.reloadAllIndices()) { if (logger.isInfoEnabled()) { - logger.info("Reloading [user-plugin] indices..."); + logger.info("Reloading indices..."); } injector.getInstance(HistoryService.class) .deleteIndex() @@ -102,14 +102,17 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Reloading [user-plugin] indices. [OK]"); + logger.info("Reloading indices [OK]"); } } else { if (logger.isInfoEnabled()) { - logger.info("Checking [user-plugin] indices..."); + logger.info("Checking indices..."); } + + boolean cleanBlockchainUserEvents = injector.getInstance(UserService.class).isIndexExists() && pluginSettings.reloadBlockchainIndices(); + injector.getInstance(HistoryService.class).createIndexIfNotExists(); injector.getInstance(UserService.class).createIndexIfNotExists(); injector.getInstance(MessageService.class).createIndexIfNotExists(); @@ -117,19 +120,19 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { injector.getInstance(UserInvitationService.class).createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Checking [user-plugin] indices. [OK]"); + logger.info("Checking indices [OK]"); } - // Reload blockchain indices : user/event - if (pluginSettings.reloadBlockchainIndices()) { + // Clean user events on blockchain + if (cleanBlockchainUserEvents) { if (logger.isInfoEnabled()) { - logger.info("Deleting existing user event, referencing a block..."); + logger.info("Deleting user events on blockchain (blockchain will be reload)..."); } // Delete events that reference a block injector.getInstance(UserEventService.class) .deleteEventsByReference(new UserEvent.Reference(null/*all*/, BlockchainService.BLOCK_TYPE, null/*all*/)); if (logger.isInfoEnabled()) { - logger.info("Deleting existing user event, referencing a block. [OK]"); + logger.info("Deleting user events on blockchain [OK]"); } } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index aef6a33a..ada0fbce 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -68,7 +68,6 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic private final UserService userService; private final UserEventService userEventService; private final AdminService adminService; - private Queue<UserEvent.Reference> referencesToDelete = ConcurrentCollections.newBlockingQueue(); @Inject public BlockchainUserEventService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, @@ -147,20 +146,13 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic reference.setHash(block.getHash()); } - // Add to queue - referencesToDelete.add(reference); - + this.bulkRequest = userEventService.addDeleteEventsByReferenceToBulk(reference, this.bulkRequest, this.bulkSize, false); flushBulkRequestOrSchedule(); } protected void beforeFlush() { - UserEvent.Reference reference = referencesToDelete.poll(); - while (reference != null) { - this.bulkRequest = userEventService.addDeleteEventsByReferenceToBulk(reference, this.bulkRequest, this.bulkSize, false); - reference = referencesToDelete.poll(); - } } /* -- internal method -- */ diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index 006857bf..7f9379c4 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -187,15 +187,13 @@ public class UserEventService extends AbstractService implements ChangeService.C } - public ActionFuture<?> deleteEventsByReference(final UserEvent.Reference reference) { + public void deleteEventsByReference(final UserEvent.Reference reference) { Preconditions.checkNotNull(reference); final int bulkSize = pluginSettings.getIndexBulkSize(); - return threadPool.schedule(() -> { - BulkRequestBuilder bulkRequest = client.prepareBulk(); - addDeleteEventsByReferenceToBulk(reference, bulkRequest, bulkSize, true); - }); + BulkRequestBuilder bulkRequest = client.prepareBulk(); + addDeleteEventsByReferenceToBulk(reference, bulkRequest, bulkSize, true); } public ActionFuture<UpdateResponse> markEventAsRead(String id, String signature) { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java index f827dd28..cb75982e 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java @@ -81,6 +81,13 @@ public class UserService extends AbstractService { return this; } + /** + * Create index need for blockchain mail, if need + */ + public boolean isIndexExists() { + return client.existsIndex(INDEX); + } + /** * Create index for mail * @throws JsonProcessingException -- GitLab