Commit f7ffeadb authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] Upgrading to LevelDB instead of Loki or SQLite for several parts

parent a5881dbe
......@@ -2,7 +2,7 @@
GT_TARGET_BLOCK=210000 # This is a fixed block# which determines to the sha1 hashes
GT_IINDEX_CS=dfd2dfc3d4d0ced4c101badb4d4a1ab85de8cbde
GT_MINDEX_CS=9d8f665f5fcf1f21082278c4787bb3df085ff109
GT_MINDEX_CS=d867b887663cdfad8ac42dacc6081d638eea0976
GT_CINDEX_CS=b141361fb40f4c13f03f4640151c7674e190a4dd
GT_SINDEX_CS=7c6801027e39b9fea9be973d8773ac77d2c9a1f9
......
......@@ -18,7 +18,7 @@ import {
IndexEntry,
Indexer,
MindexEntry,
SimpleTxEntryForWallet,
SimpleSindexEntryForWallet,
SimpleUdEntryForWallet
} from "../indexer"
import {ConfDTO} from "../dto/ConfDTO"
......@@ -37,6 +37,8 @@ import {NewLogger} from "../logger"
import {DBTx} from "../db/DBTx"
import {Underscore} from "../common-libs/underscore"
import {OtherConstants} from "../other_constants"
import {MonitorExecutionTime} from "../debug/MonitorExecutionTime"
import {WoTBInstance} from "../wot"
export class DuniterBlockchain {
......@@ -204,12 +206,6 @@ export class DuniterBlockchain {
logger.info('Block #' + block.number + ' added to the blockchain in %s ms', (Date.now() - start));
// Periodically, we trim the blockchain
if (block.number % CommonConstants.BLOCKS_COLLECT_THRESHOLD === 0) {
// Database trimming
await dal.loki.flushAndTrimData()
}
return BlockDTO.fromJSONObject(added)
}
catch(err) {
......@@ -279,8 +275,6 @@ export class DuniterBlockchain {
// Saves the block (DAL)
await dal.saveBlock(dbb);
await dal.loki.commitData()
return dbb
}
......@@ -314,12 +308,14 @@ export class DuniterBlockchain {
}
}
static async createNewcomers(iindex:IindexEntry[], dal:FileDAL, logger:any) {
@MonitorExecutionTime()
static async createNewcomers(iindex:IindexEntry[], dal:FileDAL, logger:any, instance?: WoTBInstance) {
const wotb = instance || dal.wotb
for (const i of iindex) {
if (i.op == CommonConstants.IDX_CREATE) {
const entry = i as FullIindexEntry
// Reserves a wotb ID
entry.wotb_id = dal.wotb.addNode();
entry.wotb_id = wotb.addNode();
logger.trace('%s was affected wotb_id %s', entry.uid, entry.wotb_id);
// Remove from the sandbox any other identity with the same pubkey/uid, since it has now been reserved.
await dal.removeUnWrittenWithPubkey(entry.pub)
......@@ -328,12 +324,13 @@ export class DuniterBlockchain {
}
}
static async updateMembers(block:BlockDTO, dal:FileDAL) {
static async updateMembers(block:BlockDTO, dal:FileDAL, instance?: WoTBInstance) {
const wotb = instance || dal.wotb
// Joiners (come back)
for (const inlineMS of block.joiners) {
let ms = MembershipDTO.fromInline(inlineMS)
const idty = await dal.getWrittenIdtyByPubkeyForWotbID(ms.issuer);
dal.wotb.setEnabled(true, idty.wotb_id);
wotb.setEnabled(true, idty.wotb_id);
await dal.dividendDAL.setMember(true, ms.issuer)
}
// Revoked
......@@ -344,12 +341,12 @@ export class DuniterBlockchain {
// Excluded
for (const excluded of block.excluded) {
const idty = await dal.getWrittenIdtyByPubkeyForWotbID(excluded);
dal.wotb.setEnabled(false, idty.wotb_id);
wotb.setEnabled(false, idty.wotb_id);
await dal.dividendDAL.setMember(false, excluded)
}
}
static async updateWallets(sindex:SimpleTxEntryForWallet[], dividends:SimpleUdEntryForWallet[], aDal:any, reverse = false) {
static async updateWallets(sindex:SimpleSindexEntryForWallet[], dividends:SimpleUdEntryForWallet[], aDal:any, reverse = false, at?: number) {
const differentConditions = Underscore.uniq(sindex.map((entry) => entry.conditions).concat(dividends.map(d => d.conditions)))
for (const conditions of differentConditions) {
const udsOfKey: BasedAmount[] = dividends.filter(d => d.conditions === conditions).map(d => ({ amount: d.amount, base: d.base }))
......@@ -364,9 +361,14 @@ export class DuniterBlockchain {
variation *= -1
}
if (OtherConstants.TRACE_BALANCES) {
NewLogger().trace('Balance of %s: %s (%s %s %s)', wallet.conditions, wallet.balance + variation, wallet.balance, variation < 0 ? '-' : '+', Math.abs(variation))
if (!OtherConstants.TRACE_PARTICULAR_BALANCE || wallet.conditions.match(new RegExp(OtherConstants.TRACE_PARTICULAR_BALANCE))) {
NewLogger().trace('Balance of %s: %s (%s %s %s) at #%s', wallet.conditions, wallet.balance + variation, wallet.balance, variation < 0 ? '-' : '+', Math.abs(variation), at)
}
}
wallet.balance += variation
if (OtherConstants.TRACE_PARTICULAR_BALANCE && wallet.conditions.match(new RegExp(OtherConstants.TRACE_PARTICULAR_BALANCE))) {
NewLogger().trace('>>>>>>>>> WALLET = ', (wallet.balance > 0 ? '+' : '') + wallet.balance)
}
await aDal.saveWallet(wallet)
}
}
......
......@@ -12,6 +12,7 @@
// GNU Affero General Public License for more details.
import {BlockDTO} from "../dto/BlockDTO"
import {Underscore} from "../common-libs/underscore"
export interface SwitchBlock {
......@@ -90,7 +91,7 @@ export class Switcher<T extends SwitchBlock> {
*/
private async findPotentialSuites(numberStart:number, timeStart:number) {
const suites:T[][] = []
const potentials:T[] = await this.dao.getPotentials(numberStart, timeStart, numberStart + this.forkWindowSize)
const potentials:T[] = Underscore.sortBy(await this.dao.getPotentials(numberStart, timeStart, numberStart + this.forkWindowSize), element => -element.number)
const knownForkBlocks:{ [k:string]: boolean } = {}
for (const candidate of potentials) {
knownForkBlocks[BlockDTO.fromJSONObject(candidate).blockstamp] = true
......@@ -240,4 +241,4 @@ export class Switcher<T extends SwitchBlock> {
}
return false
}
}
\ No newline at end of file
}
export function arrayPruneAll<T>(array: T[], value: T) {
if (!array || array.length === 0) {
return
}
let index
do {
index = array.indexOf(value)
if (index !== -1) {
array.splice(index, 1)
}
} while (index !== -1)
}
export function arrayPruneAllCopy<T>(original: T[], value: T) {
const array = original.slice()
let index
do {
index = array.indexOf(value)
if (index !== -1) {
array.splice(index, 1)
}
} while (index !== -1)
return array
}
......@@ -83,6 +83,11 @@ export const duniterDocument2str = (type:DuniterDocument) => {
}
}
export const ErrorConstants = {
EXIT_CODE_MINDEX_WRITING_ERROR: 500
}
export const CommonConstants = {
FORMATS: {
......@@ -309,7 +314,6 @@ export const CommonConstants = {
MAX_AGE_OF_PEER_IN_BLOCKS: 200, // blocks
INITIAL_DOWNLOAD_SLOTS: 1, // 1 peer
BLOCKS_COLLECT_THRESHOLD: 30, // Number of blocks to wait before trimming the loki data
DEFAULT_NON_WOT_PEERS_LIMIT: 100, // Number of non-wot peers accepted in our peer document pool
REJECT_WAIT_FOR_AVAILABLE_NODES_IN_SYNC_AFTER: 20000, // Reject after 20 seconds without any change
......
export enum DataErrors {
SYNC_FAST_MEM_ERROR_DURING_INJECTION,
CANNOT_GET_VALIDATION_BLOCK_FROM_REMOTE,
REJECT_WAIT_FOR_AVAILABLE_NODES_BUT_CONTINUE,
NO_NODE_FOUND_TO_DOWNLOAD_CHUNK,
WRONG_CURRENCY_DETECTED,
......@@ -9,8 +11,8 @@ export enum DataErrors {
WS2P_SYNC_PERIMETER_IS_LIMITED,
PEER_REJECTED,
TOO_OLD_PEER,
LOKI_DIVIDEND_GET_WRITTEN_ON_SHOULD_NOT_BE_USED,
LOKI_DIVIDEND_REMOVE_BLOCK_SHOULD_NOT_BE_USED,
DIVIDEND_GET_WRITTEN_ON_SHOULD_NOT_BE_USED_DIVIDEND_DAO,
DIVIDEND_REMOVE_BLOCK_SHOULD_NOT_BE_USED_BY_DIVIDEND_DAO,
NEGATIVE_BALANCE,
BLOCK_WASNT_COMMITTED,
CANNOT_ARCHIVE_CHUNK_WRONG_SIZE,
......
export function pint(value: string | number): number {
if (typeof value === 'string') {
return parseInt(value, 10)
}
return value
}
......@@ -13,7 +13,6 @@
import {BlockDTO} from "../dto/BlockDTO"
import {DuniterBlockchain} from "../blockchain/DuniterBlockchain"
import {QuickSynchronizer} from "./QuickSync"
import {DBHead} from "../db/DBHead"
import {FileDAL} from "../dal/fileDAL"
import {DBBlock} from "../db/DBBlock"
......@@ -28,7 +27,6 @@ export class BlockchainContext {
private conf:any
private dal:FileDAL
private logger:any
private quickSynchronizer:QuickSynchronizer
/**
* The virtual next HEAD. Computed each time a new block is added, because a lot of HEAD variables are deterministic
......@@ -107,10 +105,9 @@ export class BlockchainContext {
return local_vHEAD.issuerDiff;
}
setConfDAL(newConf: any, newDAL: any, theQuickSynchronizer: QuickSynchronizer): void {
setConfDAL(newConf: any, newDAL: any): void {
this.dal = newDAL;
this.conf = newConf;
this.quickSynchronizer = theQuickSynchronizer
this.logger = require('../logger').NewLogger(this.dal.profile);
}
......@@ -181,8 +178,4 @@ export class BlockchainContext {
throw 'Key ' + target + ' does not have enough links (' + count + '/' + this.conf.sigQty + ')';
}
}
quickApplyBlocks(blocks:BlockDTO[], to: number): Promise<any> {
return this.quickSynchronizer.quickApplyBlocks(blocks, to)
}
}
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {DuniterBlockchain} from "../blockchain/DuniterBlockchain";
import {BlockDTO} from "../dto/BlockDTO";
import {AccountsGarbagingDAL, FullSindexEntry, Indexer} from "../indexer";
import {CurrencyConfDTO} from "../dto/ConfDTO";
import {FileDAL} from "../dal/fileDAL"
import {DBBlock} from "../db/DBBlock"
import {Underscore} from "../common-libs/underscore"
import {CommonConstants} from "../common-libs/constants"
import {cliprogram} from "../common-libs/programOptions"
const constants = require('../constants')
let sync_bindex: any [] = [];
let sync_iindex: any[] = [];
let sync_mindex: any[] = [];
let sync_cindex: any[] = [];
let sync_sindex: any[] = [];
let sync_bindexSize = 0;
let sync_expires: number[] = [];
let sync_nextExpiring = 0;
let sync_currConf: CurrencyConfDTO;
const sync_memoryWallets: any = {}
const sync_memoryDAL:AccountsGarbagingDAL = {
getWallet: (conditions: string) => Promise.resolve(sync_memoryWallets[conditions] || { conditions, balance: 0 }),
saveWallet: async (wallet: any) => {
// Make a copy
sync_memoryWallets[wallet.conditions] = {
conditions: wallet.conditions,
balance: wallet.balance
}
},
sindexDAL: {
getAvailableForConditions: (conditions:string) => Promise.resolve([])
}
}
export class QuickSynchronizer {
constructor(private conf: any, private dal:FileDAL, private logger: any) {
}
async quickApplyBlocks(blocks:BlockDTO[], to: number): Promise<void> {
sync_memoryDAL.sindexDAL = {
getAvailableForConditions: (conditions:string) => this.dal.sindexDAL.getAvailableForConditions(conditions)
}
await this.dal.blockDAL.insertBatch(blocks.map((b:any) => {
const block = DBBlock.fromBlockDTO(b)
block.fork = false
return block
}))
// We only keep approx 2 months of blocks in memory, so memory consumption keeps approximately constant during the sync
await this.dal.blockDAL.trimBlocks(blocks[blocks.length - 1].number - CommonConstants.BLOCKS_IN_MEMORY_MAX)
for (const block of blocks) {
// VERY FIRST: parameters, otherwise we compute wrong variables such as UDTime
if (block.number == 0) {
await DuniterBlockchain.saveParametersForRoot(block, this.conf, this.dal)
}
// The new kind of object stored
const dto = BlockDTO.fromJSONObject(block)
if (block.number == 0) {
sync_currConf = BlockDTO.getConf(block);
}
if (block.number <= to - this.conf.forksize || cliprogram.noSources) { // If we require nosources option, this blockchain can't be valid so we don't make checks
const index:any = Indexer.localIndex(dto, sync_currConf);
const local_iindex = Indexer.iindex(index);
const local_cindex = Indexer.cindex(index);
const local_sindex = cliprogram.noSources ? [] : Indexer.sindex(index);
const local_mindex = Indexer.mindex(index);
const HEAD = await Indexer.quickCompleteGlobalScope(block, sync_currConf, sync_bindex, local_iindex, local_mindex, local_cindex, this.dal)
sync_bindex.push(HEAD);
// Remember expiration dates
for (const entry of index) {
if (entry.expires_on) {
sync_expires.push(entry.expires_on)
}
if (entry.revokes_on) {
sync_expires.push(entry.revokes_on)
}
}
await DuniterBlockchain.createNewcomers(local_iindex, this.dal, this.logger)
if ((block.dividend && !cliprogram.noSources)
|| block.joiners.length
|| block.actives.length
|| block.revoked.length
|| block.excluded.length
|| block.certifications.length
|| (block.transactions.length && !cliprogram.noSources)
|| block.medianTime >= sync_nextExpiring) {
const nextExpiringChanged = block.medianTime >= sync_nextExpiring
for (let i = 0; i < sync_expires.length; i++) {
let expire = sync_expires[i];
if (block.medianTime >= expire) {
sync_expires.splice(i, 1);
i--;
}
}
sync_nextExpiring = sync_expires.reduce((max, value) => max ? Math.min(max, value) : value, 9007199254740991); // Far far away date
// Fills in correctly the SINDEX
if (!cliprogram.noSources) {
await Promise.all(Underscore.where(sync_sindex.concat(local_sindex), {op: 'UPDATE'}).map(async entry => {
if (!entry.conditions) {
const src = (await this.dal.getSource(entry.identifier, entry.pos, entry.srcType === 'D')) as FullSindexEntry
entry.conditions = src.conditions;
}
}))
}
// Flush the INDEX (not bindex, which is particular)
await this.dal.flushIndexes({
mindex: sync_mindex,
iindex: sync_iindex,
sindex: sync_sindex,
cindex: sync_cindex,
})
sync_iindex = local_iindex
sync_cindex = local_cindex
sync_mindex = local_mindex
sync_sindex = local_sindex
// Dividends and account garbaging
const dividends = cliprogram.noSources ? [] : await Indexer.ruleIndexGenDividend(HEAD, local_iindex, this.dal)
if (!cliprogram.noSources) {
sync_sindex = sync_sindex.concat(await Indexer.ruleIndexGarbageSmallAccounts(HEAD, sync_sindex, dividends, sync_memoryDAL));
}
if (nextExpiringChanged) {
sync_cindex = sync_cindex.concat(await Indexer.ruleIndexGenCertificationExpiry(HEAD, this.dal));
sync_mindex = sync_mindex.concat(await Indexer.ruleIndexGenMembershipExpiry(HEAD, this.dal));
sync_iindex = sync_iindex.concat(await Indexer.ruleIndexGenExclusionByMembership(HEAD, sync_mindex, this.dal));
sync_iindex = sync_iindex.concat(await Indexer.ruleIndexGenExclusionByCertificatons(HEAD, sync_cindex, local_iindex, this.conf, this.dal));
sync_mindex = sync_mindex.concat(await Indexer.ruleIndexGenImplicitRevocation(HEAD, this.dal));
}
if (!cliprogram.noSources) {
// Update balances with UD + local garbagings
await DuniterBlockchain.updateWallets(sync_sindex, dividends, sync_memoryDAL)
}
// Flush the INDEX again (needs to be done *before* the update of wotb links because of block#0)
await this.dal.flushIndexes({
mindex: sync_mindex,
iindex: sync_iindex,
sindex: sync_sindex,
cindex: sync_cindex,
})
// --> Update links
await this.dal.updateWotbLinks(local_cindex.concat(sync_cindex));
sync_iindex = [];
sync_mindex = [];
sync_cindex = [];
sync_sindex = [];
// Create/Update nodes in wotb
await DuniterBlockchain.updateMembers(block, this.dal)
} else {
// Concat the results to the pending data
sync_iindex = sync_iindex.concat(local_iindex);
sync_cindex = sync_cindex.concat(local_cindex);
sync_mindex = sync_mindex.concat(local_mindex);
}
// Trim the bindex
sync_bindexSize = this.conf.forksize + [
block.issuersCount,
block.issuersFrame,
this.conf.medianTimeBlocks,
this.conf.dtDiffEval,
blocks.length
].reduce((max, value) => {
return Math.max(max, value);
}, 0);
if (sync_bindexSize && sync_bindex.length >= 2 * sync_bindexSize) {
// We trim it, not necessary to store it all (we already store the full blocks)
sync_bindex.splice(0, sync_bindexSize);
// Process triming & archiving continuously to avoid super long ending of sync
await this.dal.trimIndexes(sync_bindex[0].number);
}
} else {
// Save the INDEX
await this.dal.bindexDAL.insertBatch(sync_bindex);
await this.dal.flushIndexes({
mindex: sync_mindex,
iindex: sync_iindex,
sindex: sync_sindex,
cindex: sync_cindex,
})
// Save the intermediary table of wallets
const conditions = Underscore.keys(sync_memoryWallets)
const nonEmptyKeys = Underscore.filter(conditions, (k: any) => sync_memoryWallets[k] && sync_memoryWallets[k].balance > 0)
const walletsToRecord = nonEmptyKeys.map((k: any) => sync_memoryWallets[k])
await this.dal.walletDAL.insertBatch(walletsToRecord)
for (const cond of conditions) {
delete sync_memoryWallets[cond]
}
if (block.number === 0) {
await DuniterBlockchain.saveParametersForRoot(block, this.conf, this.dal)
}
// Last block: cautious mode to trigger all the INDEX expiry mechanisms
const { index, HEAD } = await DuniterBlockchain.checkBlock(dto, constants.WITH_SIGNATURES_AND_POW, this.conf, this.dal)
await DuniterBlockchain.pushTheBlock(dto, index, HEAD, this.conf, this.dal, this.logger)
// Clean temporary variables
sync_bindex = [];
sync_iindex = [];
sync_mindex = [];
sync_cindex = [];
sync_sindex = [];
sync_bindexSize = 0;
sync_expires = [];
sync_nextExpiring = 0;
// sync_currConf = {};
}
}
}
}
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import * as levelup from 'levelup'
import {LevelUp} from 'levelup'
import {AbstractLevelDOWN} from 'abstract-leveldown'
import * as leveldown from 'leveldown'
import * as memdown from 'memdown'
export const LevelDBDriver = {
newMemoryInstance: (): LevelUp => {
const impl: any = memdown.default()
return levelup.default(impl)
},
newFileInstance: (path: string): LevelUp => {
return levelup.default(leveldown.default(path))
}
}
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {FileSystem} from "../../system/directory"
import {DataErrors} from "../../common-libs/errors"
import {CFSCore} from "../fileDALs/CFSCore"
import {getNanosecondsTime} from "../../../ProcessCpuProfiler"
import {NewLogger} from "../../logger"
export interface Iterator<T> {
next(value?: any): IteratorResult<T>
return?(value?: any): IteratorResult<T>
throw?(e?: any): IteratorResult<T>
}
export interface IteratorResult<T> {
done: boolean
value: T
}
export interface DBCommit {
indexFile:string,
changes: string[]
collections: {
[coll:string]: string
}
}
export class LokiFsAdapter {
private static COMMIT_FILE = "commit.json"
private cfs:CFSCore
protected mode = "reference"
protected dbref = null
protected dirtyPartitions: string[] = [];
constructor(dbDir:string, fs:FileSystem) {
this.cfs = new CFSCore(dbDir, fs)
}
/**
* Main method to manually pilot the full DB saving to disk.
* @param loki
* @returns {Promise}
*/
async dbDump(loki:any) {
return new Promise(res => loki.saveDatabaseInternal(res))
}
async listPendingChanges(): Promise<string[]> {
if (!(await this.cfs.exists(LokiFsAdapter.COMMIT_FILE))) {
return []
}
const commitObj = await this.cfs.readJSON(LokiFsAdapter.COMMIT_FILE)