diff --git a/.gitignore b/.gitignore index 3cde71307d7603712c9ccc2b8bec24e011f26591..f01d4174e65ca43ce53afdd128e609b40bc20a7c 100644 --- a/.gitignore +++ b/.gitignore @@ -64,6 +64,8 @@ test/integration/forwarding.js test/integration/branches_switch.js test/integration/branches2.js test/integration/transactions-chaining.js +test/unit-tools.js* +test/unit-tools.d.ts test/fast/modules/crawler/block_pulling.js* test/fast/modules/crawler/block_pulling.d.ts test/fast/fork*.js* diff --git a/app/ProcessCpuProfiler.ts b/app/ProcessCpuProfiler.ts index 163f784307dba1c7a37a2527abc030dfd0c34c44..44ad6294fe30ae45544b93100d6ddc0de81e5988 100644 --- a/app/ProcessCpuProfiler.ts +++ b/app/ProcessCpuProfiler.ts @@ -11,14 +11,32 @@ // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. +import {NewLogger} from "./lib/logger" + const SAMPLING_PERIOD = 150 // milliseconds const MAX_SAMPLES_DISTANCE = 20 * 1000000 // seconds -function getMicrosecondsTime() { +export function getMicrosecondsTime() { const [ seconds, nanoseconds ] = process.hrtime() return seconds * 1000000 + nanoseconds / 1000 } +export function getNanosecondsTime() { + const [ seconds, nanoseconds ] = process.hrtime() + return seconds * 1000000 + nanoseconds +} + +export function getDurationInMicroSeconds(before:number) { + return parseInt(String(getMicrosecondsTime() - before)) +} + +export async function profileFunc<T>(name:string, f: () => Promise<T>): Promise<T> { + const now = getMicrosecondsTime() + const res = await f() + NewLogger().trace('%s %sµs', name, getDurationInMicroSeconds(now)) + return res +} + interface CpuUsage { user: number system:number diff --git a/app/lib/blockchain/DuniterBlockchain.ts b/app/lib/blockchain/DuniterBlockchain.ts index da6a576bab227dfa64c281329176a07889d27a75..6f4305c04a59c1f472f4024e7603cebb1e9e77ea 100644 --- a/app/lib/blockchain/DuniterBlockchain.ts +++ b/app/lib/blockchain/DuniterBlockchain.ts @@ -270,6 +270,8 @@ export class DuniterBlockchain extends MiscIndexedBlockchain { // Saves the block (DAL) await dal.saveBlock(dbb); + await dal.loki.commitData() + return dbb } diff --git a/app/lib/common-libs/errors.ts b/app/lib/common-libs/errors.ts index 98f2763adc6268d0b3655f3ab0e156f47845f173..3c92529727a068b7e29981c5697233f5fa4159d6 100644 --- a/app/lib/common-libs/errors.ts +++ b/app/lib/common-libs/errors.ts @@ -1,6 +1,6 @@ export enum DataErrors { - LOCAL_BLOCK_NOT_FOUND_FOR_CRAWLER, + CORRUPTED_DATABASE, BLOCKCHAIN_NOT_INITIALIZED_YET, CANNOT_DETERMINATE_MEMBERSHIP_AGE, CANNOT_DETERMINATE_IDENTITY_AGE, diff --git a/app/lib/computation/QuickSync.ts b/app/lib/computation/QuickSync.ts index 466ecf7f97d79dd7f5cb86e81e613fcbee5662e3..4ebe5a28823a08b4bd4c25c78a3f3399a1896859 100644 --- a/app/lib/computation/QuickSync.ts +++ b/app/lib/computation/QuickSync.ts @@ -30,7 +30,6 @@ let sync_mindex: any[] = []; let sync_cindex: any[] = []; let sync_sindex: any[] = []; let sync_bindexSize = 0; -let sync_allBlocks: BlockDTO[] = []; let sync_expires: number[] = []; let sync_nextExpiring = 0; let sync_currConf: CurrencyConfDTO; @@ -55,10 +54,6 @@ export class QuickSynchronizer { } async saveBlocksInMainBranch(blocks: BlockDTO[]): Promise<void> { - // VERY FIRST: parameters, otherwise we compute wrong variables such as UDTime - if (blocks[0].number == 0) { - await this.blockchain.saveParametersForRoot(blocks[0], this.conf, this.dal) - } // Helper to retrieve a block with local cache const getBlock = async (number: number): Promise<BlockDTO> => { const firstLocalNumber = blocks[0].number; @@ -107,10 +102,13 @@ export class QuickSynchronizer { async quickApplyBlocks(blocks:BlockDTO[], to: number): Promise<void> { sync_memoryDAL.sindexDAL = { getAvailableForConditions: (conditions:string) => this.dal.sindexDAL.getAvailableForConditions(conditions) } - let blocksToSave: BlockDTO[] = []; for (const block of blocks) { - sync_allBlocks.push(block); + + // VERY FIRST: parameters, otherwise we compute wrong variables such as UDTime + if (block.number == 0) { + await this.blockchain.saveParametersForRoot(block, this.conf, this.dal) + } // The new kind of object stored const dto = BlockDTO.fromJSONObject(block) @@ -120,7 +118,6 @@ export class QuickSynchronizer { } if (block.number <= to - this.conf.forksize) { - blocksToSave.push(dto); const index:any = Indexer.localIndex(dto, sync_currConf); const local_iindex = Indexer.iindex(index); const local_cindex = Indexer.cindex(index); @@ -130,14 +127,7 @@ export class QuickSynchronizer { sync_cindex = sync_cindex.concat(local_cindex); sync_mindex = sync_mindex.concat(local_mindex); - const HEAD = await Indexer.quickCompleteGlobalScope(block, sync_currConf, sync_bindex, sync_iindex, sync_mindex, sync_cindex, ({ - async getBlock(number: number) { - return sync_allBlocks[number] - }, - async getBlockByBlockstamp(blockstamp: string) { - return sync_allBlocks[parseInt(blockstamp)] - } - }) as any); + const HEAD = await Indexer.quickCompleteGlobalScope(block, sync_currConf, sync_bindex, sync_iindex, sync_mindex, sync_cindex, this.dal) sync_bindex.push(HEAD); // Remember expiration dates @@ -148,9 +138,6 @@ export class QuickSynchronizer { if (entry.revokes_on) { sync_expires.push(entry.revokes_on) } - if (entry.expires_on || entry.revokes_on) { - sync_expires = _.uniq(sync_expires) - } } await this.blockchain.createNewcomers(local_iindex, this.dal, this.logger) @@ -243,11 +230,6 @@ export class QuickSynchronizer { } } else { - if (blocksToSave.length) { - await this.saveBlocksInMainBranch(blocksToSave); - } - blocksToSave = []; - // Save the INDEX await this.dal.bindexDAL.insertBatch(sync_bindex); await this.dal.mindexDAL.insertBatch(sync_mindex); @@ -279,14 +261,10 @@ export class QuickSynchronizer { sync_cindex = []; sync_sindex = []; sync_bindexSize = 0; - sync_allBlocks = []; sync_expires = []; sync_nextExpiring = 0; // sync_currConf = {}; } } - if (blocksToSave.length) { - await this.saveBlocksInMainBranch(blocksToSave); - } } } diff --git a/app/lib/dal/drivers/LokiFsAdapter.ts b/app/lib/dal/drivers/LokiFsAdapter.ts new file mode 100644 index 0000000000000000000000000000000000000000..de478b937fab2014c438cb139dc601788a4ee85b --- /dev/null +++ b/app/lib/dal/drivers/LokiFsAdapter.ts @@ -0,0 +1,323 @@ +/* + Loki (node) fs structured Adapter (need to require this script to instance and use it). + + This adapter will save database container and each collection to separate files and + save collection only if it is dirty. It is also designed to use a destructured serialization + method intended to lower the memory overhead of json serialization. + + This adapter utilizes ES6 generator/iterator functionality to stream output and + uses node linereader module to stream input. This should lower memory pressure + in addition to individual object serializations rather than loki's default deep object + serialization. +*/ + +import {RealFS} from "../../system/directory" +import {DataErrors} from "../../common-libs/errors" +import {CFSCore} from "../fileDALs/CFSCore" +import {getNanosecondsTime} from "../../../ProcessCpuProfiler" + +const fs = require('fs'); +const readline = require('readline'); +const stream = require('stream'); + +interface Iterator<T> { + next(value?: any): IteratorResult<T> + return?(value?: any): IteratorResult<T> + throw?(e?: any): IteratorResult<T> +} + +interface IteratorResult<T> { + done: boolean + value: T +} + +export interface DBCommit { + indexFile:string, + collections: { + [coll:string]: string + } +} + +export class LokiFsAdapter { + + private static COMMIT_FILE = "commit.json" + private cfs:CFSCore + + protected mode = "reference" + protected dbref = null + protected dirtyPartitions: string[] = []; + + constructor(dbDir:string) { + this.cfs = new CFSCore(dbDir, RealFS()) + } + + /** + * Main method to manually pilot the DB saving to disk. + * @param loki + * @returns {Promise} + */ + async flush(loki:any) { + return new Promise(res => loki.saveDatabaseInternal(res)) + } + + /** + * + * Method indirectly called by `flush`. + * + * Loki reference adapter interface function. Saves structured json via loki database object reference. + * + * @param {string} dbname - the name to give the serialized database within the catalog. + * @param {object} dbref - the loki database object reference to save. + * @param {function} callback - callback passed obj.success with true or false + * @memberof LokiFsStructuredAdapter + */ + public async exportDatabase(dbname:string, dbref:any, callback:any) { + + this.dbref = dbref + + // create (dirty) partition generator/iterator + let pi = this.getPartition() + + // Prepare the commit: inherit from existing commit + let commit:DBCommit = { + indexFile: 'index.db.' + getNanosecondsTime() + ".json", + collections: {} + } + if (await this.cfs.exists(LokiFsAdapter.COMMIT_FILE)) { + commit.collections = ((await this.cfs.readJSON(LokiFsAdapter.COMMIT_FILE)) as DBCommit).collections + } + + // Eventually create the tree + await this.cfs.makeTree('/') + + this.saveNextPartition(commit, pi, async () => { + + // Write the new commit file. If the process gets interrupted during this phase, the DB will likely get corrupted. + await this.cfs.writeJSON(LokiFsAdapter.COMMIT_FILE, commit) + + const remainingFiles = [ + LokiFsAdapter.COMMIT_FILE, + commit.indexFile + ].concat(Object.keys(commit.collections).map(k => commit.collections[k])) + + // Clean obsolete DB files + const list = await this.cfs.list('/') + for (const f of list) { + if (remainingFiles.indexOf(f) === -1) { + await this.cfs.remove(f) + } + } + + // Finish + callback(null) + }) + } + + /** + * Generator for yielding sequence of dirty partition indices to iterate. + * + * @memberof LokiFsStructuredAdapter + */ + private *getPartition(): Iterator<string> { + let idx, + clen = (this.dbref as any).collections.length + + // since database container (partition -1) doesn't have dirty flag at db level, always save + yield ""; + + // yield list of dirty partitions for iterateration + for(idx=0; idx<clen; idx++) { + const coll:any = (this.dbref as any).collections[idx] + if (coll.dirty) { + yield coll.name + } + } + } + + /** + * Utility method for queueing one save at a time + */ + private saveNextPartition(commit:DBCommit, pi:Iterator<string>, callback:any) { + let li; + let filename; + let self = this; + let pinext = pi.next(); + + if (pinext.done) { + callback(); + return; + } + + // db container (partition -1) uses just dbname for filename, + // otherwise append collection array index to filename + filename = (pinext.value === "") ? commit.indexFile : ((pinext.value + "." + getNanosecondsTime()) + ".json") + + // We map the collection name to a particular file + if (pinext.value) { + commit.collections[pinext.value] = filename + } + + let wstream = fs.createWriteStream(this.cfs.getPath(filename)) + + wstream.on('close', function() { + self.saveNextPartition(commit, pi, callback); + }); + + li = this.generateDestructured({ partition: pinext.value }); + + // iterate each of the lines generated by generateDestructured() + for(let outline of li) { + wstream.write(outline + "\n"); + } + + wstream.end(); + }; + + /** + * Generator for constructing lines for file streaming output of db container or collection. + * + * @param {object=} options - output format options for use externally to loki + * @param {int=} options.partition - can be used to only output an individual collection or db (-1) + * + * @returns {string|array} A custom, restructured aggregation of independent serializations. + * @memberof LokiFsStructuredAdapter + */ + *generateDestructured(options = { partition: "" }) { + let idx + let dbcopy; + + // if partition is -1 we will return database container with no data + if (options.partition === "") { + // instantiate lightweight clone and remove its collection data + dbcopy = (this.dbref as any).copy(); + + for(idx=0; idx < dbcopy.collections.length; idx++) { + dbcopy.collections[idx].data = []; + } + + yield dbcopy.serialize({ + serializationMethod: "normal" + }); + + return; + } + + // 'partitioned' along with 'partition' of 0 or greater is a request for single collection serialization + if (options.partition) { + let doccount, + docidx; + + // dbref collections have all data so work against that + const coll = (this.dbref as any).collections.filter((c:any) => c.name === options.partition)[0] + doccount = coll.data.length; + + for(docidx=0; docidx<doccount; docidx++) { + yield JSON.stringify(coll.data[docidx]); + } + } + }; + + /** + * + * Automatically called by Loki.js on startup. + * + * Loki persistence adapter interface function which outputs un-prototype db object reference to load from. + * + * @memberof LokiFsStructuredAdapter + */ + public async loadDatabase(loki:any) { + let instream, + outstream, + rl, + self=this; + + this.dbref = null; + + // Load the database according to the commit file (lock for valid DB files) + let commitObj:DBCommit + if (!(await this.cfs.exists(LokiFsAdapter.COMMIT_FILE))) { + return + } + commitObj = await this.cfs.readJSON(LokiFsAdapter.COMMIT_FILE) + + // make sure file exists + const dbname = this.cfs.getPath(commitObj.indexFile) + return new Promise((res, rej) => { + fs.stat(dbname, function (err:any, stats:any) { + if (!err && stats.isFile()) { + instream = fs.createReadStream(dbname); + outstream = new stream(); + rl = readline.createInterface(instream, outstream); + + // first, load db container component + rl.on('line', function(line:string) { + // it should single JSON object (a one line file) + if (self.dbref === null && line !== "") { + self.dbref = JSON.parse(line); + } + }); + + // when that is done, examine its collection array to sequence loading each + rl.on('close', function() { + if ((self.dbref as any).collections.length > 0) { + self.loadNextCollection(commitObj.collections, 0, function(err:any) { + if (err) return rej(err) + loki.loadJSONObject(self.dbref) + res() + }); + } + }); + } + else { + // file does not exist, we throw as the commit file is not respected + rej(Error(DataErrors[DataErrors.CORRUPTED_DATABASE])) + } + }) + }) + }; + + + /** + * Recursive function to chain loading of each collection one at a time. + * If at some point i can determine how to make async driven generator, this may be converted to generator. + * + * @param {object} collectionsMap - Map between the names of the collections and their matching file of the filesystem. + * @param {int} collectionIndex - the ordinal position of the collection to load. + * @param {function} callback - callback to pass to next invocation or to call when done + * @memberof LokiFsStructuredAdapter + */ + async loadNextCollection(collectionsMap:{ [coll:string]: string }, collectionIndex:any, callback:any) { + let self=this, + obj; + const coll = (self.dbref as any).collections[collectionIndex] + if (!collectionsMap[coll.name] || !(await this.cfs.exists(collectionsMap[coll.name]))) { + return callback(Error(DataErrors[DataErrors.CORRUPTED_DATABASE])) + } + let instream = fs.createReadStream(this.cfs.getPath(collectionsMap[coll.name])) + let outstream = new stream(); + let rl = readline.createInterface(instream, outstream); + + rl.on('line', (line:string) => { + if (line !== "") { + obj = JSON.parse(line); + coll.data.push(obj); + } + }) + + rl.on('close', () => { + instream = null; + outstream = null; + rl = null; + obj = null; + + // if there are more collections, load the next one + if (++collectionIndex < (self.dbref as any).collections.length) { + self.loadNextCollection(collectionsMap, collectionIndex, callback); + } + // otherwise we are done, callback to loadDatabase so it can return the new db object representation. + else { + callback(); + } + }); + }; +} \ No newline at end of file diff --git a/app/lib/dal/drivers/LokiJsDriver.ts b/app/lib/dal/drivers/LokiJsDriver.ts new file mode 100644 index 0000000000000000000000000000000000000000..1be76ad45e773a2d62bac5cc2b9cc83d60b426c0 --- /dev/null +++ b/app/lib/dal/drivers/LokiJsDriver.ts @@ -0,0 +1,33 @@ +import {LokiFsAdapter} from "./LokiFsAdapter" + +const loki = require('lokijs') + +export class LokiJsDriver { + + private readonly lokiInstance:any + private adapter: LokiFsAdapter + + constructor( + private dbFilePath:string = '' + ) { + this.adapter = new LokiFsAdapter(dbFilePath) + this.lokiInstance = new loki(dbFilePath + '/loki.db' || 'mem' + Date.now() + '.db', { + adapter: this.adapter + }) + } + + async loadDatabase() { + // We load only non-memory DB + if (this.dbFilePath) { + await this.adapter.loadDatabase(this.lokiInstance) + } + } + + getLokiInstance() { + return this.lokiInstance + } + + async commitData() { + return this.adapter.flush(this.lokiInstance) + } +} diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts index 397f99ae2966bc7c41336b76762a86138a1f08dc..171dfae91ab5e5f9660f4e7370eeb0057f8dbb2b 100644 --- a/app/lib/dal/fileDAL.ts +++ b/app/lib/dal/fileDAL.ts @@ -51,10 +51,14 @@ import {LokiCIndex} from "./indexDAL/loki/LokiCIndex" import {LokiMIndex} from "./indexDAL/loki/LokiMIndex"; import {LokiBIndex} from "./indexDAL/loki/LokiBIndex" import {NewLogger} from "../logger" +import {LokiBlockchain} from "./indexDAL/loki/LokiBlockchain" +import {BlockchainDAO} from "./indexDAL/abstract/BlockchainDAO" +import {LokiTransactions} from "./indexDAL/loki/LokiTransactions" +import {profileFunc} from "../../ProcessCpuProfiler" +import {TxsDAO} from "./indexDAL/abstract/TxsDAO" +import {LokiJsDriver} from "./drivers/LokiJsDriver" const fs = require('fs') -const loki = require('lokijs') -const lokiAdapter = require('./lokifsadapater') const path = require('path') const readline = require('readline') const _ = require('underscore'); @@ -66,6 +70,7 @@ export interface FileDALParams { home:string fs:FileSystem dbf:() => SQLiteDriver + dbf2: () => LokiJsDriver wotb:WoTBInstance } @@ -76,13 +81,15 @@ export class FileDAL { wotb:WoTBInstance profile:string - loki:any + loki:LokiJsDriver powDAL:PowDAL confDAL:ConfDAL metaDAL:MetaDAL peerDAL:PeerDAL - blockDAL:BlockDAL - txsDAL:TxsDAL + fakeBlockDAL:BlockDAL + fakeTxsDAL:TxsDAL + blockDAL:BlockchainDAO + txsDAL:TxsDAO statDAL:StatDAL idtyDAL:IdentityDAL certDAL:CertDAL @@ -101,50 +108,36 @@ export class FileDAL { constructor(params:FileDALParams) { this.rootPath = params.home this.sqliteDriver = params.dbf() + this.loki = params.dbf2() this.wotb = params.wotb this.profile = 'DAL' - const that = this - this.loki = new loki(path.join(this.rootPath, Directory.INDEX_DB_FILE), { - adapter: new lokiAdapter(), - autoload: true, - autoloadCallback : () => { - const dals = [ - that.bindexDAL, - that.mindexDAL, - that.iindexDAL, - that.sindexDAL, - that.cindexDAL, - ] - for (const indexDAL of dals) { - indexDAL.triggerInit() - } - }, - autosave: true, - autosaveInterval: 4000 - }) // DALs this.powDAL = new PowDAL(this.rootPath, params.fs) this.confDAL = new ConfDAL(this.rootPath, params.fs) this.metaDAL = new (require('./sqliteDAL/MetaDAL').MetaDAL)(this.sqliteDriver); this.peerDAL = new (require('./sqliteDAL/PeerDAL').PeerDAL)(this.sqliteDriver); - this.blockDAL = new (require('./sqliteDAL/BlockDAL').BlockDAL)(this.sqliteDriver); - this.txsDAL = new (require('./sqliteDAL/TxsDAL').TxsDAL)(this.sqliteDriver); + this.fakeBlockDAL = new (require('./sqliteDAL/BlockDAL').BlockDAL)(this.sqliteDriver); + this.blockDAL = new LokiBlockchain(this.loki.getLokiInstance()) + this.fakeTxsDAL = new (require('./sqliteDAL/TxsDAL').TxsDAL)(this.sqliteDriver); + this.txsDAL = new LokiTransactions(this.loki.getLokiInstance()) this.statDAL = new StatDAL(this.rootPath, params.fs) this.idtyDAL = new (require('./sqliteDAL/IdentityDAL').IdentityDAL)(this.sqliteDriver); this.certDAL = new (require('./sqliteDAL/CertDAL').CertDAL)(this.sqliteDriver); this.msDAL = new (require('./sqliteDAL/MembershipDAL').MembershipDAL)(this.sqliteDriver); this.walletDAL = new (require('./sqliteDAL/WalletDAL').WalletDAL)(this.sqliteDriver); - this.bindexDAL = new LokiBIndex(this.loki) - this.mindexDAL = new LokiMIndex(this.loki) - this.iindexDAL = new LokiIIndex(this.loki) - this.sindexDAL = new LokiSIndex(this.loki) - this.cindexDAL = new LokiCIndex(this.loki) + this.bindexDAL = new LokiBIndex(this.loki.getLokiInstance()) + this.mindexDAL = new LokiMIndex(this.loki.getLokiInstance()) + this.iindexDAL = new LokiIIndex(this.loki.getLokiInstance()) + this.sindexDAL = new LokiSIndex(this.loki.getLokiInstance()) + this.cindexDAL = new LokiCIndex(this.loki.getLokiInstance()) this.newDals = { 'powDAL': this.powDAL, 'metaDAL': this.metaDAL, 'blockDAL': this.blockDAL, + 'fakeBlockDAL': this.fakeBlockDAL, + 'fakeTxsDAL': this.fakeTxsDAL, 'certDAL': this.certDAL, 'msDAL': this.msDAL, 'idtyDAL': this.idtyDAL, @@ -162,6 +155,20 @@ export class FileDAL { } async init(conf:ConfDTO) { + // Init LokiJS + await this.loki.loadDatabase() + const dals = [ + this.blockDAL, + this.txsDAL, + this.bindexDAL, + this.mindexDAL, + this.iindexDAL, + this.sindexDAL, + this.cindexDAL, + ] + for (const indexDAL of dals) { + indexDAL.triggerInit() + } const dalNames = _.keys(this.newDals); for (const dalName of dalNames) { const dal = this.newDals[dalName]; @@ -290,9 +297,6 @@ export class FileDAL { } // Block - lastUDBlock() { - return this.blockDAL.lastBlockWithDividend() - } getRootBlock() { return this.getBlock(0) @@ -935,11 +939,11 @@ export class FileDAL { } async trimIndexes(maxNumber:number) { - await this.bindexDAL.trimBlocks(maxNumber); - await this.iindexDAL.trimRecords(maxNumber); - await this.mindexDAL.trimRecords(maxNumber); - await this.cindexDAL.trimExpiredCerts(maxNumber); - await this.sindexDAL.trimConsumedSource(maxNumber); + await profileFunc('[loki][bindex][trim]', () => this.bindexDAL.trimBlocks(maxNumber)) + await profileFunc('[loki][iindex][trim]', () => this.iindexDAL.trimRecords(maxNumber)) + await profileFunc('[loki][mindex][trim]', () => this.mindexDAL.trimRecords(maxNumber)) + await profileFunc('[loki][cindex][trim]', () => this.cindexDAL.trimExpiredCerts(maxNumber)) + await profileFunc('[loki][sindex][trim]', () => this.sindexDAL.trimConsumedSource(maxNumber)) return true; } @@ -1056,7 +1060,7 @@ export class FileDAL { } async getUniqueIssuersBetween(start:number, end:number) { - const current = await this.blockDAL.getCurrent(); + const current = (await this.blockDAL.getCurrent()) as DBBlock const firstBlock = Math.max(0, start); const lastBlock = Math.max(0, Math.min(current.number, end)); const blocks = await this.blockDAL.getBlocks(firstBlock, lastBlock); diff --git a/app/lib/dal/fileDALs/CFSCore.ts b/app/lib/dal/fileDALs/CFSCore.ts index faa8c3a33be4ff1ba1a12882424499ed1bbb5f52..26695f0be49829565ca9cadffd90be6e41b843ba 100644 --- a/app/lib/dal/fileDALs/CFSCore.ts +++ b/app/lib/dal/fileDALs/CFSCore.ts @@ -227,4 +227,8 @@ export class CFSCore { private toRemoveFileName(filePath:string) { return path.normalize(filePath).replace(/\//g, '__').replace(/\\/g, '__'); } + + getPath(file: string) { + return path.join(this.rootPath, file) + } } diff --git a/app/lib/dal/indexDAL/abstract/BlockchainDAO.ts b/app/lib/dal/indexDAL/abstract/BlockchainDAO.ts new file mode 100644 index 0000000000000000000000000000000000000000..fa992be09000ec490e67873b28b906e8038ae52b --- /dev/null +++ b/app/lib/dal/indexDAL/abstract/BlockchainDAO.ts @@ -0,0 +1,33 @@ +import {GenericDAO} from "./GenericDAO" +import {DBBlock} from "../../../db/DBBlock" + +export interface BlockchainDAO extends GenericDAO<DBBlock> { + + getCurrent(): Promise<DBBlock|null> + + getBlock(number:string | number): Promise<DBBlock|null> + + getAbsoluteBlock(number:number, hash:string): Promise<DBBlock|null> + + saveBlock(block:DBBlock): Promise<DBBlock> + + saveSideBlock(block:DBBlock): Promise<DBBlock> + + getPotentialRoots(): Promise<DBBlock[]> + + getBlocks(start:number, end:number): Promise<DBBlock[]> + + getNextForkBlocks(number:number, hash:string): Promise<DBBlock[]> + + getPotentialForkBlocks(numberStart:number, medianTimeStart:number, maxNumber:number): Promise<DBBlock[]> + + lastBlockOfIssuer(issuer:string): Promise<DBBlock|null> + + getCountOfBlocksIssuedBy(issuer:string): Promise<number> + + saveBunch(blocks:DBBlock[]): Promise<void> + + dropNonForkBlocksAbove(number: number): Promise<void> + + setSideBlock(number:number, previousBlock:DBBlock|null): Promise<void> +} diff --git a/app/lib/dal/indexDAL/abstract/TxsDAO.ts b/app/lib/dal/indexDAL/abstract/TxsDAO.ts new file mode 100644 index 0000000000000000000000000000000000000000..40baff42a7d0724e04378a5413c83a9e0ca81ab1 --- /dev/null +++ b/app/lib/dal/indexDAL/abstract/TxsDAO.ts @@ -0,0 +1,31 @@ +import {GenericDAO} from "./GenericDAO" +import {DBTx} from "../../sqliteDAL/TxsDAL" +import {TransactionDTO} from "../../../dto/TransactionDTO" +import {SandBox} from "../../sqliteDAL/SandBox" + +export interface TxsDAO extends GenericDAO<DBTx> { + + insertBatchOfTxs(txs:DBTx[]): Promise<void> + + trimExpiredNonWrittenTxs(limitTime:number): Promise<void> + + getAllPending(versionMin:number): Promise<DBTx[]> + + getTX(hash:string): Promise<DBTx> + + addLinked(tx:TransactionDTO, block_number:number, time:number): Promise<DBTx> + + addPending(dbTx:DBTx): Promise<DBTx> + + getLinkedWithIssuer(pubkey:string): Promise<DBTx[]> + + getLinkedWithRecipient(pubkey:string): Promise<DBTx[]> + + getPendingWithIssuer(pubkey:string): Promise<DBTx[]> + + getPendingWithRecipient(pubkey:string): Promise<DBTx[]> + + removeTX(hash:string): Promise<DBTx|null> + + sandbox:SandBox<{ issuers: string[], output_base:number, output_amount:number }> +} diff --git a/app/lib/dal/indexDAL/loki/LokiBIndex.ts b/app/lib/dal/indexDAL/loki/LokiBIndex.ts index 2454e9bad6af6ef21b91abeb88dc3068644294a7..59aa3c87dd961f0b7c37a75b764b80bfe8930ddd 100644 --- a/app/lib/dal/indexDAL/loki/LokiBIndex.ts +++ b/app/lib/dal/indexDAL/loki/LokiBIndex.ts @@ -2,6 +2,7 @@ import {LokiIndex} from "./LokiIndex" import {DBHead} from "../../../db/DBHead" import {BIndexDAO} from "../abstract/BIndexDAO" import {NewLogger} from "../../../logger" +import {getDurationInMicroSeconds, getMicrosecondsTime} from "../../../../ProcessCpuProfiler" const logger = NewLogger() @@ -82,10 +83,10 @@ export class LokiBIndex extends LokiIndex<DBHead> implements BIndexDAO { } async getWrittenOn(blockstamp: string): Promise<DBHead[]> { - const now = Date.now() + const now = getMicrosecondsTime() const criterion:any = { number: parseInt(blockstamp) } const res = this.collection.find(criterion) - logger.trace('[loki][%s][getWrittenOn] %sms', this.collectionName, (Date.now() - now), blockstamp) + logger.trace('[loki][%s][getWrittenOn] %sµs', this.collectionName, getDurationInMicroSeconds(now), blockstamp) return res } } diff --git a/app/lib/dal/indexDAL/loki/LokiBlockchain.ts b/app/lib/dal/indexDAL/loki/LokiBlockchain.ts new file mode 100644 index 0000000000000000000000000000000000000000..99ca9e0f4fab059e1017d486ea65c24e4606cbc3 --- /dev/null +++ b/app/lib/dal/indexDAL/loki/LokiBlockchain.ts @@ -0,0 +1,169 @@ +import {LokiIndex} from "./LokiIndex" +import {NewLogger} from "../../../logger" +import {BlockchainDAO} from "../abstract/BlockchainDAO" +import {DBBlock} from "../../../db/DBBlock" +import {getMicrosecondsTime} from "../../../../ProcessCpuProfiler" + +const logger = NewLogger() + +export class LokiBlockchain extends LokiIndex<DBBlock> implements BlockchainDAO { + + private current:DBBlock|null = null + + constructor(loki:any) { + super(loki, 'blockchain', ['number', 'hash', 'fork']) + } + + async getCurrent() { + if (this.current) { + // Cached + return this.current + } else { + // Costly method, as a fallback + return this.collection + .chain() + .find({ + fork: false + }) + .simplesort('number', true) + .data()[0] + } + } + + async getBlock(number:string | number) { + const now = getMicrosecondsTime() + const b = this.collection + .chain() + .find({ + number: parseInt(String(number)), + fork: false + }) + .data()[0] + logger.trace('[loki][%s][getBlock] %sµs', this.collectionName, (getMicrosecondsTime() - now), number) + return b + } + + async getPotentialRoots() { + return this.collection + .chain() + .find({ number: 0, fork: true }) + .data() + } + + async saveBunch(blocks:DBBlock[]) { + return this.insertBatch(blocks) + } + + async insert(record: DBBlock): Promise<void> { + this.current = record + return super.insert(record); + } + + async removeBlock(blockstamp: string): Promise<void> { + // Never remove blocks + } + + async getAbsoluteBlock(number: number, hash: string): Promise<DBBlock | null> { + return this.collection + .chain() + .find({ + number, + hash + }) + .data()[0] + } + + async getBlocks(start: number, end: number): Promise<DBBlock[]> { + return this.collection + .chain() + .find({ + number: { $between: [start, end] }, + fork: false + }) + .simplesort('number') + .data() + } + + async getCountOfBlocksIssuedBy(issuer: string): Promise<number> { + return this.collection + .chain() + .find({ + issuer, + fork: false + }) + .data() + .length + } + + async getNextForkBlocks(number: number, hash: string): Promise<DBBlock[]> { + return this.collection + .chain() + .find({ + fork: true, + number: number + 1, + previousHash: hash + }) + .simplesort('number') + .data() + } + + async getPotentialForkBlocks(numberStart: number, medianTimeStart: number, maxNumber: number): Promise<DBBlock[]> { + return this.collection + .chain() + .find({ + fork: true, + number: { $between: [numberStart, maxNumber] }, + medianTime: { $gt: medianTimeStart } + }) + .simplesort('number') + .data() + } + + async lastBlockOfIssuer(issuer: string): Promise<DBBlock | null> { + return this.collection + .chain() + .find({ + fork: false, + issuer + }) + .simplesort('number', true) + .data()[0] + } + + async saveBlock(block: DBBlock): Promise<DBBlock> { + block.fork = false + await this.insert(block) + if (!this.current || this.current.number < block.number) { + this.current = block + } + return block + } + + async saveSideBlock(block: DBBlock): Promise<DBBlock> { + block.fork = true + await this.insert(block) + return block + } + + async dropNonForkBlocksAbove(number: number): Promise<void> { + this.collection + .chain() + .find({ + fork: false, + number: { $gt: number } + }) + .remove() + } + + async setSideBlock(number: number, previousBlock: DBBlock | null): Promise<void> { + this.collection + .chain() + .find({ + number + }) + .update((b:DBBlock) => { + b.fork = true + }) + } + +} diff --git a/app/lib/dal/indexDAL/loki/LokiCollection.ts b/app/lib/dal/indexDAL/loki/LokiCollection.ts index d5e6797bef9281310fb785a5063649bc2dbba085..6b22804f8c325be8bdf88b33c83dc561081894d3 100644 --- a/app/lib/dal/indexDAL/loki/LokiCollection.ts +++ b/app/lib/dal/indexDAL/loki/LokiCollection.ts @@ -1,5 +1,6 @@ import {LokiChainableFind, LokiCollection} from "./LokiTypes" import {NewLogger} from "../../../logger" +import {getDurationInMicroSeconds, getMicrosecondsTime} from "../../../../ProcessCpuProfiler" const logger = NewLogger() @@ -25,9 +26,9 @@ export class LokiProxyCollection<T> implements LokiCollection<T> { } find(criterion:{ [t in keyof T|'$or'|'$and']?: any }) { - const now = Date.now() + const now = getMicrosecondsTime() const res = this.collection.find(criterion) - logger.trace('[loki][%s][find] %sms', this.collectionName, (Date.now() - now), criterion) + // logger.trace('[loki][%s][find] %sµs', this.collectionName, getDurationInMicroSeconds(now), criterion) return res } diff --git a/app/lib/dal/indexDAL/loki/LokiIIndex.ts b/app/lib/dal/indexDAL/loki/LokiIIndex.ts index 1179ef77d5bc61258e1e3c494a3c9f90218969e7..e8576214c6d6d558c6a0f6dace7d295c38add5dc 100644 --- a/app/lib/dal/indexDAL/loki/LokiIIndex.ts +++ b/app/lib/dal/indexDAL/loki/LokiIIndex.ts @@ -2,6 +2,8 @@ import {FullIindexEntry, IindexEntry, Indexer} from "../../../indexer" import {IIndexDAO} from "../abstract/IIndexDAO" import {OldIindexEntry} from "../../sqliteDAL/index/IIndexDAL" import {LokiPubkeySharingIndex} from "./LokiPubkeySharingIndex" +import {getDurationInMicroSeconds, getMicrosecondsTime} from "../../../../ProcessCpuProfiler" +import {NewLogger} from "../../../logger" export class LokiIIndex extends LokiPubkeySharingIndex<IindexEntry> implements IIndexDAO { @@ -129,7 +131,10 @@ export class LokiIIndex extends LokiPubkeySharingIndex<IindexEntry> implements I } async getMembersPubkeys(): Promise<{ pub: string }[]> { - return (await this.getMembers()).map(m => ({ pub: m.pubkey })) + const now = getMicrosecondsTime() + const res = (await this.getMembers()).map(m => ({ pub: m.pubkey })) + NewLogger().trace('[getMembersPubkeys] %sµs', getDurationInMicroSeconds(now)) + return res } async getToBeKickedPubkeys(): Promise<string[]> { diff --git a/app/lib/dal/indexDAL/loki/LokiIndex.ts b/app/lib/dal/indexDAL/loki/LokiIndex.ts index d80f630d548e1705dfcd17907b8efc08c3448368..2c6d45cf002f3c6d1e6479d09bdaa4ebe6376e13 100644 --- a/app/lib/dal/indexDAL/loki/LokiIndex.ts +++ b/app/lib/dal/indexDAL/loki/LokiIndex.ts @@ -6,6 +6,7 @@ import {Initiable} from "../../sqliteDAL/Initiable" import {GenericDAO} from "../abstract/GenericDAO" import {NewLogger} from "../../../logger" import {LokiProxyCollection} from "./LokiCollection" +import {getMicrosecondsTime} from "../../../../ProcessCpuProfiler" const logger = NewLogger() @@ -22,7 +23,7 @@ export abstract class LokiIndex<T extends IndexData> implements GenericDAO<T> { public constructor( protected loki:any, - protected collectionName:'iindex'|'mindex'|'cindex'|'sindex'|'bindex', + protected collectionName:'iindex'|'mindex'|'cindex'|'sindex'|'bindex'|'blockchain'|'txs', protected indices: (keyof T)[]) { this.collectionIsInitialized = new Promise<void>(res => this.resolveCollection = res) } @@ -42,38 +43,38 @@ export abstract class LokiIndex<T extends IndexData> implements GenericDAO<T> { } async insert(record: T): Promise<void> { - const now = Date.now() + const now = getMicrosecondsTime() this.collection.insert(record) - // logger.trace('[loki][%s][insert] %sms', this.collectionName, (Date.now() - now), JSON.stringify(record, null, ' ')) + // logger.trace('[loki][%s][insert] %sµs', this.collectionName, (getMicrosecondsTime() - now)) } async findRaw(criterion?:any) { - const now = Date.now() + const now = getMicrosecondsTime() const res = this.collection.find(criterion) - logger.trace('[loki][%s][findRaw] %sms', this.collectionName, (Date.now() - now), criterion) + logger.trace('[loki][%s][findRaw] => %sµs', this.collectionName, (getMicrosecondsTime() - now), criterion) return res } async insertBatch(records: T[]): Promise<void> { - const now = Date.now() + const now = getMicrosecondsTime() records.map(r => this.insert(r)) if (records.length) { - logger.trace('[loki][%s][insertBatch] %s record(s) in %sms', this.collectionName, records.length, (Date.now() - now)) + logger.trace('[loki][%s][insertBatch] %s record(s) in %sµs', this.collectionName, records.length, getMicrosecondsTime() - now) } } async getWrittenOn(blockstamp: string): Promise<T[]> { - const now = Date.now() + const now = getMicrosecondsTime() const criterion:any = { writtenOn: parseInt(blockstamp) } const res = this.collection.find(criterion) - logger.trace('[loki][%s][getWrittenOn] %sms', this.collectionName, (Date.now() - now), blockstamp) + logger.trace('[loki][%s][getWrittenOn] %sµs', this.collectionName, (getMicrosecondsTime() - now), blockstamp) return res } async removeBlock(blockstamp: string): Promise<void> { - const now = Date.now() + const now = getMicrosecondsTime() const data = await this.getWrittenOn(blockstamp) data.map(d => this.collection.remove(d)) - logger.trace('[loki][%s][removeBlock] %sms', this.collectionName, (Date.now() - now), blockstamp) + logger.trace('[loki][%s][removeBlock] %sµs', this.collectionName, (getMicrosecondsTime() - now), blockstamp) } } diff --git a/app/lib/dal/indexDAL/loki/LokiPubkeySharingIndex.ts b/app/lib/dal/indexDAL/loki/LokiPubkeySharingIndex.ts index 5693a0ce96ca266bf46306e07006c1e1183db6c1..36236c07fda6d0db7b23fcea0d9916c03d6fc940 100644 --- a/app/lib/dal/indexDAL/loki/LokiPubkeySharingIndex.ts +++ b/app/lib/dal/indexDAL/loki/LokiPubkeySharingIndex.ts @@ -25,12 +25,12 @@ export class LokiPubkeySharingIndex<T extends { written_on:string, writtenOn:num for (const pub of Object.keys(trimmableByPub)) { if (trimmableByPub[pub].length > 1) { // Remove the existing records - // for (const t of trimmableByPub[pub]) { - // this.collection.remove(t) - // } - // // Insert a new one that gathers them - // const reduced = Indexer.DUP_HELPERS.reduce(trimmableByPub[pub]) - // this.collection.insert(reduced) + for (const t of trimmableByPub[pub]) { + this.collection.remove(t) + } + // Insert a new one that gathers them + const reduced = Indexer.DUP_HELPERS.reduce(trimmableByPub[pub]) + this.collection.insert(reduced) } } } diff --git a/app/lib/dal/indexDAL/loki/LokiSIndex.ts b/app/lib/dal/indexDAL/loki/LokiSIndex.ts index bf7ec836bf0913f7bfd4249cd7f509fdd3f07b76..3a8a795f84dc98293e366127a0c0c9fe2c9b41a7 100644 --- a/app/lib/dal/indexDAL/loki/LokiSIndex.ts +++ b/app/lib/dal/indexDAL/loki/LokiSIndex.ts @@ -7,7 +7,7 @@ const _ = require('underscore') export class LokiSIndex extends LokiIndex<SindexEntry> implements SIndexDAO { constructor(loki:any) { - super(loki, 'sindex', ['identifier', 'pos', 'conditions']) + super(loki, 'sindex', ['identifier', 'conditions', 'writtenOn']) } async findByIdentifierPosAmountBase(identifier: string, pos: number, amount: number, base: number): Promise<SindexEntry[]> { @@ -84,12 +84,9 @@ export class LokiSIndex extends LokiIndex<SindexEntry> implements SIndexDAO { } async trimConsumedSource(belowNumber: number): Promise<void> { - const consumed = this.collection.find({ - $and: [ - { consumed: true }, - { writtenOn: { $lt: belowNumber }}, - ] - }) + const consumed = this.collection + .find({ writtenOn: { $lt: belowNumber }}) + .filter(s => s.consumed) for (const e of consumed) { this.collection .chain() diff --git a/app/lib/dal/indexDAL/loki/LokiTransactions.ts b/app/lib/dal/indexDAL/loki/LokiTransactions.ts new file mode 100644 index 0000000000000000000000000000000000000000..dc8997aeff6d7e5fee26a2692dad6777be8b0e45 --- /dev/null +++ b/app/lib/dal/indexDAL/loki/LokiTransactions.ts @@ -0,0 +1,161 @@ +import {LokiIndex} from "./LokiIndex" +import {NewLogger} from "../../../logger" +import {BlockchainDAO} from "../abstract/BlockchainDAO" +import {DBBlock} from "../../../db/DBBlock" +import {DBTx} from "../../sqliteDAL/TxsDAL" +import {TxsDAO} from "../abstract/TxsDAO" +import {SandBox} from "../../sqliteDAL/SandBox" +import {TransactionDTO} from "../../../dto/TransactionDTO" + +const _ = require('underscore') +const moment = require('moment') +const constants = require('../../../constants') + +export class LokiTransactions extends LokiIndex<DBTx> implements TxsDAO { + + constructor(loki: any) { + super(loki, 'txs', []) + this.sandbox = new SandBox( + constants.SANDBOX_SIZE_TRANSACTIONS, + () => this.getSandboxTxs(), + (compared: { issuers: string[], output_base: number, output_amount: number }, + reference: { issuers: string[], output_base: number, output_amount: number } + ) => { + if (compared.output_base < reference.output_base) { + return -1; + } + else if (compared.output_base > reference.output_base) { + return 1; + } + else if (compared.output_amount > reference.output_amount) { + return -1; + } + else if (compared.output_amount < reference.output_amount) { + return 1; + } + else { + return 0; + } + }) + } + + async insertBatchOfTxs(txs: DBTx[]) { + return this.insertBatch(txs) + } + + sandbox: SandBox<{ issuers: string[]; output_base: number; output_amount: number }> + + async addLinked(tx: TransactionDTO, block_number: number, time: number): Promise<DBTx> { + const dbTx = DBTx.fromTransactionDTO(tx) + dbTx.block_number = block_number + dbTx.time = time + dbTx.received = moment().unix() + dbTx.written = true + dbTx.removed = false + dbTx.hash = tx.getHash() + await this.insert(dbTx) + return dbTx + } + + async addPending(dbTx: DBTx): Promise<DBTx> { + dbTx.received = moment().unix() + dbTx.written = false + dbTx.removed = false + await this.insert(dbTx) + return dbTx + } + + async getAllPending(versionMin: number): Promise<DBTx[]> { + return this.findRaw({ + written: false, + removed: false, + version: {$gte: versionMin} + }) + } + + async getLinkedWithIssuer(pubkey: string): Promise<DBTx[]> { + return this.findRaw({ + issuers: {$contains: pubkey}, + written: true + }) + } + + async getLinkedWithRecipient(pubkey: string): Promise<DBTx[]> { + const rows = await this.findRaw({ + recipients: {$contains: pubkey}, + written: true + }) + // Which does not contains the key as issuer + return _.filter(rows, (row: DBTx) => row.issuers.indexOf(pubkey) === -1); + } + + async getPendingWithIssuer(pubkey: string): Promise<DBTx[]> { + return this.findRaw({ + issuers: {$contains: pubkey}, + written: false, + removed: false + }) + } + + async getPendingWithRecipient(pubkey: string): Promise<DBTx[]> { + return this.findRaw({ + recipients: {$contains: pubkey}, + written: false, + removed: false + }) + } + + async getTX(hash: string): Promise<DBTx> { + return (await this.findRaw({ + hash: hash + }))[0] + } + + async removeTX(hash: string): Promise<DBTx | null> { + const tx = (await this.findRaw({ + hash: hash + }))[0] + if (tx) { + tx.removed = true; + await this.insert(tx) + } + return tx + } + + async trimExpiredNonWrittenTxs(limitTime: number): Promise<void> { + await this.collection + .chain() + .find({ + written: false, + blockstampTime: {$lte: limitTime} + }) + .remove() + } + + /************************** + * SANDBOX STUFF + */ + + async getSandboxTxs() { + // SELECT * FROM txs WHERE NOT written AND NOT removed ORDER BY output_base DESC, output_amount DESC + // return this.query('SELECT * FROM sandbox_txs LIMIT ' + (this.sandbox.maxSize), []) + return this.collection + .chain() + .find({ + written: false, + removed: false + }) + .compoundsort(['output_base', ['output_amount', true]]) + .limit(this.sandbox.maxSize) + .data() + } + + getSandboxRoom() { + return this.sandbox.getSandboxRoom() + } + + setSandboxSize(maxSize: number) { + this.sandbox.maxSize = maxSize + } + +} diff --git a/app/lib/dal/indexDAL/loki/LokiTypes.ts b/app/lib/dal/indexDAL/loki/LokiTypes.ts index c7b19d9731b732cfa1db2a27d587bb70b3a6501e..f61fea8d9b54e72385871cc75187de9b5fb26793 100644 --- a/app/lib/dal/indexDAL/loki/LokiTypes.ts +++ b/app/lib/dal/indexDAL/loki/LokiTypes.ts @@ -20,7 +20,13 @@ export interface LokiChainableFind<T> { simplesort(prop:keyof T, desc?:boolean): LokiChainableFind<T> + limit(l:number): LokiChainableFind<T> + + update(cb:(t:T) => void): LokiChainableFind<T> + remove(): LokiChainableFind<T> + compoundsort(sort:((string|((string|boolean)[]))[])): LokiChainableFind<T> + data(): T[] } \ No newline at end of file diff --git a/app/lib/dal/sqliteDAL/AbstractSQLite.ts b/app/lib/dal/sqliteDAL/AbstractSQLite.ts index c7012733ef8543a4677b826d8e6a7c6b993139ef..4a240425613410f3d2a77829f14f931d79be9f34 100644 --- a/app/lib/dal/sqliteDAL/AbstractSQLite.ts +++ b/app/lib/dal/sqliteDAL/AbstractSQLite.ts @@ -13,6 +13,7 @@ import {SQLiteDriver} from "../drivers/SQLiteDriver" import {Initiable} from "./Initiable" +import {getDurationInMicroSeconds, getMicrosecondsTime} from "../../../ProcessCpuProfiler" /** * Created by cgeek on 22/08/15. @@ -44,10 +45,10 @@ export abstract class AbstractSQLite<T> extends Initiable { async query(sql:string, params: any[] = []): Promise<T[]> { try { - //logger.trace(sql, JSON.stringify(params || [])); - const start = Date.now() + const start = getMicrosecondsTime() const res = await this.driver.executeAll(sql, params || []); - const duration = Date.now() - start; + const duration = getDurationInMicroSeconds(start) + logger.trace('[sqlite][query] %s %s %sµs', sql, JSON.stringify(params || []), duration) const entities = res.map((t:T) => this.toEntity(t)) // Display result let msg = sql + ' | %s\t==> %s rows in %s ms'; @@ -147,10 +148,12 @@ export abstract class AbstractSQLite<T> extends Initiable { await this.query('DELETE FROM ' + this.table + ' WHERE ' + conditions, condValues) } - exec(sql:string): Promise<void> { + async exec(sql:string) { try { - //console.warn(sql); - return this.driver.executeSql(sql); + const start = getMicrosecondsTime() + await this.driver.executeSql(sql); + const duration = getDurationInMicroSeconds(start) + logger.trace('[sqlite][exec] %s %sµs', sql.substring(0, 50) + '...', duration) } catch (e) { //console.error('ERROR >> %s', sql); throw e; diff --git a/app/lib/dal/sqliteDAL/BlockDAL.ts b/app/lib/dal/sqliteDAL/BlockDAL.ts index 406d34a26225133f512fcf56f5bcfb4e6b01e71c..9909bb20714edc02b43f61088c351c3ac60a250d 100644 --- a/app/lib/dal/sqliteDAL/BlockDAL.ts +++ b/app/lib/dal/sqliteDAL/BlockDAL.ts @@ -112,10 +112,6 @@ export class BlockDAL extends AbstractSQLite<DBBlock> { return this.query('SELECT * FROM block WHERE number BETWEEN ? and ? and NOT fork ORDER BY number ASC', [start, end]); } - async lastBlockWithDividend() { - return (await this.query('SELECT * FROM block WHERE dividend > 0 and NOT fork ORDER BY number DESC LIMIT 1'))[0]; - } - async lastBlockOfIssuer(issuer:string) { return (await this.query('SELECT * FROM block WHERE issuer = ? and NOT fork ORDER BY number DESC LIMIT 1', [issuer]))[0] } @@ -125,10 +121,6 @@ export class BlockDAL extends AbstractSQLite<DBBlock> { return res[0].quantity; } - getForkBlocks() { - return this.query('SELECT * FROM block WHERE fork ORDER BY number'); - } - getPotentialForkBlocks(numberStart:number, medianTimeStart:number, maxNumber:number) { return this.query('SELECT * FROM block WHERE fork AND number >= ? AND number <= ? AND medianTime >= ? ORDER BY number DESC', [numberStart, maxNumber, medianTimeStart]); } @@ -137,10 +129,6 @@ export class BlockDAL extends AbstractSQLite<DBBlock> { return this.query('SELECT * FROM block WHERE fork AND number = ?', [0]) } - getDividendBlocks() { - return this.query('SELECT * FROM block WHERE dividend IS NOT NULL ORDER BY number'); - } - async saveBunch(blocks:DBBlock[]) { let queries = "INSERT INTO block (" + this.fields.join(',') + ") VALUES "; for (let i = 0, len = blocks.length; i < len; i++) { diff --git a/app/lib/dal/sqliteDAL/TxsDAL.ts b/app/lib/dal/sqliteDAL/TxsDAL.ts index f1f1fc4bb416e37c00a56a2e801bcf15632200f9..58b7c616788fc0f954a4d4e2703bd8c0b70b9c39 100644 --- a/app/lib/dal/sqliteDAL/TxsDAL.ts +++ b/app/lib/dal/sqliteDAL/TxsDAL.ts @@ -41,6 +41,8 @@ export class DBTx { received: number output_base: number output_amount: number + written_on: string + writtenOn: number static fromTransactionDTO(tx:TransactionDTO) { const dbTx = new DBTx() diff --git a/app/lib/db/DBBlock.ts b/app/lib/db/DBBlock.ts index 9e9a7d7c950a2fd60cc18867fb6c8381d678e5d4..abcd5542166188e7f5191d363456dadf7d024c04 100644 --- a/app/lib/db/DBBlock.ts +++ b/app/lib/db/DBBlock.ts @@ -47,6 +47,8 @@ export class DBBlock { monetaryMass: number dividend: number | null UDTime: number + writtenOn: number + written_on: string wrong = false constructor( @@ -90,6 +92,8 @@ export class DBBlock { dbb.nonce = b.nonce dbb.UDTime = b.UDTime dbb.monetaryMass = b.monetaryMass + dbb.writtenOn = b.number + dbb.written_on = [b.number, b.hash].join('-') return dbb } } \ No newline at end of file diff --git a/app/lib/system/directory.ts b/app/lib/system/directory.ts index 01bac4ded07cc668d09abdf48d35f89c77d69eef..0031143d0cc3f38815b83a4f0cdeb622a394005b 100644 --- a/app/lib/system/directory.ts +++ b/app/lib/system/directory.ts @@ -11,15 +11,16 @@ // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. +import * as path from "path" +import * as fs from "fs" import {SQLiteDriver} from "../dal/drivers/SQLiteDriver" import {CFSCore} from "../dal/fileDALs/CFSCore" import {WoTBInstance, WoTBObject} from "../wot" import {FileDALParams} from "../dal/fileDAL" +import {LokiJsDriver} from "../dal/drivers/LokiJsDriver" const opts = require('optimist').argv; -const path = require('path'); const qfs = require('q-io/fs'); -const fs = require('fs'); const DEFAULT_DOMAIN = "duniter_default"; const DEFAULT_HOME = (process.platform == 'win32' ? process.env.USERPROFILE : process.env.HOME) + '/.config/duniter/'; @@ -88,8 +89,8 @@ export const Directory = { INSTANCE_NAME: getDomain(opts.mdb), INSTANCE_HOME: getHomePath(opts.mdb, opts.home), INSTANCE_HOMELOG_FILE: getLogsPath(opts.mdb, opts.home), - INDEX_DB_FILE: 'index.db', DUNITER_DB_NAME: 'duniter', + LOKI_DB_DIR: 'loki', WOTB_FILE: 'wotb.bin', getHome: (profile:string|null = null, directory:string|null = null) => getHomePath(profile, directory), @@ -110,11 +111,18 @@ export const Directory = { const params = await Directory.getHomeFS(isMemory, theHome) const home = params.home; let dbf: () => SQLiteDriver + let dbf2: () => LokiJsDriver let wotb: WoTBInstance if (isMemory) { + + // Memory DB dbf = () => new SQLiteDriver(':memory:'); + dbf2 = () => new LokiJsDriver() wotb = WoTBObject.memoryInstance(); + } else { + + // File DB const sqlitePath = path.join(home, Directory.DUNITER_DB_NAME + '.db'); dbf = () => new SQLiteDriver(sqlitePath); const wotbFilePath = path.join(home, Directory.WOTB_FILE); @@ -122,12 +130,14 @@ export const Directory = { if (!existsFile) { fs.closeSync(fs.openSync(wotbFilePath, 'w')); } + dbf2 = () => new LokiJsDriver(path.join(home, Directory.LOKI_DB_DIR)) wotb = WoTBObject.fileInstance(wotbFilePath); } return { home: params.home, fs: params.fs, dbf, + dbf2, wotb } }, diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts index ab361ecbe18076656653359f5afb7a327068d4e7..8d17886d214f5ff910e3c74ee3a2495bfaeb4d62 100644 --- a/app/modules/crawler/lib/sync.ts +++ b/app/modules/crawler/lib/sync.ts @@ -713,6 +713,7 @@ class P2PDownloader { // Chunk is COMPLETE this.logger.warn("Chunk #%s is COMPLETE from %s", realIndex, [this.handler[realIndex].host, this.handler[realIndex].port].join(':')); this.chunks[realIndex] = blocks; + await this.dal.blockDAL.insertBatch(blocks.map((b:any) => BlockDTO.fromJSONObject(b))) this.resultsDeferers[realIndex].resolve(this.chunks[realIndex]); } else { this.logger.warn("Chunk #%s DOES NOT CHAIN CORRECTLY from %s", realIndex, [this.handler[realIndex].host, this.handler[realIndex].port].join(':')); diff --git a/server.ts b/server.ts index 23856c0bdfe72ff85a709cc6e34d46e82328ad54..cd32a0172c62a6110e932f65720302ffcb6c7865 100644 --- a/server.ts +++ b/server.ts @@ -329,10 +329,10 @@ export class Server extends stream.Duplex implements HookableServer { let head_1 = await this.dal.bindexDAL.head(1); if (head_1) { // Case 1: b_index < block - await this.dal.blockDAL.exec('DELETE FROM block WHERE NOT fork AND number > ' + head_1.number); + await this.dal.blockDAL.dropNonForkBlocksAbove(head_1.number) // Case 2: b_index > block const current = await this.dal.blockDAL.getCurrent(); - const nbBlocksToRevert = (head_1.number - current.number); + const nbBlocksToRevert = (head_1.number - (current as DBBlock).number); for (let i = 0; i < nbBlocksToRevert; i++) { await this.revert(); } @@ -375,15 +375,15 @@ export class Server extends stream.Duplex implements HookableServer { async resetAll(done:any = null) { await this.resetDataHook() await this.resetConfigHook() - const files = ['stats', 'cores', 'current', Directory.INDEX_DB_FILE, Directory.DUNITER_DB_NAME, Directory.DUNITER_DB_NAME + '.db', Directory.DUNITER_DB_NAME + '.log', Directory.WOTB_FILE, 'export.zip', 'import.zip', 'conf']; - const dirs = ['blocks', 'blockchain', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; + const files = ['stats', 'cores', 'current', Directory.DUNITER_DB_NAME, Directory.DUNITER_DB_NAME + '.db', Directory.DUNITER_DB_NAME + '.log', Directory.WOTB_FILE, 'export.zip', 'import.zip', 'conf']; + const dirs = ['loki', 'blocks', 'blockchain', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; return this.resetFiles(files, dirs, done); } async resetData(done:any = null) { await this.resetDataHook() - const files = ['stats', 'cores', 'current', Directory.INDEX_DB_FILE, Directory.DUNITER_DB_NAME, Directory.DUNITER_DB_NAME + '.db', Directory.DUNITER_DB_NAME + '.log', Directory.WOTB_FILE]; - const dirs = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; + const files = ['stats', 'cores', 'current', Directory.DUNITER_DB_NAME, Directory.DUNITER_DB_NAME + '.db', Directory.DUNITER_DB_NAME + '.log', Directory.WOTB_FILE]; + const dirs = ['loki', 'blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; await this.resetFiles(files, dirs, done); } @@ -436,8 +436,8 @@ export class Server extends stream.Duplex implements HookableServer { async cleanDBData() { await this.dal.cleanCaches(); this.dal.wotb.resetWoT(); - const files = ['stats', 'cores', 'current', Directory.INDEX_DB_FILE, Directory.DUNITER_DB_NAME, Directory.DUNITER_DB_NAME + '.db', Directory.DUNITER_DB_NAME + '.log']; - const dirs = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; + const files = ['stats', 'cores', 'current', Directory.DUNITER_DB_NAME, Directory.DUNITER_DB_NAME + '.db', Directory.DUNITER_DB_NAME + '.log']; + const dirs = ['loki', 'blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; return this.resetFiles(files, dirs); } diff --git a/test/dal/loki.ts b/test/dal/loki.ts new file mode 100644 index 0000000000000000000000000000000000000000..58a857de8e0dbd7285dda4dcbbbd1761145b124a --- /dev/null +++ b/test/dal/loki.ts @@ -0,0 +1,79 @@ +// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1 +// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +import {LokiJsDriver} from "../../app/lib/dal/drivers/LokiJsDriver" +import {getNanosecondsTime} from "../../app/ProcessCpuProfiler" +import * as os from "os" +import * as path from "path" +import * as assert from "assert" +import {RealFS} from "../../app/lib/system/directory" +import {shouldThrow} from "../unit-tools" +import {DBCommit} from "../../app/lib/dal/drivers/LokiFsAdapter" + +describe("Loki data layer", () => { + + let driver:LokiJsDriver + let dbPath = path.join(os.tmpdir(), 'duniter' + getNanosecondsTime()) + + it('should be able to create a new instance', async () => { + driver = new LokiJsDriver(dbPath) + await driver.loadDatabase() + }) + + it('should be able to commit data', async () => { + const coll = driver.getLokiInstance().addCollection('block') + coll.insert({ a: 1 }) + coll.insert({ b: 2 }) + await driver.commitData() + }) + + it('should be able restart the DB and read the data', async () => { + const driver2 = new LokiJsDriver(dbPath) + await driver2.loadDatabase() + const coll = driver2.getLokiInstance().getCollection('block') + assert.notEqual(null, coll) + assert.equal(coll.find().length, 2) + }) + + it('should not see any data if commit file is absent', async () => { + const rfs = RealFS() + await rfs.fsUnlink(path.join(dbPath, 'commit.json')) + const driver3 = new LokiJsDriver(dbPath) + await driver3.loadDatabase() + const coll = driver3.getLokiInstance().getCollection('block') + assert.equal(null, coll) + }) + + it('should throw if commit file contains unknown index file', async () => { + const rfs = RealFS() + await rfs.fsWrite(path.join(dbPath, 'commit.json'), JSON.stringify({ + indexFile: 'non-existing.index.json' + })) + const driver4 = new LokiJsDriver(dbPath) + await shouldThrow(driver4.loadDatabase()) + }) + + it('should throw if commit file contains unknown data files', async () => { + const rfs = RealFS() + await rfs.fsRemoveTree(dbPath) + const driver4 = new LokiJsDriver(dbPath) + const coll = driver4.getLokiInstance().addCollection('block') + coll.insert({ a: 1 }) + coll.insert({ b: 2 }) + await driver.commitData() + const oldCommit:DBCommit = JSON.parse(await rfs.fsReadFile(path.join(dbPath, 'commit.json'))) + oldCommit.collections['block'] = 'wrong-file.json' + const driver5 = new LokiJsDriver(dbPath) + await shouldThrow(driver5.loadDatabase()) + }) +}) diff --git a/test/unit-tools.ts b/test/unit-tools.ts new file mode 100644 index 0000000000000000000000000000000000000000..b0f100b204906d1ac1d5f668e718b7176ae2e78d --- /dev/null +++ b/test/unit-tools.ts @@ -0,0 +1,23 @@ +// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1 +// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. + +export async function shouldThrow(promise:Promise<any>) { + let error = false + try { + await promise + } catch (e) { + error = true + } + promise.should.be.rejected() + error.should.equal(true) +}