From d7a5d91df5dd5e4d7b16a348a8a9fc575b37aefa Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Tue, 16 Jan 2018 20:35:44 +0100 Subject: [PATCH] [fix] Allow to reindex a block range - Workaround for Cesium issue #656 --- .../main/assembly/config/elasticsearch.yml | 3 +- .../src/test/es-home/config/elasticsearch.yml | 5 +- .../org/duniter/elasticsearch/PluginInit.java | 33 +++++- .../duniter/elasticsearch/PluginSettings.java | 3 + .../service/BlockchainService.java | 112 ++++++++++++++++-- .../i18n/duniter4j-es-core_en_GB.properties | 3 + .../i18n/duniter4j-es-core_fr_FR.properties | 5 + 7 files changed, 147 insertions(+), 17 deletions(-) diff --git a/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml b/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml index a7f7c95c..46095a0f 100644 --- a/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml @@ -126,7 +126,8 @@ duniter.blockchain.enable: true # Force blockchain full synchronization - /!\ WARNING: all user events will be reset to 'unread' # # duniter.blockchain.reload: true -# duniter.blockchain.reload.from: 50999 +# duniter.blockchain.reload.from: 18900 +# duniter.blockchain.reload.to: 19000 # # Duniter node address # diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index 0759654b..6f057657 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -125,8 +125,9 @@ duniter.blockchain.enable: true # # Force blockchain full synchronization - /!\ WARNING: all user events will be reset to 'unread' # -# duniter.blockchain.reload: true -# duniter.blockchain.reload.from: 50999 +duniter.blockchain.reload: true +duniter.blockchain.reload.from: 18900 +duniter.blockchain.reload.to: 19000 # # Duniter node address # diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index d774f743..1c279625 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -207,12 +207,27 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { // If partial reload (from a block) if (pluginSettings.reloadBlockchainIndices() && pluginSettings.reloadBlockchainIndicesFrom() > 0) { - if (logger.isWarnEnabled()) { - logger.warn(String.format("/!\\ Re-indexing blockchain from block #%s...", pluginSettings.reloadBlockchainIndicesFrom())); + // Delete blocs range [from,to] + if (pluginSettings.reloadBlockchainIndicesTo() > pluginSettings.reloadBlockchainIndicesFrom()) { + if (logger.isWarnEnabled()) { + logger.warn(String.format("/!\\ Re-indexing blockchain range [%s-%s]...", + pluginSettings.reloadBlockchainIndicesFrom(), + pluginSettings.reloadBlockchainIndicesTo())); + } + + injector.getInstance(BlockchainService.class) + .deleteRange(currencyName, + pluginSettings.reloadBlockchainIndicesFrom(), + pluginSettings.reloadBlockchainIndicesTo()); } + else { + if (logger.isWarnEnabled()) { + logger.warn(String.format("/!\\ Re-indexing blockchain from block #%s...", pluginSettings.reloadBlockchainIndicesFrom())); + } - injector.getInstance(BlockchainService.class) - .deleteFrom(currencyName, pluginSettings.reloadBlockchainIndicesFrom()); + injector.getInstance(BlockchainService.class) + .deleteFrom(currencyName, pluginSettings.reloadBlockchainIndicesFrom()); + } } else { if (logger.isInfoEnabled()) { @@ -224,6 +239,16 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { // Wait end of currency index creation, then index blocks threadPool.scheduleOnClusterReady(() -> { + // Reindex range + if (pluginSettings.reloadBlockchainIndices() + && pluginSettings.reloadBlockchainIndicesFrom() > 0 + && pluginSettings.reloadBlockchainIndicesTo() > pluginSettings.reloadBlockchainIndicesFrom()) { + injector.getInstance(BlockchainService.class) + .indexBlocksRange(peer, + pluginSettings.reloadBlockchainIndicesFrom(), + pluginSettings.reloadBlockchainIndicesTo()); + } + try { // Index blocks (and listen if new block appear) injector.getInstance(BlockchainService.class) diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index afd7e592..a0f394a8 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -201,6 +201,9 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { public int reloadBlockchainIndicesFrom() { return settings.getAsInt("duniter.blockchain.reload.from", 0); } + public int reloadBlockchainIndicesTo() { + return settings.getAsInt("duniter.blockchain.reload.to", -1); + } public File getTempDirectory() { return Configuration.instance().getTempDirectory(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index d9d54939..59f7baeb 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -198,8 +198,8 @@ public class BlockchainService extends AbstractService { if (startNumber <= peerCurrentBlockNumber) { Collection<String> missingBlocks = bulkIndex - ? indexBlocksUsingBulk(peer, currencyName, startNumber, peerCurrentBlockNumber, progressionModel) - : indexBlocksNoBulk(peer, currencyName, startNumber, peerCurrentBlockNumber, progressionModel); + ? indexBlocksUsingBulk(peer, currencyName, startNumber, peerCurrentBlockNumber, progressionModel, true) + : indexBlocksNoBulk(peer, currencyName, startNumber, peerCurrentBlockNumber, progressionModel, true); // If some blocks are missing, try to get it using other peers if (CollectionUtils.isNotEmpty(missingBlocks)) { @@ -212,7 +212,7 @@ public class BlockchainService extends AbstractService { progressionModel.setStatus(ProgressionModel.Status.SUCCESS); } else { - logger.warn(String.format("[%s] [%s] Could not indexed allOfToList blocks. Missing %s blocks.", currencyName, peer, missingBlocks.size())); + logger.warn(String.format("[%s] [%s] Could not indexed some blocks. Missing %s blocks.", currencyName, peer, missingBlocks.size())); progressionModel.setStatus(ProgressionModel.Status.FAILED); } } @@ -224,7 +224,83 @@ public class BlockchainService extends AbstractService { } } } catch(Exception e) { - logger.error("Error during indexBlocksFromNode: " + e.getMessage(), e); + logger.error("Error during indexLastBlocks: " + e.getMessage(), e); + progressionModel.setStatus(ProgressionModel.Status.FAILED); + } + + return this; + } + + public BlockchainService indexBlocksRange(Peer peer, int firstNumber, int lastNumber) { + indexBlocksRange(peer, nullProgressionModel, firstNumber, lastNumber); + return this; + } + + public BlockchainService indexBlocksRange(Peer peer, ProgressionModel progressionModel, int firstNumber, int lastNumber) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(progressionModel); + Preconditions.checkArgument(firstNumber < lastNumber); + + boolean bulkIndex = pluginSettings.isIndexBulkEnable(); + + progressionModel.setStatus(ProgressionModel.Status.RUNNING); + progressionModel.setTotal(100); + long timeStart = System.currentTimeMillis(); + + try { + // Get the blockchain name from node + BlockchainParameters parameter = blockchainRemoteService.getParameters(peer); + if (parameter == null) { + progressionModel.setStatus(ProgressionModel.Status.FAILED); + logger.error(I18n.t("duniter4j.blockIndexerService.indexBlocksRange.remoteParametersError",peer)); + return this; + } + String currencyName = parameter.getCurrency(); + + progressionModel.setTask(I18n.t("duniter4j.blockIndexerService.indexBlocksRange.task", currencyName, peer, firstNumber, lastNumber)); + logger.info(I18n.t("duniter4j.blockIndexerService.indexBlocksRange.task", currencyName, peer, firstNumber, lastNumber)); + + // Then index allOfToList blocks + BlockchainBlock peerCurrentBlock = blockchainRemoteService.getCurrentBlock(peer); + + if (peerCurrentBlock != null) { + final int peerCurrentBlockNumber = peerCurrentBlock.getNumber(); + + + boolean isLastCurrent = lastNumber >= peerCurrentBlockNumber; + if (lastNumber > peerCurrentBlockNumber) { + lastNumber = peerCurrentBlockNumber; + } + + if (firstNumber <= peerCurrentBlockNumber) { + Collection<String> missingBlocks = bulkIndex + ? indexBlocksUsingBulk(peer, currencyName, firstNumber, lastNumber, progressionModel, isLastCurrent) + : indexBlocksNoBulk(peer, currencyName, firstNumber, lastNumber, progressionModel, isLastCurrent); + + // If some blocks are missing, try to get it using other peers + if (CollectionUtils.isNotEmpty(missingBlocks)) { + progressionModel.setTask(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.otherPeers.task", currencyName)); + missingBlocks = indexMissingBlocksFromOtherPeers(peer, peerCurrentBlock, missingBlocks, 1); + } + + if (CollectionUtils.isEmpty(missingBlocks)) { + logger.info(I18n.t("duniter4j.blockIndexerService.indexBlocksRange.succeed", currencyName, peer, firstNumber, lastNumber, (System.currentTimeMillis() - timeStart))); + progressionModel.setStatus(ProgressionModel.Status.SUCCESS); + } + else { + logger.warn(String.format("[%s] [%s] Could not indexed some blocks from range [%s-%s]. Missing %s blocks.", currencyName, peer, firstNumber, lastNumber, missingBlocks.size())); + progressionModel.setStatus(ProgressionModel.Status.FAILED); + } + } + else { + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s] Invalid block range [%s-%s]. Current block number is #%s", currencyName, peer, firstNumber, lastNumber, peerCurrentBlockNumber)); + } + progressionModel.setStatus(ProgressionModel.Status.SUCCESS); + } + } + } catch(Exception e) { + logger.error("Error during indexBlocksRange: " + e.getMessage(), e); progressionModel.setStatus(ProgressionModel.Status.FAILED); } @@ -403,9 +479,24 @@ public class BlockchainService extends AbstractService { } + + public void deleteRange(final String currencyName, final int fromBlock, int toBlock) { + int maxBlock = blockDao.getMaxBlockNumber(currencyName); + + boolean isLastBlock = toBlock >= maxBlock; + + blockDao.deleteRange(currencyName, fromBlock, (isLastBlock ? maxBlock : toBlock)); + + // Delete current also, if last block + if (isLastBlock) { + blockDao.deleteById(currencyName, CURRENT_BLOCK_ID); + } + + } + /* -- Internal methods -- */ - private Collection<String> indexBlocksNoBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel) { + private Collection<String> indexBlocksNoBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel, boolean isLastCurrent) { Set<String> missingBlockNumbers = new LinkedHashSet<>(); for (int curNumber = firstNumber; curNumber <= lastNumber; curNumber++) { @@ -429,7 +520,7 @@ public class BlockchainService extends AbstractService { blockDao.create(currencyName, getBlockId(curNumber), blockAsJson.getBytes(), true /*wait*/); // If last block - if (curNumber == lastNumber - 1) { + if (isLastCurrent && curNumber == lastNumber - 1) { // update the current block indexCurrentBlockFromJson(currencyName, blockAsJson, true /*wait*/); } @@ -443,7 +534,8 @@ public class BlockchainService extends AbstractService { return missingBlockNumbers; } - private Collection<String> indexBlocksUsingBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel) { + private Collection<String> indexBlocksUsingBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel, + boolean isLastCurrentNumber) { Set<String> missingBlockNumbers = new LinkedHashSet<>(); boolean debug = logger.isDebugEnabled(); @@ -504,7 +596,7 @@ public class BlockchainService extends AbstractService { } // If last block : also update the current block - if (itemNumber == lastNumber) { + if (isLastCurrentNumber && itemNumber == lastNumber) { currentBlockJson = blockAsJson; } } @@ -599,7 +691,7 @@ public class BlockchainService extends AbstractService { // Remove current blocks range newMissingBlocks.remove(blockNumberStr); - Collection<String> bulkMissingBlocks = indexBlocksUsingBulk(childPeer, currencyName, firstNumber, lastNumber, new ProgressionModelImpl()); + Collection<String> bulkMissingBlocks = indexBlocksUsingBulk(childPeer, currencyName, firstNumber, lastNumber, new ProgressionModelImpl(), true); // Re add if new missing blocks if (CollectionUtils.isNotEmpty(bulkMissingBlocks)) { @@ -732,7 +824,7 @@ public class BlockchainService extends AbstractService { blockDao.deleteRange(currencyName, forkOriginNumber/*from*/, number+forkResyncWindow/*to*/); // Re-indexing blocks - indexBlocksUsingBulk(peer, currencyName, forkOriginNumber/*from*/, number, nullProgressionModel); + indexBlocksUsingBulk(peer, currencyName, forkOriginNumber/*from*/, number, nullProgressionModel, true); } return true; // sync OK diff --git a/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_en_GB.properties b/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_en_GB.properties index c904ed6f..4ada9bb9 100644 --- a/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_en_GB.properties +++ b/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_en_GB.properties @@ -4,6 +4,9 @@ duniter4j.blockIndexerService.detectFork.invalidBlockchain=[%s] [%s] Peer has an duniter4j.blockIndexerService.detectFork.remoteBlockNotFound=[%s] [%s] Unable to get block \#%s from peer\: %s duniter4j.blockIndexerService.detectFork.resync=[%s] [%s] Rollback index from block \#%s, and resync duniter4j.blockIndexerService.indexBlock=[%s] [%s] Indexing block \#%s - hash [%s] +duniter4j.blockIndexerService.indexBlocksRange.remoteParametersError= +duniter4j.blockIndexerService.indexBlocksRange.succeed= +duniter4j.blockIndexerService.indexBlocksRange.task= duniter4j.blockIndexerService.indexLastBlocks.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping last blocks indexation. duniter4j.blockIndexerService.indexLastBlocks.otherPeers.task=Indexing missing blocks of [%s] from other peers duniter4j.blockIndexerService.indexLastBlocks.progress=[%s] [%s] Indexing block \#%s / %s (%s%%)... diff --git a/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_fr_FR.properties b/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_fr_FR.properties index 7d302de6..a2854149 100644 --- a/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_fr_FR.properties +++ b/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_fr_FR.properties @@ -4,6 +4,11 @@ duniter4j.blockIndexerService.detectFork.invalidBlockchain=[%s] [%s] Peer has an duniter4j.blockIndexerService.detectFork.remoteBlockNotFound=[%s] [%s] Unable to get block \#%s from peer\: %s duniter4j.blockIndexerService.detectFork.resync=[%s] [%s] Rollback index from block \#%s, and resync duniter4j.blockIndexerService.indexBlock=[%s] [%s] Indexing block \#%s - hash [%s] +duniter4j.blockIndexerService.indexBlocksRange.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping blocks range indexation. +duniter4j.blockIndexerService.indexBlocksRange.remoteParametersError=[%s] Error when calling [/blockchain/parameters]\: %s +duniter4j.blockIndexerService.indexBlocksRange.stopped=[%s] [%s] Indexing blocks from range - stopped +duniter4j.blockIndexerService.indexBlocksRange.succeed=[%s] [%s] Blocks [%s-%s] indexed [%s ms] +duniter4j.blockIndexerService.indexBlocksRange.task=[%s] [%s] Indexing block [%s-%s]... duniter4j.blockIndexerService.indexLastBlocks.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping last blocks indexation. duniter4j.blockIndexerService.indexLastBlocks.otherPeers.task=Indexing missing blocks of [%s] from other peers duniter4j.blockIndexerService.indexLastBlocks.progress=[%s] [%s] Indexing block \#%s / %s (%s%%)... -- GitLab