Skip to content
Snippets Groups Projects
Commit a86c24f9 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] #1037 Migrate indexer.js + blockchainContext.js

parent 61bcbbf8
Branches
Tags
No related merge requests found
Showing
with 747 additions and 141 deletions
app/lib/blockchain/*.js
app/lib/blockchain/interfaces/*.js
app/lib/computation/*.js
app/lib/db/*.js
app/lib/dto/*.js
app/lib/indexer.js
app/lib/common.js
test/blockchain/*.js
test/blockchain/lib/*.js
\ No newline at end of file
......@@ -36,3 +36,9 @@ app/lib/blockchain/*.js
app/lib/blockchain/*.js.map
app/lib/blockchain/interfaces/*.js
app/lib/blockchain/interfaces/*.js.map
app/lib/computation/*.js
app/lib/computation/*.js.map
app/lib/common.js*
app/lib/db/*.js*
app/lib/dto/*.js*
app/lib/indexer.js*
\ No newline at end of file
......@@ -9,7 +9,7 @@ export class BasicBlockchain {
/**
* Adds a block at the end of the blockchain.
*/
pushBlock(b) {
pushBlock(b:any) {
return this.op.store(b)
}
......@@ -18,7 +18,7 @@ export class BasicBlockchain {
* @param number block ID.
* @returns {*} Promise<Block>
*/
getBlock(number) {
getBlock(number:number) {
return this.op.read(number)
}
......@@ -44,7 +44,7 @@ export class BasicBlockchain {
* @param n Quantity from top. E.g. `1` = [HEAD], `3` = [HEAD, HEAD~1, HEAD~2], etc.
* @returns {*} Promise<Block>
*/
headRange(n) {
headRange(n:number) {
return this.op.headRange(n)
}
......
This diff is collapsed.
......@@ -2,32 +2,33 @@
import {BasicBlockchain} from "./BasicBlockchain"
import {IndexOperator} from "./interfaces/IndexOperator"
import {BlockchainOperator} from "./interfaces/BlockchainOperator"
import * as _ from "underscore"
const _ = require('underscore')
export class IndexedBlockchain extends BasicBlockchain {
private initIndexer: Promise<void>
constructor(bcOperations: BlockchainOperator, private indexOperations: IndexOperator, private numberField, private pkFields: any) {
constructor(bcOperations: BlockchainOperator, private indexOperations: IndexOperator, private numberField: string, private pkFields: any) {
super(bcOperations)
this.initIndexer = indexOperations.initIndexer(pkFields)
}
async recordIndex(index) {
async recordIndex(index: { [index: string]: any }) {
// Wait indexer init
await this.initIndexer
return this.indexOperations.recordIndex(index)
}
async indexTrim(maxNumber) {
async indexTrim(maxNumber:number) {
// Wait indexer init
await this.initIndexer
const subIndexes = await this.indexOperations.getSubIndexes()
// Trim the subIndexes
const records = {}
const records: { [index: string]: any } = {}
for (const subIndex of subIndexes) {
records[subIndex] = []
const pks = typeof this.pkFields[subIndex].pk !== 'string' && this.pkFields[subIndex].pk.length ? Array.from(this.pkFields[subIndex].pk) : [this.pkFields[subIndex].pk]
......@@ -57,7 +58,7 @@ export class IndexedBlockchain extends BasicBlockchain {
return Promise.resolve()
}
async indexCount(indexName, criterias) {
async indexCount(indexName: string, criterias: { [index: string]: any }) {
// Wait indexer init
await this.initIndexer
......@@ -66,7 +67,7 @@ export class IndexedBlockchain extends BasicBlockchain {
return records.length
}
async indexReduce(indexName, criterias) {
async indexReduce(indexName: string, criterias: { [index: string]: any }) {
// Wait indexer init
await this.initIndexer
......@@ -75,7 +76,7 @@ export class IndexedBlockchain extends BasicBlockchain {
return reduce(records)
}
async indexReduceGroupBy(indexName, criterias, properties) {
async indexReduceGroupBy(indexName: string, criterias: { [index: string]: any }, properties: string[]) {
// Wait indexer init
await this.initIndexer
......@@ -84,17 +85,17 @@ export class IndexedBlockchain extends BasicBlockchain {
return reduceBy(records, properties)
}
async indexRevert(blockNumber) {
async indexRevert(blockNumber:number) {
const subIndexes = await this.indexOperations.getSubIndexes()
for (const subIndex of subIndexes) {
const removeCriterias = {}
const removeCriterias: { [index: string]: any } = {}
removeCriterias[this.numberField] = blockNumber
await this.indexOperations.removeWhere(subIndex, removeCriterias)
}
}
}
function reduce(records) {
function reduce(records: any[]) {
return records.reduce((obj, record) => {
const keys = Object.keys(record);
for (const k of keys) {
......@@ -106,18 +107,18 @@ function reduce(records) {
}, {});
}
function reduceBy(reducables, properties) {
function reduceBy(reducables: any[], properties: string[]) {
const reduced = reducables.reduce((map, entry) => {
const id = properties.map((prop) => entry[prop]).join('-');
map[id] = map[id] || [];
map[id].push(entry);
return map;
}, {});
return _.values(reduced).map((value) => reduce(value))
return _.values(reduced).map((rows: any[]) => reduce(rows))
}
function criteriasFromPks(pks, values) {
const criterias = {}
function criteriasFromPks(pks: string[], values: any): { [index: string]: any } {
const criterias: { [index: string]: any } = {}
for (const key of pks) {
criterias[key] = values[key]
}
......
"use strict"
import {IndexedBlockchain} from "./IndexedBlockchain"
import {SQLIndex} from "./SqlIndex"
import {BlockchainOperator} from "./interfaces/BlockchainOperator"
export class MiscIndexedBlockchain extends IndexedBlockchain {
constructor(blockchainStorage, mindexDAL, iindexDAL, sindexDAL, cindexDAL) {
constructor(blockchainStorage: BlockchainOperator, mindexDAL:any, iindexDAL:any, sindexDAL:any, cindexDAL:any) {
super(blockchainStorage, new SQLIndex(null, {
m_index: { handler: mindexDAL },
i_index: { handler: iindexDAL },
s_index: {
handler: sindexDAL,
findTrimable: (maxNumber) => sindexDAL.query('SELECT * FROM s_index WHERE consumed AND writtenOn < ?', [maxNumber])
findTrimable: (maxNumber:number) => sindexDAL.query('SELECT * FROM s_index WHERE consumed AND writtenOn < ?', [maxNumber])
},
c_index: {
handler: cindexDAL,
findTrimable: (maxNumber) => cindexDAL.query('SELECT * FROM c_index WHERE expired_on > 0 AND writtenOn < ?', [maxNumber])
findTrimable: (maxNumber:number) => cindexDAL.query('SELECT * FROM c_index WHERE expired_on > 0 AND writtenOn < ?', [maxNumber])
}
}), 'writtenOn', {
m_index: {
......
"use strict"
import {BlockchainOperator} from "./interfaces/BlockchainOperator"
const indexer = require('../../lib/indexer')
const indexer = require('../../lib/indexer').Indexer
export class SQLBlockchain implements BlockchainOperator {
......
......@@ -6,7 +6,7 @@ export interface BlockchainOperator {
* Pushes a new block at the top of the blockchain.
* @param b Block.
*/
store(b):Promise<any>
store(b:any):Promise<any>
/**
* Reads the block at index `i`.
......
const common = require('duniter-common')
export function hashf(str:string) {
return common.hashf(str).toUpperCase()
}
"use strict";
import {BlockDTO} from "../dto/BlockDTO"
import {DuniterBlockchain} from "../blockchain/DuniterBlockchain"
import {QuickSynchronizer} from "./QuickSync"
import {DBHead} from "../db/DBHead"
const _ = require('underscore');
const co = require('co');
const indexer = require('../indexer');
const indexer = require('../indexer').Indexer
const constants = require('../constants');
const Block = require('../entity/block');
module.exports = () => { return new BlockchainContext() };
export class BlockchainContext {
function BlockchainContext() {
const that = this;
let conf, dal, logger, blockchain, quickSynchronizer
private conf:any
private dal:any
private logger:any
private blockchain:DuniterBlockchain
private quickSynchronizer:QuickSynchronizer
/**
* The virtual next HEAD. Computed each time a new block is added, because a lot of HEAD variables are deterministic
* and can be computed one, just after a block is added for later controls.
*/
let vHEAD;
private vHEAD:any
/**
* The currently written HEAD, aka. HEAD_1 relatively to incoming HEAD.
*/
let vHEAD_1;
private vHEAD_1:any
let HEADrefreshed = Promise.resolve();
private HEADrefreshed: Promise<any> | null = Promise.resolve();
/**
* Refresh the virtual HEAD value for determined variables of the next coming block, avoiding to recompute them
* each time a new block arrives to check if the values are correct. We can know and store them early on, in vHEAD.
*/
function refreshHead() {
HEADrefreshed = co(function*() {
vHEAD_1 = yield dal.head(1);
private refreshHead(): Promise<void> {
this.HEADrefreshed = (async (): Promise<void> => {
this.vHEAD_1 = await this.dal.head(1);
// We suppose next block will have same version #, and no particular data in the block (empty index)
let block;
// But if no HEAD_1 exist, we must initialize a block with default values
if (!vHEAD_1) {
if (!this.vHEAD_1) {
block = {
version: constants.BLOCK_GENERATED_VERSION,
time: Math.round(Date.now() / 1000),
powMin: conf.powMin || 0,
powMin: this.conf.powMin || 0,
powZeros: 0,
powRemainder: 0,
avgBlockSize: 0
};
} else {
block = { version: vHEAD_1.version };
block = { version: this.vHEAD_1.version };
}
vHEAD = yield indexer.completeGlobalScope(Block.statics.fromJSON(block), conf, [], dal);
});
return HEADrefreshed;
this.vHEAD = await indexer.completeGlobalScope(Block.statics.fromJSON(block), this.conf, [], this.dal);
})()
return this.HEADrefreshed;
}
/**
* Gets a copy of vHEAD, extended with some extra properties.
* @param props The extra properties to add.
*/
this.getvHeadCopy = (props) => co(function*() {
if (!vHEAD) {
yield refreshHead();
async getvHeadCopy(props: any): Promise<any> {
if (!this.vHEAD) {
await this.refreshHead();
}
const copy = {};
const keys = Object.keys(vHEAD);
const copy: any = {};
const keys = Object.keys(this.vHEAD);
for (const k of keys) {
copy[k] = vHEAD[k];
copy[k] = this.vHEAD[k];
}
_.extend(copy, props);
return copy;
});
}
/**
* Get currently written HEAD.
*/
this.getvHEAD_1 = () => co(function*() {
if (!vHEAD) {
yield refreshHead();
async getvHEAD_1(): Promise<any> {
if (!this.vHEAD) {
await this.refreshHead();
}
return this.vHEAD_1
}
return vHEAD_1;
});
/**
* Utility method: gives the personalized difficulty level of a given issuer for next block.
* @param issuer The issuer we want to get the difficulty level.
*/
this.getIssuerPersonalizedDifficulty = (issuer) => co(function *() {
const local_vHEAD = yield that.getvHeadCopy({ issuer });
yield indexer.preparePersonalizedPoW(local_vHEAD, vHEAD_1, dal.range, conf);
async getIssuerPersonalizedDifficulty(issuer: string): Promise<any> {
const local_vHEAD = await this.getvHeadCopy({ issuer });
await indexer.preparePersonalizedPoW(local_vHEAD, this.vHEAD_1, this.dal.range, this.conf)
return local_vHEAD.issuerDiff;
});
this.setConfDAL = (newConf, newDAL, theBlockchain, theQuickSynchronizer) => {
dal = newDAL;
conf = newConf;
blockchain = theBlockchain
quickSynchronizer = theQuickSynchronizer
logger = require('../logger')(dal.profile);
};
}
this.checkBlock = (block, withPoWAndSignature) => blockchain.checkBlock(block, withPoWAndSignature, conf, dal)
setConfDAL(newConf: any, newDAL: any, theBlockchain: DuniterBlockchain, theQuickSynchronizer: QuickSynchronizer): void {
this.dal = newDAL;
this.conf = newConf;
this.blockchain = theBlockchain
this.quickSynchronizer = theQuickSynchronizer
this.logger = require('../logger')(this.dal.profile);
}
checkBlock(block: BlockDTO, withPoWAndSignature = true): Promise<any> {
return this.blockchain.checkBlock(block, withPoWAndSignature, this.conf, this.dal)
}
this.addBlock = (obj, index, HEAD) => co(function*() {
const block = yield blockchain.pushTheBlock(obj, index, HEAD, conf, dal, logger)
vHEAD_1 = vHEAD = HEADrefreshed = null
async addBlock(obj: BlockDTO, index: any, HEAD: DBHead): Promise<any> {
const block = await this.blockchain.pushTheBlock(obj, index, HEAD, this.conf, this.dal, this.logger)
this.vHEAD_1 = this.vHEAD = this.HEADrefreshed = null
return block
})
}
this.addSideBlock = (obj) => blockchain.pushSideBlock(obj, dal, logger)
addSideBlock(obj:BlockDTO): Promise<any> {
return this.blockchain.pushSideBlock(obj, this.dal, this.logger)
}
this.revertCurrentBlock = () => co(function *() {
const head_1 = yield dal.bindexDAL.head(1);
logger.debug('Reverting block #%s...', head_1.number);
const res = yield blockchain.revertBlock(head_1.number, head_1.hash, dal)
logger.debug('Reverted block #%s', head_1.number);
async revertCurrentBlock(): Promise<any> {
const head_1 = await this.dal.bindexDAL.head(1);
this.logger.debug('Reverting block #%s...', head_1.number);
const res = await this.blockchain.revertBlock(head_1.number, head_1.hash, this.dal)
this.logger.debug('Reverted block #%s', head_1.number);
// Invalidates the head, since it has changed.
yield refreshHead();
await this.refreshHead();
return res;
});
}
this.applyNextAvailableFork = () => co(function *() {
const current = yield that.current();
logger.debug('Find next potential block #%s...', current.number + 1);
const forks = yield dal.getForkBlocksFollowing(current);
async applyNextAvailableFork(): Promise<any> {
const current = await this.current();
this.logger.debug('Find next potential block #%s...', current.number + 1);
const forks = await this.dal.getForkBlocksFollowing(current);
if (!forks.length) {
throw constants.ERRORS.NO_POTENTIAL_FORK_AS_NEXT;
}
const block = forks[0];
const { index, HEAD } = yield that.checkBlock(block, constants.WITH_SIGNATURES_AND_POW);
yield that.addBlock(block, index, HEAD);
logger.debug('Applied block #%s', block.number);
});
const { index, HEAD } = await this.checkBlock(block, constants.WITH_SIGNATURES_AND_POW);
await this.addBlock(block, index, HEAD);
this.logger.debug('Applied block #%s', block.number);
}
this.current = () => dal.getCurrentBlockOrNull();
current(): Promise<any> {
return this.dal.getCurrentBlockOrNull()
}
this.checkHaveEnoughLinks = (target, newLinks) => co(function*() {
const links = yield dal.getValidLinksTo(target);
async checkHaveEnoughLinks(target: string, newLinks: any): Promise<any> {
const links = await this.dal.getValidLinksTo(target);
let count = links.length;
if (newLinks[target] && newLinks[target].length)
if (newLinks[target] && newLinks[target].length) {
count += newLinks[target].length;
if (count < conf.sigQty)
throw 'Key ' + target + ' does not have enough links (' + count + '/' + conf.sigQty + ')';
});
}
if (count < this.conf.sigQty) {
throw 'Key ' + target + ' does not have enough links (' + count + '/' + this.conf.sigQty + ')';
}
}
this.quickApplyBlocks = (blocks, to) => quickSynchronizer.quickApplyBlocks(blocks, to)
quickApplyBlocks(blocks:BlockDTO[], to: number | null): Promise<any> {
return this.quickSynchronizer.quickApplyBlocks(blocks, to)
}
}
"use strict"
import {DuniterBlockchain} from "../blockchain/DuniterBlockchain"
import {BlockDTO} from "../dto/BlockDTO"
import {DBTransaction} from "../db/DBTransaction"
import {Indexer} from "../indexer"
import {ConfDTO} from "../dto/ConfDTO"
const Q = require('q');
const _ = require('underscore')
const co = require('co')
const indexer = require('../indexer')
const constants = require('../constants')
const Block = require('../entity/block')
const Transaction = require('../entity/transaction')
const DuniterBlockchain = require('../blockchain/DuniterBlockchain').DuniterBlockchain
module.exports = (blockchain, conf, dal, logger) => {
let sync_bindex = [];
let sync_iindex = [];
let sync_mindex = [];
let sync_cindex = [];
let sync_sindex = [];
let sync_bindex: any [] = [];
let sync_iindex: any[] = [];
let sync_mindex: any[] = [];
let sync_cindex: any[] = [];
let sync_sindex: any[] = [];
let sync_bindexSize = 0;
let sync_allBlocks = [];
let sync_expires = [];
let sync_allBlocks: BlockDTO[] = [];
let sync_expires: number[] = [];
let sync_nextExpiring = 0;
let sync_currConf = {};
const sync_memoryWallets = {}
let sync_currConf: ConfDTO;
const sync_memoryWallets: any = {}
const sync_memoryDAL = {
getWallet: (conditions) => Promise.resolve(sync_memoryWallets[conditions] || { conditions, balance: 0 }),
saveWallet: (wallet) => co(function*() {
getWallet: (conditions: string) => Promise.resolve(sync_memoryWallets[conditions] || { conditions, balance: 0 }),
saveWallet: async (wallet: any) => {
// Make a copy
sync_memoryWallets[wallet.conditions] = {
conditions: wallet.conditions,
balance: wallet.balance
}
})
},
sindexDAL: {
getAvailableForConditions: null
}
}
const saveBlocksInMainBranch = (blocks) => co(function *() {
export class QuickSynchronizer {
constructor(private blockchain:DuniterBlockchain, private conf: any, private dal: any, private logger: any) {
}
async saveBlocksInMainBranch(blocks: BlockDTO[]): Promise<void> {
// VERY FIRST: parameters, otherwise we compute wrong variables such as UDTime
if (blocks[0].number == 0) {
yield blockchain.saveParametersForRoot(blocks[0], conf, dal)
await this.blockchain.saveParametersForRoot(blocks[0], this.conf, this.dal)
}
// Helper to retrieve a block with local cache
const getBlock = (number) => {
const getBlock = (number: number): BlockDTO => {
const firstLocalNumber = blocks[0].number;
if (number >= firstLocalNumber) {
let offset = number - firstLocalNumber;
return Q(blocks[offset]);
}
return dal.getBlock(number);
return this.dal.getBlock(number);
};
const getBlockByNumberAndHash = (number, hash) => co(function*() {
const block = yield getBlock(number);
const getBlockByNumberAndHash = async (number: number, hash: string): Promise<BlockDTO> => {
const block = await getBlock(number);
if (!block || block.hash != hash) {
throw 'Block #' + [number, hash].join('-') + ' not found neither in DB nor in applying blocks';
}
return block;
});
}
for (const block of blocks) {
block.fork = false;
}
// Transactions recording
yield updateTransactionsForBlocks(blocks, getBlockByNumberAndHash);
yield dal.blockDAL.saveBunch(blocks);
yield DuniterBlockchain.pushStatsForBlocks(blocks, dal);
});
await this.updateTransactionsForBlocks(blocks, getBlockByNumberAndHash);
await this.dal.blockDAL.saveBunch(blocks);
await DuniterBlockchain.pushStatsForBlocks(blocks, this.dal);
}
function updateTransactionsForBlocks(blocks, getBlockByNumberAndHash) {
return co(function *() {
let txs = [];
private async updateTransactionsForBlocks(blocks: BlockDTO[], getBlockByNumberAndHash: (number: number, hash: string) => Promise<BlockDTO>): Promise<any> {
let txs: DBTransaction[] = [];
for (const block of blocks) {
const newOnes = [];
const newOnes: DBTransaction[] = [];
for (const tx of block.transactions) {
_.extend(tx, {
block_number: block.number,
time: block.medianTime,
currency: block.currency,
written: true,
removed: false
});
const sp = tx.blockstamp.split('-');
tx.blockstampTime = (yield getBlockByNumberAndHash(sp[0], sp[1])).medianTime;
const txEntity = new Transaction(tx);
txEntity.computeAllHashes();
newOnes.push(txEntity);
const [number, hash] = tx.blockstamp.split('-')
const refBlock: BlockDTO = (await getBlockByNumberAndHash(parseInt(number), hash))
// We force the usage of the reference block's currency
tx.currency = refBlock.currency
tx.hash = tx.getHash()
const dbTx: DBTransaction = DBTransaction.fromTransactionDTO(tx, refBlock.medianTime, true, false, refBlock.number, refBlock.medianTime)
newOnes.push(dbTx)
}
txs = txs.concat(newOnes);
}
return dal.updateTransactions(txs);
})
return this.dal.updateTransactions(txs);
}
const quickApplyBlocks = (blocks, to) => co(function*() {
async quickApplyBlocks(blocks:BlockDTO[], to: number | null): Promise<void> {
sync_memoryDAL.sindexDAL = { getAvailableForConditions: dal.sindexDAL.getAvailableForConditions }
let blocksToSave = [];
sync_memoryDAL.sindexDAL = { getAvailableForConditions: this.dal.sindexDAL.getAvailableForConditions }
let blocksToSave: BlockDTO[] = [];
for (const block of blocks) {
sync_allBlocks.push(block);
// The new kind of object stored
const dto = BlockDTO.fromJSONObject(block)
if (block.number == 0) {
sync_currConf = Block.statics.getConf(block);
}
if (block.number != to) {
blocksToSave.push(block);
const index = indexer.localIndex(block, sync_currConf);
const local_iindex = indexer.iindex(index);
const local_cindex = indexer.cindex(index);
const local_sindex = indexer.sindex(index);
const local_mindex = indexer.mindex(index);
blocksToSave.push(dto);
const index:any = Indexer.localIndex(dto, sync_currConf);
const local_iindex = Indexer.iindex(index);
const local_cindex = Indexer.cindex(index);
const local_sindex = Indexer.sindex(index);
const local_mindex = Indexer.mindex(index);
sync_iindex = sync_iindex.concat(local_iindex);
sync_cindex = sync_cindex.concat(local_cindex);
sync_mindex = sync_mindex.concat(local_mindex);
const HEAD = yield indexer.quickCompleteGlobalScope(block, sync_currConf, sync_bindex, sync_iindex, sync_mindex, sync_cindex, {
getBlock: (number) => {
const HEAD = await Indexer.quickCompleteGlobalScope(block, sync_currConf, sync_bindex, sync_iindex, sync_mindex, sync_cindex, {
getBlock: (number: number) => {
return Promise.resolve(sync_allBlocks[number]);
},
getBlockByBlockstamp: (blockstamp) => {
getBlockByBlockstamp: (blockstamp: string) => {
return Promise.resolve(sync_allBlocks[parseInt(blockstamp)]);
}
});
......@@ -129,7 +132,7 @@ module.exports = (blockchain, conf, dal, logger) => {
}
sync_expires = _.uniq(sync_expires);
yield blockchain.createNewcomers(local_iindex, dal, logger)
await this.blockchain.createNewcomers(local_iindex, this.dal, this.logger)
if (block.dividend
|| block.joiners.length
......@@ -153,58 +156,58 @@ module.exports = (blockchain, conf, dal, logger) => {
const nextExpiringChanged = currentNextExpiring !== sync_nextExpiring
// Fills in correctly the SINDEX
yield _.where(sync_sindex.concat(local_sindex), { op: 'UPDATE' }).map((entry) => co(function*() {
await Promise.all(_.where(sync_sindex.concat(local_sindex), { op: 'UPDATE' }).map(async (entry: any) => {
if (!entry.conditions) {
const src = yield dal.sindexDAL.getSource(entry.identifier, entry.pos);
const src = await this.dal.sindexDAL.getSource(entry.identifier, entry.pos);
entry.conditions = src.conditions;
}
}))
// Flush the INDEX (not bindex, which is particular)
yield dal.mindexDAL.insertBatch(sync_mindex);
yield dal.iindexDAL.insertBatch(sync_iindex);
yield dal.sindexDAL.insertBatch(sync_sindex);
yield dal.cindexDAL.insertBatch(sync_cindex);
await this.dal.mindexDAL.insertBatch(sync_mindex);
await this.dal.iindexDAL.insertBatch(sync_iindex);
await this.dal.sindexDAL.insertBatch(sync_sindex);
await this.dal.cindexDAL.insertBatch(sync_cindex);
sync_mindex = [];
sync_iindex = [];
sync_cindex = [];
sync_sindex = local_sindex;
sync_sindex = sync_sindex.concat(yield indexer.ruleIndexGenDividend(HEAD, dal));
sync_sindex = sync_sindex.concat(yield indexer.ruleIndexGarbageSmallAccounts(HEAD, sync_sindex, sync_memoryDAL));
sync_sindex = sync_sindex.concat(await Indexer.ruleIndexGenDividend(HEAD, this.dal));
sync_sindex = sync_sindex.concat(await Indexer.ruleIndexGarbageSmallAccounts(HEAD, sync_sindex, sync_memoryDAL));
if (nextExpiringChanged) {
sync_cindex = sync_cindex.concat(yield indexer.ruleIndexGenCertificationExpiry(HEAD, dal));
sync_mindex = sync_mindex.concat(yield indexer.ruleIndexGenMembershipExpiry(HEAD, dal));
sync_iindex = sync_iindex.concat(yield indexer.ruleIndexGenExclusionByMembership(HEAD, sync_mindex, dal));
sync_iindex = sync_iindex.concat(yield indexer.ruleIndexGenExclusionByCertificatons(HEAD, sync_cindex, local_iindex, conf, dal));
sync_mindex = sync_mindex.concat(yield indexer.ruleIndexGenImplicitRevocation(HEAD, dal));
sync_cindex = sync_cindex.concat(await Indexer.ruleIndexGenCertificationExpiry(HEAD, this.dal));
sync_mindex = sync_mindex.concat(await Indexer.ruleIndexGenMembershipExpiry(HEAD, this.dal));
sync_iindex = sync_iindex.concat(await Indexer.ruleIndexGenExclusionByMembership(HEAD, sync_mindex, this.dal));
sync_iindex = sync_iindex.concat(await Indexer.ruleIndexGenExclusionByCertificatons(HEAD, sync_cindex, local_iindex, this.conf, this.dal));
sync_mindex = sync_mindex.concat(await Indexer.ruleIndexGenImplicitRevocation(HEAD, this.dal));
}
// Update balances with UD + local garbagings
yield blockchain.updateWallets(sync_sindex, sync_memoryDAL)
await this.blockchain.updateWallets(sync_sindex, sync_memoryDAL)
// --> Update links
yield dal.updateWotbLinks(local_cindex.concat(sync_cindex));
await this.dal.updateWotbLinks(local_cindex.concat(sync_cindex));
// Flush the INDEX again
yield dal.mindexDAL.insertBatch(sync_mindex);
yield dal.iindexDAL.insertBatch(sync_iindex);
yield dal.sindexDAL.insertBatch(sync_sindex);
yield dal.cindexDAL.insertBatch(sync_cindex);
await this.dal.mindexDAL.insertBatch(sync_mindex);
await this.dal.iindexDAL.insertBatch(sync_iindex);
await this.dal.sindexDAL.insertBatch(sync_sindex);
await this.dal.cindexDAL.insertBatch(sync_cindex);
sync_mindex = [];
sync_iindex = [];
sync_cindex = [];
sync_sindex = [];
// Create/Update nodes in wotb
yield blockchain.updateMembers(block, dal)
await this.blockchain.updateMembers(block, this.dal)
}
// Trim the bindex
sync_bindexSize = [
block.issuersCount,
block.issuersFrame,
conf.medianTimeBlocks,
conf.dtDiffEval,
this.conf.medianTimeBlocks,
this.conf.dtDiffEval,
blocks.length
].reduce((max, value) => {
return Math.max(max, value);
......@@ -215,31 +218,31 @@ module.exports = (blockchain, conf, dal, logger) => {
sync_bindex.splice(0, sync_bindexSize);
// Process triming continuously to avoid super long ending of sync
yield dal.trimIndexes(sync_bindex[0].number);
await this.dal.trimIndexes(sync_bindex[0].number);
}
} else {
if (blocksToSave.length) {
yield saveBlocksInMainBranch(blocksToSave);
await this.saveBlocksInMainBranch(blocksToSave);
}
blocksToSave = [];
// Save the INDEX
yield dal.bindexDAL.insertBatch(sync_bindex);
yield dal.mindexDAL.insertBatch(sync_mindex);
yield dal.iindexDAL.insertBatch(sync_iindex);
yield dal.sindexDAL.insertBatch(sync_sindex);
yield dal.cindexDAL.insertBatch(sync_cindex);
await this.dal.bindexDAL.insertBatch(sync_bindex);
await this.dal.mindexDAL.insertBatch(sync_mindex);
await this.dal.iindexDAL.insertBatch(sync_iindex);
await this.dal.sindexDAL.insertBatch(sync_sindex);
await this.dal.cindexDAL.insertBatch(sync_cindex);
// Save the intermediary table of wallets
const conditions = _.keys(sync_memoryWallets)
const nonEmptyKeys = _.filter(conditions, (k) => sync_memoryWallets[k] && sync_memoryWallets[k].balance > 0)
const walletsToRecord = nonEmptyKeys.map((k) => sync_memoryWallets[k])
yield dal.walletDAL.insertBatch(walletsToRecord)
const nonEmptyKeys = _.filter(conditions, (k: any) => sync_memoryWallets[k] && sync_memoryWallets[k].balance > 0)
const walletsToRecord = nonEmptyKeys.map((k: any) => sync_memoryWallets[k])
await this.dal.walletDAL.insertBatch(walletsToRecord)
// Last block: cautious mode to trigger all the INDEX expiry mechanisms
const { index, HEAD } = yield blockchain.checkBlock(block, constants.WITH_SIGNATURES_AND_POW, conf, dal)
yield blockchain.pushTheBlock(block, index, HEAD, conf, dal, logger)
const { index, HEAD } = await this.blockchain.checkBlock(dto, constants.WITH_SIGNATURES_AND_POW, this.conf, this.dal)
await this.blockchain.pushTheBlock(dto, index, HEAD, this.conf, this.dal, this.logger)
// Clean temporary variables
sync_bindex = [];
......@@ -251,15 +254,11 @@ module.exports = (blockchain, conf, dal, logger) => {
sync_allBlocks = [];
sync_expires = [];
sync_nextExpiring = 0;
sync_currConf = {};
// sync_currConf = {};
}
}
if (blocksToSave.length) {
yield saveBlocksInMainBranch(blocksToSave);
await this.saveBlocksInMainBranch(blocksToSave);
}
})
return {
saveBlocksInMainBranch, quickApplyBlocks
}
}
......@@ -3,7 +3,7 @@ const Q = require('q');
const co = require('co');
const _ = require('underscore');
const common = require('duniter-common');
const indexer = require('../indexer');
const indexer = require('../indexer').Indexer
const logger = require('../logger')('filedal');
const Configuration = require('../entity/configuration');
const Merkle = require('../entity/merkle');
......
......@@ -4,7 +4,7 @@
const _ = require('underscore');
const co = require('co');
const indexer = require('../../indexer');
const indexer = require('../../indexer').Indexer
module.exports = AbstractIndex;
......
......@@ -5,7 +5,7 @@
const co = require('co');
const constants = require('./../../../constants');
const common = require('duniter-common');
const indexer = require('../../../indexer');
const indexer = require('../../../indexer').Indexer
const AbstractSQLite = require('./../AbstractSQLite');
const AbstractIndex = require('./../AbstractIndex');
......
......@@ -4,7 +4,7 @@
const co = require('co');
const _ = require('underscore');
const indexer = require('../../../indexer');
const indexer = require('../../../indexer').Indexer
const AbstractSQLite = require('./../AbstractSQLite');
const AbstractIndex = require('./../AbstractIndex');
......
......@@ -3,7 +3,7 @@
*/
const co = require('co');
const indexer = require('../../../indexer');
const indexer = require('../../../indexer').Indexer
const AbstractSQLite = require('./../AbstractSQLite');
const AbstractIndex = require('./../AbstractIndex');
......
......@@ -5,7 +5,7 @@
const _ = require('underscore');
const co = require('co');
const common = require('duniter-common');
const indexer = require('../../../indexer');
const indexer = require('../../../indexer').Indexer
const constants = require('../../../constants');
const AbstractSQLite = require('./../AbstractSQLite');
const AbstractIndex = require('./../AbstractIndex');
......
import {BlockDTO} from "../dto/BlockDTO"
import {TransactionDTO} from "../dto/TransactionDTO"
export class DBBlock {
version: number
number: number
currency: string
hash: string
inner_hash: string
signature: string
previousHash: string
issuer: string
previousIssuer: string
time: number
powMin: number
unitbase: number
membersCount: number
issuersCount: number
issuersFrame: number
issuersFrameVar: number
identities: string[]
joiners: string[]
actives: string[]
leavers: string[]
revoked: string[]
excluded: string[]
certifications: string[]
transactions: TransactionDTO[]
medianTime: number
nonce: number
fork: boolean
parameters: string
monetaryMass: number
dividend: number | null
constructor(
) {
}
static fromBlockDTO(b:BlockDTO) {
const dbb = new DBBlock()
dbb.version = b.version
dbb.number = b.number
dbb.currency = b.currency
dbb.hash = b.hash
dbb.previousHash = b.previousHash
dbb.issuer = b.issuer
dbb.previousIssuer = b.previousIssuer
dbb.dividend = b.dividend
dbb.time = b.time
dbb.powMin = b.powMin
dbb.unitbase = b.unitbase
dbb.membersCount = b.membersCount
dbb.issuersCount = b.issuersCount
dbb.issuersFrame = b.issuersFrame
dbb.issuersFrameVar = b.issuersFrameVar
dbb.identities = b.identities
dbb.joiners = b.joiners
dbb.actives = b.actives
dbb.leavers = b.leavers
dbb.revoked = b.revoked
dbb.excluded = b.excluded
dbb.certifications = b.certifications
dbb.transactions = b.transactions
dbb.medianTime = b.medianTime
dbb.fork = b.fork
dbb.parameters = b.parameters
dbb.inner_hash = b.inner_hash
dbb.signature = b.signature
dbb.nonce = b.nonce
return dbb
}
}
\ No newline at end of file
export class DBHead {
// TODO: some properties are not registered in the DB, we should create another class
version: number
currency: string | null
bsize: number
avgBlockSize: number
udTime: number
udReevalTime: number
massReeval: number
mass: number
hash: string
previousHash: string | null
previousIssuer: string | null
issuer: string
time: number
medianTime: number
number: number
powMin: number
diffNumber: number
issuersCount: number
issuersFrame: number
issuersFrameVar: number
dtDiffEval: number
issuerDiff: number
powZeros: number
powRemainder: number
speed: number
unitBase: number
membersCount: number
dividend: number
new_dividend: number | null
issuerIsMember: boolean
constructor(
) {}
}
\ No newline at end of file
import {TransactionDTO} from "../dto/TransactionDTO"
export class DBTransaction extends TransactionDTO {
constructor(
public version: number,
public currency: string,
public locktime: number,
public hash: string,
public blockstamp: string,
public issuers: string[],
public inputs: string[],
public outputs: string[],
public unlocks: string[],
public signatures: string[],
public comment: string,
public blockstampTime: number,
public written: boolean,
public removed: boolean,
public block_number: number,
public time: number,
) {
super(
version,
currency,
locktime,
hash,
blockstamp,
issuers,
inputs,
outputs,
unlocks,
signatures,
comment
)
}
static fromTransactionDTO(dto:TransactionDTO, blockstampTime:number, written: boolean, removed: boolean, block_number:number, block_medianTime:number) {
return new DBTransaction(
dto.version,
dto.currency,
dto.locktime,
dto.hash,
dto.blockstamp,
dto.issuers,
dto.inputs,
dto.outputs,
dto.unlocks,
dto.signatures,
dto.comment || "",
blockstampTime,
written,
removed,
block_number,
block_medianTime
)
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment