Commit cb85678a authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] Old blocks now get archived on the filesystem

parent b0cfb6f8
......@@ -86,3 +86,5 @@ test/fast/dal/*-loki.d.ts
test/fast/dal/*-loki.js*
test/dal/loki.d.ts
test/dal/loki.js*
test/dal/blockchain-archive.d.ts
test/dal/blockchain-archive.js*
......@@ -242,6 +242,7 @@ export class DuniterBlockchain {
const MAX_BINDEX_SIZE = conf.forksize + bindexSize
const currentSize = indexes.HEAD.number - TAIL.number + 1
if (currentSize > MAX_BINDEX_SIZE) {
await dal.archiveBlocks()
await dal.trimIndexes(indexes.HEAD.number - MAX_BINDEX_SIZE);
}
......
......@@ -299,7 +299,10 @@ export const CommonConstants = {
SPECIAL_BLOCK
},
BLOCK_MAX_TX_CHAINING_DEPTH: 5
BLOCK_MAX_TX_CHAINING_DEPTH: 5,
CONST_BLOCKS_CHUNK: 250,
BLOCKS_IN_MEMORY_MAX: 288 * 60 // 60 days of blocks
}
function exact (regexpContent:string) {
......
export enum DataErrors {
CANNOT_ARCHIVE_CHUNK_WRONG_SIZE,
CORRUPTED_DATABASE,
BLOCKCHAIN_NOT_INITIALIZED_YET,
CANNOT_DETERMINATE_MEMBERSHIP_AGE,
......
......@@ -102,6 +102,12 @@ export class QuickSynchronizer {
sync_memoryDAL.sindexDAL = { getAvailableForConditions: (conditions:string) => this.dal.sindexDAL.getAvailableForConditions(conditions) }
await this.dal.blockDAL.insertBatch(blocks.map((b:any) => {
const block = DBBlock.fromBlockDTO(b)
block.fork = false
return block
}))
for (const block of blocks) {
// VERY FIRST: parameters, otherwise we compute wrong variables such as UDTime
......@@ -224,8 +230,9 @@ export class QuickSynchronizer {
// We trim it, not necessary to store it all (we already store the full blocks)
sync_bindex.splice(0, sync_bindexSize);
// Process triming continuously to avoid super long ending of sync
// Process triming & archiving continuously to avoid super long ending of sync
await this.dal.trimIndexes(sync_bindex[0].number);
await this.dal.archiveBlocks()
}
} else {
......
......@@ -11,6 +11,8 @@
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import * as fs from 'fs'
import * as path from 'path'
import {SQLiteDriver} from "./drivers/SQLiteDriver"
import {ConfDAL} from "./fileDALs/ConfDAL"
import {StatDAL} from "./fileDALs/StatDAL"
......@@ -21,6 +23,7 @@ import {DBIdentity, IdentityDAL} from "./sqliteDAL/IdentityDAL"
import {
CindexEntry,
FullCindexEntry,
FullIindexEntry,
FullMindexEntry,
FullSindexEntry,
IindexEntry,
......@@ -66,9 +69,10 @@ import {LokiPeer} from "./indexDAL/loki/LokiPeer"
import {DBTx} from "../db/DBTx"
import {DBWallet} from "../db/DBWallet"
import {Tristamp} from "../common/Tristamp"
import {CFSBlockchainArchive} from "./indexDAL/CFSBlockchainArchive"
import {CFSCore} from "./fileDALs/CFSCore"
import {BlockchainArchiveDAO} from "./indexDAL/abstract/BlockchainArchiveDAO"
const fs = require('fs')
const path = require('path')
const readline = require('readline')
const _ = require('underscore');
const indexer = require('../indexer').Indexer
......@@ -95,6 +99,7 @@ export class FileDAL {
powDAL:PowDAL
confDAL:ConfDAL
statDAL:StatDAL
blockchainArchiveDAL:BlockchainArchiveDAO<DBBlock>
// SQLite DALs
metaDAL:MetaDAL
......@@ -128,6 +133,7 @@ export class FileDAL {
this.powDAL = new PowDAL(this.rootPath, params.fs)
this.confDAL = new ConfDAL(this.rootPath, params.fs)
this.metaDAL = new (require('./sqliteDAL/MetaDAL').MetaDAL)(this.sqliteDriver);
this.blockchainArchiveDAL = new CFSBlockchainArchive(new CFSCore(path.join(this.rootPath, '/archives'), params.fs), CommonConstants.CONST_BLOCKS_CHUNK)
this.blockDAL = new LokiBlockchain(this.loki.getLokiInstance())
this.txsDAL = new LokiTransactions(this.loki.getLokiInstance())
this.statDAL = new StatDAL(this.rootPath, params.fs)
......@@ -158,7 +164,8 @@ export class FileDAL {
'mindexDAL': this.mindexDAL,
'iindexDAL': this.iindexDAL,
'sindexDAL': this.sindexDAL,
'cindexDAL': this.cindexDAL
'cindexDAL': this.cindexDAL,
'blockchainArchiveDAL': this.blockchainArchiveDAL,
}
}
......@@ -175,6 +182,7 @@ export class FileDAL {
this.iindexDAL,
this.sindexDAL,
this.cindexDAL,
this.blockchainArchiveDAL,
]
for (const indexDAL of dals) {
indexDAL.triggerInit()
......@@ -197,6 +205,33 @@ export class FileDAL {
return this.metaDAL.getVersion()
}
/**
* Transfer a chunk of blocks from memory DB to archives if the memory DB overflows.
* @returns {Promise<void>}
*/
async archiveBlocks() {
const lastArchived = await this.blockchainArchiveDAL.getLastSavedBlock()
const current = await this.blockDAL.getCurrent()
const lastNumber = lastArchived ? lastArchived.number : -1
const currentNumber = current ? current.number : -1
const difference = currentNumber - lastNumber
if (difference > CommonConstants.BLOCKS_IN_MEMORY_MAX) {
const CHUNK_SIZE = this.blockchainArchiveDAL.chunkSize
const nbBlocksOverflow = difference - CommonConstants.BLOCKS_IN_MEMORY_MAX
const chunks = (nbBlocksOverflow - (nbBlocksOverflow % CHUNK_SIZE)) / CHUNK_SIZE
for (let i = 0; i < chunks; i++) {
const start = lastNumber + (i*CHUNK_SIZE) + 1
const end = lastNumber + (i*CHUNK_SIZE) + CHUNK_SIZE
const memBlocks = await this.blockDAL.getNonForkChunk(start, end)
if (memBlocks.length !== CHUNK_SIZE) {
throw Error(DataErrors[DataErrors.CANNOT_ARCHIVE_CHUNK_WRONG_SIZE])
}
await this.blockchainArchiveDAL.archive(memBlocks)
await this.blockDAL.trimBlocks(end)
}
}
}
writeFileOfBlock(block:DBBlock) {
return this.blockDAL.saveBlock(block)
}
......@@ -241,14 +276,17 @@ export class FileDAL {
return (await this.blockDAL.getBlock(number)) as DBBlock
}
// Duniter-UI dependency
async getBlock(number: number): Promise<DBBlock|null> {
return this.getFullBlockOf(number)
}
async getFullBlockOf(number: number): Promise<DBBlock|null> {
// TODO
return this.blockDAL.getBlock(number)
return (await this.blockDAL.getBlock(number)) || (await this.blockchainArchiveDAL.getBlockByNumber(number))
}
async getBlockstampOf(number: number): Promise<string|null> {
// TODO
const block = await this.blockDAL.getBlock(number)
const block = await this.getTristampOf(number)
if (block) {
return [block.number, block.hash].join('-')
}
......@@ -256,32 +294,27 @@ export class FileDAL {
}
async getTristampOf(number: number): Promise<Tristamp|null> {
// TODO
return this.blockDAL.getBlock(number)
return (await this.blockDAL.getBlock(number)) || (await this.blockchainArchiveDAL.getBlockByNumber(number))
}
async existsAbsoluteBlockInForkWindow(number:number, hash:string): Promise<boolean> {
// TODO
return !!(await this.blockDAL.getAbsoluteBlock(number, hash))
return !!(await this.getAbsoluteBlockByNumberAndHash(number, hash))
}
async getAbsoluteBlockInForkWindow(number:number, hash:string): Promise<DBBlock|null> {
// TODO
return this.blockDAL.getAbsoluteBlock(number, hash)
return this.getAbsoluteBlockByNumberAndHash(number, hash)
}
async getAbsoluteValidBlockInForkWindow(number:number, hash:string): Promise<DBBlock|null> {
// TODO: blocks that are not forks
const block = await this.blockDAL.getAbsoluteBlock(number, hash)
const block = await this.getAbsoluteBlockByNumberAndHash(number, hash)
if (block && !block.fork) {
return block
}
return null
}
getAbsoluteBlockByNumberAndHash(number:number, hash:string): Promise<DBBlock|null> {
// TODO: first, look at fork window, and then fallback on archives
return this.blockDAL.getAbsoluteBlock(number, hash)
async getAbsoluteBlockByNumberAndHash(number:number, hash:string): Promise<DBBlock|null> {
return (await this.blockDAL.getAbsoluteBlock(number, hash)) || (await this.blockchainArchiveDAL.getBlock(number, hash))
}
async existsNonChainableLink(from:string, vHEAD_1:DBHead, sigStock:number) {
......@@ -565,8 +598,8 @@ export class FileDAL {
}
// Duniter-UI dependency
async getWrittenIdtyByPubkey(pub:string) {
return !!(await this.iindexDAL.getFromPubkey(pub))
async getWrittenIdtyByPubkey(pub:string): Promise<FullIindexEntry | null> {
return await this.iindexDAL.getFromPubkey(pub)
}
async getWrittenIdtyByPubkeyForExistence(uid:string) {
......
import {BlockchainArchiveDAO, BlockLike} from "./abstract/BlockchainArchiveDAO"
import {CFSCore} from "../fileDALs/CFSCore"
export class CFSBlockchainArchive<T extends BlockLike> implements BlockchainArchiveDAO<T> {
constructor(private cfs:CFSCore, private _chunkSize:number) {
}
async archive(records: T[]): Promise<number> {
if (!this.checkBlocksRepresentChunks(records)) {
return 0
}
if (!this.checkBlocksAreWellChained(records)) {
return 0
}
const chunks = this.splitIntoChunks(records)
for (const c of chunks) {
const fileName = this.getFileName(c[0].number)
await this.cfs.writeJSON(fileName, c)
}
return chunks.length
}
private checkBlocksRepresentChunks(records: BlockLike[]): boolean {
return !(records[0].number % this._chunkSize !== 0 || (records[records.length - 1].number + 1) % this._chunkSize !== 0)
}
private checkBlocksAreWellChained(records: T[]): boolean {
let previous:BlockLike = {
number: records[0].number - 1,
hash: records[0].previousHash,
previousHash: ''
}
for (const b of records) {
if (b.previousHash !== previous.hash || b.number !== previous.number + 1) {
return false
}
previous = b
}
return true
}
private splitIntoChunks(records: T[]): T[][] {
const nbChunks = records.length / this._chunkSize
const chunks: T[][] = []
for (let i = 0; i < nbChunks; i++) {
chunks.push(records.slice(i * this._chunkSize, (i + 1) * this._chunkSize))
}
return chunks
}
async getBlock(number: number, hash: string): Promise<T|null> {
const block = await this.getBlockByNumber(number)
if (!block) {
return null
}
return block.hash === hash ? block : null
}
async getBlockByNumber(number: number): Promise<T|null> {
if (number < 0) {
return null
}
const content = await this.getChunk(number)
if (!content) {
// The block's chunk is not archived
return null
}
return content[this.getPositionInChunk(number)]
}
async getChunk(number:number): Promise<(T[])|null> {
const file = this.getFileName(number)
return this.cfs.readJSON(file)
}
async getLastSavedBlock(): Promise<T | null> {
const list = await this.cfs.list('/')
const max = list
.map(f => f.replace(`chunk_`, ''))
.map(f => f.replace(`-${this._chunkSize}.json`, ''))
.map(f => parseInt(f))
.reduce((v, max) => {
return Math.max(v, max)
}, 0)
const content = await this.getChunk(max * this._chunkSize)
if (!content) {
return null
}
return this.getBlock(content[content.length - 1].number, content[content.length - 1].hash)
}
private getFileName(number:number) {
const rest = number % this._chunkSize
const chunk = (number - rest) / this._chunkSize
return CFSBlockchainArchive.getChunkName(chunk, this._chunkSize)
}
private static getChunkName(chunkNumber:number, chunkSize:number) {
return `chunk_${chunkNumber}-${chunkSize}.json`
}
private getPositionInChunk(number:number) {
return number % this._chunkSize
}
async init(): Promise<void> {
return this.cfs.makeTree('/')
}
triggerInit(): void {
// TODO: remove triggerInit from all the DAOs, it is a wrong implementation
}
cleanCache(): void {
// TODO: is it really useful?
}
get chunkSize(): number {
return this._chunkSize
}
}
import {Initiable} from "../../sqliteDAL/Initiable"
import {DBBlock} from "../../../db/DBBlock"
export interface BlockLike {
number:number
hash:string
previousHash:string
}
export interface BlockchainArchiveDAO<T extends BlockLike> extends Initiable {
/**
* Trigger the initialization of the DAO. Called when the underlying DB is ready.
*/
triggerInit(): void
/**
* Retrieves a block from the archives.
* @param {number} number Block number.
* @param {string} hash Block hash.
* @returns {Promise<DBBlock>}
*/
getBlock(number:number, hash:string): Promise<T|null>
/**
* Retrieves a block from the archives, without checking the hash.
* @param {number} number Block number.
* @returns {Promise<DBBlock>}
*/
getBlockByNumber(number:number): Promise<T|null>
/**
* Archives a suite of blocks.
*
* Throws an exception is blocks does not follow each other, or does not follow previously archived blocks.
* @param {DBBlock[]} records The blocks to archive.
* @returns {Promise<void>}
*/
archive(records:T[]): Promise<number>
/**
* Retrieve the last block (maximum number) that was archived.
* @returns {Promise<BlockLike | null>}
*/
getLastSavedBlock(): Promise<T|null>
readonly chunkSize:number
}
......@@ -36,4 +36,8 @@ export interface BlockchainDAO extends GenericDAO<DBBlock> {
removeForkBlock(number:number): Promise<void>
removeForkBlockAboveOrEqual(number:number): Promise<void>
trimBlocks(number:number): Promise<void>
getNonForkChunk(start:number, end:number): Promise<DBBlock[]>
}
......@@ -87,6 +87,15 @@ export class LokiBlockchain extends LokiIndex<DBBlock> implements BlockchainDAO
.remove()
}
async trimBlocks(number:number): Promise<void> {
await this.collection
.chain()
.find({
number: { $lte: number }
})
.remove()
}
async getAbsoluteBlock(number: number, hash: string): Promise<DBBlock | null> {
return this.collection
.chain()
......@@ -226,4 +235,14 @@ export class LokiBlockchain extends LokiIndex<DBBlock> implements BlockchainDAO
}
}
async getNonForkChunk(start: number, end: number): Promise<DBBlock[]> {
return this.collection
.chain()
.find({
fork: false,
number: { $between: [start, end ]}
})
.simplesort('number')
.data()
}
}
......@@ -29,13 +29,14 @@ import {dos2unix} from "../../../lib/common-libs/dos2unix"
import {hashf} from "../../../lib/common"
import {ConfDTO} from "../../../lib/dto/ConfDTO"
import {PeeringService} from "../../../service/PeeringService"
import {CommonConstants} from "../../../lib/common-libs/constants"
const _ = require('underscore');
const moment = require('moment');
const multimeter = require('multimeter');
const makeQuerablePromise = require('querablep');
const CONST_BLOCKS_CHUNK = 250;
const CONST_BLOCKS_CHUNK = CommonConstants.CONST_BLOCKS_CHUNK
const EVAL_REMAINING_INTERVAL = 1000;
const INITIAL_DOWNLOAD_SLOTS = 1;
......@@ -759,7 +760,6 @@ class P2PDownloader {
this.chunks[realIndex] = blocks;
// We pre-save blocks only for non-cautious sync
if (this.nocautious) {
await this.dal.blockDAL.insertBatch(blocks.map((b:any) => BlockDTO.fromJSONObject(b)))
this.writtenChunks++
watcher.savedPercent(Math.round(this.writtenChunks / this.numberOfChunksToDownload * 100));
}
......@@ -955,7 +955,7 @@ class P2PDownloader {
// Store the file to avoid re-downloading
if (this.localNumber <= 0 && chunk.length === CONST_BLOCKS_CHUNK) {
await this.dal.confDAL.coreFS.makeTree(this.currency);
await this.dal.confDAL.coreFS.writeJSON(fileName, { blocks: chunk });
await this.dal.confDAL.coreFS.writeJSON(fileName, { blocks: chunk.map((b:any) => DBBlock.fromBlockDTO(b)) });
}
return chunk;
}
......
......@@ -374,14 +374,14 @@ export class Server extends stream.Duplex implements HookableServer {
await this.resetDataHook()
await this.resetConfigHook()
const files = ['stats', 'cores', 'current', Directory.DUNITER_DB_NAME, Directory.DUNITER_DB_NAME + '.db', Directory.DUNITER_DB_NAME + '.log', Directory.WOTB_FILE, 'export.zip', 'import.zip', 'conf'];
const dirs = ['loki', 'blocks', 'blockchain', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
const dirs = ['archives', 'loki', 'blocks', 'blockchain', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
return this.resetFiles(files, dirs, done);
}
async resetData(done:any = null) {
await this.resetDataHook()
const files = ['stats', 'cores', 'current', Directory.DUNITER_DB_NAME, Directory.DUNITER_DB_NAME + '.db', Directory.DUNITER_DB_NAME + '.log', Directory.WOTB_FILE];
const dirs = ['loki', 'blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
const dirs = ['archives', 'loki', 'blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb'];
await this.resetFiles(files, dirs, done);
}
......
// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1
// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {getNanosecondsTime} from "../../app/ProcessCpuProfiler"
import * as os from "os"
import * as path from "path"
import * as assert from "assert"
import {BlockchainArchiveDAO, BlockLike} from "../../app/lib/dal/indexDAL/abstract/BlockchainArchiveDAO"
import {CFSBlockchainArchive} from "../../app/lib/dal/indexDAL/CFSBlockchainArchive"
import {CFSCore} from "../../app/lib/dal/fileDALs/CFSCore"
import {RealFS} from "../../app/lib/system/directory"
describe("Blockchain Archive data layer", () => {
let archives:BlockchainArchiveDAO<BlockLike>
let dbPath = path.join(os.tmpdir(), 'duniter' + getNanosecondsTime())
before(async () => {
archives = new CFSBlockchainArchive(new CFSCore(dbPath, RealFS()), 2)
archives.triggerInit()
await archives.init()
})
it('should be able to read last saved block when archives are empty', async () => {
assert.equal(null, await archives.getLastSavedBlock())
})
it('should be able to archive 4 blocks', async () => {
const chunksCreated = await archives.archive([
{ number: 0, hash: 'H0', previousHash: '' },
{ number: 1, hash: 'H1', previousHash: 'H0' },
{ number: 2, hash: 'H2', previousHash: 'H1' },
{ number: 3, hash: 'H3', previousHash: 'H2' },
{ number: 4, hash: 'H4', previousHash: 'H3' },
{ number: 5, hash: 'H5', previousHash: 'H4' },
])
assert.equal(chunksCreated, 3)
})
it('should be able to read archived blocks', async () => {
assert.notEqual(null, await archives.getBlock(0, 'H0'))
assert.notEqual(null, await archives.getBlock(1, 'H1'))
assert.notEqual(null, await archives.getBlock(2, 'H2'))
assert.notEqual(null, await archives.getBlock(3, 'H3'))
assert.notEqual(null, await archives.getBlock(4, 'H4'))
assert.notEqual(null, await archives.getBlock(5, 'H5'))
})
it('should be able to read last saved block when archives are full', async () => {
assert.notEqual(null, await archives.getLastSavedBlock())
assert.equal(5, ((await archives.getLastSavedBlock()) as BlockLike).number)
})
it('should not be able to read non-archived blocks', async () => {
assert.equal(null, await archives.getBlock(0, 'H5'))
assert.equal(null, await archives.getBlock(8, 'H8'))
})
it('should refuse to store unchained blocks', async () => {
const chunksCreated1 = await archives.archive([
{ number: 6, hash: 'H6', previousHash: 'H5' },
{ number: 7, hash: 'H7', previousHash: 'H61' },
])
assert.equal(chunksCreated1, 0)
const chunksCreated2 = await archives.archive([
{ number: 6, hash: 'H6', previousHash: 'H5' },
{ number: 8, hash: 'H7', previousHash: 'H6' },
])
assert.equal(chunksCreated2, 0)
})
it('should refuse to store blocks that are not chunks', async () => {
const chunksCreated = await archives.archive([
{ number: 6, hash: 'H6', previousHash: 'H5' },
])
assert.equal(chunksCreated, 0)
})
})
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment