Skip to content
Snippets Groups Projects
Commit d7a5d91d authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[fix] Allow to reindex a block range - Workaround for Cesium issue #656

parent aaa4b379
No related branches found
No related tags found
No related merge requests found
...@@ -126,7 +126,8 @@ duniter.blockchain.enable: true ...@@ -126,7 +126,8 @@ duniter.blockchain.enable: true
# Force blockchain full synchronization - /!\ WARNING: all user events will be reset to 'unread' # Force blockchain full synchronization - /!\ WARNING: all user events will be reset to 'unread'
# #
# duniter.blockchain.reload: true # duniter.blockchain.reload: true
# duniter.blockchain.reload.from: 50999 # duniter.blockchain.reload.from: 18900
# duniter.blockchain.reload.to: 19000
# #
# Duniter node address # Duniter node address
# #
......
...@@ -125,8 +125,9 @@ duniter.blockchain.enable: true ...@@ -125,8 +125,9 @@ duniter.blockchain.enable: true
# #
# Force blockchain full synchronization - /!\ WARNING: all user events will be reset to 'unread' # Force blockchain full synchronization - /!\ WARNING: all user events will be reset to 'unread'
# #
# duniter.blockchain.reload: true duniter.blockchain.reload: true
# duniter.blockchain.reload.from: 50999 duniter.blockchain.reload.from: 18900
duniter.blockchain.reload.to: 19000
# #
# Duniter node address # Duniter node address
# #
......
...@@ -207,12 +207,27 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -207,12 +207,27 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
// If partial reload (from a block) // If partial reload (from a block)
if (pluginSettings.reloadBlockchainIndices() && pluginSettings.reloadBlockchainIndicesFrom() > 0) { if (pluginSettings.reloadBlockchainIndices() && pluginSettings.reloadBlockchainIndicesFrom() > 0) {
if (logger.isWarnEnabled()) { // Delete blocs range [from,to]
logger.warn(String.format("/!\\ Re-indexing blockchain from block #%s...", pluginSettings.reloadBlockchainIndicesFrom())); if (pluginSettings.reloadBlockchainIndicesTo() > pluginSettings.reloadBlockchainIndicesFrom()) {
if (logger.isWarnEnabled()) {
logger.warn(String.format("/!\\ Re-indexing blockchain range [%s-%s]...",
pluginSettings.reloadBlockchainIndicesFrom(),
pluginSettings.reloadBlockchainIndicesTo()));
}
injector.getInstance(BlockchainService.class)
.deleteRange(currencyName,
pluginSettings.reloadBlockchainIndicesFrom(),
pluginSettings.reloadBlockchainIndicesTo());
} }
else {
if (logger.isWarnEnabled()) {
logger.warn(String.format("/!\\ Re-indexing blockchain from block #%s...", pluginSettings.reloadBlockchainIndicesFrom()));
}
injector.getInstance(BlockchainService.class) injector.getInstance(BlockchainService.class)
.deleteFrom(currencyName, pluginSettings.reloadBlockchainIndicesFrom()); .deleteFrom(currencyName, pluginSettings.reloadBlockchainIndicesFrom());
}
} }
else { else {
if (logger.isInfoEnabled()) { if (logger.isInfoEnabled()) {
...@@ -224,6 +239,16 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { ...@@ -224,6 +239,16 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
// Wait end of currency index creation, then index blocks // Wait end of currency index creation, then index blocks
threadPool.scheduleOnClusterReady(() -> { threadPool.scheduleOnClusterReady(() -> {
// Reindex range
if (pluginSettings.reloadBlockchainIndices()
&& pluginSettings.reloadBlockchainIndicesFrom() > 0
&& pluginSettings.reloadBlockchainIndicesTo() > pluginSettings.reloadBlockchainIndicesFrom()) {
injector.getInstance(BlockchainService.class)
.indexBlocksRange(peer,
pluginSettings.reloadBlockchainIndicesFrom(),
pluginSettings.reloadBlockchainIndicesTo());
}
try { try {
// Index blocks (and listen if new block appear) // Index blocks (and listen if new block appear)
injector.getInstance(BlockchainService.class) injector.getInstance(BlockchainService.class)
......
...@@ -201,6 +201,9 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { ...@@ -201,6 +201,9 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
public int reloadBlockchainIndicesFrom() { public int reloadBlockchainIndicesFrom() {
return settings.getAsInt("duniter.blockchain.reload.from", 0); return settings.getAsInt("duniter.blockchain.reload.from", 0);
} }
public int reloadBlockchainIndicesTo() {
return settings.getAsInt("duniter.blockchain.reload.to", -1);
}
public File getTempDirectory() { public File getTempDirectory() {
return Configuration.instance().getTempDirectory(); return Configuration.instance().getTempDirectory();
......
...@@ -198,8 +198,8 @@ public class BlockchainService extends AbstractService { ...@@ -198,8 +198,8 @@ public class BlockchainService extends AbstractService {
if (startNumber <= peerCurrentBlockNumber) { if (startNumber <= peerCurrentBlockNumber) {
Collection<String> missingBlocks = bulkIndex Collection<String> missingBlocks = bulkIndex
? indexBlocksUsingBulk(peer, currencyName, startNumber, peerCurrentBlockNumber, progressionModel) ? indexBlocksUsingBulk(peer, currencyName, startNumber, peerCurrentBlockNumber, progressionModel, true)
: indexBlocksNoBulk(peer, currencyName, startNumber, peerCurrentBlockNumber, progressionModel); : indexBlocksNoBulk(peer, currencyName, startNumber, peerCurrentBlockNumber, progressionModel, true);
// If some blocks are missing, try to get it using other peers // If some blocks are missing, try to get it using other peers
if (CollectionUtils.isNotEmpty(missingBlocks)) { if (CollectionUtils.isNotEmpty(missingBlocks)) {
...@@ -212,7 +212,7 @@ public class BlockchainService extends AbstractService { ...@@ -212,7 +212,7 @@ public class BlockchainService extends AbstractService {
progressionModel.setStatus(ProgressionModel.Status.SUCCESS); progressionModel.setStatus(ProgressionModel.Status.SUCCESS);
} }
else { else {
logger.warn(String.format("[%s] [%s] Could not indexed allOfToList blocks. Missing %s blocks.", currencyName, peer, missingBlocks.size())); logger.warn(String.format("[%s] [%s] Could not indexed some blocks. Missing %s blocks.", currencyName, peer, missingBlocks.size()));
progressionModel.setStatus(ProgressionModel.Status.FAILED); progressionModel.setStatus(ProgressionModel.Status.FAILED);
} }
} }
...@@ -224,7 +224,83 @@ public class BlockchainService extends AbstractService { ...@@ -224,7 +224,83 @@ public class BlockchainService extends AbstractService {
} }
} }
} catch(Exception e) { } catch(Exception e) {
logger.error("Error during indexBlocksFromNode: " + e.getMessage(), e); logger.error("Error during indexLastBlocks: " + e.getMessage(), e);
progressionModel.setStatus(ProgressionModel.Status.FAILED);
}
return this;
}
public BlockchainService indexBlocksRange(Peer peer, int firstNumber, int lastNumber) {
indexBlocksRange(peer, nullProgressionModel, firstNumber, lastNumber);
return this;
}
public BlockchainService indexBlocksRange(Peer peer, ProgressionModel progressionModel, int firstNumber, int lastNumber) {
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(progressionModel);
Preconditions.checkArgument(firstNumber < lastNumber);
boolean bulkIndex = pluginSettings.isIndexBulkEnable();
progressionModel.setStatus(ProgressionModel.Status.RUNNING);
progressionModel.setTotal(100);
long timeStart = System.currentTimeMillis();
try {
// Get the blockchain name from node
BlockchainParameters parameter = blockchainRemoteService.getParameters(peer);
if (parameter == null) {
progressionModel.setStatus(ProgressionModel.Status.FAILED);
logger.error(I18n.t("duniter4j.blockIndexerService.indexBlocksRange.remoteParametersError",peer));
return this;
}
String currencyName = parameter.getCurrency();
progressionModel.setTask(I18n.t("duniter4j.blockIndexerService.indexBlocksRange.task", currencyName, peer, firstNumber, lastNumber));
logger.info(I18n.t("duniter4j.blockIndexerService.indexBlocksRange.task", currencyName, peer, firstNumber, lastNumber));
// Then index allOfToList blocks
BlockchainBlock peerCurrentBlock = blockchainRemoteService.getCurrentBlock(peer);
if (peerCurrentBlock != null) {
final int peerCurrentBlockNumber = peerCurrentBlock.getNumber();
boolean isLastCurrent = lastNumber >= peerCurrentBlockNumber;
if (lastNumber > peerCurrentBlockNumber) {
lastNumber = peerCurrentBlockNumber;
}
if (firstNumber <= peerCurrentBlockNumber) {
Collection<String> missingBlocks = bulkIndex
? indexBlocksUsingBulk(peer, currencyName, firstNumber, lastNumber, progressionModel, isLastCurrent)
: indexBlocksNoBulk(peer, currencyName, firstNumber, lastNumber, progressionModel, isLastCurrent);
// If some blocks are missing, try to get it using other peers
if (CollectionUtils.isNotEmpty(missingBlocks)) {
progressionModel.setTask(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.otherPeers.task", currencyName));
missingBlocks = indexMissingBlocksFromOtherPeers(peer, peerCurrentBlock, missingBlocks, 1);
}
if (CollectionUtils.isEmpty(missingBlocks)) {
logger.info(I18n.t("duniter4j.blockIndexerService.indexBlocksRange.succeed", currencyName, peer, firstNumber, lastNumber, (System.currentTimeMillis() - timeStart)));
progressionModel.setStatus(ProgressionModel.Status.SUCCESS);
}
else {
logger.warn(String.format("[%s] [%s] Could not indexed some blocks from range [%s-%s]. Missing %s blocks.", currencyName, peer, firstNumber, lastNumber, missingBlocks.size()));
progressionModel.setStatus(ProgressionModel.Status.FAILED);
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug(String.format("[%s] [%s] Invalid block range [%s-%s]. Current block number is #%s", currencyName, peer, firstNumber, lastNumber, peerCurrentBlockNumber));
}
progressionModel.setStatus(ProgressionModel.Status.SUCCESS);
}
}
} catch(Exception e) {
logger.error("Error during indexBlocksRange: " + e.getMessage(), e);
progressionModel.setStatus(ProgressionModel.Status.FAILED); progressionModel.setStatus(ProgressionModel.Status.FAILED);
} }
...@@ -403,9 +479,24 @@ public class BlockchainService extends AbstractService { ...@@ -403,9 +479,24 @@ public class BlockchainService extends AbstractService {
} }
public void deleteRange(final String currencyName, final int fromBlock, int toBlock) {
int maxBlock = blockDao.getMaxBlockNumber(currencyName);
boolean isLastBlock = toBlock >= maxBlock;
blockDao.deleteRange(currencyName, fromBlock, (isLastBlock ? maxBlock : toBlock));
// Delete current also, if last block
if (isLastBlock) {
blockDao.deleteById(currencyName, CURRENT_BLOCK_ID);
}
}
/* -- Internal methods -- */ /* -- Internal methods -- */
private Collection<String> indexBlocksNoBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel) { private Collection<String> indexBlocksNoBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel, boolean isLastCurrent) {
Set<String> missingBlockNumbers = new LinkedHashSet<>(); Set<String> missingBlockNumbers = new LinkedHashSet<>();
for (int curNumber = firstNumber; curNumber <= lastNumber; curNumber++) { for (int curNumber = firstNumber; curNumber <= lastNumber; curNumber++) {
...@@ -429,7 +520,7 @@ public class BlockchainService extends AbstractService { ...@@ -429,7 +520,7 @@ public class BlockchainService extends AbstractService {
blockDao.create(currencyName, getBlockId(curNumber), blockAsJson.getBytes(), true /*wait*/); blockDao.create(currencyName, getBlockId(curNumber), blockAsJson.getBytes(), true /*wait*/);
// If last block // If last block
if (curNumber == lastNumber - 1) { if (isLastCurrent && curNumber == lastNumber - 1) {
// update the current block // update the current block
indexCurrentBlockFromJson(currencyName, blockAsJson, true /*wait*/); indexCurrentBlockFromJson(currencyName, blockAsJson, true /*wait*/);
} }
...@@ -443,7 +534,8 @@ public class BlockchainService extends AbstractService { ...@@ -443,7 +534,8 @@ public class BlockchainService extends AbstractService {
return missingBlockNumbers; return missingBlockNumbers;
} }
private Collection<String> indexBlocksUsingBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel) { private Collection<String> indexBlocksUsingBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel,
boolean isLastCurrentNumber) {
Set<String> missingBlockNumbers = new LinkedHashSet<>(); Set<String> missingBlockNumbers = new LinkedHashSet<>();
boolean debug = logger.isDebugEnabled(); boolean debug = logger.isDebugEnabled();
...@@ -504,7 +596,7 @@ public class BlockchainService extends AbstractService { ...@@ -504,7 +596,7 @@ public class BlockchainService extends AbstractService {
} }
// If last block : also update the current block // If last block : also update the current block
if (itemNumber == lastNumber) { if (isLastCurrentNumber && itemNumber == lastNumber) {
currentBlockJson = blockAsJson; currentBlockJson = blockAsJson;
} }
} }
...@@ -599,7 +691,7 @@ public class BlockchainService extends AbstractService { ...@@ -599,7 +691,7 @@ public class BlockchainService extends AbstractService {
// Remove current blocks range // Remove current blocks range
newMissingBlocks.remove(blockNumberStr); newMissingBlocks.remove(blockNumberStr);
Collection<String> bulkMissingBlocks = indexBlocksUsingBulk(childPeer, currencyName, firstNumber, lastNumber, new ProgressionModelImpl()); Collection<String> bulkMissingBlocks = indexBlocksUsingBulk(childPeer, currencyName, firstNumber, lastNumber, new ProgressionModelImpl(), true);
// Re add if new missing blocks // Re add if new missing blocks
if (CollectionUtils.isNotEmpty(bulkMissingBlocks)) { if (CollectionUtils.isNotEmpty(bulkMissingBlocks)) {
...@@ -732,7 +824,7 @@ public class BlockchainService extends AbstractService { ...@@ -732,7 +824,7 @@ public class BlockchainService extends AbstractService {
blockDao.deleteRange(currencyName, forkOriginNumber/*from*/, number+forkResyncWindow/*to*/); blockDao.deleteRange(currencyName, forkOriginNumber/*from*/, number+forkResyncWindow/*to*/);
// Re-indexing blocks // Re-indexing blocks
indexBlocksUsingBulk(peer, currencyName, forkOriginNumber/*from*/, number, nullProgressionModel); indexBlocksUsingBulk(peer, currencyName, forkOriginNumber/*from*/, number, nullProgressionModel, true);
} }
return true; // sync OK return true; // sync OK
......
...@@ -4,6 +4,9 @@ duniter4j.blockIndexerService.detectFork.invalidBlockchain=[%s] [%s] Peer has an ...@@ -4,6 +4,9 @@ duniter4j.blockIndexerService.detectFork.invalidBlockchain=[%s] [%s] Peer has an
duniter4j.blockIndexerService.detectFork.remoteBlockNotFound=[%s] [%s] Unable to get block \#%s from peer\: %s duniter4j.blockIndexerService.detectFork.remoteBlockNotFound=[%s] [%s] Unable to get block \#%s from peer\: %s
duniter4j.blockIndexerService.detectFork.resync=[%s] [%s] Rollback index from block \#%s, and resync duniter4j.blockIndexerService.detectFork.resync=[%s] [%s] Rollback index from block \#%s, and resync
duniter4j.blockIndexerService.indexBlock=[%s] [%s] Indexing block \#%s - hash [%s] duniter4j.blockIndexerService.indexBlock=[%s] [%s] Indexing block \#%s - hash [%s]
duniter4j.blockIndexerService.indexBlocksRange.remoteParametersError=
duniter4j.blockIndexerService.indexBlocksRange.succeed=
duniter4j.blockIndexerService.indexBlocksRange.task=
duniter4j.blockIndexerService.indexLastBlocks.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping last blocks indexation. duniter4j.blockIndexerService.indexLastBlocks.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping last blocks indexation.
duniter4j.blockIndexerService.indexLastBlocks.otherPeers.task=Indexing missing blocks of [%s] from other peers duniter4j.blockIndexerService.indexLastBlocks.otherPeers.task=Indexing missing blocks of [%s] from other peers
duniter4j.blockIndexerService.indexLastBlocks.progress=[%s] [%s] Indexing block \#%s / %s (%s%%)... duniter4j.blockIndexerService.indexLastBlocks.progress=[%s] [%s] Indexing block \#%s / %s (%s%%)...
......
...@@ -4,6 +4,11 @@ duniter4j.blockIndexerService.detectFork.invalidBlockchain=[%s] [%s] Peer has an ...@@ -4,6 +4,11 @@ duniter4j.blockIndexerService.detectFork.invalidBlockchain=[%s] [%s] Peer has an
duniter4j.blockIndexerService.detectFork.remoteBlockNotFound=[%s] [%s] Unable to get block \#%s from peer\: %s duniter4j.blockIndexerService.detectFork.remoteBlockNotFound=[%s] [%s] Unable to get block \#%s from peer\: %s
duniter4j.blockIndexerService.detectFork.resync=[%s] [%s] Rollback index from block \#%s, and resync duniter4j.blockIndexerService.detectFork.resync=[%s] [%s] Rollback index from block \#%s, and resync
duniter4j.blockIndexerService.indexBlock=[%s] [%s] Indexing block \#%s - hash [%s] duniter4j.blockIndexerService.indexBlock=[%s] [%s] Indexing block \#%s - hash [%s]
duniter4j.blockIndexerService.indexBlocksRange.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping blocks range indexation.
duniter4j.blockIndexerService.indexBlocksRange.remoteParametersError=[%s] Error when calling [/blockchain/parameters]\: %s
duniter4j.blockIndexerService.indexBlocksRange.stopped=[%s] [%s] Indexing blocks from range - stopped
duniter4j.blockIndexerService.indexBlocksRange.succeed=[%s] [%s] Blocks [%s-%s] indexed [%s ms]
duniter4j.blockIndexerService.indexBlocksRange.task=[%s] [%s] Indexing block [%s-%s]...
duniter4j.blockIndexerService.indexLastBlocks.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping last blocks indexation. duniter4j.blockIndexerService.indexLastBlocks.invalidBlockchain=[%s] [%s] Peer has another blockchain (no common blocks \!). Skipping last blocks indexation.
duniter4j.blockIndexerService.indexLastBlocks.otherPeers.task=Indexing missing blocks of [%s] from other peers duniter4j.blockIndexerService.indexLastBlocks.otherPeers.task=Indexing missing blocks of [%s] from other peers
duniter4j.blockIndexerService.indexLastBlocks.progress=[%s] [%s] Indexing block \#%s / %s (%s%%)... duniter4j.blockIndexerService.indexLastBlocks.progress=[%s] [%s] Indexing block \#%s / %s (%s%%)...
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment