diff --git a/lib/DataFinder.ts b/lib/DataFinder.ts index cc3fa4e00ac8c64f4e0a6bf0dbbed7caeb5c42a7..9d70d0f2ba14798f3f1eae35f7ecef802b16b697 100644 --- a/lib/DataFinder.ts +++ b/lib/DataFinder.ts @@ -2,64 +2,158 @@ import {Server} from 'duniter/server' import {DBBlock} from 'duniter/app/lib/db/DBBlock' import {MonitorExecutionTime} from './MonitorExecutionTime' import {LevelDBIindex} from "duniter/app/lib/dal/indexDAL/leveldb/LevelDBIindex"; -import {IindexEntry, reduce} from "duniter/app/lib/indexer"; +import {FullIindexEntry, IindexEntry, Indexer, reduce} from "duniter/app/lib/indexer"; +import {LevelDBBlockchain} from "duniter/app/lib/dal/indexDAL/leveldb/LevelDBBlockchain"; +import {Underscore} from "./underscore"; +import {CFSBlockchainArchive} from "duniter/app/lib/dal/indexDAL/CFSBlockchainArchive"; +import {MonitDBBlock, SqliteBlockchain} from "./SqliteBlockchain"; +import {LevelDBCindex} from "duniter/app/lib/dal/indexDAL/leveldb/LevelDBCindex"; +import {reduceConcat} from "duniter/app/lib/common-libs/reduce"; +import {LevelDBMindex} from "duniter/app/lib/dal/indexDAL/leveldb/LevelDBMindex"; + +/** + * Creates the DB objects + reset data + launches a first indexation + * @param duniterServer The server to index blockchain from. + */ +export async function initMonitDB(duniterServer: Server, resetData: boolean = false) { + DataFinder.createInstance(duniterServer) + if (resetData) { + await DataFinder.getInstance().resetIndexedData() + } + await DataFinder.getInstance().index() +} +/** + * Abstraction layer for data access (SQL + LevelDB of Duniter). + */ export class DataFinder { + private static instance: DataFinder + private static reindexing: Promise<void> = Promise.resolve() + + /** + * Singleton constructor + * @param duniterServer + */ + public static createInstance(duniterServer: Server) { + if (!DataFinder.instance) { + DataFinder.instance = new DataFinder(duniterServer) + } + } + + /** + * Singleton getter + */ + public static getInstance() { + return DataFinder.instance + } + + /** + * Retrieve the singleton + reindex Monit data if current HEAD is not up-to-date. + */ + public static async getInstanceReindexedIfNecessary() { + const currentMonit = await DataFinder.instance.getHighestBlock() + const currentDuniter = await DataFinder.instance.blockchainDao.getCurrent() + // Wait any already triggered reindexing + await DataFinder.reindexing + // Index only when opportune + if (currentDuniter && (!currentMonit || currentMonit.number < currentDuniter.number)) { + console.log('Duniter current = ', currentDuniter.number) + console.log('Monit current = ', currentMonit && currentMonit.number || -1) + DataFinder.reindexing = DataFinder.instance.index() + // Wait end of indexing + await DataFinder.reindexing + } + return DataFinder.instance + } + + private dbArchives: SqliteBlockchain; private memCache: { [cacheName: string]: { [k: string]: any } } = {}; + private dbInited: Promise<any> // Cache private intemporalWot: Promise<IindexEntry[]>; private wotmap: Promise<WotMap>; - constructor(protected duniterServer: Server) { + private constructor(protected duniterServer: Server) { + this.dbArchives = new SqliteBlockchain(duniterServer.dal.getSqliteDB) + this.dbInited = this.dbArchives.init() + } + + async resetIndexedData() { + await this.dbInited + console.log('Reseting all Monit data...') + await this.dbArchives.deleteAll() + } + + /** + * Mirror the Duniter archives for long term storage + * Renew periodically the non-archived part (in which forks may have occurred) + */ + async index() { + console.log('Reindexing blockchain...') + await this.dbInited + // 1. Look at first out-of-for-window block in Duniter: archive in Monit all the blocks < to this number + const firstOutOfFork = await this.getFirstOutOfForkBlockInDuniter(); + const newCeil = await this.archiveBlocksInMonit(firstOutOfFork) + // 2. Add all the blocks >= to this number + await this.addForkWindowBlocks(newCeil, firstOutOfFork) + console.log('Reindexing done.') } @MonitorExecutionTime() findPendingMembers() { - return this.query('SELECT `buid`,`pubkey`,`uid`,`hash`,`expires_on`,`revocation_sig` FROM identities_pending WHERE `member`=0') + return this.duniterServer.dal.idtyDAL.query('SELECT `buid`,`pubkey`,`uid`,`hash`,`expires_on`,`revocation_sig` FROM identities_pending WHERE `member`=0') } @MonitorExecutionTime() findPendingCertsToTarget(toPubkey: string, hash: string) { - return this.getFromCacheOrDB('findPendingCertsToTarget', [toPubkey, hash].join('-'), () => this.query( + return this.getFromCacheOrDB('findPendingCertsToTarget', [toPubkey, hash].join('-'), () => this.duniterServer.dal.certDAL.query( 'SELECT `from`,`block_number`,`block_hash`,`expires_on` FROM certifications_pending WHERE `to`=\''+toPubkey+'\' AND `target`=\''+hash+'\' ORDER BY `expires_on` DESC')) } @MonitorExecutionTime() - getWotexInfos(uid: string) { - return this.duniterServer.dal.idtyDAL.query('' + - 'SELECT hash, uid, pub, wotb_id FROM i_index WHERE uid = ? ' + - 'UNION ALL ' + 'SELECT hash, uid, pubkey as pub, (SELECT NULL) AS wotb_id FROM idty WHERE uid = ?', [uid, uid]) + async getWotexInfos(uid: string): Promise<{ hash: string }[]> { + const pendingIdentities: { hash: string }[] = await this.duniterServer.dal.idtyDAL.query('' + + 'SELECT hash, uid, pubkey as pub, (SELECT NULL) AS wotb_id FROM idty WHERE uid = ?', [uid]) + const eventualMember: { hash: string }|null = await this.iindex.getFromUID(uid) + if (eventualMember) { + pendingIdentities.push(eventualMember) + } + return pendingIdentities } @MonitorExecutionTime() - async getBlock(block_number: number): Promise<DBBlock|undefined> { + async getBlock(block_number: number): Promise<DBBlock> { return (await this.getFromCacheOrDB('getBlock', String(block_number),() => this.duniterServer.dal.getBlock(block_number))) || undefined } @MonitorExecutionTime() getUidOfPub(pub: string): Promise<{ uid: string }[]> { - return this.getFromCacheOrDB('getUidOfPub', pub, () => this.query('SELECT `uid` FROM i_index WHERE `pub`=\''+pub+'\' LIMIT 1')) + return this.getFromCacheOrDB('getUidOfPub', pub, () => this.iindex.getFullFromPubkey(pub)) } @MonitorExecutionTime() async getWotbIdByIssuerPubkey(issuerPubkey: string) { - return this.getFromCacheOrDB('getWotbIdByIssuerPubkey', issuerPubkey, async () => (await this.query('SELECT wotb_id FROM i_index WHERE pub = ? AND wotb_id IS NOT NULL', [issuerPubkey]))[0].wotb_id) + return this.getFromCacheOrDB('getWotbIdByIssuerPubkey', issuerPubkey, async () => (await this.iindex.getFullFromPubkey(issuerPubkey)).wotb_id) } @MonitorExecutionTime() - getChainableOnByIssuerPubkey(issuerPubkey: string) { - return this.query('SELECT `chainable_on` FROM c_index WHERE `issuer`=\''+issuerPubkey+'\' ORDER BY `chainable_on` DESC LIMIT 1') + async getChainableOnByIssuerPubkey(issuerPubkey: string) { + const reduced = await this.cindex.reducablesFrom(issuerPubkey); + return Underscore.sortBy(reduced, r => -r.chainable_on); } @MonitorExecutionTime() getChainableOnByIssuerPubkeyByExpOn(from: string) { - return this.getFromCacheOrDB('getChainableOnByIssuerPubkeyByExpOn', from, () => this.query('SELECT `chainable_on` FROM c_index WHERE `issuer`=\''+from+'\' ORDER BY `expires_on` DESC LIMIT 1')) + return this.getFromCacheOrDB('getChainableOnByIssuerPubkeyByExpOn', from, async () => { + const reduced = await this.cindex.reducablesFrom(from); + return Underscore.sortBy(reduced, r => -r.expires_on)[0]; + }) } @MonitorExecutionTime() @@ -69,30 +163,36 @@ export class DataFinder { @MonitorExecutionTime() findCertsOfIssuer(pub: string, tmpOrder: string) { - return this.getFromCacheOrDB('findCertsOfIssuer', [pub, tmpOrder].join('-'), () => this.query( - 'SELECT `receiver`,`written_on`,`expires_on` FROM c_index WHERE `issuer`=\''+pub+'\' ORDER BY `expires_on` '+tmpOrder)) + return this.getFromCacheOrDB('findCertsOfIssuer', [pub, tmpOrder].join('-'), async () => { + const reduced = await this.cindex.reducablesFrom(pub); + return Underscore.sortBy(reduced, r => tmpOrder === 'DESC' ? -r.expires_on : r.expires_on); + }) } @MonitorExecutionTime() findCertsOfReceiver(pub: any, tmpOrder: string) { - return this.getFromCacheOrDB('findCertsOfReceiver', [pub, tmpOrder].join('-'), () => this.query( - 'SELECT `issuer`,`written_on`,`expires_on` FROM c_index WHERE `receiver`=\''+pub+'\' ORDER BY `expires_on` '+tmpOrder)) + return this.getFromCacheOrDB('findCertsOfReceiver', [pub, tmpOrder].join('-'), async () => { + const reduced = await this.reducablesTo(pub); + return Underscore.sortBy(reduced, r => tmpOrder === 'DESC' ? -r.expires_on : r.expires_on); + }) } @MonitorExecutionTime() getProtagonist(pub: string) { - return this.getFromCacheOrDB('getProtagonist', pub, () => this.query('SELECT `uid`,`wotb_id` FROM i_index WHERE `pub`=\''+pub+'\' LIMIT 1')) + return this.getFromCacheOrDB('getProtagonist', pub, async (): Promise<FullIindexEntry> => { + return (await this.iindex.getFromPubkey(pub)) as FullIindexEntry; + }) } @MonitorExecutionTime() getCertsPending(pub: string, tmpOrder: string) { - return this.getFromCacheOrDB('getCertsPending', [pub, tmpOrder].join('-'), () => this.query( + return this.getFromCacheOrDB('getCertsPending', [pub, tmpOrder].join('-'), () => this.duniterServer.dal.certDAL.query( 'SELECT `from`,`to`,`block_number`,`expires_on` FROM certifications_pending WHERE `from`=\''+pub+'\' ORDER BY `expires_on` '+tmpOrder)) } @MonitorExecutionTime() getCertsPendingFromTo(pub: any, tmpOrder: string) { - return this.getFromCacheOrDB('getCertsPendingFromTo', [pub, tmpOrder].join('-'), () => this.query( + return this.getFromCacheOrDB('getCertsPendingFromTo', [pub, tmpOrder].join('-'), () => this.duniterServer.dal.certDAL.query( 'SELECT `from`,`block_number`,`block_hash`,`expires_on` FROM certifications_pending WHERE `to`=\''+pub+'\' ORDER BY `expires_on` '+tmpOrder)) } @@ -106,13 +206,9 @@ export class DataFinder { @MonitorExecutionTime() membershipWrittenOnExpiresOn(pub: string) { - return this.getFromCacheOrDB('membershipWrittenOnExpiresOn', pub, () => this.query( - 'SELECT `written_on`,`expires_on` FROM m_index WHERE `pub`=\''+pub+'\' ORDER BY `expires_on` DESC LIMIT 1')) - } - - @MonitorExecutionTime() - query(sql: string, params?: any[]): Promise<any> { - throw Error('Unhandled in Duniter 1.7.x') + return this.getFromCacheOrDB('membershipWrittenOnExpiresOn', pub, async () => { + return this.mindex.getReducedMS(pub); + }) } @MonitorExecutionTime() @@ -132,19 +228,19 @@ export class DataFinder { @MonitorExecutionTime() getBlockWhereMedianTimeGt(previousBlockchainTime: number) { return this.getFromCacheOrDB('getBlockWhereMedianTimeGt', String(previousBlockchainTime), - () => this.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` > '+previousBlockchainTime+' ORDER BY `medianTime` ASC')) + () => this.dbArchives.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` > '+previousBlockchainTime+' ORDER BY `medianTime` ASC')) } @MonitorExecutionTime() - getBlockWhereMedianTimeLte(medianTime: number) { - return this.getFromCacheOrDB('getBlockWhereMedianTimeLte', [medianTime].join('-'), - () => this.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`issuersCount`,`powMin` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC')) + getBlockWhereMedianTimeLte(newEndTime: number) { + return this.getFromCacheOrDB('getBlockWhereMedianTimeLte', [newEndTime].join('-'), + () => this.dbArchives.query('SELECT `medianTime`,`number` FROM block WHERE `fork`=0 AND `medianTime` <= \''+newEndTime+'\' ORDER BY `medianTime` DESC LIMIT 1 ')) } @MonitorExecutionTime() getBlockWhereMedianTimeLteNoLimit(medianTime: number) { return this.getFromCacheOrDB('getBlockWhereMedianTimeLteNoLimit', [medianTime].join('-'), - () => this.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`issuersCount`,`powMin` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC')) + () => this.dbArchives.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`issuersCount`,`powMin` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC')) } @MonitorExecutionTime() @@ -159,47 +255,95 @@ export class DataFinder { @MonitorExecutionTime() getBlockWhereMedianTimeLteAndGtNoLimit(currentBlockTime: number, medianTime: number) { return this.getFromCacheOrDB('getBlockWhereMedianTimeLteAndGtNoLimit', [currentBlockTime, medianTime].join('-'), - () => this.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`joiners`,`actives`,`revoked` FROM block WHERE `fork`=0 AND `medianTime` > '+currentBlockTime+' AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC')) + () => this.dbArchives.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`joiners`,`actives`,`revoked` FROM block WHERE `fork`=0 AND `medianTime` > '+currentBlockTime+' AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC')) } @MonitorExecutionTime() getBlockWhereMedianTimeLteAndGte(endMedianTime: number, beginMedianTime: number) { return this.getFromCacheOrDB('getBlockWhereMedianTimeLteAndGte', [endMedianTime, beginMedianTime].join('-'), - () => this.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+endMedianTime+' AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC')) + () => this.dbArchives.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+endMedianTime+' AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC')) } @MonitorExecutionTime() - getBlockWhereMedianTimeGte(previousBlockchainTime: number) { - return this.getFromCacheOrDB('getBlockWhereMedianTimeGte', String(previousBlockchainTime), - () => this.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` >= '+previousBlockchainTime+' ORDER BY `medianTime` ASC')) + getBlockWhereMedianTimeGte(beginTime: number) { + return this.getFromCacheOrDB('getBlockWhereMedianTimeGte', String(beginTime), + () => this.dbArchives.query('SELECT `medianTime`,`number` FROM block WHERE `fork`=0 AND `medianTime` >= \''+beginTime+'\' ORDER BY `medianTime` ASC LIMIT 1 ')) } @MonitorExecutionTime() getBlockWhereMedianTimeLteAndGt(medianTime: number, previousBlockchainTime: number) { return this.getFromCacheOrDB('getBlockWhereMedianTimeLteAndGt', [medianTime, previousBlockchainTime].join('-'), - () => this.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' AND `medianTime` > '+previousBlockchainTime+' ORDER BY `medianTime` ASC')) + () => this.dbArchives.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' AND `medianTime` > '+previousBlockchainTime+' ORDER BY `medianTime` ASC')) } @MonitorExecutionTime() getBlockWhereMedianTimeLteAndGteNoLimit(endMedianTime: number, beginMedianTime: number) { return this.getFromCacheOrDB('getBlockWhereMedianTimeLteAndGteNoLimit', [endMedianTime, beginMedianTime].join('-'), - () => this.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+endMedianTime+' AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC')) + () => this.dbArchives.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+endMedianTime+' AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC')) } @MonitorExecutionTime() getBlockWhereMedianTimeGtNoLimit(beginMedianTime: number) { return this.getFromCacheOrDB('getBlockWhereMedianTimeGtNoLimit', String(beginMedianTime), - () => this.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC')) + () => this.dbArchives.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC')) } searchIdentities(search: string) { return this.duniterServer.dal.searchJustIdentities(search) } + /** + * Get the highest block known by Monit + */ + async getHighestBlock() { + const number = await this.dbArchives.getHighestBlockNumber() + if (number < 0) { + return null + } + return this.dbArchives.getBlock(number) + } + + /** + * Get the highest block number known by Monit + */ + async getHighestBlockNumber() { + return await this.dbArchives.getHighestBlockNumber() + } + + /** + * Get the highest archived block number known by Monit + */ + async getHighestArchivedBlockNumber() { + return await this.dbArchives.getHighestArchivedBlockNumber() + } + + @MonitorExecutionTime() + async findRemainingBlocksInForkZone(criteria: (b: DBBlock) => boolean) { + const topArchived = await this.getHighestBlock() + return await this.blockchainDao.findWhere(block => (!topArchived || block.number > topArchived.number) && criteria(block)) + } + + async getFirstOutOfForkBlockInDuniter(): Promise<number> { + const current = (await this.blockchainDao.getCurrent()) + return (current && current.number || -1) - this.duniterServer.conf.forksize + } + + get blockchainDao() { + return this.duniterServer.dal.blockDAL as LevelDBBlockchain + } + get iindex() { return this.duniterServer.dal.iindexDAL as LevelDBIindex } + get mindex() { + return this.duniterServer.dal.mindexDAL as LevelDBMindex + } + + get cindex() { + return this.duniterServer.dal.cindexDAL as LevelDBCindex + } + /** * Singleton de fetching de la wotmap */ @@ -234,6 +378,121 @@ export class DataFinder { }); return wotmap; } + + // Extracted from Duniter `getValidLinksTo`, adapted to return even non-valid links + private async reducablesTo(receiver: any) { + const issuers: string[] = ((await this.cindex.getOrNull(receiver)) || { issued: [], received: [] }).received + return (await Promise.all(issuers.map(async issuer => { + const fullEntries = Indexer.DUP_HELPERS.reduceBy((await this.cindex.get(issuer)).issued, ['issuer', 'receiver']) + return fullEntries.filter(e => e.receiver === receiver ) + }))).reduce(reduceConcat, []) + } + + /** + * Save as archived blocks in Monit blocks the blocks that are not supposed to change + * ever in Duniter (non-fork blocks). + */ + private async archiveBlocksInMonit(targetCeil: number) { + console.log(`[Archives] Compiling archives up to #${targetCeil} (first non-forkable block)...`) + // Trim all the blocks above the ceil (should be the non-archived blocks) + console.log(`[Archives] Removing forkable blocks`) + await this.dbArchives.trimNonArchived() + // Check what is our new ceil + let currentCeil = await this.dbArchives.getHighestBlockNumber() + // Copy the blocks available from Duniter archives (they were stored during a sync) + currentCeil = await this.copyFromDuniterArchives(currentCeil, targetCeil) + // Then copy the bocks available in classical Duniter DB (a part stored during the sync, the other during the node's life) + currentCeil = await this.copyFromDuniterDB(currentCeil, targetCeil) + return this.dbArchives.getHighestBlockNumber() + } + + /** + * Save as non-archived blocks in Monit blocks the blocks that are in fork window of Duniter. + */ + private async addForkWindowBlocks(newCeil: number, firstOutOfFork: number) { + console.log(`[Forkables] Copying DB blocks from #${newCeil + 1} to #${firstOutOfFork}...`) + const current = (await this.blockchainDao.getCurrent()) as DBBlock + // Fetch memory blocks above our new ceil + const nonArchived: MonitDBBlock[] = await this.blockchainDao.getBlocks(newCeil + 1, firstOutOfFork) as any + // Mark them as non-archived + nonArchived.forEach(b => b.archived = true) + console.log(`[Forkables] Copying ${nonArchived.length} blocks.`) + await this.dbArchives.insertBatch(nonArchived) + console.log(`[Forkables] Copying DB forkable blocks from #${firstOutOfFork + 1} to #${current.number}...`) + // Fetch memory blocks above our new ceil + const nonArchivedForkable: MonitDBBlock[] = await this.blockchainDao.getBlocks(firstOutOfFork + 1, current.number) as any + // Mark them as non-archived because they are forkable + nonArchivedForkable.forEach(b => b.archived = false) + // And finally store them + console.log(`[Forkables] Copying ${nonArchivedForkable.length} blocks.`) + await this.dbArchives.insertBatch(nonArchivedForkable) + } + + /** + * Extract blocks from Duniter archives zone. + * @param currentCeil Our current ceil block in dbArchives. + * @param targetCeil Our target block in dbArchives (block to reach). + */ + private async copyFromDuniterArchives(currentCeil: number, targetCeil: number) { + console.log(`[Archives] Copying from Duniter archives from #${currentCeil + 1}...#${targetCeil}`) + while (currentCeil < targetCeil) { + // Get the chunk that contains the block following our current ceil + const chunk: MonitDBBlock[]|null = (await (this.duniterServer.dal.blockchainArchiveDAL as CFSBlockchainArchive<DBBlock>).getChunkForBlock(currentCeil + 1)) as any[]; + const toArchive: MonitDBBlock[] = []; + if (!chunk) { + // Not in the archives + break; + } + for (const block of chunk) { + if (block.number > currentCeil) { + // Archive it + block.archived = true; + toArchive.push(block); + currentCeil = block.number + } + } + if (toArchive.length) { + console.log(`[Archives] Copying from Duniter archives block #${toArchive[0].number}...#${toArchive[toArchive.length-1].number}`) + await this.dbArchives.insertBatch(toArchive) + // Force journal writing, otherwise we will have to wait for all the writings later on. + // I prefer to wait now, to follow the progress using logs + await this.dbArchives.getHighestBlockNumber() + } + } + await this.dbArchives.setArchived(currentCeil) + console.log(`[Archives] Copying from Duniter archives done.`) + return currentCeil + } + + /** + * Extract blocks from Duniter database zone. + * @param currentCeil Our current ceil block in dbArchives. + * @param targetCeil Our target block in dbArchives (block to reach). + */ + private async copyFromDuniterDB(currentCeil: number, targetCeil: number) { + console.log('[Archives] Copying from Duniter DB...') + const duniterCurrent = await this.blockchainDao.getCurrent() + if (duniterCurrent) { + // Get all the remaining blocks + console.log(`[Archives] Copying from Duniter DB block #${currentCeil + 1}...#${targetCeil}`) + const chunk: MonitDBBlock[]|null = (await this.blockchainDao.getBlocks(currentCeil + 1, targetCeil)) as any[]; + const toStore: MonitDBBlock[] = []; + for (const block of chunk) { + if (!block.fork && block.number === currentCeil + 1) { + // Store it + block.archived = block.number <= duniterCurrent.number; + toStore.push(block); + currentCeil = block.number + } + } + console.log(`[Archives] Copying ${toStore.length} blocks...`) + if (toStore.length) { + await this.dbArchives.insertBatch(toStore) + } + } + console.log('[Archives] Copying from Duniter DB done.') + return currentCeil + } } interface WotMap { diff --git a/lib/SqliteBlockchain.ts b/lib/SqliteBlockchain.ts new file mode 100644 index 0000000000000000000000000000000000000000..f0bf86a00856c74561147c67fef0f371a9803f28 --- /dev/null +++ b/lib/SqliteBlockchain.ts @@ -0,0 +1,105 @@ +import {SqliteTable} from "duniter/app/lib/dal/indexDAL/sqlite/SqliteTable"; +import {SQLiteDriver} from "duniter/app/lib/dal/drivers/SQLiteDriver"; +import { + SqlNotNullableFieldDefinition, + SqlNullableFieldDefinition +} from "duniter/app/lib/dal/indexDAL/sqlite/SqlFieldDefinition"; +import {MonitorExecutionTime} from "./MonitorExecutionTime"; +import {DBBlock} from "duniter/app/lib/db/DBBlock"; + +export class SqliteBlockchain extends SqliteTable<MonitDBBlock> { + + constructor(getSqliteDB: (dbName: string)=> Promise<SQLiteDriver>) { + super( + 'monit', + { + 'archived': new SqlNotNullableFieldDefinition('BOOLEAN', true), + 'fork': new SqlNotNullableFieldDefinition('BOOLEAN', true), + 'hash': new SqlNotNullableFieldDefinition('VARCHAR', false, 64), + 'inner_hash': new SqlNotNullableFieldDefinition('VARCHAR', false, 64), + 'signature': new SqlNotNullableFieldDefinition('VARCHAR', false, 100), + 'currency': new SqlNotNullableFieldDefinition('VARCHAR', false, 50), + 'issuer': new SqlNotNullableFieldDefinition('VARCHAR', false, 50), + 'version': new SqlNotNullableFieldDefinition('INT', false), + 'membersCount': new SqlNotNullableFieldDefinition('INT', false), + 'medianTime': new SqlNotNullableFieldDefinition('INT', true), // DATETIME? + 'time': new SqlNotNullableFieldDefinition('INT', false), // DATETIME? + 'powMin': new SqlNotNullableFieldDefinition('INT', false), + 'number': new SqlNotNullableFieldDefinition('INT', false), + 'nonce': new SqlNotNullableFieldDefinition('INT', false), + 'issuersCount': new SqlNotNullableFieldDefinition('INT', false), + 'parameters': new SqlNullableFieldDefinition('VARCHAR', false, 255), + 'previousHash': new SqlNullableFieldDefinition('VARCHAR', false, 64), + 'previousIssuer': new SqlNullableFieldDefinition('VARCHAR', false, 50), + 'monetaryMass': new SqlNullableFieldDefinition('VARCHAR', false, 100), + 'UDTime': new SqlNullableFieldDefinition('INT', false), // DATETIME + 'dividend': new SqlNullableFieldDefinition('INT', false), // DEFAULT \'0\' + 'unitbase': new SqlNullableFieldDefinition('INT', false), + 'transactions': new SqlNullableFieldDefinition('TEXT', false), + 'certifications': new SqlNullableFieldDefinition('TEXT', false), + 'identities': new SqlNullableFieldDefinition('TEXT', false), + 'joiners': new SqlNullableFieldDefinition('TEXT', false), + 'actives': new SqlNullableFieldDefinition('TEXT', false), + 'leavers': new SqlNullableFieldDefinition('TEXT', false), + 'revoked': new SqlNullableFieldDefinition('TEXT', false), + 'excluded': new SqlNullableFieldDefinition('TEXT', false), + }, + getSqliteDB + ); + this.name = 'block' + } + + @MonitorExecutionTime() + async insertBatch(records: MonitDBBlock[]): Promise<void> { + records.forEach((b:any) => { + for (const prop of ['joiners', 'actives', 'leavers', 'identities', 'certifications', 'transactions', 'revoked', 'excluded']) { + b[prop] = JSON.stringify(b[prop]); + } + return b + }); + if (records.length) { + return this.insertBatchInTable(this.driver, records) + } + } + + @MonitorExecutionTime() + async query(sql: string, params?: any[]): Promise<any> { + return this.driver.sqlRead(sql, params || []) + } + + async getBlock(number: number): Promise<MonitDBBlock|null> { + const blocks = await this.driver.sqlRead('SELECT * FROM block WHERE number = ?', [number]) + return blocks.length ? blocks[0] : null + } + + async getHighestBlock(): Promise<MonitDBBlock|null> { + const blocks = await this.driver.sqlRead('SELECT * FROM block ORDER BY number DESC LIMIT 1', []) + return blocks.length ? blocks[0] : null + } + + async getHighestBlockNumber(): Promise<number> { + const block = await this.getHighestBlock() + return block && block.number || -1 + } + + async getHighestArchivedBlockNumber(): Promise<number> { + const block = await this.driver.sqlRead('SELECT * FROM block WHERE archived ORDER BY number DESC LIMIT 1', []) + return block.length && block[0].number || -1 + } + + trimNonArchived() { + return this.driver.sqlWrite('DELETE FROM block WHERE NOT archived', []) + } + + setArchived(currentCeil: number) { + return this.driver.sqlWrite('UPDATE block SET archived = ? WHERE number <= ? AND NOT archived', [true, currentCeil]) + } + + deleteAll() { + return this.driver.sqlWrite('DELETE FROM block', []) + } +} + +export interface MonitDBBlock extends DBBlock { + archived: boolean +} diff --git a/lib/main.js b/lib/main.js index 0877d4da2b411e1f4e10b970921ebf724d7be61e..48cc399f2d256a47ff97d4ea28251e0c14417758 100755 --- a/lib/main.js +++ b/lib/main.js @@ -4,7 +4,7 @@ const co = require('co'); const os = require('os'); const fs = require('fs'); -const webserver = require(__dirname + '/webserver.js'); +const webserver = require(__dirname + '/webserver2.js'); const timestampToDatetime = require(__dirname + '/timestampToDatetime.js'); /**************************** diff --git a/lib/updateCache2.ts b/lib/updateCache2.ts index 9d7bc72efb9b415eb41a7a54a89c481b9bd63678..851d231add0a4584948d97388b151980e03e927e 100755 --- a/lib/updateCache2.ts +++ b/lib/updateCache2.ts @@ -3,6 +3,7 @@ import {DataFinder} from "./DataFinder"; import {DBBlock} from "duniter/app/lib/db/DBBlock"; import {MonitConstants} from "./constants2"; +import {Server} from "duniter/server"; const co = require('co'); @@ -12,9 +13,9 @@ const co = require('co'); */ module.exports = async (req:any, res:any, next:any) => { - var { duniterServer, cache } = req.app.locals + var { duniterServer, cache } = req.app.locals as { duniterServer: Server, cache: MonitCache }; - const dataFinder = new DataFinder(duniterServer) + const dataFinder = await DataFinder.getInstanceReindexedIfNecessary() try { // Définition des constantes @@ -110,7 +111,7 @@ module.exports = async (req:any, res:any, next:any) => { else if (req.query.begin > cache.endBlock[0].number) { let beginTime = cache.endBlock[0].medianTime-(parseInt(cache.step)*unitTime*MonitConstants.STEP_COUNT_MIN); - cache.beginBlock = [await dataFinder.getBlockWhereMedianTimeGte(beginTime)]; + cache.beginBlock = await dataFinder.getBlockWhereMedianTimeGte(beginTime); } else { cache.beginBlock = [await dataFinder.getBlock(req.query.begin)]; } @@ -125,20 +126,22 @@ module.exports = async (req:any, res:any, next:any) => { } else { cache.adaptMaxPoints = "begin"; } - + if (!cache.beginBlock || !cache.beginBlock[0]) { + throw Error("No begin block") + } // Apply nbMaxPoints and adaptMaxPoints if (cache.adaptMaxPoints == "begin") { if ( Math.ceil((cache.endBlock[0].medianTime-cache.beginBlock[0].medianTime)/(cache.step*unitTime)) > cache.nbMaxPoints ) { let newBeginTime = cache.endBlock[0].medianTime-cache.step*cache.nbMaxPoints*unitTime; - cache.beginBlock = [await dataFinder.getBlockWhereMedianTimeGte(newBeginTime)]; + cache.beginBlock = await dataFinder.getBlockWhereMedianTimeGte(newBeginTime); } } else if (cache.adaptMaxPoints == "step") { cache.step = Math.ceil((cache.endBlock[0].medianTime-cache.beginBlock[0].medianTime)/(MonitConstants.STEP_COUNT_MAX*unitTime)); } else { let newEndTime = cache.beginBlock[0].medianTime+cache.step*cache.nbMaxPoints*unitTime; - cache.endBlock = [await dataFinder.getBlockWhereMedianTimeLte(newEndTime)]; + cache.endBlock = await dataFinder.getBlockWhereMedianTimeLte(newEndTime); } // Calculate stepTime @@ -293,3 +296,7 @@ module.exports = async (req:any, res:any, next:any) => { } } +interface MonitCache { + [k: string]: any + beginBlock: null|DBBlock[] +} diff --git a/lib/webserver.js b/lib/webserver2.ts similarity index 63% rename from lib/webserver.js rename to lib/webserver2.ts index 567b5acc4343c27b04e1a06c14f8a3c120aa05a6..64e2e42376251e61d37ef926736565ecbf7c8a4c 100755 --- a/lib/webserver.js +++ b/lib/webserver2.ts @@ -1,5 +1,8 @@ "use strict"; +import {Server} from "duniter/server"; +import {initMonitDB} from "./DataFinder"; + const fs = require('fs'); //const util = require('util'); const Q = require('q'); @@ -13,13 +16,13 @@ const bodyParser = require('body-parser'); const routes = require(__dirname + '/../routes'); const tpl = require(__dirname + '/tplit.js'); -module.exports = (host, port, appParente, duniterServer, monitDatasPath, offset, cache) => { +module.exports = (host: any, port: any, appParente: any, duniterServer: Server, monitDatasPath: any, offset: any, cache: any, resetData: boolean = false) => { var app = express(); app.use(morgan('\x1b[90m:remote-addr :remote-user [:date[clf]] :method :url HTTP/:http-version :status :res[content-length] - :response-time ms\x1b[0m', { stream: { - write: function(message){ + write: function(message: any){ message && console.log(message.replace(/\n$/,'')); } } @@ -44,7 +47,7 @@ module.exports = (host, port, appParente, duniterServer, monitDatasPath, offset, /*************************************** * CSV des membres calculants ***************************************/ - app.get('/csvCalculatorsRank', function(req, res) { + app.get('/csvCalculatorsRank', function(req: any, res: any) { let files = fs.readdirSync(monitDatasPath + '/calculators_rank/') let maxTimestamp = 0 for (let file of files) { @@ -61,35 +64,46 @@ module.exports = (host, port, appParente, duniterServer, monitDatasPath, offset, if ( appParente == null ) { let httpServer = http.createServer(app); - httpServer.on('error', function(err) { + httpServer.on('error', function(err: any) { httpServer.errorPropagates(err); }); return { - openConnection: () => co(function *() { - try { - yield Q.Promise((resolve, reject) => { - // Weird the need of such a hack to catch an exception... - httpServer.errorPropagates = function(err) { - reject(err); - }; + openConnection: async () => { + try { + await Q.Promise((resolve: any, reject: any) => { + // Weird the need of such a hack to catch an exception... + httpServer.errorPropagates = function(err: any) { + reject(err); + }; + + httpServer.listen(port, host, (err: any) => { + if (err) return reject(err); + resolve(httpServer); + }); + }); + + // Init + first incremental indexation + await initMonitDB(duniterServer, resetData) + + console.log('Server listening on http://' + host + ':' + port); - httpServer.listen(port, host, (err) => { - if (err) return reject(err); - resolve(httpServer); - }); - }); - console.log('Server listening on http://' + host + ':' + port); - } catch (e) { - console.warn('Could NOT listen to http://' + host + ':' + port); - console.warn(e); - } - }), + } catch (e) { + console.warn('Could NOT listen to http://' + host + ':' + port); + console.warn(e); + } + }, }; } else { appParente.use("/currency-monit", app); + + return { + openConnection: async () => { + console.log('No connection to open') + } + }; } diff --git a/routes/blockCount2.ts b/routes/blockCount2.ts index 97fa18acc0d2969c2395312adbce90a83e61b049..3f8bf888a30150ca9668df1c23320067bf1b187b 100755 --- a/routes/blockCount2.ts +++ b/routes/blockCount2.ts @@ -15,9 +15,9 @@ var previousBlockchainTime= 0; module.exports = async (req: any, res: any, next: any) => { - var { duniterServer, monitDatasPath } = req.app.locals + var { monitDatasPath } = req.app.locals - const dataFinder = new DataFinder(duniterServer) + const dataFinder = await DataFinder.getInstanceReindexedIfNecessary() try { // get GET parameters diff --git a/routes/gaussianWotQuality2.ts b/routes/gaussianWotQuality2.ts index 7c4650951cc9a52b764f0eb6595ec6f450f19646..691d4953e0348d9a6aa65f50f3423c242c1700af 100644 --- a/routes/gaussianWotQuality2.ts +++ b/routes/gaussianWotQuality2.ts @@ -11,7 +11,7 @@ module.exports = async (req:any, res:any, next:any) => { var { duniterServer } = req.app.locals - const dataFinder = new DataFinder(duniterServer) + const dataFinder = await DataFinder.getInstanceReindexedIfNecessary() try { // get GET parameters diff --git a/routes/members2.ts b/routes/members2.ts index d290cd2dba17744b25465b62fe6300d0a177c1bd..aa761d5a556d0584001ba5bb87149b754452c3e3 100755 --- a/routes/members2.ts +++ b/routes/members2.ts @@ -45,7 +45,7 @@ module.exports = async (req: any, res: any, next: any) => { var { duniterServer } = req.app.locals - const dataFinder = new DataFinder(duniterServer) + const dataFinder = await DataFinder.getInstanceReindexedIfNecessary() try { // Initaliser les constantes @@ -181,7 +181,7 @@ module.exports = async (req: any, res: any, next: any) => { for (let m=0;m<membersList.length;m++) { // Récupérer les blockstamp d'écriture et date d'expiration du membership courant du membre m - let tmpQueryResult = await dataFinder.membershipWrittenOnExpiresOn(membersList[m].pub); + let tmpQueryResult = [await dataFinder.membershipWrittenOnExpiresOn(membersList[m].pub)]; membershipsExpireTimeList.push(tmpQueryResult[0].expires_on); // Extraire le numéro de bloc du blockstamp d'écriture du membership courant @@ -274,11 +274,11 @@ module.exports = async (req: any, res: any, next: any) => { let tmpQueryGetUidProtagonistCert if (mode == 'emitted') { - tmpQueryGetUidProtagonistCert = await dataFinder.getProtagonist(tmpQueryCertifsList[i].receiver) + tmpQueryGetUidProtagonistCert = [await dataFinder.getProtagonist(tmpQueryCertifsList[i].receiver)] } else { - tmpQueryGetUidProtagonistCert = await dataFinder.getProtagonist(tmpQueryCertifsList[i].issuer) + tmpQueryGetUidProtagonistCert = [await dataFinder.getProtagonist(tmpQueryCertifsList[i].issuer)] } let tmpBlockWrittenOn = tmpQueryCertifsList[i].written_on.split("-"); diff --git a/routes/membersCount2.ts b/routes/membersCount2.ts index bb3458f43ccccca0df15a56d96e1b15fb4f6d1a4..936dfb1487de7cddcad5a19c57478c1b704f7e54 100755 --- a/routes/membersCount2.ts +++ b/routes/membersCount2.ts @@ -11,9 +11,9 @@ const getLang = require(__dirname + '/../lib/getLang') module.exports = async (req: any, res: any, next: any) => { - var { duniterServer, cache } = req.app.locals + var { cache } = req.app.locals - const dataFinder = new DataFinder(duniterServer) + const dataFinder = await DataFinder.getInstanceReindexedIfNecessary() try { // get GET parameters diff --git a/routes/monetaryMass2.ts b/routes/monetaryMass2.ts index b17ab59d2a7a3a0b0a24eaaf13ab6b3226f184ae..362ccef4dbe59b937dd9fda329a7fb2c43377861 100755 --- a/routes/monetaryMass2.ts +++ b/routes/monetaryMass2.ts @@ -9,8 +9,8 @@ module.exports = async (req:any, res:any, next:any) => { var { duniterServer } = req.app.locals - const dataFinder = new DataFinder(duniterServer) - + const dataFinder = await DataFinder.getInstanceReindexedIfNecessary() + try { // get GET parameters var begin = req.query.begin >= 2 && req.query.begin || 2; // Default Value diff --git a/routes/willMembers2.ts b/routes/willMembers2.ts index c1da6873f67df35610b288dbad23e20a8acc3c47..31f92ca59ea0771f7ecda34567052d607efbf915 100755 --- a/routes/willMembers2.ts +++ b/routes/willMembers2.ts @@ -27,7 +27,7 @@ module.exports = async (req: any, res: any, next: any) => { const locals: { duniterServer: Server } = req.app.locals const duniterServer = locals.duniterServer - const dataFinder = new DataFinder(duniterServer) + const dataFinder = await DataFinder.getInstanceReindexedIfNecessary() try { // get blockchain timestamp diff --git a/routes/wotex2.ts b/routes/wotex2.ts index a18aa750d7b904a2f73776026a420c07e70a714a..4242a57114a5a78381e9e03c72de1bcc24f01037 100755 --- a/routes/wotex2.ts +++ b/routes/wotex2.ts @@ -10,7 +10,7 @@ module.exports = async (req:any, res:any, next:any) => { var { duniterServer } = req.app.locals - const dataFinder = new DataFinder(duniterServer) + const dataFinder = await DataFinder.getInstanceReindexedIfNecessary() try { // get GET parameters