Skip to content
Snippets Groups Projects
Commit 062e0b7b authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[mod] Monit now uses a local blockchain storage periodically indexed

parent 0863718a
Branches
No related tags found
No related merge requests found
......@@ -2,64 +2,158 @@ import {Server} from 'duniter/server'
import {DBBlock} from 'duniter/app/lib/db/DBBlock'
import {MonitorExecutionTime} from './MonitorExecutionTime'
import {LevelDBIindex} from "duniter/app/lib/dal/indexDAL/leveldb/LevelDBIindex";
import {IindexEntry, reduce} from "duniter/app/lib/indexer";
import {FullIindexEntry, IindexEntry, Indexer, reduce} from "duniter/app/lib/indexer";
import {LevelDBBlockchain} from "duniter/app/lib/dal/indexDAL/leveldb/LevelDBBlockchain";
import {Underscore} from "./underscore";
import {CFSBlockchainArchive} from "duniter/app/lib/dal/indexDAL/CFSBlockchainArchive";
import {MonitDBBlock, SqliteBlockchain} from "./SqliteBlockchain";
import {LevelDBCindex} from "duniter/app/lib/dal/indexDAL/leveldb/LevelDBCindex";
import {reduceConcat} from "duniter/app/lib/common-libs/reduce";
import {LevelDBMindex} from "duniter/app/lib/dal/indexDAL/leveldb/LevelDBMindex";
/**
* Creates the DB objects + reset data + launches a first indexation
* @param duniterServer The server to index blockchain from.
*/
export async function initMonitDB(duniterServer: Server, resetData: boolean = false) {
DataFinder.createInstance(duniterServer)
if (resetData) {
await DataFinder.getInstance().resetIndexedData()
}
await DataFinder.getInstance().index()
}
/**
* Abstraction layer for data access (SQL + LevelDB of Duniter).
*/
export class DataFinder {
private static instance: DataFinder
private static reindexing: Promise<void> = Promise.resolve()
/**
* Singleton constructor
* @param duniterServer
*/
public static createInstance(duniterServer: Server) {
if (!DataFinder.instance) {
DataFinder.instance = new DataFinder(duniterServer)
}
}
/**
* Singleton getter
*/
public static getInstance() {
return DataFinder.instance
}
/**
* Retrieve the singleton + reindex Monit data if current HEAD is not up-to-date.
*/
public static async getInstanceReindexedIfNecessary() {
const currentMonit = await DataFinder.instance.getHighestBlock()
const currentDuniter = await DataFinder.instance.blockchainDao.getCurrent()
// Wait any already triggered reindexing
await DataFinder.reindexing
// Index only when opportune
if (currentDuniter && (!currentMonit || currentMonit.number < currentDuniter.number)) {
console.log('Duniter current = ', currentDuniter.number)
console.log('Monit current = ', currentMonit && currentMonit.number || -1)
DataFinder.reindexing = DataFinder.instance.index()
// Wait end of indexing
await DataFinder.reindexing
}
return DataFinder.instance
}
private dbArchives: SqliteBlockchain;
private memCache: {
[cacheName: string]: {
[k: string]: any
}
} = {};
private dbInited: Promise<any>
// Cache
private intemporalWot: Promise<IindexEntry[]>;
private wotmap: Promise<WotMap>;
constructor(protected duniterServer: Server) {
private constructor(protected duniterServer: Server) {
this.dbArchives = new SqliteBlockchain(duniterServer.dal.getSqliteDB)
this.dbInited = this.dbArchives.init()
}
async resetIndexedData() {
await this.dbInited
console.log('Reseting all Monit data...')
await this.dbArchives.deleteAll()
}
/**
* Mirror the Duniter archives for long term storage
* Renew periodically the non-archived part (in which forks may have occurred)
*/
async index() {
console.log('Reindexing blockchain...')
await this.dbInited
// 1. Look at first out-of-for-window block in Duniter: archive in Monit all the blocks < to this number
const firstOutOfFork = await this.getFirstOutOfForkBlockInDuniter();
const newCeil = await this.archiveBlocksInMonit(firstOutOfFork)
// 2. Add all the blocks >= to this number
await this.addForkWindowBlocks(newCeil, firstOutOfFork)
console.log('Reindexing done.')
}
@MonitorExecutionTime()
findPendingMembers() {
return this.query('SELECT `buid`,`pubkey`,`uid`,`hash`,`expires_on`,`revocation_sig` FROM identities_pending WHERE `member`=0')
return this.duniterServer.dal.idtyDAL.query('SELECT `buid`,`pubkey`,`uid`,`hash`,`expires_on`,`revocation_sig` FROM identities_pending WHERE `member`=0')
}
@MonitorExecutionTime()
findPendingCertsToTarget(toPubkey: string, hash: string) {
return this.getFromCacheOrDB('findPendingCertsToTarget', [toPubkey, hash].join('-'), () => this.query(
return this.getFromCacheOrDB('findPendingCertsToTarget', [toPubkey, hash].join('-'), () => this.duniterServer.dal.certDAL.query(
'SELECT `from`,`block_number`,`block_hash`,`expires_on` FROM certifications_pending WHERE `to`=\''+toPubkey+'\' AND `target`=\''+hash+'\' ORDER BY `expires_on` DESC'))
}
@MonitorExecutionTime()
getWotexInfos(uid: string) {
return this.duniterServer.dal.idtyDAL.query('' +
'SELECT hash, uid, pub, wotb_id FROM i_index WHERE uid = ? ' +
'UNION ALL ' + 'SELECT hash, uid, pubkey as pub, (SELECT NULL) AS wotb_id FROM idty WHERE uid = ?', [uid, uid])
async getWotexInfos(uid: string): Promise<{ hash: string }[]> {
const pendingIdentities: { hash: string }[] = await this.duniterServer.dal.idtyDAL.query('' +
'SELECT hash, uid, pubkey as pub, (SELECT NULL) AS wotb_id FROM idty WHERE uid = ?', [uid])
const eventualMember: { hash: string }|null = await this.iindex.getFromUID(uid)
if (eventualMember) {
pendingIdentities.push(eventualMember)
}
return pendingIdentities
}
@MonitorExecutionTime()
async getBlock(block_number: number): Promise<DBBlock|undefined> {
async getBlock(block_number: number): Promise<DBBlock> {
return (await this.getFromCacheOrDB('getBlock', String(block_number),() => this.duniterServer.dal.getBlock(block_number))) || undefined
}
@MonitorExecutionTime()
getUidOfPub(pub: string): Promise<{ uid: string }[]> {
return this.getFromCacheOrDB('getUidOfPub', pub, () => this.query('SELECT `uid` FROM i_index WHERE `pub`=\''+pub+'\' LIMIT 1'))
return this.getFromCacheOrDB('getUidOfPub', pub, () => this.iindex.getFullFromPubkey(pub))
}
@MonitorExecutionTime()
async getWotbIdByIssuerPubkey(issuerPubkey: string) {
return this.getFromCacheOrDB('getWotbIdByIssuerPubkey', issuerPubkey, async () => (await this.query('SELECT wotb_id FROM i_index WHERE pub = ? AND wotb_id IS NOT NULL', [issuerPubkey]))[0].wotb_id)
return this.getFromCacheOrDB('getWotbIdByIssuerPubkey', issuerPubkey, async () => (await this.iindex.getFullFromPubkey(issuerPubkey)).wotb_id)
}
@MonitorExecutionTime()
getChainableOnByIssuerPubkey(issuerPubkey: string) {
return this.query('SELECT `chainable_on` FROM c_index WHERE `issuer`=\''+issuerPubkey+'\' ORDER BY `chainable_on` DESC LIMIT 1')
async getChainableOnByIssuerPubkey(issuerPubkey: string) {
const reduced = await this.cindex.reducablesFrom(issuerPubkey);
return Underscore.sortBy(reduced, r => -r.chainable_on);
}
@MonitorExecutionTime()
getChainableOnByIssuerPubkeyByExpOn(from: string) {
return this.getFromCacheOrDB('getChainableOnByIssuerPubkeyByExpOn', from, () => this.query('SELECT `chainable_on` FROM c_index WHERE `issuer`=\''+from+'\' ORDER BY `expires_on` DESC LIMIT 1'))
return this.getFromCacheOrDB('getChainableOnByIssuerPubkeyByExpOn', from, async () => {
const reduced = await this.cindex.reducablesFrom(from);
return Underscore.sortBy(reduced, r => -r.expires_on)[0];
})
}
@MonitorExecutionTime()
......@@ -69,30 +163,36 @@ export class DataFinder {
@MonitorExecutionTime()
findCertsOfIssuer(pub: string, tmpOrder: string) {
return this.getFromCacheOrDB('findCertsOfIssuer', [pub, tmpOrder].join('-'), () => this.query(
'SELECT `receiver`,`written_on`,`expires_on` FROM c_index WHERE `issuer`=\''+pub+'\' ORDER BY `expires_on` '+tmpOrder))
return this.getFromCacheOrDB('findCertsOfIssuer', [pub, tmpOrder].join('-'), async () => {
const reduced = await this.cindex.reducablesFrom(pub);
return Underscore.sortBy(reduced, r => tmpOrder === 'DESC' ? -r.expires_on : r.expires_on);
})
}
@MonitorExecutionTime()
findCertsOfReceiver(pub: any, tmpOrder: string) {
return this.getFromCacheOrDB('findCertsOfReceiver', [pub, tmpOrder].join('-'), () => this.query(
'SELECT `issuer`,`written_on`,`expires_on` FROM c_index WHERE `receiver`=\''+pub+'\' ORDER BY `expires_on` '+tmpOrder))
return this.getFromCacheOrDB('findCertsOfReceiver', [pub, tmpOrder].join('-'), async () => {
const reduced = await this.reducablesTo(pub);
return Underscore.sortBy(reduced, r => tmpOrder === 'DESC' ? -r.expires_on : r.expires_on);
})
}
@MonitorExecutionTime()
getProtagonist(pub: string) {
return this.getFromCacheOrDB('getProtagonist', pub, () => this.query('SELECT `uid`,`wotb_id` FROM i_index WHERE `pub`=\''+pub+'\' LIMIT 1'))
return this.getFromCacheOrDB('getProtagonist', pub, async (): Promise<FullIindexEntry> => {
return (await this.iindex.getFromPubkey(pub)) as FullIindexEntry;
})
}
@MonitorExecutionTime()
getCertsPending(pub: string, tmpOrder: string) {
return this.getFromCacheOrDB('getCertsPending', [pub, tmpOrder].join('-'), () => this.query(
return this.getFromCacheOrDB('getCertsPending', [pub, tmpOrder].join('-'), () => this.duniterServer.dal.certDAL.query(
'SELECT `from`,`to`,`block_number`,`expires_on` FROM certifications_pending WHERE `from`=\''+pub+'\' ORDER BY `expires_on` '+tmpOrder))
}
@MonitorExecutionTime()
getCertsPendingFromTo(pub: any, tmpOrder: string) {
return this.getFromCacheOrDB('getCertsPendingFromTo', [pub, tmpOrder].join('-'), () => this.query(
return this.getFromCacheOrDB('getCertsPendingFromTo', [pub, tmpOrder].join('-'), () => this.duniterServer.dal.certDAL.query(
'SELECT `from`,`block_number`,`block_hash`,`expires_on` FROM certifications_pending WHERE `to`=\''+pub+'\' ORDER BY `expires_on` '+tmpOrder))
}
......@@ -106,13 +206,9 @@ export class DataFinder {
@MonitorExecutionTime()
membershipWrittenOnExpiresOn(pub: string) {
return this.getFromCacheOrDB('membershipWrittenOnExpiresOn', pub, () => this.query(
'SELECT `written_on`,`expires_on` FROM m_index WHERE `pub`=\''+pub+'\' ORDER BY `expires_on` DESC LIMIT 1'))
}
@MonitorExecutionTime()
query(sql: string, params?: any[]): Promise<any> {
throw Error('Unhandled in Duniter 1.7.x')
return this.getFromCacheOrDB('membershipWrittenOnExpiresOn', pub, async () => {
return this.mindex.getReducedMS(pub);
})
}
@MonitorExecutionTime()
......@@ -132,19 +228,19 @@ export class DataFinder {
@MonitorExecutionTime()
getBlockWhereMedianTimeGt(previousBlockchainTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeGt', String(previousBlockchainTime),
() => this.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` > '+previousBlockchainTime+' ORDER BY `medianTime` ASC'))
() => this.dbArchives.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` > '+previousBlockchainTime+' ORDER BY `medianTime` ASC'))
}
@MonitorExecutionTime()
getBlockWhereMedianTimeLte(medianTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeLte', [medianTime].join('-'),
() => this.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`issuersCount`,`powMin` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC'))
getBlockWhereMedianTimeLte(newEndTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeLte', [newEndTime].join('-'),
() => this.dbArchives.query('SELECT `medianTime`,`number` FROM block WHERE `fork`=0 AND `medianTime` <= \''+newEndTime+'\' ORDER BY `medianTime` DESC LIMIT 1 '))
}
@MonitorExecutionTime()
getBlockWhereMedianTimeLteNoLimit(medianTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeLteNoLimit', [medianTime].join('-'),
() => this.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`issuersCount`,`powMin` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC'))
() => this.dbArchives.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`issuersCount`,`powMin` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC'))
}
@MonitorExecutionTime()
......@@ -159,47 +255,95 @@ export class DataFinder {
@MonitorExecutionTime()
getBlockWhereMedianTimeLteAndGtNoLimit(currentBlockTime: number, medianTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeLteAndGtNoLimit', [currentBlockTime, medianTime].join('-'),
() => this.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`joiners`,`actives`,`revoked` FROM block WHERE `fork`=0 AND `medianTime` > '+currentBlockTime+' AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC'))
() => this.dbArchives.query('SELECT `hash`,`membersCount`,`medianTime`,`number`,`certifications`,`joiners`,`actives`,`revoked` FROM block WHERE `fork`=0 AND `medianTime` > '+currentBlockTime+' AND `medianTime` <= '+medianTime+' ORDER BY `medianTime` ASC'))
}
@MonitorExecutionTime()
getBlockWhereMedianTimeLteAndGte(endMedianTime: number, beginMedianTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeLteAndGte', [endMedianTime, beginMedianTime].join('-'),
() => this.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+endMedianTime+' AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC'))
() => this.dbArchives.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+endMedianTime+' AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC'))
}
@MonitorExecutionTime()
getBlockWhereMedianTimeGte(previousBlockchainTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeGte', String(previousBlockchainTime),
() => this.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` >= '+previousBlockchainTime+' ORDER BY `medianTime` ASC'))
getBlockWhereMedianTimeGte(beginTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeGte', String(beginTime),
() => this.dbArchives.query('SELECT `medianTime`,`number` FROM block WHERE `fork`=0 AND `medianTime` >= \''+beginTime+'\' ORDER BY `medianTime` ASC LIMIT 1 '))
}
@MonitorExecutionTime()
getBlockWhereMedianTimeLteAndGt(medianTime: number, previousBlockchainTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeLteAndGt', [medianTime, previousBlockchainTime].join('-'),
() => this.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' AND `medianTime` > '+previousBlockchainTime+' ORDER BY `medianTime` ASC'))
() => this.dbArchives.query('SELECT `issuer`,`membersCount`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+medianTime+' AND `medianTime` > '+previousBlockchainTime+' ORDER BY `medianTime` ASC'))
}
@MonitorExecutionTime()
getBlockWhereMedianTimeLteAndGteNoLimit(endMedianTime: number, beginMedianTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeLteAndGteNoLimit', [endMedianTime, beginMedianTime].join('-'),
() => this.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+endMedianTime+' AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC'))
() => this.dbArchives.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` <= '+endMedianTime+' AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC'))
}
@MonitorExecutionTime()
getBlockWhereMedianTimeGtNoLimit(beginMedianTime: number) {
return this.getFromCacheOrDB('getBlockWhereMedianTimeGtNoLimit', String(beginMedianTime),
() => this.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC'))
() => this.dbArchives.query('SELECT `issuer`,`membersCount`,`monetaryMass`,`medianTime`,`dividend`,`number`,`nonce` FROM block WHERE `fork`=0 AND `medianTime` >= '+beginMedianTime+' ORDER BY `medianTime` ASC'))
}
searchIdentities(search: string) {
return this.duniterServer.dal.searchJustIdentities(search)
}
/**
* Get the highest block known by Monit
*/
async getHighestBlock() {
const number = await this.dbArchives.getHighestBlockNumber()
if (number < 0) {
return null
}
return this.dbArchives.getBlock(number)
}
/**
* Get the highest block number known by Monit
*/
async getHighestBlockNumber() {
return await this.dbArchives.getHighestBlockNumber()
}
/**
* Get the highest archived block number known by Monit
*/
async getHighestArchivedBlockNumber() {
return await this.dbArchives.getHighestArchivedBlockNumber()
}
@MonitorExecutionTime()
async findRemainingBlocksInForkZone(criteria: (b: DBBlock) => boolean) {
const topArchived = await this.getHighestBlock()
return await this.blockchainDao.findWhere(block => (!topArchived || block.number > topArchived.number) && criteria(block))
}
async getFirstOutOfForkBlockInDuniter(): Promise<number> {
const current = (await this.blockchainDao.getCurrent())
return (current && current.number || -1) - this.duniterServer.conf.forksize
}
get blockchainDao() {
return this.duniterServer.dal.blockDAL as LevelDBBlockchain
}
get iindex() {
return this.duniterServer.dal.iindexDAL as LevelDBIindex
}
get mindex() {
return this.duniterServer.dal.mindexDAL as LevelDBMindex
}
get cindex() {
return this.duniterServer.dal.cindexDAL as LevelDBCindex
}
/**
* Singleton de fetching de la wotmap
*/
......@@ -234,6 +378,121 @@ export class DataFinder {
});
return wotmap;
}
// Extracted from Duniter `getValidLinksTo`, adapted to return even non-valid links
private async reducablesTo(receiver: any) {
const issuers: string[] = ((await this.cindex.getOrNull(receiver)) || { issued: [], received: [] }).received
return (await Promise.all(issuers.map(async issuer => {
const fullEntries = Indexer.DUP_HELPERS.reduceBy((await this.cindex.get(issuer)).issued, ['issuer', 'receiver'])
return fullEntries.filter(e => e.receiver === receiver )
}))).reduce(reduceConcat, [])
}
/**
* Save as archived blocks in Monit blocks the blocks that are not supposed to change
* ever in Duniter (non-fork blocks).
*/
private async archiveBlocksInMonit(targetCeil: number) {
console.log(`[Archives] Compiling archives up to #${targetCeil} (first non-forkable block)...`)
// Trim all the blocks above the ceil (should be the non-archived blocks)
console.log(`[Archives] Removing forkable blocks`)
await this.dbArchives.trimNonArchived()
// Check what is our new ceil
let currentCeil = await this.dbArchives.getHighestBlockNumber()
// Copy the blocks available from Duniter archives (they were stored during a sync)
currentCeil = await this.copyFromDuniterArchives(currentCeil, targetCeil)
// Then copy the bocks available in classical Duniter DB (a part stored during the sync, the other during the node's life)
currentCeil = await this.copyFromDuniterDB(currentCeil, targetCeil)
return this.dbArchives.getHighestBlockNumber()
}
/**
* Save as non-archived blocks in Monit blocks the blocks that are in fork window of Duniter.
*/
private async addForkWindowBlocks(newCeil: number, firstOutOfFork: number) {
console.log(`[Forkables] Copying DB blocks from #${newCeil + 1} to #${firstOutOfFork}...`)
const current = (await this.blockchainDao.getCurrent()) as DBBlock
// Fetch memory blocks above our new ceil
const nonArchived: MonitDBBlock[] = await this.blockchainDao.getBlocks(newCeil + 1, firstOutOfFork) as any
// Mark them as non-archived
nonArchived.forEach(b => b.archived = true)
console.log(`[Forkables] Copying ${nonArchived.length} blocks.`)
await this.dbArchives.insertBatch(nonArchived)
console.log(`[Forkables] Copying DB forkable blocks from #${firstOutOfFork + 1} to #${current.number}...`)
// Fetch memory blocks above our new ceil
const nonArchivedForkable: MonitDBBlock[] = await this.blockchainDao.getBlocks(firstOutOfFork + 1, current.number) as any
// Mark them as non-archived because they are forkable
nonArchivedForkable.forEach(b => b.archived = false)
// And finally store them
console.log(`[Forkables] Copying ${nonArchivedForkable.length} blocks.`)
await this.dbArchives.insertBatch(nonArchivedForkable)
}
/**
* Extract blocks from Duniter archives zone.
* @param currentCeil Our current ceil block in dbArchives.
* @param targetCeil Our target block in dbArchives (block to reach).
*/
private async copyFromDuniterArchives(currentCeil: number, targetCeil: number) {
console.log(`[Archives] Copying from Duniter archives from #${currentCeil + 1}...#${targetCeil}`)
while (currentCeil < targetCeil) {
// Get the chunk that contains the block following our current ceil
const chunk: MonitDBBlock[]|null = (await (this.duniterServer.dal.blockchainArchiveDAL as CFSBlockchainArchive<DBBlock>).getChunkForBlock(currentCeil + 1)) as any[];
const toArchive: MonitDBBlock[] = [];
if (!chunk) {
// Not in the archives
break;
}
for (const block of chunk) {
if (block.number > currentCeil) {
// Archive it
block.archived = true;
toArchive.push(block);
currentCeil = block.number
}
}
if (toArchive.length) {
console.log(`[Archives] Copying from Duniter archives block #${toArchive[0].number}...#${toArchive[toArchive.length-1].number}`)
await this.dbArchives.insertBatch(toArchive)
// Force journal writing, otherwise we will have to wait for all the writings later on.
// I prefer to wait now, to follow the progress using logs
await this.dbArchives.getHighestBlockNumber()
}
}
await this.dbArchives.setArchived(currentCeil)
console.log(`[Archives] Copying from Duniter archives done.`)
return currentCeil
}
/**
* Extract blocks from Duniter database zone.
* @param currentCeil Our current ceil block in dbArchives.
* @param targetCeil Our target block in dbArchives (block to reach).
*/
private async copyFromDuniterDB(currentCeil: number, targetCeil: number) {
console.log('[Archives] Copying from Duniter DB...')
const duniterCurrent = await this.blockchainDao.getCurrent()
if (duniterCurrent) {
// Get all the remaining blocks
console.log(`[Archives] Copying from Duniter DB block #${currentCeil + 1}...#${targetCeil}`)
const chunk: MonitDBBlock[]|null = (await this.blockchainDao.getBlocks(currentCeil + 1, targetCeil)) as any[];
const toStore: MonitDBBlock[] = [];
for (const block of chunk) {
if (!block.fork && block.number === currentCeil + 1) {
// Store it
block.archived = block.number <= duniterCurrent.number;
toStore.push(block);
currentCeil = block.number
}
}
console.log(`[Archives] Copying ${toStore.length} blocks...`)
if (toStore.length) {
await this.dbArchives.insertBatch(toStore)
}
}
console.log('[Archives] Copying from Duniter DB done.')
return currentCeil
}
}
interface WotMap {
......
import {SqliteTable} from "duniter/app/lib/dal/indexDAL/sqlite/SqliteTable";
import {SQLiteDriver} from "duniter/app/lib/dal/drivers/SQLiteDriver";
import {
SqlNotNullableFieldDefinition,
SqlNullableFieldDefinition
} from "duniter/app/lib/dal/indexDAL/sqlite/SqlFieldDefinition";
import {MonitorExecutionTime} from "./MonitorExecutionTime";
import {DBBlock} from "duniter/app/lib/db/DBBlock";
export class SqliteBlockchain extends SqliteTable<MonitDBBlock> {
constructor(getSqliteDB: (dbName: string)=> Promise<SQLiteDriver>) {
super(
'monit',
{
'archived': new SqlNotNullableFieldDefinition('BOOLEAN', true),
'fork': new SqlNotNullableFieldDefinition('BOOLEAN', true),
'hash': new SqlNotNullableFieldDefinition('VARCHAR', false, 64),
'inner_hash': new SqlNotNullableFieldDefinition('VARCHAR', false, 64),
'signature': new SqlNotNullableFieldDefinition('VARCHAR', false, 100),
'currency': new SqlNotNullableFieldDefinition('VARCHAR', false, 50),
'issuer': new SqlNotNullableFieldDefinition('VARCHAR', false, 50),
'version': new SqlNotNullableFieldDefinition('INT', false),
'membersCount': new SqlNotNullableFieldDefinition('INT', false),
'medianTime': new SqlNotNullableFieldDefinition('INT', true), // DATETIME?
'time': new SqlNotNullableFieldDefinition('INT', false), // DATETIME?
'powMin': new SqlNotNullableFieldDefinition('INT', false),
'number': new SqlNotNullableFieldDefinition('INT', false),
'nonce': new SqlNotNullableFieldDefinition('INT', false),
'issuersCount': new SqlNotNullableFieldDefinition('INT', false),
'parameters': new SqlNullableFieldDefinition('VARCHAR', false, 255),
'previousHash': new SqlNullableFieldDefinition('VARCHAR', false, 64),
'previousIssuer': new SqlNullableFieldDefinition('VARCHAR', false, 50),
'monetaryMass': new SqlNullableFieldDefinition('VARCHAR', false, 100),
'UDTime': new SqlNullableFieldDefinition('INT', false), // DATETIME
'dividend': new SqlNullableFieldDefinition('INT', false), // DEFAULT \'0\'
'unitbase': new SqlNullableFieldDefinition('INT', false),
'transactions': new SqlNullableFieldDefinition('TEXT', false),
'certifications': new SqlNullableFieldDefinition('TEXT', false),
'identities': new SqlNullableFieldDefinition('TEXT', false),
'joiners': new SqlNullableFieldDefinition('TEXT', false),
'actives': new SqlNullableFieldDefinition('TEXT', false),
'leavers': new SqlNullableFieldDefinition('TEXT', false),
'revoked': new SqlNullableFieldDefinition('TEXT', false),
'excluded': new SqlNullableFieldDefinition('TEXT', false),
},
getSqliteDB
);
this.name = 'block'
}
@MonitorExecutionTime()
async insertBatch(records: MonitDBBlock[]): Promise<void> {
records.forEach((b:any) => {
for (const prop of ['joiners', 'actives', 'leavers', 'identities', 'certifications', 'transactions', 'revoked', 'excluded']) {
b[prop] = JSON.stringify(b[prop]);
}
return b
});
if (records.length) {
return this.insertBatchInTable(this.driver, records)
}
}
@MonitorExecutionTime()
async query(sql: string, params?: any[]): Promise<any> {
return this.driver.sqlRead(sql, params || [])
}
async getBlock(number: number): Promise<MonitDBBlock|null> {
const blocks = await this.driver.sqlRead('SELECT * FROM block WHERE number = ?', [number])
return blocks.length ? blocks[0] : null
}
async getHighestBlock(): Promise<MonitDBBlock|null> {
const blocks = await this.driver.sqlRead('SELECT * FROM block ORDER BY number DESC LIMIT 1', [])
return blocks.length ? blocks[0] : null
}
async getHighestBlockNumber(): Promise<number> {
const block = await this.getHighestBlock()
return block && block.number || -1
}
async getHighestArchivedBlockNumber(): Promise<number> {
const block = await this.driver.sqlRead('SELECT * FROM block WHERE archived ORDER BY number DESC LIMIT 1', [])
return block.length && block[0].number || -1
}
trimNonArchived() {
return this.driver.sqlWrite('DELETE FROM block WHERE NOT archived', [])
}
setArchived(currentCeil: number) {
return this.driver.sqlWrite('UPDATE block SET archived = ? WHERE number <= ? AND NOT archived', [true, currentCeil])
}
deleteAll() {
return this.driver.sqlWrite('DELETE FROM block', [])
}
}
export interface MonitDBBlock extends DBBlock {
archived: boolean
}
......@@ -4,7 +4,7 @@ const co = require('co');
const os = require('os');
const fs = require('fs');
const webserver = require(__dirname + '/webserver.js');
const webserver = require(__dirname + '/webserver2.js');
const timestampToDatetime = require(__dirname + '/timestampToDatetime.js');
/****************************
......
......@@ -3,6 +3,7 @@
import {DataFinder} from "./DataFinder";
import {DBBlock} from "duniter/app/lib/db/DBBlock";
import {MonitConstants} from "./constants2";
import {Server} from "duniter/server";
const co = require('co');
......@@ -12,9 +13,9 @@ const co = require('co');
*/
module.exports = async (req:any, res:any, next:any) => {
var { duniterServer, cache } = req.app.locals
var { duniterServer, cache } = req.app.locals as { duniterServer: Server, cache: MonitCache };
const dataFinder = new DataFinder(duniterServer)
const dataFinder = await DataFinder.getInstanceReindexedIfNecessary()
try {
// Définition des constantes
......@@ -110,7 +111,7 @@ module.exports = async (req:any, res:any, next:any) => {
else if (req.query.begin > cache.endBlock[0].number)
{
let beginTime = cache.endBlock[0].medianTime-(parseInt(cache.step)*unitTime*MonitConstants.STEP_COUNT_MIN);
cache.beginBlock = [await dataFinder.getBlockWhereMedianTimeGte(beginTime)];
cache.beginBlock = await dataFinder.getBlockWhereMedianTimeGte(beginTime);
}
else { cache.beginBlock = [await dataFinder.getBlock(req.query.begin)]; }
......@@ -125,20 +126,22 @@ module.exports = async (req:any, res:any, next:any) => {
} else {
cache.adaptMaxPoints = "begin";
}
if (!cache.beginBlock || !cache.beginBlock[0]) {
throw Error("No begin block")
}
// Apply nbMaxPoints and adaptMaxPoints
if (cache.adaptMaxPoints == "begin")
{
if ( Math.ceil((cache.endBlock[0].medianTime-cache.beginBlock[0].medianTime)/(cache.step*unitTime)) > cache.nbMaxPoints )
{
let newBeginTime = cache.endBlock[0].medianTime-cache.step*cache.nbMaxPoints*unitTime;
cache.beginBlock = [await dataFinder.getBlockWhereMedianTimeGte(newBeginTime)];
cache.beginBlock = await dataFinder.getBlockWhereMedianTimeGte(newBeginTime);
}
} else if (cache.adaptMaxPoints == "step") {
cache.step = Math.ceil((cache.endBlock[0].medianTime-cache.beginBlock[0].medianTime)/(MonitConstants.STEP_COUNT_MAX*unitTime));
} else {
let newEndTime = cache.beginBlock[0].medianTime+cache.step*cache.nbMaxPoints*unitTime;
cache.endBlock = [await dataFinder.getBlockWhereMedianTimeLte(newEndTime)];
cache.endBlock = await dataFinder.getBlockWhereMedianTimeLte(newEndTime);
}
// Calculate stepTime
......@@ -293,3 +296,7 @@ module.exports = async (req:any, res:any, next:any) => {
}
}
interface MonitCache {
[k: string]: any
beginBlock: null|DBBlock[]
}
"use strict";
import {Server} from "duniter/server";
import {initMonitDB} from "./DataFinder";
const fs = require('fs');
//const util = require('util');
const Q = require('q');
......@@ -13,13 +16,13 @@ const bodyParser = require('body-parser');
const routes = require(__dirname + '/../routes');
const tpl = require(__dirname + '/tplit.js');
module.exports = (host, port, appParente, duniterServer, monitDatasPath, offset, cache) => {
module.exports = (host: any, port: any, appParente: any, duniterServer: Server, monitDatasPath: any, offset: any, cache: any, resetData: boolean = false) => {
var app = express();
app.use(morgan('\x1b[90m:remote-addr :remote-user [:date[clf]] :method :url HTTP/:http-version :status :res[content-length] - :response-time ms\x1b[0m', {
stream: {
write: function(message){
write: function(message: any){
message && console.log(message.replace(/\n$/,''));
}
}
......@@ -44,7 +47,7 @@ module.exports = (host, port, appParente, duniterServer, monitDatasPath, offset,
/***************************************
* CSV des membres calculants
***************************************/
app.get('/csvCalculatorsRank', function(req, res) {
app.get('/csvCalculatorsRank', function(req: any, res: any) {
let files = fs.readdirSync(monitDatasPath + '/calculators_rank/')
let maxTimestamp = 0
for (let file of files) {
......@@ -61,35 +64,46 @@ module.exports = (host, port, appParente, duniterServer, monitDatasPath, offset,
if ( appParente == null )
{
let httpServer = http.createServer(app);
httpServer.on('error', function(err) {
httpServer.on('error', function(err: any) {
httpServer.errorPropagates(err);
});
return {
openConnection: () => co(function *() {
openConnection: async () => {
try {
yield Q.Promise((resolve, reject) => {
await Q.Promise((resolve: any, reject: any) => {
// Weird the need of such a hack to catch an exception...
httpServer.errorPropagates = function(err) {
httpServer.errorPropagates = function(err: any) {
reject(err);
};
httpServer.listen(port, host, (err) => {
httpServer.listen(port, host, (err: any) => {
if (err) return reject(err);
resolve(httpServer);
});
});
// Init + first incremental indexation
await initMonitDB(duniterServer, resetData)
console.log('Server listening on http://' + host + ':' + port);
} catch (e) {
console.warn('Could NOT listen to http://' + host + ':' + port);
console.warn(e);
}
}),
},
};
}
else
{
appParente.use("/currency-monit", app);
return {
openConnection: async () => {
console.log('No connection to open')
}
};
}
......
......@@ -15,9 +15,9 @@ var previousBlockchainTime= 0;
module.exports = async (req: any, res: any, next: any) => {
var { duniterServer, monitDatasPath } = req.app.locals
var { monitDatasPath } = req.app.locals
const dataFinder = new DataFinder(duniterServer)
const dataFinder = await DataFinder.getInstanceReindexedIfNecessary()
try {
// get GET parameters
......
......@@ -11,7 +11,7 @@ module.exports = async (req:any, res:any, next:any) => {
var { duniterServer } = req.app.locals
const dataFinder = new DataFinder(duniterServer)
const dataFinder = await DataFinder.getInstanceReindexedIfNecessary()
try {
// get GET parameters
......
......@@ -45,7 +45,7 @@ module.exports = async (req: any, res: any, next: any) => {
var { duniterServer } = req.app.locals
const dataFinder = new DataFinder(duniterServer)
const dataFinder = await DataFinder.getInstanceReindexedIfNecessary()
try {
// Initaliser les constantes
......@@ -181,7 +181,7 @@ module.exports = async (req: any, res: any, next: any) => {
for (let m=0;m<membersList.length;m++)
{
// Récupérer les blockstamp d'écriture et date d'expiration du membership courant du membre m
let tmpQueryResult = await dataFinder.membershipWrittenOnExpiresOn(membersList[m].pub);
let tmpQueryResult = [await dataFinder.membershipWrittenOnExpiresOn(membersList[m].pub)];
membershipsExpireTimeList.push(tmpQueryResult[0].expires_on);
// Extraire le numéro de bloc du blockstamp d'écriture du membership courant
......@@ -274,11 +274,11 @@ module.exports = async (req: any, res: any, next: any) => {
let tmpQueryGetUidProtagonistCert
if (mode == 'emitted')
{
tmpQueryGetUidProtagonistCert = await dataFinder.getProtagonist(tmpQueryCertifsList[i].receiver)
tmpQueryGetUidProtagonistCert = [await dataFinder.getProtagonist(tmpQueryCertifsList[i].receiver)]
}
else
{
tmpQueryGetUidProtagonistCert = await dataFinder.getProtagonist(tmpQueryCertifsList[i].issuer)
tmpQueryGetUidProtagonistCert = [await dataFinder.getProtagonist(tmpQueryCertifsList[i].issuer)]
}
let tmpBlockWrittenOn = tmpQueryCertifsList[i].written_on.split("-");
......
......@@ -11,9 +11,9 @@ const getLang = require(__dirname + '/../lib/getLang')
module.exports = async (req: any, res: any, next: any) => {
var { duniterServer, cache } = req.app.locals
var { cache } = req.app.locals
const dataFinder = new DataFinder(duniterServer)
const dataFinder = await DataFinder.getInstanceReindexedIfNecessary()
try {
// get GET parameters
......
......@@ -9,7 +9,7 @@ module.exports = async (req:any, res:any, next:any) => {
var { duniterServer } = req.app.locals
const dataFinder = new DataFinder(duniterServer)
const dataFinder = await DataFinder.getInstanceReindexedIfNecessary()
try {
// get GET parameters
......
......@@ -27,7 +27,7 @@ module.exports = async (req: any, res: any, next: any) => {
const locals: { duniterServer: Server } = req.app.locals
const duniterServer = locals.duniterServer
const dataFinder = new DataFinder(duniterServer)
const dataFinder = await DataFinder.getInstanceReindexedIfNecessary()
try {
// get blockchain timestamp
......
......@@ -10,7 +10,7 @@ module.exports = async (req:any, res:any, next:any) => {
var { duniterServer } = req.app.locals
const dataFinder = new DataFinder(duniterServer)
const dataFinder = await DataFinder.getInstanceReindexedIfNecessary()
try {
// get GET parameters
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment