Commit f302be99 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] Upgrading to LevelDB for IIndex

parent 3db06c0e
export enum DataErrors {
INVALID_LEVELDB_IINDEX_DATA_WAS_KICKED,
INVALID_LEVELDB_IINDEX_DATA_TO_BE_KICKED,
IDENTITY_UID_NOT_FOUND,
INVALID_TRIMMABLE_DATA,
SYNC_FAST_MEM_ERROR_DURING_INJECTION,
CANNOT_GET_VALIDATION_BLOCK_FROM_REMOTE,
REJECT_WAIT_FOR_AVAILABLE_NODES_BUT_CONTINUE,
......
export function reduceConcat<T>(cumulated: T[], arr: T[]) {
return cumulated.concat(arr)
}
export type GroupResult<T> = { [k:string]: T[] }
export function reduceGroupBy<T, K extends keyof T>(arr: T[], k: K): GroupResult<T> {
return arr.reduce((cumulated: GroupResult<T>, t: T) => {
const key: string = String(t[k])
if (!cumulated[key]) {
cumulated[key] = []
}
cumulated[key].push(t)
return cumulated
}, {} as GroupResult<T>)
}
export function reduceConcat<T>(cumulated: T[], arr: T[]) {
return cumulated.concat(arr)
}
......@@ -72,7 +72,6 @@ import {GenericDAO} from "./indexDAL/abstract/GenericDAO"
import {LokiDAO} from "./indexDAL/loki/LokiDAO"
import {MonitorExecutionTime} from "../debug/MonitorExecutionTime"
import {SqliteMIndex} from "./indexDAL/sqlite/SqliteMIndex"
import {SqliteIIndex} from "./indexDAL/sqlite/SqliteIIndex"
import {LevelDBDividend} from "./indexDAL/leveldb/LevelDBDividend"
import {LevelDBBindex} from "./indexDAL/leveldb/LevelDBBindex"
......@@ -83,6 +82,7 @@ import {SqliteTransactions} from "./indexDAL/sqlite/SqliteTransactions"
import {SqlitePeers} from "./indexDAL/sqlite/SqlitePeers"
import {LevelDBWallet} from "./indexDAL/leveldb/LevelDBWallet"
import {LevelDBCindex} from "./indexDAL/leveldb/LevelDBCindex"
import {LevelDBIindex} from "./indexDAL/leveldb/LevelDBIindex"
const readline = require('readline')
const indexer = require('../indexer').Indexer
......@@ -166,7 +166,7 @@ export class FileDAL {
this.walletDAL = new LevelDBWallet(getLevelDB)
this.bindexDAL = new LevelDBBindex(getLevelDB)
this.mindexDAL = new SqliteMIndex(getSqliteDB)
this.iindexDAL = new SqliteIIndex(getSqliteDB)
this.iindexDAL = new LevelDBIindex(getLevelDB)
this.sindexDAL = new LevelDBSindex(getLevelDB)
this.cindexDAL = new LevelDBCindex(getLevelDB)
this.dividendDAL = new LevelDBDividend(getLevelDB)
......
......@@ -24,7 +24,7 @@ export interface IIndexDAO extends ReduceableDAO<IindexEntry> {
getFullFromPubkey(pub:string): Promise<FullIindexEntry>
getFullFromHash(hash:string): Promise<FullIindexEntry>
getFullFromHash(hash:string): Promise<FullIindexEntry|null>
getToBeKickedPubkeys(): Promise<string[]>
}
......@@ -5,7 +5,7 @@ import {LevelDBTable} from "./LevelDBTable"
import {Underscore} from "../../../common-libs/underscore"
import {pint} from "../../../common-libs/pint"
import {CIndexDAO} from "../abstract/CIndexDAO"
import {reduceConcat} from "../../../common-libs/reduceConcat"
import {reduceConcat} from "../../../common-libs/reduce"
export interface LevelDBCindexEntry {
received: string[]
......
import {MonitorExecutionTime} from "../../../debug/MonitorExecutionTime"
import {FullIindexEntry, IindexEntry, reduce, reduceForDBTrimming} from "../../../indexer"
import {LevelUp} from 'levelup'
import {LevelDBTable} from "./LevelDBTable"
import {Underscore} from "../../../common-libs/underscore"
import {pint} from "../../../common-libs/pint"
import {IIndexDAO} from "../abstract/IIndexDAO"
import {LevelIIndexHashIndexer} from "./indexers/LevelIIndexHashIndexer"
import {reduceConcat, reduceGroupBy} from "../../../common-libs/reduce"
import {LevelDBWrittenOnIndexer} from "./indexers/LevelDBWrittenOnIndexer"
import {OldIindexEntry} from "../../../db/OldIindexEntry"
import {LevelIIndexUidIndexer} from "./indexers/LevelIIndexUidIndexer"
import {LevelIIndexKickIndexer} from "./indexers/LevelIIndexKickIndexer"
import {DataErrors} from "../../../common-libs/errors"
import {OldTransformers} from "../common/OldTransformer"
export class LevelDBIindex extends LevelDBTable<IindexEntry[]> implements IIndexDAO {
private indexForHash: LevelIIndexHashIndexer
private indexForUid: LevelIIndexUidIndexer
private indexForKick: LevelIIndexKickIndexer
private indexForWrittenOn: LevelDBWrittenOnIndexer<IindexEntry>
constructor(protected getLevelDB: (dbName: string)=> Promise<LevelUp>) {
super('level_iindex', getLevelDB)
}
/**
* TECHNICAL
*/
async init(): Promise<void> {
await super.init()
this.indexForHash = new LevelIIndexHashIndexer('level_iindex/hash', this.getLevelDB)
this.indexForUid = new LevelIIndexUidIndexer('level_iindex/uid', this.getLevelDB)
this.indexForKick = new LevelIIndexKickIndexer('level_iindex/kick', this.getLevelDB)
this.indexForWrittenOn = new LevelDBWrittenOnIndexer('level_iindex/writtenOn', this.getLevelDB, i => i.pub)
await this.indexForHash.init()
await this.indexForUid.init()
await this.indexForKick.init()
await this.indexForWrittenOn.init()
}
async close(): Promise<void> {
await super.close()
await this.indexForHash.close()
await this.indexForUid.close()
await this.indexForKick.close()
await this.indexForWrittenOn.close()
}
/**
* INSERT
*/
@MonitorExecutionTime()
async insert(record: IindexEntry): Promise<void> {
await this.insertBatch([record])
}
@MonitorExecutionTime()
async insertBatch(records: IindexEntry[]): Promise<void> {
// Database insertion
const recordsByPub = reduceGroupBy(records, 'pub')
await Promise.all(Underscore.keys(recordsByPub).map(async pub => {
const existing = (await this.getOrNull(pub)) || []
await this.put(pub, existing.concat(recordsByPub[pub]))
}))
// Indexation
await this.indexForHash.onInsert(records)
await this.indexForUid.onInsert(records)
await this.indexForKick.onInsert(records)
await this.indexForWrittenOn.onInsert(records)
}
/**
* Reduceable DAO
*/
async trimRecords(belowNumber: number): Promise<void> {
// Trim writtenOn: we remove from the index the blocks below `belowNumber`, and keep track of the deleted values
const pubkeys: string[] = Underscore.uniq((await this.indexForWrittenOn.deleteBelow(belowNumber)))
// For each entry, we trim the records of our INDEX
await Promise.all(pubkeys.map(async pub => {
const oldEntries = await this.get(pub)
const newEntries = reduceForDBTrimming(oldEntries, belowNumber)
await this.put(pub, newEntries)
}))
await this.indexForHash.onTrimming(belowNumber)
await this.indexForUid.onTrimming(belowNumber)
await this.indexForKick.onTrimming(belowNumber)
}
/**
* Generic DAO
*/
async findRawWithOrder(criterion: { pub?: string }, sort: (string | (string | boolean)[])[]): Promise<IindexEntry[]> {
const rows: IindexEntry[] = (await this.findAllValues()).reduce(reduceConcat, [])
return Underscore.sortBy(rows, r => `${String(r.writtenOn).padStart(10, '0')}-${String(r.wotb_id).padStart(10, '0')}`)
}
async getWrittenOn(blockstamp: string): Promise<IindexEntry[]> {
const ids = (await this.indexForWrittenOn.getWrittenOnKeys(pint(blockstamp))) || []
return (await Promise.all(ids.map(id => this.get(id)))).reduce(reduceConcat, []).filter(e => e.written_on === blockstamp)
}
async removeBlock(blockstamp: string): Promise<void> {
// Trim writtenOn: we remove from the index the blocks below `belowNumber`, and keep track of the deleted values
const writteOn = pint(blockstamp)
const pubkeys: string[] = Underscore.uniq((await this.indexForWrittenOn.deleteAt(writteOn)))
let removedRecords: IindexEntry[] = []
// For each entry, we trim the records of our INDEX
await Promise.all(pubkeys.map(async pub => {
const records = await this.get(pub)
const keptRecords = records.filter(e => e.written_on !== blockstamp)
removedRecords = removedRecords.concat(records.filter(e => e.written_on === blockstamp))
await this.put(pub, keptRecords)
}))
// Update indexes
await this.indexForHash.onRemove(removedRecords)
await this.indexForUid.onRemove(removedRecords)
await this.indexForKick.onRemove(removedRecords)
}
async findByPub(pub: string): Promise<IindexEntry[]> {
if (!pub) {
return []
}
return (await this.getOrNull(pub)) || []
}
async findByUid(uid: string): Promise<IindexEntry[]> {
const pub = await this.indexForUid.getPubByUid(uid)
if (!pub) {
return []
}
return this.get(pub)
}
async getFromPubkey(pub: string): Promise<FullIindexEntry | null> {
const entries = (await this.getOrNull(pub)) || []
if (!entries || entries.length === 0) {
return null
}
return reduce(entries) as FullIindexEntry
}
async getFromPubkeyOrUid(search: string): Promise<FullIindexEntry | null> {
const fromPub = await this.getFromPubkey(search)
const fromUid = await this.getFromUID(search)
return fromPub || fromUid
}
async getFromUID(uid: string): Promise<FullIindexEntry | null> {
const pub = await this.indexForUid.getPubByUid(uid)
if (!pub) {
return null
}
const entries = (await this.getOrNull(pub)) || []
if (!entries || entries.length === 0) {
return null
}
return reduce(entries) as FullIindexEntry
}
async getFullFromHash(hash: string): Promise<FullIindexEntry|null> {
const pub = await this.indexForHash.getByHash(hash) as string
if (!pub) {
return null
}
const entries = await this.get(pub)
return OldTransformers.iindexEntityOrNull(entries) as Promise<FullIindexEntry>
}
async getFullFromPubkey(pub: string): Promise<FullIindexEntry> {
const entries = await this.get(pub)
return reduce(entries) as FullIindexEntry
}
async getFullFromUID(uid: string): Promise<FullIindexEntry> {
const pub = await this.indexForUid.getPubByUid(uid)
if (!pub) {
throw Error(DataErrors[DataErrors.IDENTITY_UID_NOT_FOUND])
}
const entries = await this.get(pub)
return reduce(entries) as FullIindexEntry
}
// Full scan
async getMembers(): Promise<{ pubkey: string; uid: string | null }[]> {
const members: IindexEntry[] = []
await this.findWhere(e => {
if (reduce(e).member as boolean) {
members.push(e[0])
}
return false
})
return members.map(m => ({
pubkey: m.pub,
uid: m.uid
}))
}
async getToBeKickedPubkeys(): Promise<string[]> {
return this.indexForKick.getAll()
}
async reducable(pub: string): Promise<IindexEntry[]> {
return this.findByPub(pub)
}
// Full scan
async searchThoseMatching(search: string): Promise<OldIindexEntry[]> {
const uidKeys = await this.indexForUid.findAllKeys()
const pubKeys = await this.findAllKeys()
const uids = (uidKeys).filter(u => u.includes(search))
const pubs = (pubKeys).filter(p => p.includes(search))
const uidIdentities = await Promise.all(uids.map(async uid => OldTransformers.toOldIindexEntry(reduce(await this.findByUid(uid)))))
const pubIdentities = await Promise.all(pubs.map(async pub => OldTransformers.toOldIindexEntry(reduce(await this.findByPub(pub)))))
return uidIdentities
.filter(u => u.pub)
.concat(
pubIdentities
.filter(p => p.pub)
)
}
}
......@@ -135,6 +135,14 @@ export class LevelDBTable<T> {
return deletedKv
}
public async findAllKeys(options?: AbstractIteratorOptions): Promise<string[]> {
const data: string[] = []
await this.readAllKeyValue(kv => {
data.push(kv.key)
}, options)
return data
}
public async findAllValues(options?: AbstractIteratorOptions): Promise<T[]> {
const data: T[] = []
await this.readAllKeyValue(kv => {
......
import {LevelDBTable} from "../LevelDBTable"
export abstract class LevelDBDataIndex<T, R> extends LevelDBTable<T> {
public abstract onInsert(records: R[]): Promise<void>
public abstract onRemove(records: R[]): Promise<void>
public async onTrimming(belowNumber: number): Promise<void> {}
}
import {LevelUp} from "levelup";
import {reduceConcat, reduceGroupBy} from "../../../../common-libs/reduce"
import {Underscore} from "../../../../common-libs/underscore"
import {LevelDBTable} from "../LevelDBTable"
import {pint} from "../../../../common-libs/pint"
export interface WrittenOnData {
writtenOn: number
}
export class LevelDBWrittenOnIndexer<T extends WrittenOnData> extends LevelDBTable<string[]> {
constructor(
name: string,
getLevelDB: (dbName: string)=> Promise<LevelUp>,
protected toKey: (t: T) => string) {
super(name, getLevelDB)
}
async onInsert(records: T[]): Promise<void> {
const byWrittenOn = reduceGroupBy(records, 'writtenOn')
await Promise.all(Underscore.keys(byWrittenOn).map(async writtenOn => {
await this.put(LevelDBWrittenOnIndexer.trimWrittenOnKey(pint(writtenOn)), byWrittenOn[writtenOn].map(e => this.toKey(e)))
}))
}
getWrittenOnKeys(writtenOn: number): Promise<string[]|null> {
return this.getOrNull(LevelDBWrittenOnIndexer.trimWrittenOnKey(writtenOn))
}
trim(writtenOn: number): Promise<void> {
return this.del(LevelDBWrittenOnIndexer.trimWrittenOnKey(writtenOn))
}
private static trimWrittenOnKey(writtenOn: number) {
return String(writtenOn).padStart(10, '0')
}
async deleteBelow(writtenOn: number): Promise<string[]> {
return (await this.deleteWhere({ lt: LevelDBWrittenOnIndexer.trimWrittenOnKey(writtenOn) }))
.map(kv => kv.value)
.reduce(reduceConcat, [])
}
async deleteAt(writtenOn: number): Promise<string[]> {
const k = LevelDBWrittenOnIndexer.trimWrittenOnKey(writtenOn)
const value = await this.getOrNull(k)
if (!value) {
// Nothing to delete, nothing to return
return []
}
await this.del(k)
return value
}
}
import {LevelDBDataIndex} from "../generic/LevelDBDataIndex"
import {IindexEntry} from "../../../../indexer"
export type Hash = string
export type Pubkey = string
export class LevelIIndexHashIndexer extends LevelDBDataIndex<Pubkey[], IindexEntry> {
async onInsert(records: IindexEntry[]): Promise<void> {
await Promise.all(records
.filter(e => e.op === 'CREATE' && e.hash)
.map(async e => this.put(e.hash as string, [e.pub]))
)
}
async onRemove(records: IindexEntry[]): Promise<void> {
await Promise.all(records
.filter(e => e.op === 'CREATE' && e.hash)
.map(async e => this.del(e.hash as string))
)
}
async getByHash(hash: Hash): Promise<Pubkey|null> {
const res = await this.getOrNull(hash)
return res && res[0]
}
}
import {LevelDBDataIndex} from "../generic/LevelDBDataIndex"
import {IindexEntry} from "../../../../indexer"
import {DataErrors} from "../../../../common-libs/errors"
export type Pubkey = string
export interface KickEntry {
on: number|undefined // The next time that the identity must be kicked
done: number[] // The revertion history
}
export class LevelIIndexKickIndexer extends LevelDBDataIndex<KickEntry, IindexEntry> {
async onInsert(records: IindexEntry[]): Promise<void> {
// Case 1: to be kicked
await Promise.all(records
.filter(e => e.kick)
.map(async e => {
let entry = await this.getOrNull(e.pub)
if (!entry) {
entry = {
on: e.writtenOn,
done: []
}
}
entry.on = e.writtenOn
await this.put(e.pub, entry)
})
)
// Case 2: just kicked
await Promise.all(records
.filter(e => e.member === false)
.map(async e => {
const entry = await this.getOrNull(e.pub)
if (entry && entry.on === e.writtenOn - 1) { // Members are excluded at B# +1
entry.done.push(entry.on)
entry.on = undefined
await this.put(e.pub, entry)
}
// Otherwise it is not a kicking
})
)
}
async onRemove(records: IindexEntry[]): Promise<void> {
// Case 1: to be kicked => unkicked
await Promise.all(records
.filter(e => e.kick)
.map(async e => {
const entry = await this.get(e.pub)
if (entry.on === e.writtenOn) {
entry.on = entry.done.pop()
if (entry.on === undefined) {
// No more kicking left
await this.del(e.pub)
}
// Some kicks left
await this.put(e.pub, entry) // TODO: test this, can occur, probably not covered
} else {
throw Error(DataErrors[DataErrors.INVALID_LEVELDB_IINDEX_DATA_TO_BE_KICKED])
}
})
)
// Case 2: just kicked => to be kicked
await Promise.all(records
.filter(e => e.member === false)
.map(async e => {
const entry = await this.getOrNull(e.pub)
if (entry && entry.done.includes(e.writtenOn - 1)) {
// It was a kicking
entry.on = entry.done.pop()
if (!entry.on) {
throw Error(DataErrors[DataErrors.INVALID_LEVELDB_IINDEX_DATA_WAS_KICKED])
}
await this.put(e.pub, entry)
}
})
)
}
async onTrimming(belowNumber: number): Promise<void> {
await this.applyAllKeyValue(async kv => {
const initialLength = kv.value.done.length
kv.value.done = kv.value.done.filter(e => e >= belowNumber)
if (kv.value.done.length !== initialLength && kv.value.done.length > 0) {
// We simply update the entry which was pruned
await this.put(kv.key, kv.value)
}
else if (kv.value.done.length !== initialLength && kv.value.done.length === 0 && !kv.value.on) {
// We remove the entry, no more necessary
await this.del(kv.key)
}
})
}
async getAll(): Promise<Pubkey[]> {
return this.findWhereTransform(t => !!t.on, kv => kv.key)
}
}
import {LevelDBDataIndex} from "../generic/LevelDBDataIndex"
import {IindexEntry} from "../../../../indexer"
export type Uid = string
export type Pubkey = string
export class LevelIIndexUidIndexer extends LevelDBDataIndex<Pubkey[], IindexEntry> {
async onInsert(records: IindexEntry[]): Promise<void> {
await Promise.all(records
.filter(e => e.op === 'CREATE' && e.uid)
.map(async e => this.put(e.uid as string, [e.pub]))
)
}
async onRemove(records: IindexEntry[]): Promise<void> {
await Promise.all(records
.filter(e => e.op === 'CREATE' && e.uid)
.map(async e => this.del(e.uid as string))
)
}
async getPubByUid(uid: Uid): Promise<Pubkey|null> {
const res = await this.getOrNull(uid)
return res && res[0]
}
}
......@@ -1963,14 +1963,24 @@ function blockstamp(aNumber: number, aHash: string) {
return [aNumber, aHash].join('-');
}
function reduceOrNull<T>(records: T[]): T|null {
export function reduceOrNull<T>(records: T[]): T|null {
if (records.length === 0) {
return null
}
return reduce(records)
}
function reduce<T>(records: T[]): T {
export function reduceForDBTrimming<T extends { writtenOn: number }>(records: T[], belowNumber: number): T[] {
if (records.length === 0) {
throw Error(DataErrors[DataErrors.INVALID_TRIMMABLE_DATA])
}
const reducableRecords = records.filter(r => r.writtenOn < belowNumber)
const nonReducableRecords = records.filter(r => r.writtenOn >= belowNumber)
const reduced = reduce(reducableRecords) as T
return [reduced].concat(nonReducableRecords)
}
export functi