Skip to content
Snippets Groups Projects
Commit 42b359da authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[fix] User event reload when blockchain reload

[fix] better log on node start
parent 80a4386a
No related branches found
No related tags found
No related merge requests found
Showing with 53 additions and 34 deletions
...@@ -85,7 +85,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -85,7 +85,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
// Reload All indices // Reload All indices
if (pluginSettings.reloadAllIndices() || pluginSettings.reloadBlockchainIndices()) { if (pluginSettings.reloadAllIndices() || pluginSettings.reloadBlockchainIndices()) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("Reloading [core-plugin] indices..."); logger.warn("Reloading indices...");
} }
injector.getInstance(CurrencyService.class) injector.getInstance(CurrencyService.class)
...@@ -93,21 +93,21 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -93,21 +93,21 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
.createIndexIfNotExists(); .createIndexIfNotExists();
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Reloading [core-plugin] indices. [OK]"); logger.info("Reloading indices [OK]");
} }
} }
else { else {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Checking if [core-plugin] indices exists..."); logger.info("Checking indices...");
} }
injector.getInstance(CurrencyService.class) injector.getInstance(CurrencyService.class)
.createIndexIfNotExists(); .createIndexIfNotExists();
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Checking if [core-plugin] indices exists. [OK]"); logger.info("Checking indices [OK]");
} }
} }
} }
...@@ -120,9 +120,13 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -120,9 +120,13 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
Peer peer = pluginSettings.checkAndGetPeer(); Peer peer = pluginSettings.checkAndGetPeer();
// Index (or refresh) node's currency // Index (or refresh) node's currency
Currency currency = injector.getInstance(CurrencyService.class) final Currency currency = injector.getInstance(CurrencyService.class)
.indexCurrencyFromPeer(peer, true); .indexCurrencyFromPeer(peer, true);
if (logger.isInfoEnabled()) {
logger.info(String.format("[%s] Indexing blockchain...", currency.getCurrencyName()));
}
injector.getInstance(RestSecurityController.class) injector.getInstance(RestSecurityController.class)
// Add access to <currency>/block index // Add access to <currency>/block index
...@@ -161,6 +165,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -161,6 +165,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
injector.getInstance(PeerService.class) injector.getInstance(PeerService.class)
.listenAndIndexPeers(peer); .listenAndIndexPeers(peer);
if (logger.isInfoEnabled()) {
logger.info(String.format("[%s] Indexing blockchain [OK]", currency.getCurrencyName()));
}
}, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN); }, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN);
......
...@@ -197,7 +197,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { ...@@ -197,7 +197,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
} }
public boolean reloadBlockchainIndices() { public boolean reloadBlockchainIndices() {
return settings.getAsBoolean("duniter.blockchain.indices.reload", false); return settings.getAsBoolean("duniter.blockchain.reload", false);
} }
public File getTempDirectory() { public File getTempDirectory() {
......
...@@ -96,10 +96,21 @@ public abstract class AbstractBlockchainListenerService extends AbstractService ...@@ -96,10 +96,21 @@ public abstract class AbstractBlockchainListenerService extends AbstractService
if("current".equals(change.getId())) return; if("current".equals(change.getId())) return;
switch (change.getOperation()) { switch (change.getOperation()) {
// on INDEX
case CREATE:
if (change.getSource() != null) {
synchronized (threadLock) {
processBlockIndex(change);
}
}
break;
// on INDEX // on INDEX
case INDEX: case INDEX:
synchronized (threadLock) { if (change.getSource() != null) {
processBlockIndex(change); synchronized (threadLock) {
processBlockIndex(change);
}
} }
break; break;
......
...@@ -79,24 +79,24 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -79,24 +79,24 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
if (reloadIndices) { if (reloadIndices) {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Reloading all subscription indices..."); logger.info("Reloading indices...");
} }
injector.getInstance(SubscriptionIndexDao.class) injector.getInstance(SubscriptionIndexDao.class)
.deleteIndex() .deleteIndex()
.createIndexIfNotExists(); .createIndexIfNotExists();
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Reloading all subscription indices... [OK]"); logger.info("Reloading indices [OK]");
} }
} }
else { else {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Checking subscription indices..."); logger.info("Checking indices...");
} }
injector.getInstance(SubscriptionIndexDao.class).createIndexIfNotExists(); injector.getInstance(SubscriptionIndexDao.class).createIndexIfNotExists();
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Checking subscription indices... [OK]"); logger.info("Checking indices [OK]");
} }
} }
} }
......
...@@ -83,7 +83,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -83,7 +83,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
// Reload all indices // Reload all indices
if (pluginSettings.reloadAllIndices()) { if (pluginSettings.reloadAllIndices()) {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Reloading [user-plugin] indices..."); logger.info("Reloading indices...");
} }
injector.getInstance(HistoryService.class) injector.getInstance(HistoryService.class)
.deleteIndex() .deleteIndex()
...@@ -102,14 +102,17 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -102,14 +102,17 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
.createIndexIfNotExists(); .createIndexIfNotExists();
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Reloading [user-plugin] indices. [OK]"); logger.info("Reloading indices [OK]");
} }
} }
else { else {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Checking [user-plugin] indices..."); logger.info("Checking indices...");
} }
boolean cleanBlockchainUserEvents = injector.getInstance(UserService.class).isIndexExists() && pluginSettings.reloadBlockchainIndices();
injector.getInstance(HistoryService.class).createIndexIfNotExists(); injector.getInstance(HistoryService.class).createIndexIfNotExists();
injector.getInstance(UserService.class).createIndexIfNotExists(); injector.getInstance(UserService.class).createIndexIfNotExists();
injector.getInstance(MessageService.class).createIndexIfNotExists(); injector.getInstance(MessageService.class).createIndexIfNotExists();
...@@ -117,19 +120,19 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -117,19 +120,19 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
injector.getInstance(UserInvitationService.class).createIndexIfNotExists(); injector.getInstance(UserInvitationService.class).createIndexIfNotExists();
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Checking [user-plugin] indices. [OK]"); logger.info("Checking indices [OK]");
} }
// Reload blockchain indices : user/event // Clean user events on blockchain
if (pluginSettings.reloadBlockchainIndices()) { if (cleanBlockchainUserEvents) {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Deleting existing user event, referencing a block..."); logger.info("Deleting user events on blockchain (blockchain will be reload)...");
} }
// Delete events that reference a block // Delete events that reference a block
injector.getInstance(UserEventService.class) injector.getInstance(UserEventService.class)
.deleteEventsByReference(new UserEvent.Reference(null/*all*/, BlockchainService.BLOCK_TYPE, null/*all*/)); .deleteEventsByReference(new UserEvent.Reference(null/*all*/, BlockchainService.BLOCK_TYPE, null/*all*/));
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
logger.info("Deleting existing user event, referencing a block. [OK]"); logger.info("Deleting user events on blockchain [OK]");
} }
} }
} }
......
...@@ -68,7 +68,6 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic ...@@ -68,7 +68,6 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic
private final UserService userService; private final UserService userService;
private final UserEventService userEventService; private final UserEventService userEventService;
private final AdminService adminService; private final AdminService adminService;
private Queue<UserEvent.Reference> referencesToDelete = ConcurrentCollections.newBlockingQueue();
@Inject @Inject
public BlockchainUserEventService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, public BlockchainUserEventService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService,
...@@ -147,20 +146,13 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic ...@@ -147,20 +146,13 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic
reference.setHash(block.getHash()); reference.setHash(block.getHash());
} }
// Add to queue this.bulkRequest = userEventService.addDeleteEventsByReferenceToBulk(reference, this.bulkRequest, this.bulkSize, false);
referencesToDelete.add(reference);
flushBulkRequestOrSchedule(); flushBulkRequestOrSchedule();
} }
protected void beforeFlush() { protected void beforeFlush() {
UserEvent.Reference reference = referencesToDelete.poll();
while (reference != null) {
this.bulkRequest = userEventService.addDeleteEventsByReferenceToBulk(reference, this.bulkRequest, this.bulkSize, false);
reference = referencesToDelete.poll();
}
} }
/* -- internal method -- */ /* -- internal method -- */
......
...@@ -187,15 +187,13 @@ public class UserEventService extends AbstractService implements ChangeService.C ...@@ -187,15 +187,13 @@ public class UserEventService extends AbstractService implements ChangeService.C
} }
public ActionFuture<?> deleteEventsByReference(final UserEvent.Reference reference) { public void deleteEventsByReference(final UserEvent.Reference reference) {
Preconditions.checkNotNull(reference); Preconditions.checkNotNull(reference);
final int bulkSize = pluginSettings.getIndexBulkSize(); final int bulkSize = pluginSettings.getIndexBulkSize();
return threadPool.schedule(() -> { BulkRequestBuilder bulkRequest = client.prepareBulk();
BulkRequestBuilder bulkRequest = client.prepareBulk(); addDeleteEventsByReferenceToBulk(reference, bulkRequest, bulkSize, true);
addDeleteEventsByReferenceToBulk(reference, bulkRequest, bulkSize, true);
});
} }
public ActionFuture<UpdateResponse> markEventAsRead(String id, String signature) { public ActionFuture<UpdateResponse> markEventAsRead(String id, String signature) {
......
...@@ -81,6 +81,13 @@ public class UserService extends AbstractService { ...@@ -81,6 +81,13 @@ public class UserService extends AbstractService {
return this; return this;
} }
/**
* Create index need for blockchain mail, if need
*/
public boolean isIndexExists() {
return client.existsIndex(INDEX);
}
/** /**
* Create index for mail * Create index for mail
* @throws JsonProcessingException * @throws JsonProcessingException
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment