diff --git a/app/lib/streams/bma.js b/app/lib/streams/bma.js index 4293aeae69aad425260fdd78c1ecba87000d0da1..0228291b8653466c58f8f6089f3ce3b025251808 100644 --- a/app/lib/streams/bma.js +++ b/app/lib/streams/bma.js @@ -1,19 +1,24 @@ +"use strict"; + var _ = require('underscore'); var http = require('http'); var express = require('express'); +var url = require('url'); var co = require('co'); var Q = require('q'); var cors = require('express-cors'); var es = require('event-stream'); var morgan = require('morgan'); +var errorhandler = require('errorhandler'); +var bodyParser = require('body-parser'); var constants = require('../../lib/constants'); var dtos = require('../../lib/streams/dtos'); var sanitize = require('../../lib/sanitize'); var logger = require('../../lib/logger')('bma'); -module.exports = function(server, interfaces, httpLogs) { +let WebSocketServer = require('ws').Server; - "use strict"; +module.exports = function(server, interfaces, httpLogs) { var app = express(); @@ -48,15 +53,14 @@ module.exports = function(server, interfaces, httpLogs) { })); - app.use(express.urlencoded()); - app.use(express.json()); - - // Routing - app.use(app.router); + app.use(bodyParser.urlencoded({ + extended: true + })); + app.use(bodyParser.json()); // development only if (app.get('env') == 'development') { - app.use(express.errorHandler()); + app.use(errorhandler()); } var node = require('../../controllers/node')(server); @@ -125,11 +129,11 @@ module.exports = function(server, interfaces, httpLogs) { // Ensure of the answer format result = sanitize(result, dtoContract); // HTTP answer - res.send(200, JSON.stringify(result, null, " ")); + res.status(200).send(JSON.stringify(result, null, " ")); } catch (e) { let error = getResultingError(e); // HTTP error - res.send(error.httpCode, JSON.stringify(error.uerr, null, " ")); + res.status(error.httpCode).send(JSON.stringify(error.uerr, null, " ")); } }); }); @@ -164,6 +168,7 @@ module.exports = function(server, interfaces, httpLogs) { httpServers.push(httpServer); logger.info('uCoin server listening on ' + netInterface.ip + ' port ' + netInterface.port); } catch (err) { + logger.error(err.message); logger.error('uCoin server cannot listen on ' + netInterface.ip + ' port ' + netInterface.port); } } @@ -199,7 +204,6 @@ module.exports = function(server, interfaces, httpLogs) { }; function listenInterface(app, netInterface, port) { - "use strict"; return Q.Promise(function(resolve, reject){ var httpServer = http.createServer(app); httpServer.on('error', reject); @@ -209,26 +213,40 @@ function listenInterface(app, netInterface, port) { } function listenWebSocket(server, httpServer) { - "use strict"; - var io = require('socket.io')(httpServer); - var currentBlock = {}; - var blockSocket = io - .of('/websocket/block') - .on('error', (err) => logger.error(err)) - .on('connection', function (socket) { - socket.emit('block', currentBlock); - }); - var peerSocket = io - .of('/websocket/peer'); + let currentBlock = {}; + let wssBlock = new WebSocketServer({ + server: httpServer, + path: '/ws/block' + }); + let wssPeer = new WebSocketServer({ + server: httpServer, + path: '/ws/peer' + }); + + wssBlock.on('connection', function connection(ws) { + ws.send(JSON.stringify(currentBlock)); + }); + + wssBlock.broadcast = (data) => wssBlock.clients.forEach((client) => client.send(data)); + wssPeer.broadcast = (data) => wssPeer.clients.forEach((client) => client.send(data)); + + // Forward blocks & peers server .pipe(es.mapSync(function(data) { + // Broadcast block if (data.joiners) { currentBlock = data; - blockSocket.emit('block', currentBlock); + wssBlock.broadcast(JSON.stringify(currentBlock)); } + // Broadcast peer if (data.endpoints) { - peerSocket.emit('peer', data); + wssPeer.broadcast(JSON.stringify(data)); } })); + + return co(function *() { + currentBlock = yield server.dal.getCurrent(); + wssBlock.broadcast(JSON.stringify(currentBlock)); + }); } diff --git a/doc/HTTP_API.md b/doc/HTTP_API.md index 0bea60697f9fa237e6dfa6ae323abecb48504b6f..319c593dcb30f1e7c17d6c383640579bb1e51009 100644 --- a/doc/HTTP_API.md +++ b/doc/HTTP_API.md @@ -48,9 +48,9 @@ * [history/[pubkey]/times/[from]/[to]](#txhistorypubkeytimesfromto) * [ud/](#ud) * [history/[pubkey]](#udhistorypubkey) - * [websocket/](#websocket) - * [block](#websocketblock) - * [peer](#websocketpeer) + * [ws/](#ws) + * [block](#wsblock) + * [peer](#wspeer) ## Overview @@ -90,7 +90,7 @@ Data is made accessible through an HTTP API mainly inspired from [OpenUDC_exchan | `-- history |-- ud/ | `-- history - `-- websocket/ + `-- ws/ |-- block `-- peer @@ -1699,9 +1699,9 @@ The universal dividend history for the given `pubkey`. ``` -### websocket/* +### ws/* -#### `websocket/block` +#### `ws/block` **Goal** @@ -1715,7 +1715,7 @@ A websocket entry point for receiving blocks. Websocket connection. -#### `websocket/peer` +#### `ws/peer` **Goal** diff --git a/package.json b/package.json index 91f8e95b044824a6f66144e4dede27b9ab2e8ff5..08e7448b2d922bc0cc6d7f9a1b2605e61ba2ed16 100644 --- a/package.json +++ b/package.json @@ -35,13 +35,16 @@ "dependencies": { "async": "0.2.9", "bindings": "1.2.1", + "body-parser": "1.14.2", "co": "4.6.0", "colors": "1.1.2", "commander": "2.1.0", "daemonize2": "0.4.2", + "errorhandler": "1.4.3", "event-stream": "3.1.5", - "express": "3.4.7", + "express": "4.13.4", "express-cors": "0.0.3", + "express-ws": "0.2.6", "inquirer": "0.8.5", "merkle": "0.1.0", "moment": "2.6.0", @@ -55,14 +58,14 @@ "request": "2.31.0", "scrypt": "5.4.1", "sha1": "1.1.0", - "socket.io": "1.3.7", "sqlite3": "3.1.1", "superagent": "1.4.0", "tweetnacl": "0.11.2", "underscore": "1.8.3", "vucoin": "0.27.0", "winston": "2.1.1", - "wotb": "0.3.1" + "wotb": "0.3.1", + "ws": "1.0.1" }, "devDependencies": { "coveralls": "2.11.4", @@ -75,7 +78,6 @@ "request-promise": "0.4.2", "sha1": "", "should": "", - "socket.io-client": "1.3.6", "supertest": "" }, "bin": { diff --git a/test/integration/branches.js b/test/integration/branches.js index 6079fdbc2145d48e09ca85b2cacdb5bdae669386..89b13ac61150d6f8061ed6706233619cf677695a 100644 --- a/test/integration/branches.js +++ b/test/integration/branches.js @@ -16,6 +16,8 @@ var expectJSON = httpTest.expectJSON; var expectAnswer = httpTest.expectAnswer; var expectHttpCode = httpTest.expectHttpCode; +let WebSocket = require('ws'); + require('../../app/lib/logger')().mute(); var MEMORY_MODE = true; @@ -230,14 +232,14 @@ describe("Branches", function() { }); }); - it('should have an open websocket on /websocket/block', function() { - var socket = require('socket.io-client')('http://127.0.0.1:7778/websocket/block'); + it('should have an open websocket on /ws/block', function() { + var ws = new WebSocket('ws://127.0.0.1:7778/ws/block'); return Q.Promise(function(resolve, reject){ - socket.on('block', function(data){ + ws.on('message', function(data){ should.exist(data); resolve(data); }); - socket.on('error', reject); + ws.on('error', reject); }); });