Commit 07505ece authored by Cédric Moreau's avatar Cédric Moreau

[enh] #1037 Migrate SQLite DALs

parent 3321b9de
......@@ -6,5 +6,7 @@ app/lib/dto/*.js
app/lib/indexer.js
app/lib/common.js
app/lib/dal/drivers/*.js
app/lib/dal/sqliteDAL/*.js
app/lib/dal/sqliteDAL/index/*.js
test/blockchain/*.js
test/blockchain/lib/*.js
\ No newline at end of file
......@@ -35,7 +35,7 @@
"comma-dangle": [1],
"eol-last": [1],
"no-shadow": [1],
"no-unused-vars": ["warning", { "varsIgnorePattern": "should"}],
"no-unused-vars": [1, { "varsIgnorePattern": "should"}],
"space-infix-ops": [1],
"handle-callback-err": [1],
"no-extra-semi": [1]
......
......@@ -42,4 +42,6 @@ app/lib/common.js*
app/lib/db/*.js*
app/lib/dto/*.js*
app/lib/indexer.js*
app/lib/dal/drivers/*.js*
\ No newline at end of file
app/lib/dal/drivers/*.js*
app/lib/dal/sqliteDAL/*.js*
app/lib/dal/sqliteDAL/index/*.js*
\ No newline at end of file
"use strict"
import {IndexOperator} from "./interfaces/IndexOperator"
import {AbstractIndex} from "../dal/sqliteDAL/AbstractIndex";
const IndexDAL = require('../dal/sqliteDAL/IndexDAL')
const _ = require('underscore')
export class SQLIndex implements IndexOperator {
......@@ -19,18 +19,14 @@ export class SQLIndex implements IndexOperator {
this.indexes[k] = this.definitions[k].handler
} else {
// Internal table: managed here
const indexTable = new IndexDAL(this.db);
const pk = pkFields[k].pk
indexTable.table = k
indexTable.fields = this.definitions[k].fields
indexTable.booleans = this.definitions[k].booleans
const indexTable = new AbstractIndex<any>(this.db, k, [], this.definitions[k].fields, [], this.definitions[k].booleans)
this.indexes[k] = indexTable
indexTable.init = () => {
return indexTable.exec('BEGIN;' +
'CREATE TABLE IF NOT EXISTS ' + indexTable.table + ' (' +
this.definitions[k].sqlFields.join(',') +
');' +
'COMMIT;', [])
'COMMIT;')
}
await indexTable.init()
}
......
......@@ -31,7 +31,7 @@ const sync_memoryDAL = {
}
},
sindexDAL: {
getAvailableForConditions: null
getAvailableForConditions: (conditions:string) => null
}
}
......@@ -90,7 +90,7 @@ export class QuickSynchronizer {
async quickApplyBlocks(blocks:BlockDTO[], to: number | null): Promise<void> {
sync_memoryDAL.sindexDAL = { getAvailableForConditions: this.dal.sindexDAL.getAvailableForConditions }
sync_memoryDAL.sindexDAL = { getAvailableForConditions: (conditions:string) => this.dal.sindexDAL.getAvailableForConditions(conditions) }
let blocksToSave: BlockDTO[] = [];
for (const block of blocks) {
......
......@@ -8,6 +8,7 @@ const logger = require('../logger')('filedal');
const Configuration = require('../entity/configuration');
const Merkle = require('../entity/merkle');
const Transaction = require('../entity/transaction');
const TransactionDTO = require('../dto/TransactionDTO').TransactionDTO
const constants = require('../constants');
const ConfDAL = require('./fileDALs/confDAL');
const StatDAL = require('./fileDALs/statDAL');
......@@ -29,20 +30,20 @@ function FileDAL(params) {
// DALs
this.confDAL = new ConfDAL(rootPath, myFS, null, that, CFSStorage);
this.metaDAL = new (require('./sqliteDAL/MetaDAL'))(sqliteDriver);
this.peerDAL = new (require('./sqliteDAL/PeerDAL'))(sqliteDriver);
this.blockDAL = new (require('./sqliteDAL/BlockDAL'))(sqliteDriver);
this.txsDAL = new (require('./sqliteDAL/TxsDAL'))(sqliteDriver);
this.metaDAL = new (require('./sqliteDAL/MetaDAL').MetaDAL)(sqliteDriver);
this.peerDAL = new (require('./sqliteDAL/PeerDAL').PeerDAL)(sqliteDriver);
this.blockDAL = new (require('./sqliteDAL/BlockDAL').BlockDAL)(sqliteDriver);
this.txsDAL = new (require('./sqliteDAL/TxsDAL').TxsDAL)(sqliteDriver);
this.statDAL = new StatDAL(rootPath, myFS, null, that, CFSStorage);
this.idtyDAL = new (require('./sqliteDAL/IdentityDAL'))(sqliteDriver);
this.certDAL = new (require('./sqliteDAL/CertDAL'))(sqliteDriver);
this.msDAL = new (require('./sqliteDAL/MembershipDAL'))(sqliteDriver);
this.walletDAL = new (require('./sqliteDAL/WalletDAL'))(sqliteDriver);
this.bindexDAL = new (require('./sqliteDAL/index/BIndexDAL'))(sqliteDriver);
this.mindexDAL = new (require('./sqliteDAL/index/MIndexDAL'))(sqliteDriver);
this.iindexDAL = new (require('./sqliteDAL/index/IIndexDAL'))(sqliteDriver);
this.sindexDAL = new (require('./sqliteDAL/index/SIndexDAL'))(sqliteDriver);
this.cindexDAL = new (require('./sqliteDAL/index/CIndexDAL'))(sqliteDriver);
this.idtyDAL = new (require('./sqliteDAL/IdentityDAL').IdentityDAL)(sqliteDriver);
this.certDAL = new (require('./sqliteDAL/CertDAL').CertDAL)(sqliteDriver);
this.msDAL = new (require('./sqliteDAL/MembershipDAL').MembershipDAL)(sqliteDriver);
this.walletDAL = new (require('./sqliteDAL/WalletDAL').WalletDAL)(sqliteDriver);
this.bindexDAL = new (require('./sqliteDAL/index/BIndexDAL').BIndexDAL)(sqliteDriver);
this.mindexDAL = new (require('./sqliteDAL/index/MIndexDAL').MIndexDAL)(sqliteDriver);
this.iindexDAL = new (require('./sqliteDAL/index/IIndexDAL').IIndexDAL)(sqliteDriver);
this.sindexDAL = new (require('./sqliteDAL/index/SIndexDAL').SIndexDAL)(sqliteDriver);
this.cindexDAL = new (require('./sqliteDAL/index/CIndexDAL').CIndexDAL)(sqliteDriver);
this.newDals = {
'metaDAL': that.metaDAL,
......@@ -527,7 +528,7 @@ function FileDAL(params) {
block.wrong = false;
yield [
that.saveBlockInFile(block),
that.saveTxsInFiles(block.transactions, {block_number: block.number, time: block.medianTime, currency: block.currency })
that.saveTxsInFiles(block.transactions, block.number, block.medianTime)
];
});
......@@ -591,14 +592,13 @@ function FileDAL(params) {
this.saveSideBlockInFile = (block) => that.writeSideFileOfBlock(block);
this.saveTxsInFiles = (txs, extraProps) => {
this.saveTxsInFiles = (txs, block_number, medianTime) => {
return Q.all(txs.map((tx) => co(function*() {
_.extend(tx, extraProps);
const sp = tx.blockstamp.split('-');
tx.blockstampTime = (yield that.getBlockByNumberAndHash(sp[0], sp[1])).medianTime;
const txEntity = new Transaction(tx);
txEntity.computeAllHashes();
return that.txsDAL.addLinked(txEntity);
return that.txsDAL.addLinked(TransactionDTO.fromJSONObject(txEntity), block_number, medianTime);
})));
};
......@@ -628,7 +628,7 @@ function FileDAL(params) {
this.registerNewCertification = (cert) => that.certDAL.saveNewCertification(cert);
this.saveTransaction = (tx) => that.txsDAL.addPending(tx);
this.saveTransaction = (tx) => that.txsDAL.addPending(TransactionDTO.fromJSONObject(tx))
this.getTransactionsHistory = (pubkey) => co(function*() {
const history = {
......
/**
* Created by cgeek on 22/08/15.
*/
const _ = require('underscore');
const co = require('co');
const indexer = require('../../indexer').Indexer
module.exports = AbstractIndex;
function AbstractIndex() {
"use strict";
const that = this;
this.getWrittenOn = (blockstamp) => that.query('SELECT * FROM ' + that.table + ' WHERE written_on = ?', [blockstamp]);
this.trimRecords = (belowNumber) => co(function*() {
const belowRecords = yield that.query('SELECT COUNT(*) as nbRecords, pub FROM ' + that.table + ' ' +
'WHERE CAST(written_on as int) < ? ' +
'GROUP BY pub ' +
'HAVING nbRecords > 1', [belowNumber]);
const reducedByPub = indexer.DUP_HELPERS.reduceBy(belowRecords, ['pub']);
for (const record of reducedByPub) {
const recordsOfPub = yield that.query('SELECT * FROM ' + that.table + ' WHERE pub = ?', [record.pub]);
const toReduce = _.filter(recordsOfPub, (rec) => parseInt(rec.written_on) < belowNumber);
if (toReduce.length) {
// Clean the records in the DB
yield that.exec('DELETE FROM ' + that.table + ' WHERE pub = \'' + record.pub + '\'');
const nonReduced = _.filter(recordsOfPub, (rec) => parseInt(rec.written_on) >= belowNumber);
const reduced = indexer.DUP_HELPERS.reduce(toReduce);
// Persist
yield that.insertBatch([reduced].concat(nonReduced));
}
}
});
}
import {AbstractSQLite, BeforeSaveHook} from "./AbstractSQLite";
import {SQLiteDriver} from "../drivers/SQLiteDriver";
import {IndexEntry, Indexer} from "../../indexer";
const _ = require('underscore');
export class AbstractIndex<T extends IndexEntry> extends AbstractSQLite<T> {
constructor(
driver:SQLiteDriver,
table: string,
pkFields: string[] = [],
fields: string[] = [],
arrays: string[] = [],
booleans: string[] = [],
bigintegers: string[] = [],
transientFields: string[] = [],
beforeSaveHook: BeforeSaveHook<T> | null = null
) {
super(driver, table, pkFields, fields, arrays, booleans, bigintegers, transientFields, beforeSaveHook)
}
public async init() {}
getWrittenOn(blockstamp:string) {
return this.query('SELECT * FROM ' + this.table + ' WHERE written_on = ?', [blockstamp])
}
async trimRecords(belowNumber:number) {
const belowRecords:T[] = await this.query('SELECT COUNT(*) as nbRecords, pub FROM ' + this.table + ' ' +
'WHERE CAST(written_on as int) < ? ' +
'GROUP BY pub ' +
'HAVING nbRecords > 1', [belowNumber]);
const reducedByPub = Indexer.DUP_HELPERS.reduceBy(belowRecords, ['pub']);
for (const record of reducedByPub) {
const recordsOfPub = await this.query('SELECT * FROM ' + this.table + ' WHERE pub = ?', [record.pub]);
const toReduce = _.filter(recordsOfPub, (rec:T) => parseInt(rec.written_on) < belowNumber);
if (toReduce.length) {
// Clean the records in the DB
await this.exec('DELETE FROM ' + this.table + ' WHERE pub = \'' + record.pub + '\'');
const nonReduced = _.filter(recordsOfPub, (rec:T) => parseInt(rec.written_on) >= belowNumber);
const reduced = Indexer.DUP_HELPERS.reduce(toReduce);
// Persist
await this.insertBatch([reduced].concat(nonReduced));
}
}
}
}
/**
* Created by cgeek on 22/08/15.
*/
import {AbstractSQLite} from "./AbstractSQLite";
import {SQLiteDriver} from "../drivers/SQLiteDriver";
const Q = require('q');
const co = require('co');
const constants = require('../../constants');
const AbstractSQLite = require('./AbstractSQLite');
module.exports = BlockDAL;
const IS_FORK = true;
const IS_NOT_FORK = false;
function BlockDAL(driver) {
"use strict";
AbstractSQLite.call(this, driver);
let current = null;
let that = this;
export interface DBBlock {
fork: boolean
hash: string
inner_hash: string
signature: string
currency: string
issuer: string
parameters: string
previousHash: string
previousIssuer: string
version: string
membersCount: string
monetaryMass: string
UDTime: string
medianTime: string
dividend: string
unitbase: string
time: string
powMin: string
number: string
nonce: string
transactions: string
certifications: string
identities: string
joiners: string
actives: string
leavers: string
revoked: string
excluded: string
created: string
updated: string
}
this.table = 'block';
this.fields = ['fork', 'hash', 'inner_hash', 'signature', 'currency', 'issuer', 'issuersCount', 'issuersFrame', 'issuersFrameVar', 'parameters', 'previousHash', 'previousIssuer', 'version', 'membersCount', 'monetaryMass', 'UDTime', 'medianTime', 'dividend', 'unitbase', 'time', 'powMin', 'number', 'nonce', 'transactions', 'certifications', 'identities', 'joiners', 'actives', 'leavers', 'revoked', 'excluded', 'len'];
this.arrays = ['identities','certifications','actives','revoked','excluded','leavers','joiners','transactions'];
this.bigintegers = ['monetaryMass'];
this.booleans = ['wrong'];
this.pkFields = ['number','hash'];
export class BlockDAL extends AbstractSQLite<DBBlock> {
private current: any
constructor(driver:SQLiteDriver) {
super(
driver,
'block',
// PK fields
['number','hash'],
// Fields
['fork', 'hash', 'inner_hash', 'signature', 'currency', 'issuer', 'issuersCount', 'issuersFrame', 'issuersFrameVar', 'parameters', 'previousHash', 'previousIssuer', 'version', 'membersCount', 'monetaryMass', 'UDTime', 'medianTime', 'dividend', 'unitbase', 'time', 'powMin', 'number', 'nonce', 'transactions', 'certifications', 'identities', 'joiners', 'actives', 'leavers', 'revoked', 'excluded', 'len'],
// Arrays
['identities','certifications','actives','revoked','excluded','leavers','joiners','transactions'],
// Booleans
['wrong'],
// BigIntegers
['monetaryMass'],
// Transient
[]
)
/**
* Periodically cleans the current block cache.
* It seems the cache is not always correct and may stuck the node, so it is preferable to reset it periodically.
*/
setInterval(this.cleanCache, constants.CURRENT_BLOCK_CACHE_DURATION);
}
this.init = () => co(function *() {
return that.exec('BEGIN;' +
'CREATE TABLE IF NOT EXISTS ' + that.table + ' (' +
async init() {
await this.exec('BEGIN;' +
'CREATE TABLE IF NOT EXISTS ' + this.table + ' (' +
'fork BOOLEAN NOT NULL,' +
'hash VARCHAR(64) NOT NULL,' +
'inner_hash VARCHAR(64) NOT NULL,' +
......@@ -65,94 +105,89 @@ function BlockDAL(driver) {
');' +
'CREATE INDEX IF NOT EXISTS idx_block_hash ON block (hash);' +
'CREATE INDEX IF NOT EXISTS idx_block_fork ON block (fork);' +
'COMMIT;', []);
});
this.cleanCache = () => current = null;
'COMMIT;')
}
/**
* Periodically cleans the current block cache.
* It seems the cache is not always correct and may stuck the node, so it is preferable to reset it periodically.
*/
setInterval(this.cleanCache, constants.CURRENT_BLOCK_CACHE_DURATION);
cleanCache() {
this.current = null
}
this.getCurrent = () => co(function *() {
if (!current) {
current = (yield that.query('SELECT * FROM block WHERE NOT fork ORDER BY number DESC LIMIT 1'))[0];
async getCurrent() {
if (!this.current) {
this.current = (await this.query('SELECT * FROM block WHERE NOT fork ORDER BY number DESC LIMIT 1'))[0];
}
return Q(current);
});
return Q(this.current);
}
this.getBlock = (number) => co(function *() {
return (yield that.query('SELECT * FROM block WHERE number = ? and NOT fork', [parseInt(number)]))[0];
});
async getBlock(number:string | number) {
return (await this.query('SELECT * FROM block WHERE number = ? and NOT fork', [parseInt(String(number))]))[0];
}
this.getAbsoluteBlock = (number, hash) => co(function *() {
return (yield that.query('SELECT * FROM block WHERE number = ? and hash = ?', [parseInt(number), hash]))[0];
});
async getAbsoluteBlock(number:number, hash:string) {
return (await this.query('SELECT * FROM block WHERE number = ? and hash = ?', [number, hash]))[0];
}
this.getBlocks = (start, end) => {
return that.query('SELECT * FROM block WHERE number BETWEEN ? and ? and NOT fork ORDER BY number ASC', [start, end]);
};
getBlocks(start:number, end:number) {
return this.query('SELECT * FROM block WHERE number BETWEEN ? and ? and NOT fork ORDER BY number ASC', [start, end]);
}
this.lastBlockWithDividend = () => co(function *() {
return (yield that.query('SELECT * FROM block WHERE dividend > 0 and NOT fork ORDER BY number DESC LIMIT 1'))[0];
});
async lastBlockWithDividend() {
return (await this.query('SELECT * FROM block WHERE dividend > 0 and NOT fork ORDER BY number DESC LIMIT 1'))[0];
}
this.lastBlockOfIssuer = (issuer) => co(function *() {
return (yield that.query('SELECT * FROM block WHERE issuer = ? and NOT fork ORDER BY number DESC LIMIT 1', [issuer]))[0];
});
async lastBlockOfIssuer(issuer:string) {
return (await this.query('SELECT * FROM block WHERE issuer = ? and NOT fork ORDER BY number DESC LIMIT 1', [issuer]))[0]
}
this.getCountOfBlocksIssuedBy = (issuer) => co(function *() {
let res = yield that.query('SELECT COUNT(*) as quantity FROM block WHERE issuer = ? and NOT fork', [issuer]);
async getCountOfBlocksIssuedBy(issuer:string) {
let res: any = await this.query('SELECT COUNT(*) as quantity FROM block WHERE issuer = ? and NOT fork', [issuer]);
return res[0].quantity;
});
}
this.getForkBlocks = () => {
return that.query('SELECT * FROM block WHERE fork ORDER BY number');
};
getForkBlocks() {
return this.query('SELECT * FROM block WHERE fork ORDER BY number');
}
this.getDividendBlocks = () => {
return that.query('SELECT * FROM block WHERE dividend IS NOT NULL ORDER BY number');
};
getDividendBlocks() {
return this.query('SELECT * FROM block WHERE dividend IS NOT NULL ORDER BY number');
}
this.saveBunch = (blocks) => co(function *() {
let queries = "INSERT INTO block (" + that.fields.join(',') + ") VALUES ";
async saveBunch(blocks:DBBlock[]) {
let queries = "INSERT INTO block (" + this.fields.join(',') + ") VALUES ";
for (let i = 0, len = blocks.length; i < len; i++) {
let block = blocks[i];
queries += that.toInsertValues(block);
queries += this.toInsertValues(block);
if (i + 1 < len) {
queries += ",\n";
}
}
yield that.exec(queries);
that.cleanCache();
});
this.saveBlock = (block) => co(function *() {
let saved = yield saveBlockAs(block, IS_NOT_FORK);
if (!current || current.number < block.number) {
current = block;
await this.exec(queries);
this.cleanCache();
}
async saveBlock(block:DBBlock) {
let saved = await this.saveBlockAs(block, IS_NOT_FORK);
if (!this.current || this.current.number < block.number) {
this.current = block;
}
return saved;
});
}
this.saveSideBlock = (block) =>
saveBlockAs(block, IS_FORK);
saveSideBlock(block:DBBlock) {
return this.saveBlockAs(block, IS_FORK)
}
function saveBlockAs(block, fork) {
return co(function *() {
block.fork = fork;
return yield that.saveEntity(block);
});
private async saveBlockAs(block:DBBlock, fork:boolean) {
block.fork = fork;
return await this.saveEntity(block);
}
this.setSideBlock = (number, previousBlock) => co(function *() {
yield that.query('UPDATE block SET fork = ? WHERE number = ?', [true, number]);
current = previousBlock;
});
async setSideBlock(number:number, previousBlock:DBBlock) {
await this.query('UPDATE block SET fork = ? WHERE number = ?', [true, number]);
this.current = previousBlock;
}
this.getNextForkBlocks = (number, hash) => {
return that.query('SELECT * FROM block WHERE fork AND number = ? AND previousHash like ? ORDER BY number', [number + 1, hash]);
};
getNextForkBlocks(number:number, hash:string) {
return this.query('SELECT * FROM block WHERE fork AND number = ? AND previousHash like ? ORDER BY number', [number + 1, hash]);
}
}
/**
* Created by cgeek on 22/08/15.
*/
import {SQLiteDriver} from "../drivers/SQLiteDriver";
import {AbstractSQLite} from "./AbstractSQLite";
import {SandBox} from "./SandBox";
const Q = require('q');
const co = require('co');
const AbstractSQLite = require('./AbstractSQLite');
const constants = require('../../constants');
const SandBox = require('./SandBox');
module.exports = CertDAL;
function CertDAL(driver) {
"use strict";
AbstractSQLite.call(this, driver);
const that = this;
export interface DBCert {
linked:boolean
written:boolean
written_block:null
written_hash:null
sig:string
block_number:number
block_hash:string
target:string
to:string
from:string
block:number
expired: boolean | null
expires_on: number
}
this.table = 'cert';
this.fields = [
'linked',
'written',
'written_block',
'written_hash',
'sig',
'block_number',
'block_hash',
'target',
'to',
'from',
'block',
'expired',
'expires_on'
];
this.arrays = [];
this.booleans = ['linked', 'written'];
this.pkFields = ['from','target','sig'];
this.translated = {};
export class CertDAL extends AbstractSQLite<DBCert> {
constructor(driver:SQLiteDriver) {
super(
driver,
'cert',
// PK fields
['from','target','sig'],
// Fields
[
'linked',
'written',
'written_block',
'written_hash',
'sig',
'block_number',
'block_hash',
'target',
'to',
'from',
'block',
'expired',
'expires_on'
],
// Arrays
[],
// Booleans
['linked', 'written'],
// BigIntegers
[],
// Transient
[],
(entity:DBCert) => {
entity.written = entity.written || !!(entity.written_hash)
}