From f302be99a76b34ed28dcf8ff04e9f58f9c397c16 Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Sat, 27 Oct 2018 11:19:11 +0200 Subject: [PATCH] [enh] Upgrading to LevelDB for IIndex --- app/lib/common-libs/errors.ts | 4 + app/lib/common-libs/reduce.ts | 16 ++ app/lib/common-libs/reduceConcat.ts | 3 - app/lib/dal/fileDAL.ts | 4 +- app/lib/dal/indexDAL/abstract/IIndexDAO.ts | 2 +- app/lib/dal/indexDAL/leveldb/LevelDBCindex.ts | 2 +- app/lib/dal/indexDAL/leveldb/LevelDBIindex.ts | 230 ++++++++++++++++++ app/lib/dal/indexDAL/leveldb/LevelDBTable.ts | 8 + .../leveldb/generic/LevelDBDataIndex.ts | 10 + .../indexers/LevelDBWrittenOnIndexer.ts | 55 +++++ .../indexers/LevelIIndexHashIndexer.ts | 27 ++ .../indexers/LevelIIndexKickIndexer.ts | 100 ++++++++ .../leveldb/indexers/LevelIIndexUidIndexer.ts | 27 ++ app/lib/indexer.ts | 14 +- app/lib/system/directory.ts | 2 +- .../crawler/lib/sync/v2/GlobalIndexStream.ts | 5 - .../identity/identity-revocation-test.ts | 2 +- 17 files changed, 495 insertions(+), 16 deletions(-) create mode 100644 app/lib/common-libs/reduce.ts delete mode 100644 app/lib/common-libs/reduceConcat.ts create mode 100644 app/lib/dal/indexDAL/leveldb/LevelDBIindex.ts create mode 100644 app/lib/dal/indexDAL/leveldb/generic/LevelDBDataIndex.ts create mode 100644 app/lib/dal/indexDAL/leveldb/indexers/LevelDBWrittenOnIndexer.ts create mode 100644 app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexHashIndexer.ts create mode 100644 app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexKickIndexer.ts create mode 100644 app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexUidIndexer.ts diff --git a/app/lib/common-libs/errors.ts b/app/lib/common-libs/errors.ts index a5be43790..f40687717 100755 --- a/app/lib/common-libs/errors.ts +++ b/app/lib/common-libs/errors.ts @@ -1,5 +1,9 @@ export enum DataErrors { + INVALID_LEVELDB_IINDEX_DATA_WAS_KICKED, + INVALID_LEVELDB_IINDEX_DATA_TO_BE_KICKED, + IDENTITY_UID_NOT_FOUND, + INVALID_TRIMMABLE_DATA, SYNC_FAST_MEM_ERROR_DURING_INJECTION, CANNOT_GET_VALIDATION_BLOCK_FROM_REMOTE, REJECT_WAIT_FOR_AVAILABLE_NODES_BUT_CONTINUE, diff --git a/app/lib/common-libs/reduce.ts b/app/lib/common-libs/reduce.ts new file mode 100644 index 000000000..2d674e4ed --- /dev/null +++ b/app/lib/common-libs/reduce.ts @@ -0,0 +1,16 @@ +export function reduceConcat<T>(cumulated: T[], arr: T[]) { + return cumulated.concat(arr) +} + +export type GroupResult<T> = { [k:string]: T[] } + +export function reduceGroupBy<T, K extends keyof T>(arr: T[], k: K): GroupResult<T> { + return arr.reduce((cumulated: GroupResult<T>, t: T) => { + const key: string = String(t[k]) + if (!cumulated[key]) { + cumulated[key] = [] + } + cumulated[key].push(t) + return cumulated + }, {} as GroupResult<T>) +} diff --git a/app/lib/common-libs/reduceConcat.ts b/app/lib/common-libs/reduceConcat.ts deleted file mode 100644 index 20cde246e..000000000 --- a/app/lib/common-libs/reduceConcat.ts +++ /dev/null @@ -1,3 +0,0 @@ -export function reduceConcat<T>(cumulated: T[], arr: T[]) { - return cumulated.concat(arr) -} diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts index 023298a83..faa1bfd1d 100644 --- a/app/lib/dal/fileDAL.ts +++ b/app/lib/dal/fileDAL.ts @@ -72,7 +72,6 @@ import {GenericDAO} from "./indexDAL/abstract/GenericDAO" import {LokiDAO} from "./indexDAL/loki/LokiDAO" import {MonitorExecutionTime} from "../debug/MonitorExecutionTime" import {SqliteMIndex} from "./indexDAL/sqlite/SqliteMIndex" -import {SqliteIIndex} from "./indexDAL/sqlite/SqliteIIndex" import {LevelDBDividend} from "./indexDAL/leveldb/LevelDBDividend" import {LevelDBBindex} from "./indexDAL/leveldb/LevelDBBindex" @@ -83,6 +82,7 @@ import {SqliteTransactions} from "./indexDAL/sqlite/SqliteTransactions" import {SqlitePeers} from "./indexDAL/sqlite/SqlitePeers" import {LevelDBWallet} from "./indexDAL/leveldb/LevelDBWallet" import {LevelDBCindex} from "./indexDAL/leveldb/LevelDBCindex" +import {LevelDBIindex} from "./indexDAL/leveldb/LevelDBIindex" const readline = require('readline') const indexer = require('../indexer').Indexer @@ -166,7 +166,7 @@ export class FileDAL { this.walletDAL = new LevelDBWallet(getLevelDB) this.bindexDAL = new LevelDBBindex(getLevelDB) this.mindexDAL = new SqliteMIndex(getSqliteDB) - this.iindexDAL = new SqliteIIndex(getSqliteDB) + this.iindexDAL = new LevelDBIindex(getLevelDB) this.sindexDAL = new LevelDBSindex(getLevelDB) this.cindexDAL = new LevelDBCindex(getLevelDB) this.dividendDAL = new LevelDBDividend(getLevelDB) diff --git a/app/lib/dal/indexDAL/abstract/IIndexDAO.ts b/app/lib/dal/indexDAL/abstract/IIndexDAO.ts index e167e9835..bec034fdf 100644 --- a/app/lib/dal/indexDAL/abstract/IIndexDAO.ts +++ b/app/lib/dal/indexDAL/abstract/IIndexDAO.ts @@ -24,7 +24,7 @@ export interface IIndexDAO extends ReduceableDAO<IindexEntry> { getFullFromPubkey(pub:string): Promise<FullIindexEntry> - getFullFromHash(hash:string): Promise<FullIindexEntry> + getFullFromHash(hash:string): Promise<FullIindexEntry|null> getToBeKickedPubkeys(): Promise<string[]> } diff --git a/app/lib/dal/indexDAL/leveldb/LevelDBCindex.ts b/app/lib/dal/indexDAL/leveldb/LevelDBCindex.ts index e235de381..d8dfd0ada 100644 --- a/app/lib/dal/indexDAL/leveldb/LevelDBCindex.ts +++ b/app/lib/dal/indexDAL/leveldb/LevelDBCindex.ts @@ -5,7 +5,7 @@ import {LevelDBTable} from "./LevelDBTable" import {Underscore} from "../../../common-libs/underscore" import {pint} from "../../../common-libs/pint" import {CIndexDAO} from "../abstract/CIndexDAO" -import {reduceConcat} from "../../../common-libs/reduceConcat" +import {reduceConcat} from "../../../common-libs/reduce" export interface LevelDBCindexEntry { received: string[] diff --git a/app/lib/dal/indexDAL/leveldb/LevelDBIindex.ts b/app/lib/dal/indexDAL/leveldb/LevelDBIindex.ts new file mode 100644 index 000000000..3fd3c87f7 --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/LevelDBIindex.ts @@ -0,0 +1,230 @@ +import {MonitorExecutionTime} from "../../../debug/MonitorExecutionTime" +import {FullIindexEntry, IindexEntry, reduce, reduceForDBTrimming} from "../../../indexer" +import {LevelUp} from 'levelup' +import {LevelDBTable} from "./LevelDBTable" +import {Underscore} from "../../../common-libs/underscore" +import {pint} from "../../../common-libs/pint" +import {IIndexDAO} from "../abstract/IIndexDAO" +import {LevelIIndexHashIndexer} from "./indexers/LevelIIndexHashIndexer" +import {reduceConcat, reduceGroupBy} from "../../../common-libs/reduce" +import {LevelDBWrittenOnIndexer} from "./indexers/LevelDBWrittenOnIndexer" +import {OldIindexEntry} from "../../../db/OldIindexEntry" +import {LevelIIndexUidIndexer} from "./indexers/LevelIIndexUidIndexer" +import {LevelIIndexKickIndexer} from "./indexers/LevelIIndexKickIndexer" +import {DataErrors} from "../../../common-libs/errors" +import {OldTransformers} from "../common/OldTransformer" + +export class LevelDBIindex extends LevelDBTable<IindexEntry[]> implements IIndexDAO { + + private indexForHash: LevelIIndexHashIndexer + private indexForUid: LevelIIndexUidIndexer + private indexForKick: LevelIIndexKickIndexer + private indexForWrittenOn: LevelDBWrittenOnIndexer<IindexEntry> + + constructor(protected getLevelDB: (dbName: string)=> Promise<LevelUp>) { + super('level_iindex', getLevelDB) + } + + /** + * TECHNICAL + */ + + async init(): Promise<void> { + await super.init() + this.indexForHash = new LevelIIndexHashIndexer('level_iindex/hash', this.getLevelDB) + this.indexForUid = new LevelIIndexUidIndexer('level_iindex/uid', this.getLevelDB) + this.indexForKick = new LevelIIndexKickIndexer('level_iindex/kick', this.getLevelDB) + this.indexForWrittenOn = new LevelDBWrittenOnIndexer('level_iindex/writtenOn', this.getLevelDB, i => i.pub) + await this.indexForHash.init() + await this.indexForUid.init() + await this.indexForKick.init() + await this.indexForWrittenOn.init() + } + + async close(): Promise<void> { + await super.close() + await this.indexForHash.close() + await this.indexForUid.close() + await this.indexForKick.close() + await this.indexForWrittenOn.close() + } + + /** + * INSERT + */ + + @MonitorExecutionTime() + async insert(record: IindexEntry): Promise<void> { + await this.insertBatch([record]) + } + + @MonitorExecutionTime() + async insertBatch(records: IindexEntry[]): Promise<void> { + // Database insertion + const recordsByPub = reduceGroupBy(records, 'pub') + await Promise.all(Underscore.keys(recordsByPub).map(async pub => { + const existing = (await this.getOrNull(pub)) || [] + await this.put(pub, existing.concat(recordsByPub[pub])) + })) + // Indexation + await this.indexForHash.onInsert(records) + await this.indexForUid.onInsert(records) + await this.indexForKick.onInsert(records) + await this.indexForWrittenOn.onInsert(records) + } + + + /** + * Reduceable DAO + */ + + async trimRecords(belowNumber: number): Promise<void> { + // Trim writtenOn: we remove from the index the blocks below `belowNumber`, and keep track of the deleted values + const pubkeys: string[] = Underscore.uniq((await this.indexForWrittenOn.deleteBelow(belowNumber))) + // For each entry, we trim the records of our INDEX + await Promise.all(pubkeys.map(async pub => { + const oldEntries = await this.get(pub) + const newEntries = reduceForDBTrimming(oldEntries, belowNumber) + await this.put(pub, newEntries) + })) + await this.indexForHash.onTrimming(belowNumber) + await this.indexForUid.onTrimming(belowNumber) + await this.indexForKick.onTrimming(belowNumber) + } + + /** + * Generic DAO + */ + + async findRawWithOrder(criterion: { pub?: string }, sort: (string | (string | boolean)[])[]): Promise<IindexEntry[]> { + const rows: IindexEntry[] = (await this.findAllValues()).reduce(reduceConcat, []) + return Underscore.sortBy(rows, r => `${String(r.writtenOn).padStart(10, '0')}-${String(r.wotb_id).padStart(10, '0')}`) + } + + async getWrittenOn(blockstamp: string): Promise<IindexEntry[]> { + const ids = (await this.indexForWrittenOn.getWrittenOnKeys(pint(blockstamp))) || [] + return (await Promise.all(ids.map(id => this.get(id)))).reduce(reduceConcat, []).filter(e => e.written_on === blockstamp) + } + + async removeBlock(blockstamp: string): Promise<void> { + // Trim writtenOn: we remove from the index the blocks below `belowNumber`, and keep track of the deleted values + const writteOn = pint(blockstamp) + const pubkeys: string[] = Underscore.uniq((await this.indexForWrittenOn.deleteAt(writteOn))) + let removedRecords: IindexEntry[] = [] + // For each entry, we trim the records of our INDEX + await Promise.all(pubkeys.map(async pub => { + const records = await this.get(pub) + const keptRecords = records.filter(e => e.written_on !== blockstamp) + removedRecords = removedRecords.concat(records.filter(e => e.written_on === blockstamp)) + await this.put(pub, keptRecords) + })) + // Update indexes + await this.indexForHash.onRemove(removedRecords) + await this.indexForUid.onRemove(removedRecords) + await this.indexForKick.onRemove(removedRecords) + } + + async findByPub(pub: string): Promise<IindexEntry[]> { + if (!pub) { + return [] + } + return (await this.getOrNull(pub)) || [] + } + + async findByUid(uid: string): Promise<IindexEntry[]> { + const pub = await this.indexForUid.getPubByUid(uid) + if (!pub) { + return [] + } + return this.get(pub) + } + + async getFromPubkey(pub: string): Promise<FullIindexEntry | null> { + const entries = (await this.getOrNull(pub)) || [] + if (!entries || entries.length === 0) { + return null + } + return reduce(entries) as FullIindexEntry + } + + async getFromPubkeyOrUid(search: string): Promise<FullIindexEntry | null> { + const fromPub = await this.getFromPubkey(search) + const fromUid = await this.getFromUID(search) + return fromPub || fromUid + } + + async getFromUID(uid: string): Promise<FullIindexEntry | null> { + const pub = await this.indexForUid.getPubByUid(uid) + if (!pub) { + return null + } + const entries = (await this.getOrNull(pub)) || [] + if (!entries || entries.length === 0) { + return null + } + return reduce(entries) as FullIindexEntry + } + + async getFullFromHash(hash: string): Promise<FullIindexEntry|null> { + const pub = await this.indexForHash.getByHash(hash) as string + if (!pub) { + return null + } + const entries = await this.get(pub) + return OldTransformers.iindexEntityOrNull(entries) as Promise<FullIindexEntry> + } + + async getFullFromPubkey(pub: string): Promise<FullIindexEntry> { + const entries = await this.get(pub) + return reduce(entries) as FullIindexEntry + } + + async getFullFromUID(uid: string): Promise<FullIindexEntry> { + const pub = await this.indexForUid.getPubByUid(uid) + if (!pub) { + throw Error(DataErrors[DataErrors.IDENTITY_UID_NOT_FOUND]) + } + const entries = await this.get(pub) + return reduce(entries) as FullIindexEntry + } + + // Full scan + async getMembers(): Promise<{ pubkey: string; uid: string | null }[]> { + const members: IindexEntry[] = [] + await this.findWhere(e => { + if (reduce(e).member as boolean) { + members.push(e[0]) + } + return false + }) + return members.map(m => ({ + pubkey: m.pub, + uid: m.uid + })) + } + + async getToBeKickedPubkeys(): Promise<string[]> { + return this.indexForKick.getAll() + } + + async reducable(pub: string): Promise<IindexEntry[]> { + return this.findByPub(pub) + } + + // Full scan + async searchThoseMatching(search: string): Promise<OldIindexEntry[]> { + const uidKeys = await this.indexForUid.findAllKeys() + const pubKeys = await this.findAllKeys() + const uids = (uidKeys).filter(u => u.includes(search)) + const pubs = (pubKeys).filter(p => p.includes(search)) + const uidIdentities = await Promise.all(uids.map(async uid => OldTransformers.toOldIindexEntry(reduce(await this.findByUid(uid))))) + const pubIdentities = await Promise.all(pubs.map(async pub => OldTransformers.toOldIindexEntry(reduce(await this.findByPub(pub))))) + return uidIdentities + .filter(u => u.pub) + .concat( + pubIdentities + .filter(p => p.pub) + ) + } + +} diff --git a/app/lib/dal/indexDAL/leveldb/LevelDBTable.ts b/app/lib/dal/indexDAL/leveldb/LevelDBTable.ts index 85169fd5c..8ad9ff357 100644 --- a/app/lib/dal/indexDAL/leveldb/LevelDBTable.ts +++ b/app/lib/dal/indexDAL/leveldb/LevelDBTable.ts @@ -135,6 +135,14 @@ export class LevelDBTable<T> { return deletedKv } + public async findAllKeys(options?: AbstractIteratorOptions): Promise<string[]> { + const data: string[] = [] + await this.readAllKeyValue(kv => { + data.push(kv.key) + }, options) + return data + } + public async findAllValues(options?: AbstractIteratorOptions): Promise<T[]> { const data: T[] = [] await this.readAllKeyValue(kv => { diff --git a/app/lib/dal/indexDAL/leveldb/generic/LevelDBDataIndex.ts b/app/lib/dal/indexDAL/leveldb/generic/LevelDBDataIndex.ts new file mode 100644 index 000000000..da1821791 --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/generic/LevelDBDataIndex.ts @@ -0,0 +1,10 @@ +import {LevelDBTable} from "../LevelDBTable" + +export abstract class LevelDBDataIndex<T, R> extends LevelDBTable<T> { + + public abstract onInsert(records: R[]): Promise<void> + + public abstract onRemove(records: R[]): Promise<void> + + public async onTrimming(belowNumber: number): Promise<void> {} +} diff --git a/app/lib/dal/indexDAL/leveldb/indexers/LevelDBWrittenOnIndexer.ts b/app/lib/dal/indexDAL/leveldb/indexers/LevelDBWrittenOnIndexer.ts new file mode 100644 index 000000000..bdbd5af51 --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/indexers/LevelDBWrittenOnIndexer.ts @@ -0,0 +1,55 @@ +import {LevelUp} from "levelup"; +import {reduceConcat, reduceGroupBy} from "../../../../common-libs/reduce" +import {Underscore} from "../../../../common-libs/underscore" +import {LevelDBTable} from "../LevelDBTable" +import {pint} from "../../../../common-libs/pint" + +export interface WrittenOnData { + writtenOn: number +} + +export class LevelDBWrittenOnIndexer<T extends WrittenOnData> extends LevelDBTable<string[]> { + + constructor( + name: string, + getLevelDB: (dbName: string)=> Promise<LevelUp>, + protected toKey: (t: T) => string) { + super(name, getLevelDB) + } + + async onInsert(records: T[]): Promise<void> { + const byWrittenOn = reduceGroupBy(records, 'writtenOn') + await Promise.all(Underscore.keys(byWrittenOn).map(async writtenOn => { + await this.put(LevelDBWrittenOnIndexer.trimWrittenOnKey(pint(writtenOn)), byWrittenOn[writtenOn].map(e => this.toKey(e))) + })) + } + + getWrittenOnKeys(writtenOn: number): Promise<string[]|null> { + return this.getOrNull(LevelDBWrittenOnIndexer.trimWrittenOnKey(writtenOn)) + } + + trim(writtenOn: number): Promise<void> { + return this.del(LevelDBWrittenOnIndexer.trimWrittenOnKey(writtenOn)) + } + + private static trimWrittenOnKey(writtenOn: number) { + return String(writtenOn).padStart(10, '0') + } + + async deleteBelow(writtenOn: number): Promise<string[]> { + return (await this.deleteWhere({ lt: LevelDBWrittenOnIndexer.trimWrittenOnKey(writtenOn) })) + .map(kv => kv.value) + .reduce(reduceConcat, []) + } + + async deleteAt(writtenOn: number): Promise<string[]> { + const k = LevelDBWrittenOnIndexer.trimWrittenOnKey(writtenOn) + const value = await this.getOrNull(k) + if (!value) { + // Nothing to delete, nothing to return + return [] + } + await this.del(k) + return value + } +} diff --git a/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexHashIndexer.ts b/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexHashIndexer.ts new file mode 100644 index 000000000..5cd3a9dd7 --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexHashIndexer.ts @@ -0,0 +1,27 @@ +import {LevelDBDataIndex} from "../generic/LevelDBDataIndex" +import {IindexEntry} from "../../../../indexer" + +export type Hash = string +export type Pubkey = string + +export class LevelIIndexHashIndexer extends LevelDBDataIndex<Pubkey[], IindexEntry> { + + async onInsert(records: IindexEntry[]): Promise<void> { + await Promise.all(records + .filter(e => e.op === 'CREATE' && e.hash) + .map(async e => this.put(e.hash as string, [e.pub])) + ) + } + + async onRemove(records: IindexEntry[]): Promise<void> { + await Promise.all(records + .filter(e => e.op === 'CREATE' && e.hash) + .map(async e => this.del(e.hash as string)) + ) + } + + async getByHash(hash: Hash): Promise<Pubkey|null> { + const res = await this.getOrNull(hash) + return res && res[0] + } +} diff --git a/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexKickIndexer.ts b/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexKickIndexer.ts new file mode 100644 index 000000000..127e2386b --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexKickIndexer.ts @@ -0,0 +1,100 @@ +import {LevelDBDataIndex} from "../generic/LevelDBDataIndex" +import {IindexEntry} from "../../../../indexer" +import {DataErrors} from "../../../../common-libs/errors" + +export type Pubkey = string + +export interface KickEntry { + on: number|undefined // The next time that the identity must be kicked + done: number[] // The revertion history +} + +export class LevelIIndexKickIndexer extends LevelDBDataIndex<KickEntry, IindexEntry> { + + async onInsert(records: IindexEntry[]): Promise<void> { + // Case 1: to be kicked + await Promise.all(records + .filter(e => e.kick) + .map(async e => { + let entry = await this.getOrNull(e.pub) + if (!entry) { + entry = { + on: e.writtenOn, + done: [] + } + } + entry.on = e.writtenOn + await this.put(e.pub, entry) + }) + ) + // Case 2: just kicked + await Promise.all(records + .filter(e => e.member === false) + .map(async e => { + const entry = await this.getOrNull(e.pub) + if (entry && entry.on === e.writtenOn - 1) { // Members are excluded at B# +1 + entry.done.push(entry.on) + entry.on = undefined + await this.put(e.pub, entry) + } + // Otherwise it is not a kicking + }) + ) + } + + async onRemove(records: IindexEntry[]): Promise<void> { + // Case 1: to be kicked => unkicked + await Promise.all(records + .filter(e => e.kick) + .map(async e => { + const entry = await this.get(e.pub) + if (entry.on === e.writtenOn) { + entry.on = entry.done.pop() + if (entry.on === undefined) { + // No more kicking left + await this.del(e.pub) + } + // Some kicks left + await this.put(e.pub, entry) // TODO: test this, can occur, probably not covered + } else { + throw Error(DataErrors[DataErrors.INVALID_LEVELDB_IINDEX_DATA_TO_BE_KICKED]) + } + }) + ) + + // Case 2: just kicked => to be kicked + await Promise.all(records + .filter(e => e.member === false) + .map(async e => { + const entry = await this.getOrNull(e.pub) + if (entry && entry.done.includes(e.writtenOn - 1)) { + // It was a kicking + entry.on = entry.done.pop() + if (!entry.on) { + throw Error(DataErrors[DataErrors.INVALID_LEVELDB_IINDEX_DATA_WAS_KICKED]) + } + await this.put(e.pub, entry) + } + }) + ) + } + + async onTrimming(belowNumber: number): Promise<void> { + await this.applyAllKeyValue(async kv => { + const initialLength = kv.value.done.length + kv.value.done = kv.value.done.filter(e => e >= belowNumber) + if (kv.value.done.length !== initialLength && kv.value.done.length > 0) { + // We simply update the entry which was pruned + await this.put(kv.key, kv.value) + } + else if (kv.value.done.length !== initialLength && kv.value.done.length === 0 && !kv.value.on) { + // We remove the entry, no more necessary + await this.del(kv.key) + } + }) + } + + async getAll(): Promise<Pubkey[]> { + return this.findWhereTransform(t => !!t.on, kv => kv.key) + } +} diff --git a/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexUidIndexer.ts b/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexUidIndexer.ts new file mode 100644 index 000000000..69b4756f7 --- /dev/null +++ b/app/lib/dal/indexDAL/leveldb/indexers/LevelIIndexUidIndexer.ts @@ -0,0 +1,27 @@ +import {LevelDBDataIndex} from "../generic/LevelDBDataIndex" +import {IindexEntry} from "../../../../indexer" + +export type Uid = string +export type Pubkey = string + +export class LevelIIndexUidIndexer extends LevelDBDataIndex<Pubkey[], IindexEntry> { + + async onInsert(records: IindexEntry[]): Promise<void> { + await Promise.all(records + .filter(e => e.op === 'CREATE' && e.uid) + .map(async e => this.put(e.uid as string, [e.pub])) + ) + } + + async onRemove(records: IindexEntry[]): Promise<void> { + await Promise.all(records + .filter(e => e.op === 'CREATE' && e.uid) + .map(async e => this.del(e.uid as string)) + ) + } + + async getPubByUid(uid: Uid): Promise<Pubkey|null> { + const res = await this.getOrNull(uid) + return res && res[0] + } +} diff --git a/app/lib/indexer.ts b/app/lib/indexer.ts index cb9e9d8d3..d64bdc842 100644 --- a/app/lib/indexer.ts +++ b/app/lib/indexer.ts @@ -1963,14 +1963,24 @@ function blockstamp(aNumber: number, aHash: string) { return [aNumber, aHash].join('-'); } -function reduceOrNull<T>(records: T[]): T|null { +export function reduceOrNull<T>(records: T[]): T|null { if (records.length === 0) { return null } return reduce(records) } -function reduce<T>(records: T[]): T { +export function reduceForDBTrimming<T extends { writtenOn: number }>(records: T[], belowNumber: number): T[] { + if (records.length === 0) { + throw Error(DataErrors[DataErrors.INVALID_TRIMMABLE_DATA]) + } + const reducableRecords = records.filter(r => r.writtenOn < belowNumber) + const nonReducableRecords = records.filter(r => r.writtenOn >= belowNumber) + const reduced = reduce(reducableRecords) as T + return [reduced].concat(nonReducableRecords) +} + +export function reduce<T>(records: T[]): T { return records.reduce((obj:T, record) => { const keys = Object.keys(record) as (keyof T)[] for (const k of keys) { diff --git a/app/lib/system/directory.ts b/app/lib/system/directory.ts index 5eaab89f7..d4cd6cdbb 100644 --- a/app/lib/system/directory.ts +++ b/app/lib/system/directory.ts @@ -118,7 +118,7 @@ export const MemFS = (initialTree:{ [folder:string]: { [file:string]: string }} export const Directory = { DATA_FILES: ['mindex.db', 'c_mindex.db', 'iindex.db', 'cindex.db', 'sindex.db', 'wallet.db', 'dividend.db', 'txs.db', 'peers.db'], - DATA_DIRS: ['level_dividend', 'level_bindex', 'level_blockchain', 'level_sindex', 'level_cindex', 'level_wallet'], + DATA_DIRS: ['level_dividend', 'level_bindex', 'level_blockchain', 'level_sindex', 'level_cindex', 'level_iindex', 'level_wallet'], INSTANCE_NAME: getDomain(opts.mdb), INSTANCE_HOME: getHomePath(opts.mdb, opts.home), diff --git a/app/modules/crawler/lib/sync/v2/GlobalIndexStream.ts b/app/modules/crawler/lib/sync/v2/GlobalIndexStream.ts index 61dac936a..6a07bb972 100644 --- a/app/modules/crawler/lib/sync/v2/GlobalIndexStream.ts +++ b/app/modules/crawler/lib/sync/v2/GlobalIndexStream.ts @@ -24,7 +24,6 @@ import {cliprogram} from "../../../../../lib/common-libs/programOptions" import {DBHead} from "../../../../../lib/db/DBHead" import {Watcher} from "../Watcher" import {LokiMIndex} from "../../../../../lib/dal/indexDAL/loki/LokiMIndex" -import {LokiIIndex} from "../../../../../lib/dal/indexDAL/loki/LokiIIndex" import {LokiDividend} from "../../../../../lib/dal/indexDAL/loki/LokiDividend" import {DataErrors} from "../../../../../lib/common-libs/errors" import {ProtocolIndexesStream} from "./ProtocolIndexesStream" @@ -103,7 +102,6 @@ export class GlobalIndexStream extends Duplex { this.mindexLokiInjection = (async () => { await this.injectLoki(this.dal, 'mindexDAL', new LokiMIndex(new loki())) - await this.injectLoki(this.dal, 'iindexDAL', new LokiIIndex(new loki())) await this.injectLoki(this.dal, 'dividendDAL', new LokiDividend(new loki())) })() } @@ -440,9 +438,6 @@ export class GlobalIndexStream extends Duplex { await inject(this.dal, 'mindexDAL', () => this.dal.mindexDAL.findRawWithOrder({}, [['writtenOn',false]])) - await inject(this.dal, 'iindexDAL', - () => this.dal.iindexDAL.findRawWithOrder({}, [['writtenOn',false]])) - await inject(this.dal, 'dividendDAL', () => this.dal.dividendDAL.listAll()) diff --git a/test/integration/identity/identity-revocation-test.ts b/test/integration/identity/identity-revocation-test.ts index 14e5feed0..8e6178fd8 100644 --- a/test/integration/identity/identity-revocation-test.ts +++ b/test/integration/identity/identity-revocation-test.ts @@ -40,7 +40,7 @@ const commonConf = { let s1:TestingServer, s2:TestingServer, cat:TestUser, tic:TestUser, toc:TestUser, tacOnS1:TestUser, tacOnS2:TestUser -describe("Revocation", function() { +describe("Revocation behavior", function() { before(async () => { -- GitLab