diff --git a/app/lib/dal/fileDAL.js b/app/lib/dal/fileDAL.js index d5a3aa7eaedda3127e02f59682466afebbbaa7a4..1e34ed1506a7e054bacf5a140355d5fa3825a5b2 100644 --- a/app/lib/dal/fileDAL.js +++ b/app/lib/dal/fileDAL.js @@ -2,6 +2,8 @@ var Q = require('q'); var co = require('co'); var _ = require('underscore'); +var fs = require('fs'); +var qfs = require('q-io/fs'); var sha1 = require('sha1'); var path = require('path'); var Configuration = require('../entity/configuration'); @@ -24,33 +26,65 @@ var CFSStorage = require('./fileDALs/AbstractCFS'); var lokijs = require('lokijs'); var logger = require('../../lib/logger')('database'); +const UCOIN_DB_NAME = 'ucoin'; + module.exports = { memory: function(profile) { return getHomeFS(profile, true) .then(function(params) { - let loki = new lokijs('ucoin', { autosave: false }); + let loki = new lokijs(UCOIN_DB_NAME, { autosave: false }); return Q(new FileDAL(profile, params.home, "", params.fs, null, 'fileDal', loki)); }); }, file: function(profile, forConf) { return getHomeFS(profile, false) .then(function(params) { - return Q.Promise(function(resolve){ + return Q.Promise(function(resolve, reject){ let loki; if (forConf) { // Memory only service dals loki = new lokijs('temp', { autosave: false }); resolve(loki); } else { - logger.info('Loading...'); - loki = new lokijs(path.join(params.home, 'ucoin.json'), { - autoload: true, - autosave: true, - autosaveInterval: 30000, - autoloadCallback: function() { - resolve(loki); + let lokiPath = path.join(params.home, UCOIN_DB_NAME + '.json'); + logger.debug('Loading DB at %s...', lokiPath); + co(function *() { + let rawDB; + try { + // Try to read database + rawDB = yield qfs.read(lokiPath); + // Check if content is standard JSON + JSON.parse(rawDB); + } catch (e) { + if (rawDB) { + logger.error('The database could not be loaded, it is probably corrupted due to some system fail.'); + logger.error('Please stop uCoin and re-sync it with another node of the network before restarting, using the following commands:'); + logger.error('> ucoind reset data'); + logger.error('> ucoind sync <some.ucoin.node> <port>'); + logger.error('> ucoind restart'); + // Dirty "won't go any further" + return Q.Promise((resolve) => null); + } } - }); + + return Q.Promise(function(resolve2){ + let lokiDB; + lokiDB = new lokijs(lokiPath, { + autoload: true, + autosave: true, + autosaveInterval: 30000, + adapter: { + loadDatabase: (dbname, callback) => { + callback(rawDB || null); + resolve2(lokiDB); + }, + saveDatabase: (dbname, dbstring, callback) => fs.writeFile(dbname, dbstring, callback) + } + }); + }); + }) + .then(resolve) + .catch(reject); } }) .then(function(loki){ @@ -74,7 +108,7 @@ function getHomeFS(profile, isMemory) { let fs; return someDelayFix() .then(function() { - fs = (isMemory ? require('q-io/fs-mock')({}) : require('q-io/fs')); + fs = (isMemory ? require('q-io/fs-mock')({}) : qfs); return fs.makeTree(home); }) .then(function(){ @@ -1148,7 +1182,6 @@ function FileDAL(profile, home, localDir, myFS, parentFileDAL, dalName, loki) { this.loadStats = that.statDAL.loadStats; this.getStat = that.statDAL.getStat; - this.saveStat = that.statDAL.saveStat; this.pushStats = that.statDAL.pushStats; this.needsSave = function() { @@ -1163,13 +1196,13 @@ function FileDAL(profile, home, localDir, myFS, parentFileDAL, dalName, loki) { }; this.resetAll = function(done) { - var files = ['stats', 'cores', 'current', 'conf', 'ucoin']; + var files = ['stats', 'cores', 'current', 'conf', UCOIN_DB_NAME]; var dirs = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; return resetFiles(files, dirs, done); }; this.resetData = function(done) { - var files = ['stats', 'cores', 'current', 'ucoin']; + var files = ['stats', 'cores', 'current', UCOIN_DB_NAME]; var dirs = ['blocks', 'ud_history', 'branches', 'certs', 'txs', 'cores', 'sources', 'links', 'ms', 'identities', 'peers', 'indicators', 'leveldb']; return resetFiles(files, dirs, done); }; diff --git a/app/lib/dal/fileDALs/statDAL.js b/app/lib/dal/fileDALs/statDAL.js index 02f859607b9dea8b943dfe68ed34c30461fabf33..d4a4de9707ae1a8c8cad2aee945ea47f0af9a565 100644 --- a/app/lib/dal/fileDALs/statDAL.js +++ b/app/lib/dal/fileDALs/statDAL.js @@ -21,14 +21,6 @@ function StatDAL(rootPath, qioFS, parentCore, localDAL, AbstractStorage) { this.getStat = (statName) => that.loadStats().then((stats) => (stats && stats[statName]) || { statName: statName, blocks: [], lastParsedBlock: -1 }); - this.saveStat = (stat, name) => { - return co(function *() { - var stats = (yield that.loadStats()) || {}; - stats[name] = stat; - return that.coreFS.writeJSON('stats.json', stats); - }); - }; - this.pushStats = (statsToPush) => { return co(function *() { var stats = (yield that.loadStats()) || {}; diff --git a/app/service/PeeringService.js b/app/service/PeeringService.js index 105ec7cfa22ecbdbfc6591ea85fe7e736dc50510..ec4c4810326200bafc2010dc67601f4115c10770 100644 --- a/app/service/PeeringService.js +++ b/app/service/PeeringService.js @@ -136,80 +136,75 @@ function PeeringService(server, pair, dal) { this.generateSelfPeer = generateSelfPeer; - function generateSelfPeer(conf, signalTimeInterval, done) { - var currency = conf.currency; - var current = null; - async.waterfall([ - function (next) { - blockchainCtx(conf, dal).current(next); - }, - function (currentBlock, next) { - current = currentBlock; - dal.findPeers(selfPubkey).then(_.partial(next, null)).catch(next); - }, - function (peers, next) { - var p1 = { version: 1, currency: currency }; - if(peers.length != 0){ - p1 = _(peers[0]).extend({ version: 1, currency: currency }); - } - var endpoint = 'BASIC_MERKLED_API'; - if (conf.remotehost) { - endpoint += ' ' + conf.remotehost; - } - if (conf.remoteipv4) { - endpoint += ' ' + conf.remoteipv4; - } - if (conf.remoteipv6) { - endpoint += ' ' + conf.remoteipv6; - } - if (conf.remoteport) { - endpoint += ' ' + conf.remoteport; - } - var p2 = { - version: 1, - currency: currency, - pubkey: selfPubkey, - block: current ? [current.number, current.hash].join('-') : constants.PEER.SPECIAL_BLOCK, - endpoints: [endpoint] - }; - var raw1 = new Peer(p1).getRaw().dos2unix(); - var raw2 = new Peer(p2).getRaw().dos2unix(); - logger.info('External access:', new Peer(raw1 == raw2 ? p1 : p2).getURL()); - if (raw1 != raw2) { - logger.debug('Generating server\'s peering entry based on block#%s...', p2.block.split('-')[0]); - async.waterfall([ - function (next){ - server.sign(raw2, next); - }, - function (signature, next) { - p2.signature = signature; - p2.pubkey = selfPubkey; - p2.documentType = 'peer'; - server.submit(p2, false, next); - } - ], function (err) { - next(err); - }); - } else { - p1.documentType = 'peer'; - server.push(p1); - next(); - } - }, - function (next){ - dal.getPeer(selfPubkey).then(_.partial(next, null)).catch(next); - }, - function (peer, next){ - // Set peer's statut to UP - peer.documentType = 'peer'; - that.peer(peer); - server.push(peer); - next(); + function generateSelfPeer(theConf, signalTimeInterval, done) { + return co(function *() { + let current = yield server.dal.getCurrentBlockOrNull(); + let currency = theConf.currency; + let peers = yield dal.findPeers(selfPubkey); + let p1 = { version: 1, currency: currency }; + if(peers.length != 0){ + p1 = _(peers[0]).extend({ version: 1, currency: currency }); + } + let endpoint = 'BASIC_MERKLED_API'; + if (theConf.remotehost) { + endpoint += ' ' + theConf.remotehost; + } + if (theConf.remoteipv4) { + endpoint += ' ' + theConf.remoteipv4; } - ], function(err) { + if (theConf.remoteipv6) { + endpoint += ' ' + theConf.remoteipv6; + } + if (theConf.remoteport) { + endpoint += ' ' + theConf.remoteport; + } + if (!currency || endpoint == 'BASIC_MERKLED_API') { + logger.error('It seems there is an issue with your configuration.'); + logger.error('Please restart your node with:'); + logger.error('$ ucoind restart'); + return Q.Promise((resolve) => null); + } + // Choosing next based-block for our peer record: we basically want the most distant possible from current + let minBlock = current ? current.number - 30 : 0; + // But if already have a peer record within this distance, we need to take the next block of it + if (p1) { + let p1Block = parseInt(p1.block.split('-')[0], 10); + minBlock = Math.max(minBlock, p1Block + 1); + } + // Finally we can't have a negative block + minBlock = Math.max(0, minBlock); + let targetBlock = yield server.dal.getBlockOrNull(minBlock); + var p2 = { + version: 1, + currency: currency, + pubkey: selfPubkey, + block: targetBlock ? [targetBlock.number, targetBlock.hash].join('-') : constants.PEER.SPECIAL_BLOCK, + endpoints: [endpoint] + }; + var raw1 = new Peer(p1).getRaw().dos2unix(); + var raw2 = new Peer(p2).getRaw().dos2unix(); + logger.info('External access:', new Peer(raw1 == raw2 ? p1 : p2).getURL()); + if (raw1 != raw2) { + logger.debug('Generating server\'s peering entry based on block#%s...', p2.block.split('-')[0]); + p2.signature = yield Q.nfcall(server.sign, raw2); + p2.pubkey = selfPubkey; + p2.documentType = 'peer'; + // Submit & share with the network + yield server.submitP(p2, false); + } else { + p1.documentType = 'peer'; + // Share with the network + server.push(p1); + } + let selfPeer = yield dal.getPeer(selfPubkey); + // Set peer's statut to UP + selfPeer.documentType = 'selfPeer'; + that.peer(selfPeer); + server.push(selfPeer); logger.info("Next peering signal in %s min", signalTimeInterval / 1000 / 60); - done(err); - }); + }) + .then(() => done()) + .catch(done); } function testPeers(displayDelays, done) { diff --git a/bin/ucoind b/bin/ucoind index 6ed96791ebe5cbd7bf90b2b94b3581317c4fe517..64757b99e8d8e1f38294a991c5894391ca3337c3 100755 --- a/bin/ucoind +++ b/bin/ucoind @@ -125,7 +125,7 @@ program err && console.error(err.stack || err.message); }) .then(function(){ - return server.disconnect().catch(() => null); + return ((server && server.disconnect()) || Q()).catch(() => null); }) .then(function(){ process.exit(); @@ -166,7 +166,7 @@ program return selfPeer; }) .catch(function(){ - return Q.nfcall(server.PeeringService.generateSelfPeer, server.conf) + return Q.nfcall(server.PeeringService.generateSelfPeer, server.conf, 0) .then(function(){ return server.dal.getPeer(server.PeeringService.pubkey); }); @@ -267,7 +267,7 @@ program logger.error('Error during revert:', err); } // Save DB - server.disconnect() + ((server && server.disconnect()) || Q()) .catch(() => null) .then(function(){ process.exit(); @@ -461,51 +461,38 @@ function serverStart(server, conf) { constants.setUDID2Format(); } - async.waterfall([ - function (next){ - // Launching server - server.start() - .then(function(){ - // Enabling UPnP - if (conf.upnp) { - return upnp(server.conf.port, server.conf.remoteport) - .catch(function(err){ - logger.warn(err); - }); - } - }) - .then(function(){ - // Enabling HTTP - return bma(server, null, conf.httplogs); - }) - .then(function(){ - // Enabling Routing - if (server.conf.routing) { - server - // The router asks for multicasting of documents - .pipe(server.router()) - // The documents get sent to peers - .pipe(multicaster(server.conf.isolate)) - // The multicaster may answer 'unreachable peer' - .pipe(server.router()); - } - next(); - }) - .then(function(){ - if (conf.participate) { - return server.startBlockComputation(); - } - }) - .catch(next); + return co(function *() { + // Enabling HTTP + yield bma(server, null, conf.httplogs); + // Enabling Routing + if (server.conf.routing) { + server + // The router asks for multicasting of documents + .pipe(server.router()) + // The documents get sent to peers + .pipe(multicaster(server.conf.isolate)) + // The multicaster may answer 'unreachable peer' + .pipe(server.router()); + } + // Enabling UPnP + if (conf.upnp) { + yield upnp(server.conf.port, server.conf.remoteport) + .catch(function(err){ + logger.warn(err); + }); + } + if (conf.participate) { + server.startBlockComputation(); } - ], function (err) { - if(err){ + // Launching server + server.start(); + logger.info('Server ready!'); + }) + .catch((err) => { logger.error(err); server.disconnect(); process.exit(); - } - logger.debug('Server ready!'); - }); + }); } function startWizard(step, server, conf, done) { @@ -768,7 +755,7 @@ function connect(callback, forConf, useDefaultConf) { process.on('SIGINT', function() { if (!isSaving) { isSaving = true; - if (server.dal.needsSave()) { + if (server.dal && server.dal.needsSave()) { logger.info('Saving DB and close...'); } // Save DB @@ -808,11 +795,11 @@ function service(callback) { process.on('SIGINT', function() { if (!isSaving) { isSaving = true; - if (server.dal.needsSave()) { + if (server.dal && server.dal.needsSave()) { logger.info('Saving DB and close...'); } // Save DB - server.disconnect() + ((server && server.disconnect()) || Q()) .catch(() => null) .then(function(){ process.exit(); diff --git a/server.js b/server.js index d5a6ff34855548b480851974891e5e226e34368c..6928db4587bcef9ecb602ef6e362ce8fd664c4cf 100644 --- a/server.js +++ b/server.js @@ -93,6 +93,8 @@ function Server (dbConf, overrideConf) { }); }; + this.submitP = (obj, isInnerWrite) => Q.nbind(this.submit, this)(obj, isInnerWrite); + this.connect = function (forConf, useDefaultConf) { // Init connection if (that.dal) { @@ -148,7 +150,7 @@ function Server (dbConf, overrideConf) { }; this.recomputeSelfPeer = function() { - return Q.nbind(that.PeeringService.generateSelfPeer, that.PeeringService)(that.conf); + return Q.nbind(that.PeeringService.generateSelfPeer, that.PeeringService)(that.conf, 0); }; this.initPeer = function (done) { @@ -157,7 +159,6 @@ function Server (dbConf, overrideConf) { that.checkConfig().then(next).catch(next); }, function (next){ - logger.info('Starting core: %s', that.dal.name); logger.info('Storing self peer...'); that.PeeringService.regularPeerSignal(next); }, @@ -257,7 +258,7 @@ function Server (dbConf, overrideConf) { }; this.disconnect = function() { - return that.dal.close(); + return that.dal && that.dal.close(); }; this.initServices = function() {