From 46291caef11c1ec01ff990e9a12dbb736812487b Mon Sep 17 00:00:00 2001 From: cgeek <cem.moreau@gmail.com> Date: Thu, 29 Oct 2015 21:46:22 +0100 Subject: [PATCH] LokiJS: synchronization --- app/lib/dal/fileDAL.js | 36 ++++++++++++++-------- app/lib/dal/fileDALs/AbstractLoki.js | 5 ++-- app/lib/dal/fileDALs/BlockDAL.js | 45 +++++++++++++++++++--------- app/lib/sync.js | 19 +++++++----- app/service/BlockchainService.js | 3 +- bin/ucoind | 44 ++++++++++++++++++++++----- server.js | 9 ++---- 7 files changed, 107 insertions(+), 54 deletions(-) diff --git a/app/lib/dal/fileDAL.js b/app/lib/dal/fileDAL.js index e1adb6573..9b7d43830 100644 --- a/app/lib/dal/fileDAL.js +++ b/app/lib/dal/fileDAL.js @@ -35,17 +35,26 @@ module.exports = { file: function(profile, forConf) { return getHomeFS(profile, false) .then(function(params) { - let loki; - if (forConf) { - // Memory only service dals - loki = new lokijs('temp', { autosave: false }); - } else { - loki = new lokijs(path.join(params.home, 'ucoin.json'), { - autosave: true, - autosaveInterval: 300 + return Q.Promise(function(resolve){ + let loki; + if (forConf) { + // Memory only service dals + loki = new lokijs('temp', { autosave: false }); + resolve(loki); + } else { + loki = new lokijs(path.join(params.home, 'ucoin.json'), { + autoload: true, + autosave: true, + autosaveInterval: 30000, + autoloadCallback: function() { + resolve(loki); + } + }); + } + }) + .then(function(loki){ + return new FileDAL(profile, params.home, "", params.fs, null, 'fileDal', null, loki); }); - } - return new FileDAL(profile, params.home, "", params.fs, null, 'fileDal', null, loki); }); }, FileDAL: FileDAL @@ -1082,6 +1091,7 @@ function FileDAL(profile, home, localDir, myFS, parentFileDAL, dalName, core, lo .then(function(conf){ conf = _(conf).extend(overrideConf || {}); currency = conf.currency; + that.blockDAL.setConf(conf); return conf; }); }; @@ -1100,17 +1110,17 @@ function FileDAL(profile, home, localDir, myFS, parentFileDAL, dalName, core, lo this.saveStat = that.statDAL.saveStat; this.close = function() { - // TODO + return Q.nbind(loki.saveDatabase, loki)(); }; this.resetAll = function(done) { - var files = ['stats', 'cores', 'current', 'conf']; + var files = ['stats', 'cores', 'current', 'conf', 'ucoin']; var dirs = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; return resetFiles(files, dirs, done); }; this.resetData = function(done) { - var files = ['stats', 'cores', 'current']; + var files = ['stats', 'cores', 'current', 'ucoin']; var dirs = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; return resetFiles(files, dirs, done); }; diff --git a/app/lib/dal/fileDALs/AbstractLoki.js b/app/lib/dal/fileDALs/AbstractLoki.js index e59d77eaa..079990b6f 100644 --- a/app/lib/dal/fileDALs/AbstractLoki.js +++ b/app/lib/dal/fileDALs/AbstractLoki.js @@ -127,7 +127,6 @@ function AbstractLoki(collection, fileDAL, viewFields, loki) { // Save in fork branch: overrides meta data existing.metaData[that.metaKey()] = _.pick(entity, this.metaProps); } - console.log(existing); collection.update(existing); } else { entity.metaData = {}; @@ -137,7 +136,6 @@ function AbstractLoki(collection, fileDAL, viewFields, loki) { _.pluck(entity, this.metaProps).forEach(function(metaProp){ entity[metaProp] = false; }); - console.log(entity); collection.insert(entity); } return Q(entity); @@ -151,7 +149,8 @@ function AbstractLoki(collection, fileDAL, viewFields, loki) { } p = p.parentDAL; } - return _.sortBy(theCores, (b) => b.forkPointNumber); + return _.sortBy(theCores, (b) => + b.forkPointNumber); } function getView() { diff --git a/app/lib/dal/fileDALs/BlockDAL.js b/app/lib/dal/fileDALs/BlockDAL.js index a18fea9df..c58bf511c 100644 --- a/app/lib/dal/fileDALs/BlockDAL.js +++ b/app/lib/dal/fileDALs/BlockDAL.js @@ -23,22 +23,38 @@ function BlockDAL(fileDAL, loki) { this.blocksDB = blocksDB; this.collection = collection; - this.getLastSavedBlockFileNumber = () => Q(collection.chain().simplesort('number', true).limit(1).data()[0]); + this.getLastSavedBlockFileNumber = () => { + let last = collection.chain().simplesort('number', true).limit(1).data()[0]; + if (last) return Q(last.number); + return Q(-1); + }; + + let conf; + + this.setConf = (conf2) => conf = conf2; this.saveBlock = (block) => { - _.extend(block, { fork: !!fileDAL.parentDAL }); - let existing = collection.find({ - $and: [{ - number: block.number - }, { - hash: block.hash - }] - })[0]; - if (existing) { - existing = _.extend(existing, block); - collection.update(existing); - } else { + if (conf && conf.branchesWindowSize == 0) { + block.fork = false; + //console.log('--> MAIN -->', _.pick(block, 'number', 'hash')); collection.insert(block); + } else { + block.fork = !!fileDAL.parentDAL; + let existing; + existing = collection.find({ + $and: [{ + number: block.number + }, { + hash: block.hash + }] + })[0]; + if (existing) { + existing.fork = block.fork; + collection.update(existing); + } else { + //console.log('--> BRAN -->', _.pick(block, 'number', 'hash')); + collection.insert(block); + } } return Q(block); }; @@ -59,7 +75,8 @@ function BlockDAL(fileDAL, loki) { p = p.parentDAL; } blocks = _.sortBy(blocks, (b) => b.number); - let conditions = blocks.map((b) => { return { $and: [{ + let conditions = blocks.map((b) => { + return { $and: [{ fork: true }, { number: b.forkPointNumber diff --git a/app/lib/sync.js b/app/lib/sync.js index 0b49362e8..f87cd8df4 100644 --- a/app/lib/sync.js +++ b/app/lib/sync.js @@ -7,12 +7,14 @@ var superagent = require("superagent"); var sha1 = require('sha1'); var moment = require('moment'); var vucoin = require('vucoin'); +var lokijs = require('lokijs'); var dos2unix = require('./dos2unix'); var localValidator = require('./localValidator'); var logger = require('./logger')('sync'); var rawer = require('../lib/rawer'); var constants = require('../lib/constants'); var Peer = require('../lib/entity/peer'); +var BlockDAL = require('../lib/dal/fileDALs/BlockDAL'); var multimeter = require('multimeter'); var CONST_BLOCKS_CHUNK = 500; @@ -45,14 +47,17 @@ module.exports = function Synchroniser (server, host, port, conf, interactive) { timeout: constants.NETWORK.SYNC_LONG_TIMEOUT }; - this.sync = (to, nocautious, done) => { + this.sync = (to, nocautious) => { var cautious = !nocautious, logInterval; logger.info('Connecting remote host...'); return co(function *() { var node = yield getVucoin(host, port, vucoinOptions); logger.info('Sync started.'); - var lastSavedNumber = yield dal.getLastSavedBlockFileNumber(); + let loki = new lokijs('download', { autosave: false }); + let downloadedDAL = new BlockDAL({}, loki); + downloadedDAL.setConf({ branchesWindowSize: 0 }); + var lastSavedNumber = yield server.dal.getLastSavedBlockFileNumber(); var lCurrent = yield dal.getCurrentBlockOrNull(); //============ @@ -127,7 +132,7 @@ module.exports = function Synchroniser (server, host, port, conf, interactive) { var blocks = yield Q.nfcall(node.blockchain.blocks, chunk[1] - chunk[0] + 1, chunk[0]); watcher.downloadPercent(Math.floor(chunk[1] / remoteNumber * 100)); for (let i = 0; i < blocks.length; i++) { - yield server.dal.saveBlockInFile(blocks[i], false); + yield downloadedDAL.saveBlock(blocks[i]); } }) // Resolve the promise @@ -143,7 +148,7 @@ module.exports = function Synchroniser (server, host, port, conf, interactive) { let range = yield toApply[i].promise; // Apply downloaded blocks for (var j = range[0]; j < range[1] + 1; j++) { - yield server.dal.getBlock(j).then((block) => applyGivenBlock(cautious, remoteNumber)(block)); + yield downloadedDAL.getBlock(j).then((block) => applyGivenBlock(cautious, remoteNumber)(block)); } } yield Q.all(toApply).then(() => watcher.appliedPercent(100.0)); @@ -187,7 +192,6 @@ module.exports = function Synchroniser (server, host, port, conf, interactive) { .then(() => { watcher.end(); logger.info('Sync finished.'); - done(); }) .catch((err) => { if (logInterval) { @@ -195,8 +199,7 @@ module.exports = function Synchroniser (server, host, port, conf, interactive) { } err && watcher.writeStatus(err.message || String(err)); watcher.end(); - logger.info('Sync finished.'); - done(err); + throw err; }); }; @@ -236,7 +239,7 @@ module.exports = function Synchroniser (server, host, port, conf, interactive) { // Enables again branching for the lasts blocks conf.branchesWindowSize = initialForkSize; } - return BlockchainService.submitBlock(block, cautious); + return BlockchainService.submitBlock(_.omit(block, '$loki', 'meta'), cautious); }; } diff --git a/app/service/BlockchainService.js b/app/service/BlockchainService.js index f3db97774..6c263dd21 100644 --- a/app/service/BlockchainService.js +++ b/app/service/BlockchainService.js @@ -265,7 +265,7 @@ function BlockchainService (conf, mainDAL, pair) { function startPruning(highest, cores, current, forkWindowSize, doCheck) { var distanceFromMain = current && highest.forkPointNumber - current.number; var distanceFromVoid = highest.forkPointNumber + 1; - var branchSize = distanceFromMain || distanceFromVoid; + var branchSize = distanceFromMain !== undefined ? distanceFromMain : distanceFromVoid; var toPruneCount = Math.max(0, branchSize - forkWindowSize); if (!toPruneCount) { // Fork window still has some room or is just full @@ -395,6 +395,7 @@ function BlockchainService (conf, mainDAL, pair) { return basedCore.dal.fork(obj) .then(function(coreDAL){ that.currentDal = coreDAL; + coreDAL.blockDAL.setConf(conf); return blockchainCtx(conf, coreDAL); }) .then((ctx) => _.extend(ctx, coreObj)) diff --git a/bin/ucoind b/bin/ucoind index 5d6177632..ac2237737 100755 --- a/bin/ucoind +++ b/bin/ucoind @@ -115,17 +115,25 @@ program if (!port) { throw 'Port is required.'; } - sync(server, host, port, conf, to); + sync(server, host, port, conf, to) + .then(function(){ + logger.info('Sync finished.'); + }) + .catch(function(err){ + console.error(err.stack || err.message); + }) + .then(function(){ + return server.disconnect().catch(() => null); + }) + .then(function(){ + process.exit(); + }); })); function sync(server, host, port, conf, to) { - async.series([ - function (next){ - // Synchronize - var remote = new Synchroniser(server, host, port, conf, !program.nointeractive); - remote.sync(parseInt(to), program.nocautious, next); - } - ], logIfErrorAndExit(server, 'Error during sync: ')); + // Synchronize + var remote = new Synchroniser(server, host, port, conf, !program.nointeractive); + return remote.sync(parseInt(to), program.nocautious); } program @@ -709,6 +717,16 @@ function connect(callback, forConf) { server.disconnect(); process.exit(); }); + + // If ever the process gets interrupted + process.on('SIGINT', function() { + // Save DB + server.disconnect() + .catch(() => null) + .then(function(){ + process.exit(); + }); + }); }; } @@ -732,6 +750,16 @@ function service(callback) { server.disconnect(); process.exit(); }); + + // If ever the process gets interrupted + process.on('SIGINT', function() { + // Save DB + server.disconnect() + .catch(() => null) + .then(function(){ + process.exit(); + }); + }); }; } diff --git a/server.js b/server.js index 6f0e51d71..f1f779edc 100644 --- a/server.js +++ b/server.js @@ -262,13 +262,8 @@ function Server (dbConf, overrideConf) { return that.dal.resetConf(done); }; - this.disconnect = function(done) { - that.dal.close(function (err) { - if(err) - logger.error(err); - if (typeof done == 'function') - done(err); - }); + this.disconnect = function() { + return that.dal.close(); }; this.initServices = function() { -- GitLab