Commit 0fbfe814 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] #1076 New block & fork resolution

parent 2dea73ad
......@@ -228,8 +228,6 @@ export class DuniterBlockchain extends MiscIndexedBlockchain {
const dbb = DBBlock.fromBlockDTO(block)
this.updateBlocksComputedVars(current, dbb)
// Saves the block (DAL)
await dal.saveBlock(dbb);
// --> Update links
await dal.updateWotbLinks(indexes.cindex);
......@@ -244,6 +242,8 @@ export class DuniterBlockchain extends MiscIndexedBlockchain {
await this.deleteTransactions(block, dal);
await dal.trimSandboxes(block);
// Saves the block (DAL)
await dal.saveBlock(dbb);
return block;
}
......@@ -370,6 +370,8 @@ export class DuniterBlockchain extends MiscIndexedBlockchain {
// Restore block's transaction as incoming transactions
await this.undoDeleteTransactions(block, dal)
return block
}
async undoMembersUpdate(blockstamp:string, dal:any) {
......
......@@ -6,68 +6,94 @@ export interface SwitchBlock {
medianTime:number
}
export interface SwitcherDao {
export interface SwitcherDao<T extends SwitchBlock> {
getCurrent(): SwitchBlock
getPotentials(numberStart:number, timeStart:number): SwitchBlock[]
getBlockchainBlock(number:number, hash:string): SwitchBlock|null
getSandboxBlock(number:number, hash:string): SwitchBlock|null
revertTo(number:number): SwitchBlock[]
addBlock(block:SwitchBlock): SwitchBlock
getCurrent(): Promise<T>
getPotentials(numberStart:number, timeStart:number): Promise<T[]>
getBlockchainBlock(number:number, hash:string): Promise<T|null>
getSandboxBlock(number:number, hash:string): Promise<T|null>
revertTo(number:number): Promise<T[]>
addBlock(block:T): Promise<T>
}
export class Switcher {
export class Switcher<T extends SwitchBlock> {
constructor(private dao:SwitcherDao, private avgGenTime:number, private forkWindowSize:number) {}
constructor(
private dao:SwitcherDao<T>,
private avgGenTime:number,
private forkWindowSize:number,
private switchOnHeadAdvance:number,
private logger:any = undefined) {}
/**
* Looks at known blocks in the sandbox and try to follow the longest resulting chain that has at least both 3 blocks of
* advance and 3 * avgGenTime of medianTime advancce.
* @returns {SwitchBlock}
*/
tryToFork() {
const current = this.dao.getCurrent()
const numberStart = current.number + 3
const timeStart = current.medianTime + 3 * this.avgGenTime
// Phase 1: find potential chains
const suites = this.findPotentialSuites(current, numberStart, timeStart)
// Phase 2: select the best chain
let longestChain:null|SwitchBlock[] = this.findLongestChain(suites)
// Phase 3: a best exist?
if (longestChain) {
const chainHEAD = longestChain[longestChain.length - 1]
// apply it if it respects the 3-3 rule
if (chainHEAD.number >= numberStart && chainHEAD.medianTime >= timeStart) {
this.switchOnChain(longestChain)
async tryToFork() {
const current = await this.dao.getCurrent()
if (current) {
const numberStart = current.number + this.switchOnHeadAdvance
const timeStart = current.medianTime + this.switchOnHeadAdvance * this.avgGenTime
// Phase 1: find potential chains
const suites = await this.findPotentialSuites(current, numberStart, timeStart)
if (suites.length) {
this.logger && this.logger.info("Fork resolution: %s potential suite(s) found...", suites.length)
}
// Phase 2: select the best chain
let longestChain:null|T[] = await this.findLongestChain(current, suites)
// Phase 3: a best exist?
if (longestChain) {
const chainHEAD = longestChain[longestChain.length - 1]
// apply it if it respects the 3-3 rule
if (chainHEAD.number >= numberStart && chainHEAD.medianTime >= timeStart) {
await this.switchOnChain(longestChain)
return await this.dao.getCurrent()
}
}
}
return this.dao.getCurrent()
return null
}
/**
* Find all the suites' HEAD that we could potentially fork on, in the current fork window.
* @param current
*/
async findPotentialSuitesHeads(current:T) {
const numberStart = current.number - this.forkWindowSize
const timeStart = current.medianTime - this.forkWindowSize * this.avgGenTime
const suites = await this.findPotentialSuites(current, numberStart, timeStart)
return suites.map(suite => suite[suite.length - 1])
}
/**
* Looks at the potential blocks that could form fork chains in the sandbox, and sort them to have a maximum of unique
* chains.
* @param {SwitchBlock} current HEAD of local blockchain.
* @param {number} numberStart The minimum number of a fork block.
* @param {number} timeStart The minimum medianTime of a fork block.
* @param numberStart The minimum number of a fork block.
* @param timeStart The minimum medianTime of a fork block.
* @returns {SwitchBlock[][]} The suites found.
*/
private findPotentialSuites(current:SwitchBlock, numberStart:number, timeStart:number) {
const suites:SwitchBlock[][] = []
const potentials:SwitchBlock[] = this.dao.getPotentials(numberStart, timeStart)
const invalids: { [hash:string]: SwitchBlock } = {}
private async findPotentialSuites(current:T, numberStart:number, timeStart:number) {
const suites:T[][] = []
const potentials:T[] = await this.dao.getPotentials(numberStart, timeStart)
const invalids: { [hash:string]: T } = {}
if (potentials.length) {
this.logger && this.logger.info("Fork resolution: %s potential block(s) found...", potentials.length)
}
for (const candidate of potentials) {
const suite:SwitchBlock[] = []
const suite:T[] = []
// Do not process the block if it is already known as invalid (has no fork point with current blockchain or misses
// some blocks) or is already contained in a valid chain.
if (!invalids[candidate.hash] && !Switcher.suitesContains(suites, candidate)) {
// Tries to build up a full chain that is linked to current chain by a fork point.
let previous:SwitchBlock|null = candidate, commonRootFound = false
while (previous && previous.number > current.number - this.forkWindowSize) {
let previous:T|null = candidate, commonRootFound = false
let previousNumber:number = previous.number - 1
let previousHash:string = previous.previousHash
while (previous && previous.number > candidate.number - this.forkWindowSize) {
suite.push(previous)
const previousNumber = previous.number - 1
const previousHash = previous.previousHash
previous = this.dao.getBlockchainBlock(previousNumber, previousHash)
previousNumber = previous.number - 1
previousHash = previous.previousHash
previous = await this.dao.getBlockchainBlock(previousNumber, previousHash)
if (previous) {
// Stop the loop: common block has been found
previous = null
......@@ -75,13 +101,19 @@ export class Switcher {
commonRootFound = true
} else {
// Have a look in sandboxes
previous = this.dao.getSandboxBlock(previousNumber, previousHash)
previous = await this.dao.getSandboxBlock(previousNumber, previousHash)
}
}
// Forget about invalid blocks
if (!commonRootFound) {
for (const b of suite) {
invalids[b.hash] = b
if (!previous) {
this.logger && this.logger.debug("Suite -> %s-%s missing block#%s-%s", candidate.number, candidate.hash.substr(0, 8), previousNumber, previousHash.substr(0, 8))
for (const b of suite) {
invalids[b.hash] = b
}
} else {
// The chain would be too long, we could not revert correctly the chain.
this.logger && this.logger.debug("Suite #%s-%s -> %s-%s out of fork window", previousNumber, previousHash.substr(0, 8), candidate.number, candidate.hash.substr(0, 8))
}
}
}
......@@ -92,37 +124,52 @@ export class Switcher {
/**
* Find the longest chain among a suite of chains. Tests the validity of each block against the current blockchain.
* The length of a chain is the number of blocks successfuly added to current blockchain.
* @param {SwitchBlock} current
* @param {SwitchBlock[][]} suites
* @returns {SwitchBlock[]}
*/
private findLongestChain(suites:SwitchBlock[][]) {
let longestChain:null|SwitchBlock[] = null
private async findLongestChain(current:T, suites:T[][]) {
if (suites.length) {
this.logger && this.logger.info("Fork resolution: HEAD = block#%s", current.number)
}
let longestChain:null|T[] = null
let j = 0
for (const s of suites) {
j++
s.reverse()
// Revert current blockchain to fork point
const reverted = this.dao.revertTo(s[0].number - 1)
const forkPoint = s[0].number - 1
const forkHead = s[s.length - 1]
this.logger && this.logger.info("Fork resolution: suite %s/%s (-> #%s-%s) revert to fork point block#%s", j, suites.length, forkHead.number, forkHead.hash.substr(0, 6), forkPoint)
const reverted = await this.dao.revertTo(s[0].number - 1)
// Try to add a maximum of blocks
let added = true, i = 0, successfulBlocks:SwitchBlock[] = []
while (added) {
let added = true, i = 0, successfulBlocks:T[] = []
while (added && i < s.length) {
try {
this.dao.addBlock(s[i])
await this.dao.addBlock(s[i])
this.logger && this.logger.info("Fork resolution: suite %s/%s added block#%s-%s", j, suites.length, s[i].number, s[i].hash)
successfulBlocks.push(s[i])
} catch (e) {
this.logger && this.logger.info("Fork resolution: suite %s/%s REFUSED block#%s: %s", j, suites.length, s[0].number + i, e && e.message)
added = false
}
i++
}
// Pop the successfuly added blocks
if (successfulBlocks.length) {
this.dao.revertTo(this.dao.getCurrent().number - successfulBlocks.length)
const addedToHeadLevel = successfulBlocks[successfulBlocks.length-1].number - current.number
this.logger && this.logger.info("Fork resolution: suite %s/%s reached HEAD + %s. Now rolling back.", j, suites.length, addedToHeadLevel)
await this.dao.revertTo(forkPoint)
}
// Push back the initial blocks that were temporarily reverted
reverted.reverse()
for (const b of reverted) {
this.dao.addBlock(b)
await this.dao.addBlock(b)
}
// Remember the chain if it is the longest among tested chains
if ((!longestChain && successfulBlocks.length > 0) || (longestChain && longestChain.length < successfulBlocks.length)) {
// Remember the chain if it is the longest (highest HEAD) among tested chains
const longestHEAD = longestChain && longestChain[longestChain.length - 1]
const successHEAD = successfulBlocks && successfulBlocks[successfulBlocks.length - 1]
if ((!longestHEAD && successHEAD) || (longestHEAD && successHEAD && longestHEAD.number < successHEAD.number)) {
longestChain = successfulBlocks
}
}
......@@ -133,10 +180,10 @@ export class Switcher {
* Switch current blockchain on another chain, by poping top blocks and replacing them by new ones.
* @param {SwitchBlock[]} chain
*/
private switchOnChain(chain:SwitchBlock[]) {
this.dao.revertTo(chain[0].number - 1)
private async switchOnChain(chain:T[]) {
await this.dao.revertTo(chain[0].number - 1)
for (const b of chain) {
this.dao.addBlock(b)
await this.dao.addBlock(b)
}
}
......@@ -144,9 +191,8 @@ export class Switcher {
* Checks if a suite of chains contains a particular block in one of its chains.
* @param {SwitchBlock[][]} suites
* @param {SwitchBlock} block
* @returns {boolean}
*/
static suitesContains(suites:SwitchBlock[][], block:SwitchBlock) {
static suitesContains<T extends SwitchBlock>(suites:T[][], block:T) {
for (const suite of suites) {
for (const b of suite) {
if (b.number === block.number && b.hash === block.hash) {
......
......@@ -138,6 +138,9 @@ export const CommonConstants = {
TOO_OLD_MEMBERSHIP: { httpCode: 400, uerr: { ucode: 2029, message: "Too old membership." }},
MAXIMUM_LEN_OF_OUTPUT: { httpCode: 400, uerr: { ucode: 2032, message: 'A transaction output has a maximum size of ' + MAXIMUM_LEN_OF_OUTPUT + ' characters' }},
MAXIMUM_LEN_OF_UNLOCK: { httpCode: 400, uerr: { ucode: 2033, message: 'A transaction unlock has a maximum size of ' + MAXIMUM_LEN_OF_UNLOCK + ' characters' }},
WRONG_CURRENCY: { httpCode: 400, uerr: { ucode: 2500, message: 'Wrong currency' }},
WRONG_POW: { httpCode: 400, uerr: { ucode: 2501, message: 'Wrong proof-of-work' }},
OUT_OF_FORK_WINDOW: { httpCode: 400, uerr: { ucode: 2501, message: 'Out of fork window' }},
WRONG_SIGNATURE_FOR_CERT: { httpCode: 400, uerr: { ucode: 3000, message: 'Wrong signature for certification' }},
},
......
......@@ -3,6 +3,7 @@ import {BlockDTO} from "../dto/BlockDTO"
import {DuniterBlockchain} from "../blockchain/DuniterBlockchain"
import {QuickSynchronizer} from "./QuickSync"
import {DBHead} from "../db/DBHead"
const _ = require('underscore');
const indexer = require('../indexer').Indexer
const constants = require('../constants');
......@@ -100,11 +101,11 @@ export class BlockchainContext {
this.logger = require('../logger').NewLogger(this.dal.profile);
}
checkBlock(block: BlockDTO, withPoWAndSignature:boolean): Promise<any> {
async checkBlock(block: BlockDTO, withPoWAndSignature:boolean): Promise<any> {
return DuniterBlockchain.checkBlock(block, withPoWAndSignature, this.conf, this.dal)
}
async addBlock(obj: BlockDTO, index: any = null, HEAD: DBHead | null = null): Promise<any> {
private async addBlock(obj: BlockDTO, index: any = null, HEAD: DBHead | null = null): Promise<any> {
const block = await this.blockchain.pushTheBlock(obj, index, HEAD, this.conf, this.dal, this.logger)
this.vHEAD_1 = this.vHEAD = this.HEADrefreshed = null
return block
......@@ -115,11 +116,10 @@ export class BlockchainContext {
return dbb.toBlockDTO()
}
async revertCurrentBlock(): Promise<any> {
async revertCurrentBlock(): Promise<BlockDTO> {
const head_1 = await this.dal.bindexDAL.head(1);
this.logger.debug('Reverting block #%s...', head_1.number);
const res = await this.blockchain.revertBlock(head_1.number, head_1.hash, this.dal)
this.logger.debug('Reverted block #%s', head_1.number);
// Invalidates the head, since it has changed.
await this.refreshHead();
return res;
......@@ -133,11 +133,15 @@ export class BlockchainContext {
throw constants.ERRORS.NO_POTENTIAL_FORK_AS_NEXT;
}
const block = forks[0];
const { index, HEAD } = await this.checkBlock(block, constants.WITH_SIGNATURES_AND_POW);
await this.addBlock(block, index, HEAD);
await this.checkAndAddBlock(block)
this.logger.debug('Applied block #%s', block.number);
}
async checkAndAddBlock(block:BlockDTO) {
const { index, HEAD } = await this.checkBlock(block, constants.WITH_SIGNATURES_AND_POW);
return await this.addBlock(block, index, HEAD);
}
current(): Promise<any> {
return this.dal.getCurrentBlockOrNull()
}
......
"use strict";
import {CommonConstants} from "./common-libs/constants"
import {OtherConstants} from "./other_constants"
const UDID2 = "udid2;c;([A-Z-]*);([A-Z-]*);(\\d{4}-\\d{2}-\\d{2});(e\\+\\d{2}\\.\\d{2}(\\+|-)\\d{3}\\.\\d{2});(\\d+)(;?)";
const PUBKEY = CommonConstants.FORMATS.PUBKEY
......@@ -160,7 +161,7 @@ module.exports = {
SAFE_FACTOR: 3,
BLOCKS_COLLECT_THRESHOLD: 30, // Blocks to collect from memory and persist
MUTE_LOGS_DURING_UNIT_TESTS: true,
MUTE_LOGS_DURING_UNIT_TESTS: OtherConstants.MUTE_LOGS_DURING_UNIT_TESTS,
SANDBOX_SIZE_TRANSACTIONS: 200,
SANDBOX_SIZE_IDENTITIES: 5000,
......
......@@ -223,6 +223,10 @@ export class FileDAL {
return this.getBlock(0)
}
getPotentialRootBlocks() {
return this.blockDAL.getPotentialRoots()
}
lastBlockOfIssuer(issuer:string) {
return this.blockDAL.lastBlockOfIssuer(issuer);
}
......@@ -239,6 +243,10 @@ export class FileDAL {
return this.blockDAL.getNextForkBlocks(current.number, current.hash)
}
getPotentialForkBlocks(numberStart:number, medianTimeStart:number) {
return this.blockDAL.getPotentialForkBlocks(numberStart, medianTimeStart)
}
async getBlockCurrent() {
const current = await this.blockDAL.getCurrent();
if (!current)
......
import {AbstractSQLite} from "./AbstractSQLite"
import {SQLiteDriver} from "../drivers/SQLiteDriver"
import {DBBlock} from "../../db/DBBlock"
const constants = require('../../constants');
const IS_FORK = true;
......@@ -115,6 +116,14 @@ export class BlockDAL extends AbstractSQLite<DBBlock> {
return this.query('SELECT * FROM block WHERE fork ORDER BY number');
}
getPotentialForkBlocks(numberStart:number, medianTimeStart:number) {
return this.query('SELECT * FROM block WHERE fork AND number >= ? AND medianTime >= ? ORDER BY number DESC', [numberStart, medianTimeStart]);
}
getPotentialRoots() {
return this.query('SELECT * FROM block WHERE fork AND number = ?', [0])
}
getDividendBlocks() {
return this.query('SELECT * FROM block WHERE dividend IS NOT NULL ORDER BY number');
}
......
......@@ -1066,7 +1066,8 @@ export class Indexer {
nbPersonalBlocksInFrame = 0;
medianOfBlocksInFrame = 1;
} else {
const blocksInFrame = _.filter(await range(1, HEAD_1.issuersFrame), (b:BlockDTO) => b.number <= HEAD_1.number);
const ranged = await range(1, HEAD_1.issuersFrame)
const blocksInFrame = _.filter(ranged, (b:BlockDTO) => b.number <= HEAD_1.number);
const issuersInFrame = blocksInFrame.map((b:BlockDTO) => b.issuer);
blocksOfIssuer = _.filter(blocksInFrame, (entry:BlockDTO) => entry.issuer == HEAD.issuer);
nbPersonalBlocksInFrame = count(blocksOfIssuer);
......@@ -1335,7 +1336,9 @@ export class Indexer {
// BR_G72
static ruleCertificationSignature(cindex: CindexEntry[]) {
for (const ENTRY of cindex) {
if (!ENTRY.sigOK) return false;
if (!ENTRY.sigOK) {
return false
}
}
return true
}
......
export const OtherConstants = {
MUTE_LOGS_DURING_UNIT_TESTS: true
}
\ No newline at end of file
......@@ -27,11 +27,15 @@ export const LOCAL_RULES_FUNCTIONS = {
return true;
},
checkProofOfWork: async (block:BlockDTO) => {
isProofOfWorkCorrect: (block:BlockDTO) => {
let remainder = block.powMin % 16;
let nb_zeros = (block.powMin - remainder) / 16;
const powRegexp = new RegExp('^0{' + nb_zeros + '}');
if (!block.hash.match(powRegexp)) {
return !!block.hash.match(powRegexp)
},
checkProofOfWork: async (block:BlockDTO) => {
if (!LOCAL_RULES_FUNCTIONS.isProofOfWorkCorrect(block)) {
throw Error('Not a proof-of-work');
}
return true;
......
......@@ -405,7 +405,7 @@ export class BlockCrawler {
async applyMainBranch(block: BlockDTO): Promise<boolean> {
const existing = await server.dal.getAbsoluteBlockByNumberAndHash(block.number, block.hash)
if (!existing) {
let addedBlock = await server.writeBlock(block, false)
let addedBlock = await server.writeBlock(block, false, true)
if (!this.lastDownloaded) {
this.lastDownloaded = await dao.remoteCurrent(node);
}
......@@ -417,9 +417,6 @@ export class BlockCrawler {
server.streamPush(addedBlock);
}
}
} else {
this.crawler.logger && this.crawler.logger.info('Downloaded already known block#%s-%s, try to fork...', block.number, block.hash)
await server.BlockchainService.tryToFork()
}
return true
}
......@@ -427,8 +424,7 @@ export class BlockCrawler {
return true
}
async isMemberPeer(thePeer: PeerDTO): Promise<boolean> {
let idty = await server.dal.getWrittenIdtyByPubkey(thePeer.pubkey);
return (idty && idty.member) || false;
return true
}
async downloadBlocks(thePeer: any, fromNumber: number, count?: number | undefined): Promise<BlockDTO[]> {
if (!count) {
......@@ -444,8 +440,7 @@ export class BlockCrawler {
return blocks;
}
})(this)
await dao.pull(server.conf, server.logger);
await dao.pull(server.conf, server.logger)
} catch (e) {
if (this.isConnectionError(e)) {
this.logger && this.logger.info("Peer %s unreachable: now considered as DOWN.", p.pubkey);
......@@ -459,6 +454,12 @@ export class BlockCrawler {
}
}
}))
await this.server.BlockchainService.pushFIFO("crawlerResolution", async () => {
await server.BlockchainService.blockResolution()
await server.BlockchainService.forkResolution()
})
this.pullingEvent(server, 'end', current.number);
}
this.logger && this.logger.info('Will pull blocks from the network in %s min %s sec', Math.floor(this.pullingActualIntervalDuration / 60), Math.floor(this.pullingActualIntervalDuration % 60));
......
......@@ -258,7 +258,7 @@ export class Synchroniser extends stream.Duplex {
return block;
}
async applyMainBranch(block: BlockDTO): Promise<boolean> {
const addedBlock = await this.BlockchainService.submitBlock(block, true, CrawlerConstants.FORK_ALLOWED);
const addedBlock = await this.BlockchainService.submitBlock(block)
this.server.streamPush(addedBlock);
this.watcher.appliedPercent(Math.floor(block.number / to * 100));
return true
......
......@@ -11,12 +11,13 @@ import {GLOBAL_RULES_HELPERS} from "../lib/rules/global_rules"
import {parsers} from "../lib/common-libs/parsers/index"
import {HttpIdentityRequirement} from "../modules/bma/lib/dtos"
import {FIFOService} from "./FIFOService"
import {CommonConstants} from "../lib/common-libs/constants"
import {LOCAL_RULES_FUNCTIONS} from "../lib/rules/local_rules"
import {Switcher, SwitcherDao} from "../lib/blockchain/Switcher"
const _ = require('underscore');
const constants = require('../lib/constants');
const CHECK_ALL_RULES = true;
export class BlockchainService extends FIFOService {
mainContext:BlockchainContext
......@@ -25,16 +26,70 @@ export class BlockchainService extends FIFOService {
logger:any
selfPubkey:string
quickSynchronizer:QuickSynchronizer
switcherDao:SwitcherDao<BlockDTO>
constructor(private server:any, fifoPromiseHandler:GlobalFifoPromise) {
super(fifoPromiseHandler)
this.mainContext = new BlockchainContext()
this.switcherDao = new (class ForkDao implements SwitcherDao<BlockDTO> {
constructor(private bcService:BlockchainService) {}
getCurrent(): Promise<BlockDTO> {
return this.bcService.current()
}
async getPotentials(numberStart: number, timeStart: number): Promise<BlockDTO[]> {
const blocks = await this.bcService.dal.getPotentialForkBlocks(numberStart, timeStart)
return blocks.map((b:any) => BlockDTO.fromJSONObject(b))
}
async getBlockchainBlock(number: number, hash: string): Promise<BlockDTO | null> {
try {
return BlockDTO.fromJSONObject(await this.bcService.dal.getBlockByNumberAndHash(number, hash))
} catch (e) {
return null
}
}
async getSandboxBlock(number: number, hash: string): Promise<BlockDTO | null> {
const block = await this.bcService.dal.getAbsoluteBlockByNumberAndHash(number, hash)
if (block && block.fork) {
return BlockDTO.fromJSONObject(block)
} else {
return null
}
}
async revertTo(number: number): Promise<BlockDTO[]> {
const blocks:BlockDTO[] = []
const current = await this.bcService.current();
for (let i = 0, count = current.number - number; i < count; i++) {
const reverted = await this.bcService.mainContext.revertCurrentBlock()
blocks.push(BlockDTO.fromJSONObject(reverted))
}
if (current.number < number) {
throw "Already below this number"
}
return blocks
}
async addBlock(block: BlockDTO): Promise<BlockDTO> {
return await this.bcService.mainContext.checkAndAddBlock(block)
}
})(this)
}
/**
* Mandatory with stream.Readable
* @private
*/
_read() {}
getContext() {
return this.mainContext
}