diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts index ea94bc43110a945c4e87a0ffe1ee1b913b1e3acb..8902b070772281337c9554f8069ce7d3395b2f6d 100644 --- a/app/lib/dal/fileDAL.ts +++ b/app/lib/dal/fileDAL.ts @@ -27,7 +27,7 @@ import { IndexEntry, SindexEntry } from "../indexer" -import {DBPeer, PeerDAL} from "./sqliteDAL/PeerDAL" +import {DBPeer} from "./sqliteDAL/PeerDAL" import {TransactionDTO} from "../dto/TransactionDTO" import {CertDAL, DBCert} from "./sqliteDAL/CertDAL" import {DBWallet} from "./sqliteDAL/WalletDAL" @@ -64,6 +64,8 @@ import {TxsDAO} from "./indexDAL/abstract/TxsDAO" import {LokiJsDriver} from "./drivers/LokiJsDriver" import {WalletDAO} from "./indexDAL/abstract/WalletDAO" import {LokiWallet} from "./indexDAL/loki/LokiWallet" +import {PeerDAO} from "./indexDAL/abstract/PeerDAO" +import {LokiPeer} from "./indexDAL/loki/LokiPeer" const fs = require('fs') const path = require('path') @@ -100,7 +102,6 @@ export class FileDAL { // SQLite DALs metaDAL:MetaDAL - peerDAL:PeerDAL idtyDAL:IdentityDAL certDAL:CertDAL msDAL:MembershipDAL @@ -108,6 +109,7 @@ export class FileDAL { // New DAO entities blockDAL:BlockchainDAO txsDAL:TxsDAO + peerDAL:PeerDAO walletDAL:WalletDAO bindexDAL:BIndexDAO mindexDAL:MIndexDAO @@ -130,7 +132,6 @@ export class FileDAL { this.powDAL = new PowDAL(this.rootPath, params.fs) this.confDAL = new ConfDAL(this.rootPath, params.fs) this.metaDAL = new (require('./sqliteDAL/MetaDAL').MetaDAL)(this.sqliteDriver); - this.peerDAL = new (require('./sqliteDAL/PeerDAL').PeerDAL)(this.sqliteDriver); this.fakeBlockDAL = new (require('./sqliteDAL/BlockDAL').BlockDAL)(this.sqliteDriver); this.blockDAL = new LokiBlockchain(this.loki.getLokiInstance()) this.fakeTxsDAL = new (require('./sqliteDAL/TxsDAL').TxsDAL)(this.sqliteDriver); @@ -139,6 +140,7 @@ export class FileDAL { this.idtyDAL = new (require('./sqliteDAL/IdentityDAL').IdentityDAL)(this.sqliteDriver); this.certDAL = new (require('./sqliteDAL/CertDAL').CertDAL)(this.sqliteDriver); this.msDAL = new (require('./sqliteDAL/MembershipDAL').MembershipDAL)(this.sqliteDriver); + this.peerDAL = new LokiPeer(this.loki.getLokiInstance()) this.walletDAL = new LokiWallet(this.loki.getLokiInstance()) this.bindexDAL = new LokiBIndex(this.loki.getLokiInstance()) this.mindexDAL = new LokiMIndex(this.loki.getLokiInstance()) @@ -174,6 +176,7 @@ export class FileDAL { const dals = [ this.blockDAL, this.txsDAL, + this.peerDAL, this.walletDAL, this.bindexDAL, this.mindexDAL, diff --git a/app/lib/dal/indexDAL/abstract/PeerDAO.ts b/app/lib/dal/indexDAL/abstract/PeerDAO.ts new file mode 100644 index 0000000000000000000000000000000000000000..9e3423b43b5bf43a925626193b99904223e5b99c --- /dev/null +++ b/app/lib/dal/indexDAL/abstract/PeerDAO.ts @@ -0,0 +1,61 @@ +import {Initiable} from "../../sqliteDAL/Initiable" +import {DBPeer} from "../../sqliteDAL/PeerDAL" + +export interface PeerDAO extends Initiable { + + /** + * Trigger the initialization of the DAO. Called when the underlying DB is ready. + */ + triggerInit(): void + + listAll(): Promise<DBPeer[]> + + withUPStatus(): Promise<DBPeer[]> + + /** + * Saves a wallet. + * @param {DBPeer} peer + * @returns {Promise<DBPeer>} + */ + savePeer(peer:DBPeer): Promise<DBPeer> + + /** + * Find a wallet based on conditions. + * @param {string} pubkey + * @returns {Promise<DBPeer>} + */ + getPeer(pubkey:string): Promise<DBPeer> + + /** + * Find all peers with at least one endpoint matching given parameter. + * @param {string} ep + * @returns {Promise<DBPeer[]>} + */ + getPeersWithEndpointsLike(ep:string): Promise<DBPeer[]> + + /** + * Make a batch insert. + * @param records The records to insert as a batch. + */ + insertBatch(records:DBPeer[]): Promise<void> + + /** + * Remove a peer by its pubkey. + * @param {string} pubkey + * @returns {Promise<void>} + */ + removePeerByPubkey(pubkey:string): Promise<void> + + /** + * Remove peers that were set down before a certain datetime. + * @param {number} thresholdTime + * @returns {Promise<void>} + */ + removePeersDownBefore(thresholdTime:number): Promise<void> + + /** + * Remove all the peers. + * @returns {Promise<void>} + */ + removeAll(): Promise<void> +} diff --git a/app/lib/dal/indexDAL/loki/LokiCollectionManager.ts b/app/lib/dal/indexDAL/loki/LokiCollectionManager.ts index ad040eed84ed38a2f75235a95a7d046f0cfc2327..f922c0fcfacf170a3685d5219cc4d5686813dfbc 100644 --- a/app/lib/dal/indexDAL/loki/LokiCollectionManager.ts +++ b/app/lib/dal/indexDAL/loki/LokiCollectionManager.ts @@ -12,7 +12,7 @@ export abstract class LokiCollectionManager<T> { public constructor( protected loki:any, - protected collectionName:'iindex'|'mindex'|'cindex'|'sindex'|'bindex'|'blockchain'|'txs'|'wallet', + protected collectionName:'iindex'|'mindex'|'cindex'|'sindex'|'bindex'|'blockchain'|'txs'|'wallet'|'peer', protected indices: (keyof T)[]) { this.collectionIsInitialized = new Promise<void>(res => this.resolveCollection = res) } diff --git a/app/lib/dal/indexDAL/loki/LokiPeer.ts b/app/lib/dal/indexDAL/loki/LokiPeer.ts new file mode 100644 index 0000000000000000000000000000000000000000..2614d0ac737ba5295893ec92fbe1b0d86525981b --- /dev/null +++ b/app/lib/dal/indexDAL/loki/LokiPeer.ts @@ -0,0 +1,94 @@ +import {LokiCollectionManager} from "./LokiCollectionManager" +import {PeerDAO} from "../abstract/PeerDAO" +import {DBPeer} from "../../sqliteDAL/PeerDAL" + +export class LokiPeer extends LokiCollectionManager<DBPeer> implements PeerDAO { + + constructor(loki:any) { + super(loki, 'peer', ['pubkey']) + } + + cleanCache(): void { + } + + async listAll(): Promise<DBPeer[]> { + return this.collection + .find({}) + } + + async withUPStatus(): Promise<DBPeer[]> { + return this.collection + .find({ status: 'UP' }) + } + + async getPeer(pubkey: string): Promise<DBPeer> { + return this.collection + .find({ pubkey })[0] + } + + async insertBatch(peers: DBPeer[]): Promise<void> { + for (const p of peers) { + this.collection.insert(p) + } + } + + async savePeer(peer: DBPeer): Promise<DBPeer> { + let updated = false + this.collection + .chain() + .find({ pubkey: peer.pubkey }) + .update(p => { + p.version = peer.version + p.currency = peer.currency + p.status = peer.status + p.statusTS = peer.statusTS + p.hash = peer.hash + p.first_down = peer.first_down + p.last_try = peer.last_try + p.pubkey = peer.pubkey + p.block = peer.block + p.signature = peer.signature + p.endpoints = peer.endpoints + p.raw = peer.raw + updated = true + }) + if (!updated) { + await this.insertBatch([peer]) + } + return peer + } + + async removePeerByPubkey(pubkey:string): Promise<void> { + this.collection + .chain() + .find({ pubkey }) + .remove() + } + + async removePeersDownBefore(thresholdTime:number): Promise<void> { + this.collection + .chain() + .find({ + $and: [ + { first_down: { $lt: thresholdTime } }, + { first_down: { $gt: 0 } }, + ] + }) + .remove() + } + + async removeAll(): Promise<void> { + this.collection + .chain() + .find({}) + .remove() + } + + async getPeersWithEndpointsLike(ep: string): Promise<DBPeer[]> { + return this.collection + .chain() + .find({}) + .where(p => p.endpoints.filter(ep => ep.indexOf(ep) !== -1).length > 0) + .data() + } +} \ No newline at end of file diff --git a/app/lib/dal/indexDAL/loki/LokiTypes.ts b/app/lib/dal/indexDAL/loki/LokiTypes.ts index f61fea8d9b54e72385871cc75187de9b5fb26793..26150bbe1254d77b0730e7a429b9894c329723d7 100644 --- a/app/lib/dal/indexDAL/loki/LokiTypes.ts +++ b/app/lib/dal/indexDAL/loki/LokiTypes.ts @@ -24,6 +24,8 @@ export interface LokiChainableFind<T> { update(cb:(t:T) => void): LokiChainableFind<T> + where(filter:(t:T) => boolean): LokiChainableFind<T> + remove(): LokiChainableFind<T> compoundsort(sort:((string|((string|boolean)[]))[])): LokiChainableFind<T> diff --git a/app/modules/crawler/index.ts b/app/modules/crawler/index.ts index 653466f47b4476b51f104a78159ff94efeccd78c..ddf6fc1fe79a43f8ddb624ea2ed107a561308cef 100644 --- a/app/modules/crawler/index.ts +++ b/app/modules/crawler/index.ts @@ -159,7 +159,7 @@ export const CrawlerDependency = { const toPort = params[4]; const logger = server.logger; try { - const peers = fromHost && fromPort ? [{ endpoints: [['BASIC_MERKLED_API', fromHost, fromPort].join(' ')] }] : await server.dal.peerDAL.query('SELECT * FROM peer WHERE status = ?', ['UP']) + const peers = fromHost && fromPort ? [{ endpoints: [['BASIC_MERKLED_API', fromHost, fromPort].join(' ')] }] : await server.dal.peerDAL.withUPStatus() // Memberships for (const p of peers) { const peer = PeerDTO.fromJSONObject(p) @@ -312,7 +312,7 @@ export const CrawlerDependency = { const fromPort = params[3] const logger = server.logger; try { - const peers = fromHost && fromPort ? [{ endpoints: [['BASIC_MERKLED_API', fromHost, fromPort].join(' ')] }] : await server.dal.peerDAL.query('SELECT * FROM peer WHERE status = ?', ['UP']) + const peers = fromHost && fromPort ? [{ endpoints: [['BASIC_MERKLED_API', fromHost, fromPort].join(' ')] }] : await server.dal.peerDAL.withUPStatus() // Memberships for (const p of peers) { const peer = PeerDTO.fromJSONObject(p) diff --git a/app/modules/crawler/lib/garbager.ts b/app/modules/crawler/lib/garbager.ts index f748a63a66320d97dfb3e6f5729efa73bd56b74d..852d33187eb872fe9b768b9c50c12690dae73f09 100644 --- a/app/modules/crawler/lib/garbager.ts +++ b/app/modules/crawler/lib/garbager.ts @@ -16,5 +16,5 @@ import {Server} from "../../../../server" export const cleanLongDownPeers = async (server:Server, now:number) => { const first_down_limit = now - CrawlerConstants.PEER_LONG_DOWN * 1000; - await server.dal.peerDAL.query('DELETE FROM peer WHERE first_down < ' + first_down_limit) + await server.dal.peerDAL.removePeersDownBefore(first_down_limit) } diff --git a/app/modules/crawler/lib/sync.ts b/app/modules/crawler/lib/sync.ts index 424b89010c710d2fbdcbc32579dfa4b5513f9195..239be33cd32106dc25ce5920712ecb329e4ae5ef 100644 --- a/app/modules/crawler/lib/sync.ts +++ b/app/modules/crawler/lib/sync.ts @@ -328,6 +328,9 @@ export class Synchroniser extends stream.Duplex { //======= await this.syncPeers(nopeers, fullSync, this.host, this.port, to) + // Trim the loki data + await this.server.dal.loki.flushAndTrimData() + this.watcher.end(); this.push({ sync: true }); this.logger.info('Sync finished.'); diff --git a/test/fast/modules/crawler/peers_garbaging.js b/test/fast/modules/crawler/peers_garbaging.js index 4d44184db5b6171a2dff1dbb7ba8aaddb8cd9489..9e29d392ac7c040f3c4671be247f412900ad8f64 100644 --- a/test/fast/modules/crawler/peers_garbaging.js +++ b/test/fast/modules/crawler/peers_garbaging.js @@ -38,10 +38,10 @@ describe('Peers garbaging', () => { yield server.dal.peerDAL.savePeer({ pubkey: 'B', version: 1, currency: 'c', first_down: 1484827199999, statusTS: 1485000000000, block: '2393-H' }); yield server.dal.peerDAL.savePeer({ pubkey: 'C', version: 1, currency: 'c', first_down: 1484827200000, statusTS: 1485000000000, block: '2393-H' }); yield server.dal.peerDAL.savePeer({ pubkey: 'D', version: 1, currency: 'c', first_down: 1484820000000, statusTS: 1485000000000, block: '2393-H' }); - (yield server.dal.peerDAL.sqlListAll()).should.have.length(4); + (yield server.dal.peerDAL.listAll()).should.have.length(4); const now = 1485000000000; yield garbager.cleanLongDownPeers(server, now); - (yield server.dal.peerDAL.sqlListAll()).should.have.length(2); + (yield server.dal.peerDAL.listAll()).should.have.length(2); }) }] } diff --git a/test/integration/v1.0-modules-api.js b/test/integration/v1.0-modules-api.js index 9c6b3b76bbde6431aef9c29f4bb3129d680273b8..1cee0379a3398af691f82645dab85ed04b423797 100644 --- a/test/integration/v1.0-modules-api.js +++ b/test/integration/v1.0-modules-api.js @@ -28,7 +28,7 @@ describe("v1.0 Module API", () => { it('should be able to execute `hello` command with quickRun', () => co(function*() { duniter.statics.setOnRunDone(() => { /* Do not exit the process */ }) const absolutePath = path.join(__dirname, './scenarios/hello-plugin.js') - process.argv = ['', absolutePath, 'hello-world'] + process.argv = ['', absolutePath, 'hello-world', '--memory'] const res = yield duniter.statics.quickRun(absolutePath) res.should.equal('Hello world! from within Duniter.') })) @@ -111,7 +111,7 @@ describe("v1.0 Module API", () => { cli: [{ name: 'gimme-conf', desc: 'Returns the configuration object.', - onDatabaseExecute: (server, conf, program, params, startServices, stopServices) => co(function*() { + onConfiguredExecute: (server, conf, program, params, startServices, stopServices) => co(function*() { // Gimme the conf! return conf; })