From b5391ffb3ce3c1a2d6509c72444c0c20911aa4cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Moreau?= <cem.moreau@gmail.com> Date: Fri, 4 May 2018 15:52:37 +0200 Subject: [PATCH] [enh] Refactoring: type FileDAL.walletDAL --- app/lib/dal/drivers/LokiFsAdapter.ts | 132 +++++++----------- app/lib/dal/fileDAL.ts | 28 ++-- app/lib/dal/fileDALs/CFSCore.ts | 4 + app/lib/dal/indexDAL/abstract/WalletDAO.ts | 30 ++++ app/lib/dal/indexDAL/loki/LokiBlockchain.ts | 5 + .../indexDAL/loki/LokiCollectionManager.ts | 33 +++++ app/lib/dal/indexDAL/loki/LokiIndex.ts | 30 +--- app/lib/dal/indexDAL/loki/LokiWallet.ts | 39 ++++++ app/lib/system/directory.ts | 5 + test/integration/http_api.js | 8 +- 10 files changed, 189 insertions(+), 125 deletions(-) create mode 100644 app/lib/dal/indexDAL/abstract/WalletDAO.ts create mode 100644 app/lib/dal/indexDAL/loki/LokiCollectionManager.ts create mode 100644 app/lib/dal/indexDAL/loki/LokiWallet.ts diff --git a/app/lib/dal/drivers/LokiFsAdapter.ts b/app/lib/dal/drivers/LokiFsAdapter.ts index d19e58bf9..d703da890 100644 --- a/app/lib/dal/drivers/LokiFsAdapter.ts +++ b/app/lib/dal/drivers/LokiFsAdapter.ts @@ -1,15 +1,15 @@ -/* - Loki (node) fs structured Adapter (need to require this script to instance and use it). - - This adapter will save database container and each collection to separate files and - save collection only if it is dirty. It is also designed to use a destructured serialization - method intended to lower the memory overhead of json serialization. - - This adapter utilizes ES6 generator/iterator functionality to stream output and - uses node linereader module to stream input. This should lower memory pressure - in addition to individual object serializations rather than loki's default deep object - serialization. -*/ +// Source file from duniter: Crypto-currency software to manage libre currency such as Ğ1 +// Copyright (C) 2018 Cedric Moreau <cem.moreau@gmail.com> +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. import {FileSystem} from "../../system/directory" import {DataErrors} from "../../common-libs/errors" @@ -17,10 +17,6 @@ import {CFSCore} from "../fileDALs/CFSCore" import {getNanosecondsTime} from "../../../ProcessCpuProfiler" import {NewLogger} from "../../logger" -const fs = require('fs'); -const readline = require('readline'); -const stream = require('stream'); - interface Iterator<T> { next(value?: any): IteratorResult<T> return?(value?: any): IteratorResult<T> @@ -166,7 +162,7 @@ export class LokiFsAdapter { /** * Utility method for queueing one save at a time */ - private saveNextPartition(commit:DBCommit, pi:Iterator<string>, callback:any) { + private async saveNextPartition(commit:DBCommit, pi:Iterator<string>, callback:any) { let li; let filename; let self = this; @@ -186,20 +182,14 @@ export class LokiFsAdapter { commit.collections[pinext.value] = filename } - let wstream = fs.createWriteStream(this.cfs.getPath(filename)) - - wstream.on('close', function() { - self.saveNextPartition(commit, pi, callback); - }); - li = this.generateDestructured({ partition: pinext.value }); // iterate each of the lines generated by generateDestructured() for(let outline of li) { - wstream.write(outline + "\n"); + await this.cfs.appendFile(filename, outline + "\n") } - wstream.end(); + self.saveNextPartition(commit, pi, callback) }; /** @@ -271,41 +261,25 @@ export class LokiFsAdapter { commitObj = await this.cfs.readJSON(LokiFsAdapter.COMMIT_FILE) // make sure file exists - const dbname = this.cfs.getPath(commitObj.indexFile) + const dbname = commitObj.indexFile // Trimmed data first - await new Promise((res, rej) => { - fs.stat(dbname, function (err:any, stats:any) { - if (!err && stats.isFile()) { - instream = fs.createReadStream(dbname); - outstream = new stream(); - rl = readline.createInterface(instream, outstream); - - // first, load db container component - rl.on('line', function(line:string) { - // it should single JSON object (a one line file) - if (self.dbref === null && line !== "") { - self.dbref = JSON.parse(line); - } - }); - - // when that is done, examine its collection array to sequence loading each - rl.on('close', function() { - if ((self.dbref as any).collections.length > 0) { - self.loadNextCollection(commitObj.collections, 0, function(err:any) { - if (err) return rej(err) - loki.loadJSONObject(self.dbref) - res() - }); - } - }); - } - else { - // file does not exist, we throw as the commit file is not respected - rej(Error(DataErrors[DataErrors.CORRUPTED_DATABASE])) - } - }) - }) + if (await this.cfs.exists(dbname)) { + const line = await this.cfs.read(dbname) + // it should single JSON object (a one line file) + if (self.dbref === null && line) { + self.dbref = JSON.parse(line) + } + + // when that is done, examine its collection array to sequence loading each + if ((self.dbref as any).collections.length > 0) { + await self.loadNextCollection(commitObj.collections, 0) + loki.loadJSONObject(self.dbref) + } + } else { + // file does not exist, we throw as the commit file is not respected + throw Error(DataErrors[DataErrors.CORRUPTED_DATABASE]) + } // Changes data for (const changeFile of commitObj.changes) { @@ -339,38 +313,28 @@ export class LokiFsAdapter { * @param {function} callback - callback to pass to next invocation or to call when done * @memberof LokiFsStructuredAdapter */ - async loadNextCollection(collectionsMap:{ [coll:string]: string }, collectionIndex:any, callback:any) { + async loadNextCollection(collectionsMap:{ [coll:string]: string }, collectionIndex:any) { let self=this, obj; const coll = (self.dbref as any).collections[collectionIndex] if (!collectionsMap[coll.name] || !(await this.cfs.exists(collectionsMap[coll.name]))) { - return callback(Error(DataErrors[DataErrors.CORRUPTED_DATABASE])) + throw Error(DataErrors[DataErrors.CORRUPTED_DATABASE]) } - let instream = fs.createReadStream(this.cfs.getPath(collectionsMap[coll.name])) - let outstream = new stream(); - let rl = readline.createInterface(instream, outstream); - - rl.on('line', (line:string) => { - if (line !== "") { - obj = JSON.parse(line); - coll.data.push(obj); + const filename = collectionsMap[coll.name] + const content = await this.cfs.read(filename) + if (content) { + const lines = content.split('\n') + for (const line of lines) { + if (line !== "") { + obj = JSON.parse(line); + coll.data.push(obj); + } } - }) - - rl.on('close', () => { - instream = null; - outstream = null; - rl = null; - obj = null; + } - // if there are more collections, load the next one - if (++collectionIndex < (self.dbref as any).collections.length) { - self.loadNextCollection(collectionsMap, collectionIndex, callback); - } - // otherwise we are done, callback to loadDatabase so it can return the new db object representation. - else { - callback(); - } - }); + // if there are more collections, load the next one + if (++collectionIndex < (self.dbref as any).collections.length) { + await self.loadNextCollection(collectionsMap, collectionIndex) + } }; } \ No newline at end of file diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts index 775fd9e9b..ea94bc431 100644 --- a/app/lib/dal/fileDAL.ts +++ b/app/lib/dal/fileDAL.ts @@ -30,7 +30,7 @@ import { import {DBPeer, PeerDAL} from "./sqliteDAL/PeerDAL" import {TransactionDTO} from "../dto/TransactionDTO" import {CertDAL, DBCert} from "./sqliteDAL/CertDAL" -import {DBWallet, WalletDAL} from "./sqliteDAL/WalletDAL" +import {DBWallet} from "./sqliteDAL/WalletDAL" import {DBTx, TxsDAL} from "./sqliteDAL/TxsDAL" import {DBBlock} from "../db/DBBlock" import {DBMembership, MembershipDAL} from "./sqliteDAL/MembershipDAL" @@ -62,6 +62,8 @@ import {LokiTransactions} from "./indexDAL/loki/LokiTransactions" import {profileFunc} from "../../ProcessCpuProfiler" import {TxsDAO} from "./indexDAL/abstract/TxsDAO" import {LokiJsDriver} from "./drivers/LokiJsDriver" +import {WalletDAO} from "./indexDAL/abstract/WalletDAO" +import {LokiWallet} from "./indexDAL/loki/LokiWallet" const fs = require('fs') const path = require('path') @@ -82,24 +84,31 @@ export interface FileDALParams { export class FileDAL { rootPath:string + loki:LokiJsDriver sqliteDriver:SQLiteDriver wotb:WoTBInstance profile:string - loki:LokiJsDriver + // Simple file accessors powDAL:PowDAL confDAL:ConfDAL - metaDAL:MetaDAL - peerDAL:PeerDAL + statDAL:StatDAL + + // DALs to be removed fakeBlockDAL:BlockDAL fakeTxsDAL:TxsDAL - blockDAL:BlockchainDAO - txsDAL:TxsDAO - statDAL:StatDAL + + // SQLite DALs + metaDAL:MetaDAL + peerDAL:PeerDAL idtyDAL:IdentityDAL certDAL:CertDAL msDAL:MembershipDAL - walletDAL:WalletDAL + + // New DAO entities + blockDAL:BlockchainDAO + txsDAL:TxsDAO + walletDAL:WalletDAO bindexDAL:BIndexDAO mindexDAL:MIndexDAO iindexDAL:IIndexDAO @@ -130,7 +139,7 @@ export class FileDAL { this.idtyDAL = new (require('./sqliteDAL/IdentityDAL').IdentityDAL)(this.sqliteDriver); this.certDAL = new (require('./sqliteDAL/CertDAL').CertDAL)(this.sqliteDriver); this.msDAL = new (require('./sqliteDAL/MembershipDAL').MembershipDAL)(this.sqliteDriver); - this.walletDAL = new (require('./sqliteDAL/WalletDAL').WalletDAL)(this.sqliteDriver); + this.walletDAL = new LokiWallet(this.loki.getLokiInstance()) this.bindexDAL = new LokiBIndex(this.loki.getLokiInstance()) this.mindexDAL = new LokiMIndex(this.loki.getLokiInstance()) this.iindexDAL = new LokiIIndex(this.loki.getLokiInstance()) @@ -165,6 +174,7 @@ export class FileDAL { const dals = [ this.blockDAL, this.txsDAL, + this.walletDAL, this.bindexDAL, this.mindexDAL, this.iindexDAL, diff --git a/app/lib/dal/fileDALs/CFSCore.ts b/app/lib/dal/fileDALs/CFSCore.ts index 26695f0be..9e0e0d3ad 100644 --- a/app/lib/dal/fileDALs/CFSCore.ts +++ b/app/lib/dal/fileDALs/CFSCore.ts @@ -231,4 +231,8 @@ export class CFSCore { getPath(file: string) { return path.join(this.rootPath, file) } + + appendFile(filename: string, content: string) { + return this.qfs.fsAppend(path.join(this.rootPath, filename), content) + } } diff --git a/app/lib/dal/indexDAL/abstract/WalletDAO.ts b/app/lib/dal/indexDAL/abstract/WalletDAO.ts new file mode 100644 index 000000000..4e9a34ff8 --- /dev/null +++ b/app/lib/dal/indexDAL/abstract/WalletDAO.ts @@ -0,0 +1,30 @@ +import {Initiable} from "../../sqliteDAL/Initiable" +import {DBWallet} from "../../sqliteDAL/WalletDAL" + +export interface WalletDAO extends Initiable { + + /** + * Trigger the initialization of the DAO. Called when the underlying DB is ready. + */ + triggerInit(): void + + /** + * Saves a wallet. + * @param {DBWallet} wallet + * @returns {Promise<DBWallet>} + */ + saveWallet(wallet:DBWallet): Promise<DBWallet> + + /** + * Find a wallet based on conditions. + * @param {string} conditions + * @returns {Promise<DBWallet>} + */ + getWallet(conditions:string): Promise<DBWallet> + + /** + * Make a batch insert. + * @param records The records to insert as a batch. + */ + insertBatch(records:DBWallet[]): Promise<void> +} diff --git a/app/lib/dal/indexDAL/loki/LokiBlockchain.ts b/app/lib/dal/indexDAL/loki/LokiBlockchain.ts index 4fe1109be..80d37f88c 100644 --- a/app/lib/dal/indexDAL/loki/LokiBlockchain.ts +++ b/app/lib/dal/indexDAL/loki/LokiBlockchain.ts @@ -14,6 +14,11 @@ export class LokiBlockchain extends LokiIndex<DBBlock> implements BlockchainDAO super(loki, 'blockchain', ['number', 'hash', 'fork']) } + cleanCache(): void { + super.cleanCache() + this.current = null + } + async getCurrent() { if (this.current) { // Cached diff --git a/app/lib/dal/indexDAL/loki/LokiCollectionManager.ts b/app/lib/dal/indexDAL/loki/LokiCollectionManager.ts new file mode 100644 index 000000000..ad040eed8 --- /dev/null +++ b/app/lib/dal/indexDAL/loki/LokiCollectionManager.ts @@ -0,0 +1,33 @@ +import {LokiCollection} from "./LokiTypes" +import {LokiProxyCollection} from "./LokiCollection" +import {NewLogger} from "../../../logger" + +const logger = NewLogger() + +export abstract class LokiCollectionManager<T> { + + protected collection:LokiCollection<T> + protected collectionIsInitialized: Promise<void> + protected resolveCollection: () => void + + public constructor( + protected loki:any, + protected collectionName:'iindex'|'mindex'|'cindex'|'sindex'|'bindex'|'blockchain'|'txs'|'wallet', + protected indices: (keyof T)[]) { + this.collectionIsInitialized = new Promise<void>(res => this.resolveCollection = res) + } + + public triggerInit() { + const coll = this.loki.addCollection(this.collectionName, { + indices: this.indices, + disableChangesApi: false + }) + this.collection = new LokiProxyCollection(coll, this.collectionName) + this.resolveCollection() + } + + async init(): Promise<void> { + await this.collectionIsInitialized + logger.info('Collection %s ready', this.collectionName) + } +} \ No newline at end of file diff --git a/app/lib/dal/indexDAL/loki/LokiIndex.ts b/app/lib/dal/indexDAL/loki/LokiIndex.ts index 8e978153a..1197d56c7 100644 --- a/app/lib/dal/indexDAL/loki/LokiIndex.ts +++ b/app/lib/dal/indexDAL/loki/LokiIndex.ts @@ -1,8 +1,7 @@ -import {LokiCollection} from "./LokiTypes" import {GenericDAO} from "../abstract/GenericDAO" import {NewLogger} from "../../../logger" -import {LokiProxyCollection} from "./LokiCollection" import {getMicrosecondsTime} from "../../../../ProcessCpuProfiler" +import {LokiCollectionManager} from "./LokiCollectionManager" const logger = NewLogger() @@ -11,32 +10,7 @@ export interface IndexData { writtenOn: number } -export abstract class LokiIndex<T extends IndexData> implements GenericDAO<T> { - - protected collection:LokiCollection<T> - protected collectionIsInitialized: Promise<void> - private resolveCollection: () => void - - public constructor( - protected loki:any, - protected collectionName:'iindex'|'mindex'|'cindex'|'sindex'|'bindex'|'blockchain'|'txs', - protected indices: (keyof T)[]) { - this.collectionIsInitialized = new Promise<void>(res => this.resolveCollection = res) - } - - public triggerInit() { - const coll = this.loki.addCollection(this.collectionName, { - indices: this.indices, - disableChangesApi: false - }) - this.collection = new LokiProxyCollection(coll, this.collectionName) - this.resolveCollection() - } - - async init(): Promise<void> { - await this.collectionIsInitialized - logger.info('Collection %s ready', this.collectionName) - } +export abstract class LokiIndex<T extends IndexData> extends LokiCollectionManager<T> implements GenericDAO<T> { cleanCache(): void { } diff --git a/app/lib/dal/indexDAL/loki/LokiWallet.ts b/app/lib/dal/indexDAL/loki/LokiWallet.ts new file mode 100644 index 000000000..cd0e3c7a3 --- /dev/null +++ b/app/lib/dal/indexDAL/loki/LokiWallet.ts @@ -0,0 +1,39 @@ +import {WalletDAO} from "../abstract/WalletDAO" +import {DBWallet} from "../../sqliteDAL/WalletDAL" +import {LokiCollectionManager} from "./LokiCollectionManager" + +export class LokiWallet extends LokiCollectionManager<DBWallet> implements WalletDAO { + + constructor(loki:any) { + super(loki, 'wallet', ['conditions']) + } + + cleanCache(): void { + } + + async getWallet(conditions: string): Promise<DBWallet> { + return this.collection + .find({ conditions })[0] + } + + async insertBatch(records: DBWallet[]): Promise<void> { + for (const w of records) { + this.collection.insert(w) + } + } + + async saveWallet(wallet: DBWallet): Promise<DBWallet> { + let updated = false + this.collection + .chain() + .find({ conditions: wallet.conditions }) + .update(w => { + w.balance = wallet.balance + updated = true + }) + if (!updated) { + await this.insertBatch([wallet]) + } + return wallet + } +} \ No newline at end of file diff --git a/app/lib/system/directory.ts b/app/lib/system/directory.ts index 931d98a9b..8254b4299 100644 --- a/app/lib/system/directory.ts +++ b/app/lib/system/directory.ts @@ -41,6 +41,7 @@ export interface FileSystem { fsWrite(file:string, content:string): Promise<void> fsMakeDirectory(dir:string): Promise<void> fsRemoveTree(dir:string): Promise<void> + fsAppend(file: string, content: string): Promise<void> } class QioFileSystem implements FileSystem { @@ -67,6 +68,10 @@ class QioFileSystem implements FileSystem { return this.qio.write(file, content) } + fsAppend(file: string, content: string): Promise<void> { + return this.qio.append(file, content) + } + fsMakeDirectory(dir: string): Promise<void> { return this.qio.makeTree(dir) } diff --git a/test/integration/http_api.js b/test/integration/http_api.js index 22ee16aca..d793281ec 100644 --- a/test/integration/http_api.js +++ b/test/integration/http_api.js @@ -309,19 +309,19 @@ describe("HTTP API", function() { const p5 = new Promise(res => resolve5 = res) const p6 = new Promise(res => resolve6 = res) server.addEndpointsDefinitions(() => Promise.resolve("BASIC_MERKLED_API localhost 7777")) - const p1 = yield server.PeeringService.generateSelfPeer({ - currency: server.conf.currency - }, 0) client.on('message', function message(data) { const peer = JSON.parse(data); if (peer.block.match(/2-/)) { server2.PeeringService.generateSelfPeer(server.conf) return resolve5(peer) } - if (peer.block.match(/1-/) && peer.pubkey === 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') { + if ((peer.block.match(/1-/) || peer.block.match(/3-/)) && peer.pubkey === 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') { return resolve6(peer) } }) + const p1 = yield server.PeeringService.generateSelfPeer({ + currency: server.conf.currency + }, 0) yield server2.writeRawPeer(PeerDTO.fromJSONObject(p1).getRawSigned()) const b5 = yield p5 should(b5).have.property('version', 10) -- GitLab