diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts index faa1bfd1d006b8e540eeee04f785c598eba14aa8..df7cd4eeb7c160818cc67b938fc8e62f85e97d8e 100644 --- a/app/lib/dal/fileDAL.ts +++ b/app/lib/dal/fileDAL.ts @@ -71,7 +71,6 @@ import {HttpSource, HttpUD} from "../../modules/bma/lib/dtos" import {GenericDAO} from "./indexDAL/abstract/GenericDAO" import {LokiDAO} from "./indexDAL/loki/LokiDAO" import {MonitorExecutionTime} from "../debug/MonitorExecutionTime" -import {SqliteMIndex} from "./indexDAL/sqlite/SqliteMIndex" import {LevelDBDividend} from "./indexDAL/leveldb/LevelDBDividend" import {LevelDBBindex} from "./indexDAL/leveldb/LevelDBBindex" @@ -83,6 +82,7 @@ import {SqlitePeers} from "./indexDAL/sqlite/SqlitePeers" import {LevelDBWallet} from "./indexDAL/leveldb/LevelDBWallet" import {LevelDBCindex} from "./indexDAL/leveldb/LevelDBCindex" import {LevelDBIindex} from "./indexDAL/leveldb/LevelDBIindex" +import {LevelDBMindex} from "./indexDAL/leveldb/LevelDBMindex" const readline = require('readline') const indexer = require('../indexer').Indexer @@ -165,7 +165,7 @@ export class FileDAL { this.peerDAL = new SqlitePeers(getSqliteDB) this.walletDAL = new LevelDBWallet(getLevelDB) this.bindexDAL = new LevelDBBindex(getLevelDB) - this.mindexDAL = new SqliteMIndex(getSqliteDB) + this.mindexDAL = new LevelDBMindex(getLevelDB) this.iindexDAL = new LevelDBIindex(getLevelDB) this.sindexDAL = new LevelDBSindex(getLevelDB) this.cindexDAL = new LevelDBCindex(getLevelDB) @@ -761,8 +761,8 @@ export class FileDAL { i.pubkey = i.pub return i })); - return await Promise.all<DBIdentity>(found.map(async (f:any) => { - const ms = await this.mindexDAL.getReducedMSForImplicitRevocation(f.pub); + return await Promise.all<DBIdentity>(found.map(async (f) => { + const ms = await this.mindexDAL.getReducedMSForImplicitRevocation(f.pubkey); if (ms) { f.revoked_on = null; if (ms.revoked_on) { @@ -770,7 +770,7 @@ export class FileDAL { f.revoked_on = blockOfRevocation.medianTime } f.revoked = !!f.revoked_on; - f.revocation_sig = ms.revocation || null; + f.revocation_sig = f.revocation_sig || ms.revocation || null; } return f; })) diff --git a/app/lib/dal/indexDAL/abstract/MIndexDAO.ts b/app/lib/dal/indexDAL/abstract/MIndexDAO.ts index e578ee21f451c9c75eb2ccd1a17f0a752c83ab9a..2f79c3850fb6a91d028fa1124a75305ff89b1881 100644 --- a/app/lib/dal/indexDAL/abstract/MIndexDAO.ts +++ b/app/lib/dal/indexDAL/abstract/MIndexDAO.ts @@ -9,9 +9,9 @@ export interface MIndexDAO extends ReduceableDAO<MindexEntry> { findByPubAndChainableOnGt(pub:string, medianTime:number): Promise<MindexEntry[]> - findRevokesOnLteAndRevokedOnIsNull(medianTime:number): Promise<MindexEntry[]> + findRevokesOnLteAndRevokedOnIsNull(medianTime:number): Promise<string[]> - findExpiresOnLteAndRevokesOnGt(medianTime:number): Promise<MindexEntry[]> + findExpiresOnLteAndRevokesOnGt(medianTime:number): Promise<string[]> getReducedMS(pub:string): Promise<FullMindexEntry|null> diff --git a/app/lib/dal/indexDAL/leveldb/LevelDBMindex.ts b/app/lib/dal/indexDAL/leveldb/LevelDBMindex.ts new file mode 100644 index 0000000000000000000000000000000000000000..6ce9d3e8057ea8b2eca0c604424b7c72d2d64aa8 --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/LevelDBMindex.ts @@ -0,0 +1,168 @@ +import {MonitorExecutionTime} from "../../../debug/MonitorExecutionTime" +import {FullMindexEntry, MindexEntry, reduce, reduceForDBTrimming, reduceOrNull} from "../../../indexer" +import {LevelUp} from 'levelup' +import {LevelDBTable} from "./LevelDBTable" +import {Underscore} from "../../../common-libs/underscore" +import {pint} from "../../../common-libs/pint" +import {reduceConcat, reduceGroupBy} from "../../../common-libs/reduce" +import {LevelDBWrittenOnIndexer} from "./indexers/LevelDBWrittenOnIndexer" +import {MIndexDAO} from "../abstract/MIndexDAO" +import {LevelMIndexRevokesOnIndexer} from "./indexers/LevelMIndexRevokesOnIndexer" +import {LevelMIndexExpiresOnIndexer} from "./indexers/LevelMIndexExpiresOnIndexer" + +export class LevelDBMindex extends LevelDBTable<MindexEntry[]> implements MIndexDAO { + + private indexForExpiresOn: LevelMIndexExpiresOnIndexer + private indexForRevokesOn: LevelMIndexRevokesOnIndexer + private indexForWrittenOn: LevelDBWrittenOnIndexer<MindexEntry> + + constructor(protected getLevelDB: (dbName: string)=> Promise<LevelUp>) { + super('level_mindex', getLevelDB) + } + + /** + * TECHNICAL + */ + + async init(): Promise<void> { + await super.init() + this.indexForExpiresOn = new LevelMIndexExpiresOnIndexer('level_mindex/expiresOn', this.getLevelDB) + this.indexForRevokesOn = new LevelMIndexRevokesOnIndexer('level_mindex/revokesOn', this.getLevelDB) + this.indexForWrittenOn = new LevelDBWrittenOnIndexer('level_mindex/writtenOn', this.getLevelDB, i => i.pub) + await this.indexForExpiresOn.init() + await this.indexForRevokesOn.init() + await this.indexForWrittenOn.init() + } + + async close(): Promise<void> { + await super.close() + await this.indexForExpiresOn.close() + await this.indexForRevokesOn.close() + await this.indexForWrittenOn.close() + } + + /** + * INSERT + */ + + @MonitorExecutionTime() + async insert(record: MindexEntry): Promise<void> { + await this.insertBatch([record]) + } + + @MonitorExecutionTime() + async insertBatch(records: MindexEntry[]): Promise<void> { + // Database insertion + let prevRecords: MindexEntry[] = [] + const recordsByPub = reduceGroupBy(records, 'pub') + await Promise.all(Underscore.keys(recordsByPub).map(async pub => { + const existing = (await this.getOrNull(pub)) || [] + prevRecords = prevRecords.concat(existing) + await this.put(pub, existing.concat(recordsByPub[pub])) + })) + // Indexation + await this.indexForExpiresOn.onInsert(records, prevRecords) + await this.indexForRevokesOn.onInsert(records, prevRecords) + await this.indexForWrittenOn.onInsert(records) + } + + + /** + * Reduceable DAO + */ + + async trimRecords(belowNumber: number): Promise<void> { + // Trim writtenOn: we remove from the index the blocks below `belowNumber`, and keep track of the deleted values + const pubkeys: string[] = Underscore.uniq((await this.indexForWrittenOn.deleteBelow(belowNumber))) + // For each entry, we trim the records of our INDEX + await Promise.all(pubkeys.map(async pub => { + const oldEntries = await this.get(pub) + const newEntries = reduceForDBTrimming(oldEntries, belowNumber) + await this.put(pub, newEntries) + })) + await this.indexForExpiresOn.onTrimming(belowNumber) + await this.indexForRevokesOn.onTrimming(belowNumber) + } + + /** + * Generic DAO + */ + + async findRawWithOrder(criterion: { pub?: string }, sort: (string | (string | boolean)[])[]): Promise<MindexEntry[]> { + const rows: MindexEntry[] = (await this.findAllValues()).reduce(reduceConcat, []) + return Underscore.sortBy(rows, r => `${String(r.writtenOn).padStart(10, '0')}-${r.pub}`) + } + + async getWrittenOn(blockstamp: string): Promise<MindexEntry[]> { + const ids = (await this.indexForWrittenOn.getWrittenOnKeys(pint(blockstamp))) || [] + return (await Promise.all(ids.map(id => this.get(id)))).reduce(reduceConcat, []).filter(e => e.written_on === blockstamp) + } + + async removeBlock(blockstamp: string): Promise<void> { + // Trim writtenOn: we remove from the index the blocks below `belowNumber`, and keep track of the deleted values + let newStateRecords: MindexEntry[] = [] + const writteOn = pint(blockstamp) + const pubkeys: string[] = Underscore.uniq((await this.indexForWrittenOn.deleteAt(writteOn))) + let removedRecords: MindexEntry[] = [] + // For each entry, we trim the records of our INDEX + await Promise.all(pubkeys.map(async pub => { + const records = await this.get(pub) + const keptRecords = records.filter(e => e.written_on !== blockstamp) + removedRecords = removedRecords.concat(records.filter(e => e.written_on === blockstamp)) + newStateRecords = newStateRecords.concat(keptRecords) + await this.put(pub, keptRecords) + })) + // Update indexes + await this.indexForExpiresOn.onRemove(removedRecords, newStateRecords) + await this.indexForRevokesOn.onRemove(removedRecords, newStateRecords) + } + + //------------- DAO QUERIES -------------- + + async findByPubAndChainableOnGt(pub: string, medianTime: number): Promise<MindexEntry[]> { + return (await this.reducable(pub)).filter(e => e.chainable_on && e.chainable_on > medianTime) + } + + async findExpiresOnLteAndRevokesOnGt(medianTime: number): Promise<string[]> { + return this.indexForExpiresOn.findExpiresOnLte(medianTime) + } + + async findPubkeysThatShouldExpire(medianTime: number): Promise<{ pub: string; created_on: string }[]> { + const results: { pub: string; created_on: string }[] = [] + const pubkeys = await this.findExpiresOnLteAndRevokesOnGt(medianTime) + for (const pub of pubkeys) { + const MS = await this.getReducedMS(pub) as FullMindexEntry // We are sure because `memberships` already comes from the MINDEX + const hasRenewedSince = MS.expires_on > medianTime; + if (!MS.expired_on && !hasRenewedSince) { + results.push({ + pub: MS.pub, + created_on: MS.created_on, + }) + } + } + return results + } + + async findRevokesOnLteAndRevokedOnIsNull(medianTime: number): Promise<string[]> { + return this.indexForRevokesOn.findRevokesOnLte(medianTime) + } + + async getReducedMS(pub: string): Promise<FullMindexEntry | null> { + const reducable = await this.reducable(pub) + return reduceOrNull(reducable) as FullMindexEntry + } + + async getReducedMSForImplicitRevocation(pub: string): Promise<FullMindexEntry | null> { + return this.getReducedMS(pub) + } + + async getRevokedPubkeys(): Promise<string[]> { + return this.findWhereTransform(v => !!reduce(v).revoked_on, kv => kv.key) + } + + async reducable(pub: string): Promise<MindexEntry[]> { + return (await this.getOrNull(pub)) || [] + } + + +} diff --git a/app/lib/dal/indexDAL/leveldb/LevelDBTable.ts b/app/lib/dal/indexDAL/leveldb/LevelDBTable.ts index 8ad9ff357f9bd0f609e8e3d890e4b62ea82c6f26..6bba36ffe6e43178c75151358a3e0b24ca6508f9 100644 --- a/app/lib/dal/indexDAL/leveldb/LevelDBTable.ts +++ b/app/lib/dal/indexDAL/leveldb/LevelDBTable.ts @@ -33,7 +33,6 @@ export class LevelDBTable<T> { } public async getOrNull(k: string): Promise<T|null> { - try { const data = await this.db.get(k) return JSON.parse(String(data)) as any diff --git a/app/lib/dal/indexDAL/leveldb/generic/LevelDBDataIndex.ts b/app/lib/dal/indexDAL/leveldb/generic/LevelDBDataIndex.ts index da18217915e5fc29dcffa5d6bb1c4d05e41da7a7..5763232221d496007dc81e8926b58ddbebfc455e 100644 --- a/app/lib/dal/indexDAL/leveldb/generic/LevelDBDataIndex.ts +++ b/app/lib/dal/indexDAL/leveldb/generic/LevelDBDataIndex.ts @@ -2,9 +2,9 @@ import {LevelDBTable} from "../LevelDBTable" export abstract class LevelDBDataIndex<T, R> extends LevelDBTable<T> { - public abstract onInsert(records: R[]): Promise<void> + public abstract onInsert(records: R[], newState: R[]): Promise<void> - public abstract onRemove(records: R[]): Promise<void> + public abstract onRemove(records: R[], newState: R[]): Promise<void> public async onTrimming(belowNumber: number): Promise<void> {} } diff --git a/app/lib/dal/indexDAL/leveldb/indexers/LevelMIndexExpiresOnIndexer.ts b/app/lib/dal/indexDAL/leveldb/indexers/LevelMIndexExpiresOnIndexer.ts new file mode 100644 index 0000000000000000000000000000000000000000..483a8595155c0de2aa35fde3288e4e952f4ccd7e --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/indexers/LevelMIndexExpiresOnIndexer.ts @@ -0,0 +1,108 @@ +import {LevelDBDataIndex} from "../generic/LevelDBDataIndex" +import {MindexEntry, reduce} from "../../../../indexer" +import {reduceConcat, reduceGroupBy} from "../../../../common-libs/reduce" +import {pint} from "../../../../common-libs/pint" +import {Underscore} from "../../../../common-libs/underscore" + +export type Pubkey = string + +export class LevelMIndexExpiresOnIndexer extends LevelDBDataIndex<Pubkey[], MindexEntry> { + + async onInsert(records: MindexEntry[], prevState: MindexEntry[]): Promise<void> { + + const prevStateByPub = reduceGroupBy(prevState, 'pub') + + // Case 1: expires_on change (when MS JOIN|RENEW) + const byExpiresOn = reduceGroupBy(records.filter(e => e.expires_on), "expires_on") + await Promise.all(Underscore.keys(byExpiresOn) + .map(async expiresOn => { + const pubkeys = byExpiresOn[expiresOn].map(e => e.pub) + // 1. If the key had a previous revokes_on, we remove it + const reducedWhosExpiresOnChanges = pubkeys.filter(p => prevStateByPub[p]) + .map(p => reduce(prevStateByPub[p])) + .filter(r => r.expires_on && !r.expired_on) + for (const reduced of reducedWhosExpiresOnChanges) { + await this.removeAllKeysFromExpiresOn(reduced.expires_on as number, [reduced.pub]) + } + // 2. We put the new value + await this.addAllKeysToExpiresOn(pint(expiresOn), byExpiresOn[expiresOn].map(e => e.pub)) + }) + ) + // Case 2: expiration occurs + const pubkeysToexpire = Underscore.uniq(records.filter(e => e.expired_on).map(r => r.pub)) + const prevStateFM = Underscore.values(prevStateByPub).map(reduce) + const byExpiresOnPrevState = reduceGroupBy(prevStateFM.filter(r => pubkeysToexpire.includes(r.pub)), 'expires_on') + await Promise.all(Underscore.keys(byExpiresOnPrevState) + .map(async expiresOn => this.removeAllKeysFromExpiresOn(pint(expiresOn), byExpiresOnPrevState[expiresOn].map(e => e.pub))) + ) + } + + async onRemove(records: MindexEntry[], newState: MindexEntry[]): Promise<void> { + + const newStateByPub = reduceGroupBy(newState, 'pub') + + // Case 1: expires_on change REVERT + const byExpiresOn = reduceGroupBy(records.filter(e => e.expires_on), "expires_on") + await Promise.all(Underscore.keys(byExpiresOn) + .map(async expiresOn => { + const pubkeys = byExpiresOn[expiresOn].map(e => e.pub) + // 1. Remove the existing value + await this.removeAllKeysFromExpiresOn(pint(expiresOn), pubkeys) + // 2. Put back the old one if it exists + const reduced = pubkeys + .filter(p => newStateByPub[p]) + .map(p => newStateByPub[p]) + .map(reduce) + .filter(r => r.expires_on) + for (const r of reduced) { + await this.addAllKeysToExpiresOn(r.expires_on as number, [r.pub]) + + } + }) + ) + // Case 2: expiration REVERT + const values: MindexEntry[] = Underscore.values(newStateByPub).map(entries => reduce(entries)) + const byExpiredOn = reduceGroupBy(values, "expired_on") + await Promise.all(Underscore.keys(byExpiredOn) + .map(async expiresOn => this.addAllKeysToExpiresOn(pint(expiresOn), byExpiredOn[expiresOn].map(e => e.pub))) + ) + } + + async addAllKeysToExpiresOn(expiresOn: number, pubkeys: Pubkey[]): Promise<void> { + const key = LevelMIndexExpiresOnIndexer.trimKey(expiresOn) + let entry = await this.getOrNull(key) + if (!entry) { + entry = [] + } + for (const pub of pubkeys) { + entry.push(pub) + } + await this.put(key, entry) + } + + async removeAllKeysFromExpiresOn(expiresOn: number, pubkeys: Pubkey[]): Promise<void> { + // We remove the "expires_on" indexed values + const key = LevelMIndexExpiresOnIndexer.trimKey(expiresOn) + const entry = await this.get(key) + for (const pub of pubkeys) { + if (entry.includes(pub)) { + entry.splice(entry.indexOf(pub), 1) + } + } + if (entry.length) { + // Some expirations left + await this.put(key, entry) // TODO: test this, can occur, probably not covered + } else { + // No more expirations left + await this.del(key) + } + } + + async findExpiresOnLte(medianTime: number) { + return (await this.findAllValues({ lte: LevelMIndexExpiresOnIndexer.trimKey(medianTime) })).reduce(reduceConcat, []) + } + + private static trimKey(expiresOn: number) { + return String(expiresOn).padStart(10, '0') + } +} diff --git a/app/lib/dal/indexDAL/leveldb/indexers/LevelMIndexRevokesOnIndexer.ts b/app/lib/dal/indexDAL/leveldb/indexers/LevelMIndexRevokesOnIndexer.ts new file mode 100644 index 0000000000000000000000000000000000000000..5a62f742eeb485f71a8635c78a73bf4faa1d80d2 --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/indexers/LevelMIndexRevokesOnIndexer.ts @@ -0,0 +1,108 @@ +import {LevelDBDataIndex} from "../generic/LevelDBDataIndex" +import {MindexEntry, reduce} from "../../../../indexer" +import {reduceConcat, reduceGroupBy} from "../../../../common-libs/reduce" +import {Underscore} from "../../../../common-libs/underscore" +import {pint} from "../../../../common-libs/pint" + +export type Pubkey = string + +export class LevelMIndexRevokesOnIndexer extends LevelDBDataIndex<Pubkey[], MindexEntry> { + + async onInsert(records: MindexEntry[], prevState: MindexEntry[]): Promise<void> { + + const prevStateByPub = reduceGroupBy(prevState, 'pub') + + // Case 1: revokes_on change (when MS JOIN|RENEW) + const byRevokesOn = reduceGroupBy(records.filter(e => e.revokes_on), "revokes_on") + await Promise.all(Underscore.keys(byRevokesOn) + .map(async revokesOn => { + const pubkeys = byRevokesOn[revokesOn].map(e => e.pub) + // 1. If the key had a previous revokes_on, we remove it + const reducedWhosRevokesOnChanges = pubkeys.filter(p => prevStateByPub[p]) + .map(p => reduce(prevStateByPub[p])) + .filter(r => r.revokes_on && !r.revoked_on) + for (const reduced of reducedWhosRevokesOnChanges) { + await this.removeAllKeysFromRevokesOn(reduced.revokes_on as number, [reduced.pub]) + } + // 2. We put the new value + await this.addAllKeysToRevokesOn(pint(revokesOn), byRevokesOn[revokesOn].map(e => e.pub)) + }) + ) + // Case 2: revocation occurs + const pubkeysToRevoke = Underscore.uniq(records.filter(e => e.revoked_on).map(r => r.pub)) + const prevStateFM = Underscore.values(prevStateByPub).map(reduce) + const byRevokesOnPrevState = reduceGroupBy(prevStateFM.filter(r => pubkeysToRevoke.includes(r.pub)), 'revokes_on') + await Promise.all(Underscore.keys(byRevokesOnPrevState) + .map(async revokesOn => this.removeAllKeysFromRevokesOn(pint(revokesOn), byRevokesOnPrevState[revokesOn].map(e => e.pub))) + ) + } + + async onRemove(records: MindexEntry[], newState: MindexEntry[]): Promise<void> { + + const newStateByPub = reduceGroupBy(newState, 'pub') + + // Case 1: revokes_on change REVERT + const byRevokesOn = reduceGroupBy(records.filter(e => e.revokes_on), "revokes_on") + await Promise.all(Underscore.keys(byRevokesOn) + .map(async revokesOn => { + const pubkeys = byRevokesOn[revokesOn].map(e => e.pub) + // 1. Remove the existing value + await this.removeAllKeysFromRevokesOn(pint(revokesOn), pubkeys) + // 2. Put back the old one if it exists + const reduced = pubkeys + .filter(p => newStateByPub[p]) + .map(p => newStateByPub[p]) + .map(reduce) + .filter(r => r.revokes_on) + for (const r of reduced) { + await this.addAllKeysToRevokesOn(r.revokes_on as number, [r.pub]) + + } + }) + ) + // Case 2: revocation REVERT + const values: MindexEntry[] = Underscore.values(newStateByPub).map(entries => reduce(entries)) + const byExpiredOn = reduceGroupBy(values, "revoked_on") + await Promise.all(Underscore.keys(byExpiredOn) + .map(async revokesOn => this.addAllKeysToRevokesOn(pint(revokesOn), byExpiredOn[revokesOn].map(e => e.pub))) + ) + } + + async addAllKeysToRevokesOn(revokesOn: number, pubkeys: Pubkey[]): Promise<void> { + const key = LevelMIndexRevokesOnIndexer.trimKey(revokesOn) + let entry = await this.getOrNull(key) + if (!entry) { + entry = [] + } + for (const pub of pubkeys) { + entry.push(pub) + } + await this.put(key, entry) + } + + async removeAllKeysFromRevokesOn(revokesOn: number, pubkeys: Pubkey[]): Promise<void> { + // We remove the "revokes_on" indexed values + const key = LevelMIndexRevokesOnIndexer.trimKey(revokesOn) + const entry = await this.get(key) + for (const pub of pubkeys) { + if (entry.includes(pub)) { + entry.splice(entry.indexOf(pub), 1) + } + } + if (entry.length) { + // Some revocations left + await this.put(key, entry) // TODO: test this, can occur, probably not covered + } else { + // No more revocations left + await this.del(key) + } + } + + async findRevokesOnLte(revokesOn: number): Promise<Pubkey[]> { + return (await this.findAllValues({ lte: LevelMIndexRevokesOnIndexer.trimKey(revokesOn) })).reduce(reduceConcat, []) + } + + private static trimKey(revokesOn: number) { + return String(revokesOn).padStart(10, '0') + } +} diff --git a/app/lib/dal/indexDAL/loki/LokiMIndex.ts b/app/lib/dal/indexDAL/loki/LokiMIndex.ts index b2a5a72f7b113427f5cdcce02f2121506aebb11f..bbd189abcf317ccba2082da36d996324995d042c 100644 --- a/app/lib/dal/indexDAL/loki/LokiMIndex.ts +++ b/app/lib/dal/indexDAL/loki/LokiMIndex.ts @@ -1,4 +1,4 @@ -import {FullMindexEntry, Indexer, MindexEntry, reduceBy} from "../../../indexer" +import {FullMindexEntry, Indexer, MindexEntry} from "../../../indexer" import {MIndexDAO} from "../abstract/MIndexDAO" import {LokiPubkeySharingIndex} from "./LokiPubkeySharingIndex" import {MonitorExecutionTime} from "../../../debug/MonitorExecutionTime" @@ -21,7 +21,7 @@ export class LokiMIndex extends LokiPubkeySharingIndex<MindexEntry> implements M } @MonitorExecutionTime() - async findExpiresOnLteAndRevokesOnGt(medianTime: number): Promise<MindexEntry[]> { + async findExpiresOnLteAndRevokesOnGt(medianTime: number): Promise<string[]> { return this.collection .find({ $and: [ @@ -29,10 +29,11 @@ export class LokiMIndex extends LokiPubkeySharingIndex<MindexEntry> implements M { revokes_on: { $gt: medianTime } }, ] }) + .map(e => e.pub) } @MonitorExecutionTime() - async findRevokesOnLteAndRevokedOnIsNull(medianTime: number): Promise<MindexEntry[]> { + async findRevokesOnLteAndRevokedOnIsNull(medianTime: number): Promise<string[]> { return this.collection .find({ $and: [ @@ -40,6 +41,7 @@ export class LokiMIndex extends LokiPubkeySharingIndex<MindexEntry> implements M { revoked_on: null }, ] }) + .map(e => e.pub) } @MonitorExecutionTime() async getReducedMS(pub: string): Promise<FullMindexEntry | null> { @@ -70,9 +72,9 @@ export class LokiMIndex extends LokiPubkeySharingIndex<MindexEntry> implements M async findPubkeysThatShouldExpire(medianTime: number): Promise<{ pub: string; created_on: string }[]> { const results: { pub: string; created_on: string }[] = [] - const memberships: MindexEntry[] = reduceBy(await this.findExpiresOnLteAndRevokesOnGt(medianTime), ['pub']) - for (const POTENTIAL of memberships) { - const MS = await this.getReducedMS(POTENTIAL.pub) as FullMindexEntry // We are sure because `memberships` already comes from the MINDEX + const pubkeys = await this.findExpiresOnLteAndRevokesOnGt(medianTime) + for (const pub of pubkeys) { + const MS = await this.getReducedMS(pub) as FullMindexEntry // We are sure because `memberships` already comes from the MINDEX const hasRenewedSince = MS.expires_on > medianTime; if (!MS.expired_on && !hasRenewedSince) { results.push({ diff --git a/app/lib/dal/indexDAL/sqlite/SqliteMIndex.ts b/app/lib/dal/indexDAL/sqlite/SqliteMIndex.ts index 8ae1796986307fdb249bcde83e0492e8ae33e9b4..3b2c5fa9de265defcca64108ab2b6616df3aae67 100644 --- a/app/lib/dal/indexDAL/sqlite/SqliteMIndex.ts +++ b/app/lib/dal/indexDAL/sqlite/SqliteMIndex.ts @@ -205,8 +205,9 @@ export class SqliteMIndex extends SqliteTable<MindexEntry> implements MIndexDAO } @MonitorExecutionTime() - async findRevokesOnLteAndRevokedOnIsNull(medianTime: number): Promise<MindexEntry[]> { - return this.find('SELECT * FROM mindex WHERE revokes_on <= ? AND revoked_on IS NULL', [medianTime]) + async findRevokesOnLteAndRevokedOnIsNull(medianTime: number): Promise<string[]> { + return (await this.find('SELECT * FROM mindex WHERE revokes_on <= ? AND revoked_on IS NULL', [medianTime])) + .map(e => e.pub) } @MonitorExecutionTime() @@ -271,7 +272,7 @@ export class SqliteMIndex extends SqliteTable<MindexEntry> implements MIndexDAO return this.findEntities('SELECT * FROM mindex WHERE pub = ? order by writtenOn ASC', [pub]) } - async findExpiresOnLteAndRevokesOnGt(medianTime: number): Promise<MindexEntry[]> { + async findExpiresOnLteAndRevokesOnGt(medianTime: number): Promise<string[]> { return [] } diff --git a/app/lib/indexer.ts b/app/lib/indexer.ts index d64bdc842deee007b4608d773c67063a0491f631..81826649994abbb4b02b0d22c0eef52ba66c9376 100644 --- a/app/lib/indexer.ts +++ b/app/lib/indexer.ts @@ -807,9 +807,9 @@ export class Indexer { if (HEAD.number > 0) { await Promise.all(mindex.map(async (ENTRY: MindexEntry) => { if (ENTRY.revocation === null) { - const rows = await dal.mindexDAL.findByPubAndChainableOnGt(ENTRY.pub, HEAD_1.medianTime) // This rule will be enabled on if (HEAD.medianTime >= 1498860000) { + const rows = await dal.mindexDAL.findByPubAndChainableOnGt(ENTRY.pub, HEAD_1.medianTime) ENTRY.unchainables = count(rows); } } @@ -1837,12 +1837,12 @@ export class Indexer { static async ruleIndexGenImplicitRevocation(HEAD: DBHead, dal:FileDAL) { const revocations = []; const pending = await dal.mindexDAL.findRevokesOnLteAndRevokedOnIsNull(HEAD.medianTime) - for (const MS of pending) { - const REDUCED = (await dal.mindexDAL.getReducedMSForImplicitRevocation(MS.pub)) as FullMindexEntry + for (const pub of pending) { + const REDUCED = (await dal.mindexDAL.getReducedMSForImplicitRevocation(pub)) as FullMindexEntry if (REDUCED.revokes_on <= HEAD.medianTime && !REDUCED.revoked_on) { revocations.push({ op: 'UPDATE', - pub: MS.pub, + pub: pub, created_on: REDUCED.created_on, written_on: [HEAD.number, HEAD.hash].join('-'), writtenOn: HEAD.number, diff --git a/app/lib/system/directory.ts b/app/lib/system/directory.ts index d4cd6cdbb2a9c5c1d766b58a6588f61049c74309..0057a15bd18076ab89e3c7339bd3ef4734cf5ac1 100644 --- a/app/lib/system/directory.ts +++ b/app/lib/system/directory.ts @@ -118,7 +118,7 @@ export const MemFS = (initialTree:{ [folder:string]: { [file:string]: string }} export const Directory = { DATA_FILES: ['mindex.db', 'c_mindex.db', 'iindex.db', 'cindex.db', 'sindex.db', 'wallet.db', 'dividend.db', 'txs.db', 'peers.db'], - DATA_DIRS: ['level_dividend', 'level_bindex', 'level_blockchain', 'level_sindex', 'level_cindex', 'level_iindex', 'level_wallet'], + DATA_DIRS: ['level_dividend', 'level_bindex', 'level_blockchain', 'level_sindex', 'level_cindex', 'level_iindex', 'level_mindex', 'level_wallet'], INSTANCE_NAME: getDomain(opts.mdb), INSTANCE_HOME: getHomePath(opts.mdb, opts.home), diff --git a/app/modules/crawler/lib/sync/v2/GlobalIndexStream.ts b/app/modules/crawler/lib/sync/v2/GlobalIndexStream.ts index 6a07bb972517f5445c60b57c795218c08278f994..82b962fe81a1f990667ad23e269070c0e145c117 100644 --- a/app/modules/crawler/lib/sync/v2/GlobalIndexStream.ts +++ b/app/modules/crawler/lib/sync/v2/GlobalIndexStream.ts @@ -23,7 +23,6 @@ import {AbstractSynchronizer} from "../AbstractSynchronizer" import {cliprogram} from "../../../../../lib/common-libs/programOptions" import {DBHead} from "../../../../../lib/db/DBHead" import {Watcher} from "../Watcher" -import {LokiMIndex} from "../../../../../lib/dal/indexDAL/loki/LokiMIndex" import {LokiDividend} from "../../../../../lib/dal/indexDAL/loki/LokiDividend" import {DataErrors} from "../../../../../lib/common-libs/errors" import {ProtocolIndexesStream} from "./ProtocolIndexesStream" @@ -101,8 +100,7 @@ export class GlobalIndexStream extends Duplex { } this.mindexLokiInjection = (async () => { - await this.injectLoki(this.dal, 'mindexDAL', new LokiMIndex(new loki())) - await this.injectLoki(this.dal, 'dividendDAL', new LokiDividend(new loki())) + await this.injectLoki(this.dal, 'dividendDAL', new LokiDividend(new loki())) // TODO })() } @@ -435,9 +433,6 @@ export class GlobalIndexStream extends Duplex { } } - await inject(this.dal, 'mindexDAL', - () => this.dal.mindexDAL.findRawWithOrder({}, [['writtenOn',false]])) - await inject(this.dal, 'dividendDAL', () => this.dal.dividendDAL.listAll())