Commit 3db06c0e authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] Upgrading to LevelDB for CIndex

parent 7dbb4bff
export function reduceConcat<T>(cumulated: T[], arr: T[]) {
return cumulated.concat(arr)
}
......@@ -13,19 +13,30 @@
import * as levelup from 'levelup'
import {LevelUp} from 'levelup'
import {AbstractLevelDOWN} from 'abstract-leveldown'
import {AbstractLevelDOWN, ErrorCallback} from 'abstract-leveldown'
import * as leveldown from 'leveldown'
import * as memdown from 'memdown'
export const LevelDBDriver = {
newMemoryInstance: (): LevelUp => {
newMemoryInstance: (): Promise<LevelUp> => {
const impl: any = memdown.default()
return levelup.default(impl)
return new Promise((res, rej) => {
const db: LevelUp = levelup.default(impl, undefined, (err: Error) => {
if (err) return rej(err)
res(db)
})
})
},
newFileInstance: (path: string): LevelUp => {
return levelup.default(leveldown.default(path))
newFileInstance: (path: string): Promise<LevelUp> => {
const impl: any = leveldown.default(path)
return new Promise((res, rej) => {
const db: LevelUp = levelup.default(impl, undefined, (err: Error) => {
if (err) return rej(err)
res(db)
})
})
}
}
......@@ -73,7 +73,6 @@ import {LokiDAO} from "./indexDAL/loki/LokiDAO"
import {MonitorExecutionTime} from "../debug/MonitorExecutionTime"
import {SqliteMIndex} from "./indexDAL/sqlite/SqliteMIndex"
import {SqliteIIndex} from "./indexDAL/sqlite/SqliteIIndex"
import {SqliteCIndex} from "./indexDAL/sqlite/SqliteCIndex"
import {LevelDBDividend} from "./indexDAL/leveldb/LevelDBDividend"
import {LevelDBBindex} from "./indexDAL/leveldb/LevelDBBindex"
......@@ -83,6 +82,7 @@ import {LevelDBSindex} from "./indexDAL/leveldb/LevelDBSindex"
import {SqliteTransactions} from "./indexDAL/sqlite/SqliteTransactions"
import {SqlitePeers} from "./indexDAL/sqlite/SqlitePeers"
import {LevelDBWallet} from "./indexDAL/leveldb/LevelDBWallet"
import {LevelDBCindex} from "./indexDAL/leveldb/LevelDBCindex"
const readline = require('readline')
const indexer = require('../indexer').Indexer
......@@ -168,7 +168,7 @@ export class FileDAL {
this.mindexDAL = new SqliteMIndex(getSqliteDB)
this.iindexDAL = new SqliteIIndex(getSqliteDB)
this.sindexDAL = new LevelDBSindex(getLevelDB)
this.cindexDAL = new SqliteCIndex(getSqliteDB)
this.cindexDAL = new LevelDBCindex(getLevelDB)
this.dividendDAL = new LevelDBDividend(getLevelDB)
this.newDals = {
......
......@@ -7,7 +7,7 @@ export interface CIndexDAO extends ReduceableDAO<CindexEntry> {
getValidLinksFrom(issuer:string): Promise<CindexEntry[]>
findExpired(medianTime:number): Promise<CindexEntry[]>
findExpiresOnLteNotExpiredYet(medianTime:number): Promise<CindexEntry[]>
findByIssuerAndReceiver(issuer: string, receiver: string): Promise<CindexEntry[]>
......
import {MonitorExecutionTime} from "../../../debug/MonitorExecutionTime"
import {CindexEntry, FullCindexEntry, Indexer, reduceBy} from "../../../indexer"
import {LevelUp} from 'levelup'
import {LevelDBTable} from "./LevelDBTable"
import {Underscore} from "../../../common-libs/underscore"
import {pint} from "../../../common-libs/pint"
import {CIndexDAO} from "../abstract/CIndexDAO"
import {reduceConcat} from "../../../common-libs/reduceConcat"
export interface LevelDBCindexEntry {
received: string[]
issued: CindexEntry[]
}
export class LevelDBCindex extends LevelDBTable<LevelDBCindexEntry> implements CIndexDAO {
private indexForExpiresOn: LevelDBTable<string[]>
private indexForWrittenOn: LevelDBTable<string[]>
constructor(protected getLevelDB: (dbName: string)=> Promise<LevelUp>) {
super('level_cindex', getLevelDB)
}
/**
* TECHNICAL
*/
async init(): Promise<void> {
await super.init()
this.indexForExpiresOn = new LevelDBTable<string[]>('level_cindex/expiresOn', this.getLevelDB)
this.indexForWrittenOn = new LevelDBTable<string[]>('level_cindex/writtenOn', this.getLevelDB)
await this.indexForExpiresOn.init()
await this.indexForWrittenOn.init()
}
async close(): Promise<void> {
await super.close()
await this.indexForExpiresOn.close()
await this.indexForWrittenOn.close()
}
/**
* INSERT
*/
@MonitorExecutionTime()
async insert(record: CindexEntry): Promise<void> {
await this.insertBatch([record])
}
@MonitorExecutionTime()
async insertBatch(records: CindexEntry[]): Promise<void> {
for (const r of records) {
const existingIssuer = await this.getOrNull(r.issuer)
const existingReceiver = await this.getOrNull(r.receiver)
let newValue4Issuer = existingIssuer || {
received: [],
issued: []
}
let newValue4Receiver = existingReceiver || {
received: [],
issued: []
}
newValue4Issuer.issued.push(r)
if (!newValue4Receiver.received.includes(r.issuer) && r.op === 'CREATE') {
newValue4Receiver.received.push(r.issuer)
}
await Promise.all([
this.put(r.issuer, newValue4Issuer),
this.put(r.receiver, newValue4Receiver)
])
}
await this.indexRecords(records)
}
/**
* Reduceable DAO
*/
async trimRecords(belowNumber: number): Promise<void> {
// Trim writtenOn: we remove from the index the blocks below `belowNumber`, and keep track of the deleted values
let issuers: string[] = Underscore.uniq((await this.indexForWrittenOn.deleteWhere({ lt: LevelDBCindex.trimWrittenOnKey(belowNumber) }))
.map(kv => kv.value)
.reduce(reduceConcat, []))
// Trim expired certs that won't be rolled back + we remember the max value of expired_on that was trimmed
let maxExpired = 0
issuers = Underscore.uniq(issuers)
await Promise.all(issuers.map(async issuer => {
const entry = await this.get(issuer)
const fullEntries = reduceBy(entry.issued, ['issuer', 'receiver', 'created_on'])
const toRemove: string[] = []
// We remember the maximum value of expired_on, for efficient trimming search
fullEntries
.filter(f => f.expired_on && f.writtenOn < belowNumber)
.forEach(f => {
maxExpired = Math.max(maxExpired, f.expired_on)
toRemove.push(LevelDBCindex.trimFullKey(f.issuer, f.receiver, f.created_on))
})
if (toRemove.length) {
// Trim the expired certs that won't be rolled back ever
entry.issued = entry.issued.filter(entry => !toRemove.includes(LevelDBCindex.trimFullKey(entry.issuer, entry.receiver, entry.created_on)))
await this.put(issuer, entry)
}
}))
// Finally, we trim the expiredOn index
await this.indexForExpiresOn.deleteWhere({ lte: LevelDBCindex.trimExpiredOnKey(maxExpired) })
}
/**
* Generic DAO
*/
async findRawWithOrder(criterion: { pub?: string }, sort: (string | (string | boolean)[])[]): Promise<CindexEntry[]> {
const rows: CindexEntry[] = (await this.findAllValues()).map(r => r.issued).reduce(reduceConcat, [])
return Underscore.sortBy(rows, r => LevelDBCindex.trimDumpSortKey(r.written_on, r.issuer, r.receiver))
}
async getWrittenOn(blockstamp: string): Promise<CindexEntry[]> {
const ids = (await this.indexForWrittenOn.getOrNull(LevelDBCindex.trimWrittenOnKey(pint(blockstamp)))) || []
return (await Promise.all(ids.map(async id => (await this.get(id)).issued))).reduce(reduceConcat, []).filter(e => e.written_on === blockstamp)
}
async removeBlock(blockstamp: string): Promise<void> {
const writtenOn = pint(blockstamp)
const issuers = (await this.indexForWrittenOn.getOrNull(LevelDBCindex.trimWrittenOnKey(writtenOn))) || []
const toRemove: CindexEntry[] = []
for (const issuer of issuers) {
// Remove the entries
const entry = await this.get(issuer)
const previousLength = entry.issued.length
entry.issued = entry.issued.filter(e => {
const shouldBeDeleted = e.written_on === blockstamp
if (shouldBeDeleted) {
toRemove.push(e)
}
return !shouldBeDeleted
})
if (entry.issued.length !== previousLength) {
// Update the entry
await this.put(issuer, entry)
}
}
// Remove the "received" arrays
await Promise.all(toRemove.map(async e => {
const entry = await this.get(e.receiver)
// Remove the certification
entry.received = entry.received.filter(issuer => issuer !== e.issuer)
// Persist
await this.put(e.receiver, entry)
}))
// Remove the expires_on index entries
const expires = Underscore.uniq(toRemove.filter(e => e.expires_on).map(e => e.expires_on))
await Promise.all(expires.map(async e => this.indexForExpiresOn.del(LevelDBCindex.trimExpiredOnKey(e))))
}
private static trimExpiredOnKey(writtenOn: number) {
return String(writtenOn).padStart(10, '0')
}
private static trimWrittenOnKey(writtenOn: number) {
return String(writtenOn).padStart(10, '0')
}
private static trimFullKey(issuer: string, receiver: string, created_on: number) {
return `${issuer}-${receiver}-${String(created_on).padStart(10, '0')}`
}
private static trimDumpSortKey(written_on: string, issuer: string, receiver: string) {
return `${written_on.padStart(100, '0')}-${issuer}-${receiver}`
}
private async indexRecords(records: CindexEntry[]) {
const byExpiresOn: { [k: number]: CindexEntry[] } = {}
const byWrittenOn: { [k: number]: CindexEntry[] } = {}
records
.filter(r => r.expires_on)
.forEach(r => (byExpiresOn[r.expires_on] || (byExpiresOn[r.expires_on] = [])).push(r))
records
.forEach(r => (byWrittenOn[r.writtenOn] || (byWrittenOn[r.writtenOn] = [])).push(r))
// Index expires_on => issuers
for (const k of Underscore.keys(byExpiresOn)) {
const issuers: string[] = ((await this.indexForExpiresOn.getOrNull(LevelDBCindex.trimExpiredOnKey(k))) || [])
.concat(byExpiresOn[k].map(r => r.issuer))
await this.indexForExpiresOn.put(LevelDBCindex.trimExpiredOnKey(k), issuers)
}
// Index writtenOn => issuers
for (const k of Underscore.keys(byWrittenOn)) {
await this.indexForWrittenOn.put(LevelDBCindex.trimWrittenOnKey(k), byWrittenOn[k].map(r => r.issuer))
}
}
async existsNonReplayableLink(issuer: string, receiver: string): Promise<boolean> {
const entries = await this.findByIssuer(issuer)
const reduced = Indexer.DUP_HELPERS.reduceBy(entries, ['issuer', 'receiver', 'created_on'])
return reduced.filter(e => e.receiver === receiver && !e.expired_on).length > 0
}
async findByIssuer(issuer: string): Promise<CindexEntry[]> {
return (await this.getOrNull(issuer) || { issued: [], received: [] }).issued
}
async findByIssuerAndChainableOnGt(issuer: string, medianTime: number): Promise<CindexEntry[]> {
return (await this.findByIssuer(issuer)).filter(e => e.chainable_on > medianTime)
}
async findByIssuerAndReceiver(issuer: string, receiver: string): Promise<CindexEntry[]> {
return (await this.findByIssuer(issuer)).filter(e => e.receiver === receiver)
}
async findByReceiverAndExpiredOn(pub: string, expired_on: number): Promise<CindexEntry[]> {
const receiver = (await this.getOrNull(pub)) || { issued: [], received: [] }
const issuers = receiver.received
return (await Promise.all(issuers.map(async issuer => {
return (await this.get(issuer)).issued.filter(e => e.receiver === pub && e.expired_on === 0)
}))).reduce(reduceConcat, [])
}
async findExpiresOnLteNotExpiredYet(medianTime: number): Promise<CindexEntry[]> {
const issuers: string[] = Underscore.uniq((await this.indexForExpiresOn.findAllValues({ lte: LevelDBCindex.trimExpiredOnKey(medianTime) })).reduce(reduceConcat, []))
return (await Promise.all(issuers.map(async issuer => {
const fullEntries = Indexer.DUP_HELPERS.reduceBy((await this.get(issuer)).issued, ['issuer', 'receiver', 'created_on'])
return fullEntries.filter(e => e.expires_on <= medianTime && !e.expired_on)
}))).reduce(reduceConcat, [])
}
async getReceiversAbove(minsig: number): Promise<string[]> {
return this.findWhereTransform(i => i.received.length >= minsig, i => i.key)
}
async getValidLinksFrom(issuer: string): Promise<CindexEntry[]> {
const fullEntries = Indexer.DUP_HELPERS.reduceBy(((await this.getOrNull(issuer)) || { issued: [] }).issued, ['issuer', 'receiver', 'created_on'])
return fullEntries.filter(e => !e.expired_on)
}
async getValidLinksTo(receiver: string): Promise<CindexEntry[]> {
const issuers: string[] = ((await this.getOrNull(receiver)) || { issued: [], received: [] }).received
return (await Promise.all(issuers.map(async issuer => {
const fullEntries = Indexer.DUP_HELPERS.reduceBy((await this.get(issuer)).issued, ['issuer', 'receiver', 'created_on'])
return fullEntries.filter(e => e.receiver === receiver && !e.expired_on)
}))).reduce(reduceConcat, [])
}
async reducablesFrom(from: string): Promise<FullCindexEntry[]> {
const entries = ((await this.getOrNull(from)) || { issued: [], received: [] }).issued
return Indexer.DUP_HELPERS.reduceBy(entries, ['issuer', 'receiver', 'created_on'])
}
trimExpiredCerts(belowNumber: number): Promise<void> {
return this.trimRecords(belowNumber)
}
}
......@@ -123,6 +123,18 @@ export class LevelDBTable<T> {
await Promise.all(ops)
}
public async deleteWhere(options?: AbstractIteratorOptions) {
const deletedKv: {
key: string,
value: T
}[] = []
await this.applyAllKeyValue(async kv => {
deletedKv.push(kv)
await this.del(kv.key)
}, options)
return deletedKv
}
public async findAllValues(options?: AbstractIteratorOptions): Promise<T[]> {
const data: T[] = []
await this.readAllKeyValue(kv => {
......@@ -132,19 +144,32 @@ export class LevelDBTable<T> {
}
public async findWhere(filter: (t: T) => boolean): Promise<T[]> {
const data: T[] = []
return this.findWhereTransform<T>(filter, t => t.value)
}
public async findWhereTransform<R>(filter: (t: T) => boolean, transform: (t: {
key: string,
value: T
}) => R): Promise<R[]> {
const data: R[] = []
await this.readAllKeyValue(kv => {
if (filter(kv.value)) {
data.push(kv.value)
if (!filter || filter(kv.value)) {
data.push(transform(kv))
}
}, {})
return data
}
async dump(dumpValue: (value: T) => any = (v) => v): Promise<number> {
async dump(dumpValue: (t: {
key: string,
value: T
}) => any = (v) => v): Promise<number> {
let count = 0
await this.readAllKeyValue(entry => {
console.log(entry.key, dumpValue(entry.value))
console.log(entry.key, dumpValue({
key: entry.key,
value: entry.value
}))
count++
})
return count
......
......@@ -70,7 +70,7 @@ export class LokiCIndex extends LokiProtocolIndex<CindexEntry> implements CIndex
}
@MonitorExecutionTime()
async findExpired(medianTime: number): Promise<CindexEntry[]> {
async findExpiresOnLteNotExpiredYet(medianTime: number): Promise<CindexEntry[]> {
return this.collection
.chain()
.find({ expires_on: { $lte: medianTime } })
......
......@@ -149,7 +149,7 @@ export class SqliteCIndex extends SqliteTable<CindexEntry> implements CIndexDAO
'AND expired_on = ?', [pub, expired_on])
}
findExpired(medianTime: number): Promise<CindexEntry[]> {
findExpiresOnLteNotExpiredYet(medianTime: number): Promise<CindexEntry[]> {
return this.find('SELECT * FROM cindex c1 ' +
'WHERE c1.expires_on <= ? ' +
'AND NOT EXISTS (' +
......
......@@ -1759,7 +1759,7 @@ export class Indexer {
// BR_G92
static async ruleIndexGenCertificationExpiry(HEAD: DBHead, dal:FileDAL) {
const expiries = [];
const certs = await dal.cindexDAL.findExpired(HEAD.medianTime);
const certs = await dal.cindexDAL.findExpiresOnLteNotExpiredYet(HEAD.medianTime);
for (const CERT of certs) {
expiries.push({
op: 'UPDATE',
......
......@@ -118,7 +118,7 @@ export const MemFS = (initialTree:{ [folder:string]: { [file:string]: string }}
export const Directory = {
DATA_FILES: ['mindex.db', 'c_mindex.db', 'iindex.db', 'cindex.db', 'sindex.db', 'wallet.db', 'dividend.db', 'txs.db', 'peers.db'],
DATA_DIRS: ['level_dividend', 'level_bindex', 'level_blockchain', 'level_sindex', 'level_wallet'],
DATA_DIRS: ['level_dividend', 'level_bindex', 'level_blockchain', 'level_sindex', 'level_cindex', 'level_wallet'],
INSTANCE_NAME: getDomain(opts.mdb),
INSTANCE_HOME: getHomePath(opts.mdb, opts.home),
......
......@@ -24,7 +24,6 @@ import {cliprogram} from "../../../../../lib/common-libs/programOptions"
import {DBHead} from "../../../../../lib/db/DBHead"
import {Watcher} from "../Watcher"
import {LokiMIndex} from "../../../../../lib/dal/indexDAL/loki/LokiMIndex"
import {LokiCIndex} from "../../../../../lib/dal/indexDAL/loki/LokiCIndex"
import {LokiIIndex} from "../../../../../lib/dal/indexDAL/loki/LokiIIndex"
import {LokiDividend} from "../../../../../lib/dal/indexDAL/loki/LokiDividend"
import {DataErrors} from "../../../../../lib/common-libs/errors"
......@@ -105,7 +104,6 @@ export class GlobalIndexStream extends Duplex {
this.mindexLokiInjection = (async () => {
await this.injectLoki(this.dal, 'mindexDAL', new LokiMIndex(new loki()))
await this.injectLoki(this.dal, 'iindexDAL', new LokiIIndex(new loki()))
await this.injectLoki(this.dal, 'cindexDAL', new LokiCIndex(new loki()))
await this.injectLoki(this.dal, 'dividendDAL', new LokiDividend(new loki()))
})()
}
......@@ -445,9 +443,6 @@ export class GlobalIndexStream extends Duplex {
await inject(this.dal, 'iindexDAL',
() => this.dal.iindexDAL.findRawWithOrder({}, [['writtenOn',false]]))
await inject(this.dal, 'cindexDAL',
() => this.dal.cindexDAL.findRawWithOrder({}, [['writtenOn',false]]))
await inject(this.dal, 'dividendDAL',
() => this.dal.dividendDAL.listAll())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment