Skip to content
Snippets Groups Projects
Commit 3d8e54ec authored by Cédric Moreau's avatar Cédric Moreau
Browse files

Fix #309 Websockets using a lower level implementation than socket.io

parent b791efcd
No related branches found
No related tags found
No related merge requests found
"use strict";
var _ = require('underscore'); var _ = require('underscore');
var http = require('http'); var http = require('http');
var express = require('express'); var express = require('express');
var url = require('url');
var co = require('co'); var co = require('co');
var Q = require('q'); var Q = require('q');
var cors = require('express-cors'); var cors = require('express-cors');
var es = require('event-stream'); var es = require('event-stream');
var morgan = require('morgan'); var morgan = require('morgan');
var errorhandler = require('errorhandler');
var bodyParser = require('body-parser');
var constants = require('../../lib/constants'); var constants = require('../../lib/constants');
var dtos = require('../../lib/streams/dtos'); var dtos = require('../../lib/streams/dtos');
var sanitize = require('../../lib/sanitize'); var sanitize = require('../../lib/sanitize');
var logger = require('../../lib/logger')('bma'); var logger = require('../../lib/logger')('bma');
module.exports = function(server, interfaces, httpLogs) { let WebSocketServer = require('ws').Server;
"use strict"; module.exports = function(server, interfaces, httpLogs) {
var app = express(); var app = express();
...@@ -48,15 +53,14 @@ module.exports = function(server, interfaces, httpLogs) { ...@@ -48,15 +53,14 @@ module.exports = function(server, interfaces, httpLogs) {
})); }));
app.use(express.urlencoded()); app.use(bodyParser.urlencoded({
app.use(express.json()); extended: true
}));
// Routing app.use(bodyParser.json());
app.use(app.router);
// development only // development only
if (app.get('env') == 'development') { if (app.get('env') == 'development') {
app.use(express.errorHandler()); app.use(errorhandler());
} }
var node = require('../../controllers/node')(server); var node = require('../../controllers/node')(server);
...@@ -125,11 +129,11 @@ module.exports = function(server, interfaces, httpLogs) { ...@@ -125,11 +129,11 @@ module.exports = function(server, interfaces, httpLogs) {
// Ensure of the answer format // Ensure of the answer format
result = sanitize(result, dtoContract); result = sanitize(result, dtoContract);
// HTTP answer // HTTP answer
res.send(200, JSON.stringify(result, null, " ")); res.status(200).send(JSON.stringify(result, null, " "));
} catch (e) { } catch (e) {
let error = getResultingError(e); let error = getResultingError(e);
// HTTP error // HTTP error
res.send(error.httpCode, JSON.stringify(error.uerr, null, " ")); res.status(error.httpCode).send(JSON.stringify(error.uerr, null, " "));
} }
}); });
}); });
...@@ -164,6 +168,7 @@ module.exports = function(server, interfaces, httpLogs) { ...@@ -164,6 +168,7 @@ module.exports = function(server, interfaces, httpLogs) {
httpServers.push(httpServer); httpServers.push(httpServer);
logger.info('uCoin server listening on ' + netInterface.ip + ' port ' + netInterface.port); logger.info('uCoin server listening on ' + netInterface.ip + ' port ' + netInterface.port);
} catch (err) { } catch (err) {
logger.error(err.message);
logger.error('uCoin server cannot listen on ' + netInterface.ip + ' port ' + netInterface.port); logger.error('uCoin server cannot listen on ' + netInterface.ip + ' port ' + netInterface.port);
} }
} }
...@@ -199,7 +204,6 @@ module.exports = function(server, interfaces, httpLogs) { ...@@ -199,7 +204,6 @@ module.exports = function(server, interfaces, httpLogs) {
}; };
function listenInterface(app, netInterface, port) { function listenInterface(app, netInterface, port) {
"use strict";
return Q.Promise(function(resolve, reject){ return Q.Promise(function(resolve, reject){
var httpServer = http.createServer(app); var httpServer = http.createServer(app);
httpServer.on('error', reject); httpServer.on('error', reject);
...@@ -209,26 +213,40 @@ function listenInterface(app, netInterface, port) { ...@@ -209,26 +213,40 @@ function listenInterface(app, netInterface, port) {
} }
function listenWebSocket(server, httpServer) { function listenWebSocket(server, httpServer) {
"use strict";
var io = require('socket.io')(httpServer); let currentBlock = {};
var currentBlock = {}; let wssBlock = new WebSocketServer({
var blockSocket = io server: httpServer,
.of('/websocket/block') path: '/ws/block'
.on('error', (err) => logger.error(err)) });
.on('connection', function (socket) { let wssPeer = new WebSocketServer({
socket.emit('block', currentBlock); server: httpServer,
path: '/ws/peer'
});
wssBlock.on('connection', function connection(ws) {
ws.send(JSON.stringify(currentBlock));
}); });
var peerSocket = io
.of('/websocket/peer');
wssBlock.broadcast = (data) => wssBlock.clients.forEach((client) => client.send(data));
wssPeer.broadcast = (data) => wssPeer.clients.forEach((client) => client.send(data));
// Forward blocks & peers
server server
.pipe(es.mapSync(function(data) { .pipe(es.mapSync(function(data) {
// Broadcast block
if (data.joiners) { if (data.joiners) {
currentBlock = data; currentBlock = data;
blockSocket.emit('block', currentBlock); wssBlock.broadcast(JSON.stringify(currentBlock));
} }
// Broadcast peer
if (data.endpoints) { if (data.endpoints) {
peerSocket.emit('peer', data); wssPeer.broadcast(JSON.stringify(data));
} }
})); }));
return co(function *() {
currentBlock = yield server.dal.getCurrent();
wssBlock.broadcast(JSON.stringify(currentBlock));
});
} }
...@@ -48,9 +48,9 @@ ...@@ -48,9 +48,9 @@
* [history/[pubkey]/times/[from]/[to]](#txhistorypubkeytimesfromto) * [history/[pubkey]/times/[from]/[to]](#txhistorypubkeytimesfromto)
* [ud/](#ud) * [ud/](#ud)
* [history/[pubkey]](#udhistorypubkey) * [history/[pubkey]](#udhistorypubkey)
* [websocket/](#websocket) * [ws/](#ws)
* [block](#websocketblock) * [block](#wsblock)
* [peer](#websocketpeer) * [peer](#wspeer)
## Overview ## Overview
...@@ -90,7 +90,7 @@ Data is made accessible through an HTTP API mainly inspired from [OpenUDC_exchan ...@@ -90,7 +90,7 @@ Data is made accessible through an HTTP API mainly inspired from [OpenUDC_exchan
| `-- history | `-- history
|-- ud/ |-- ud/
| `-- history | `-- history
`-- websocket/ `-- ws/
|-- block |-- block
`-- peer `-- peer
...@@ -1699,9 +1699,9 @@ The universal dividend history for the given `pubkey`. ...@@ -1699,9 +1699,9 @@ The universal dividend history for the given `pubkey`.
``` ```
### websocket/* ### ws/*
#### `websocket/block` #### `ws/block`
**Goal** **Goal**
...@@ -1715,7 +1715,7 @@ A websocket entry point for receiving blocks. ...@@ -1715,7 +1715,7 @@ A websocket entry point for receiving blocks.
Websocket connection. Websocket connection.
#### `websocket/peer` #### `ws/peer`
**Goal** **Goal**
......
...@@ -35,13 +35,16 @@ ...@@ -35,13 +35,16 @@
"dependencies": { "dependencies": {
"async": "0.2.9", "async": "0.2.9",
"bindings": "1.2.1", "bindings": "1.2.1",
"body-parser": "1.14.2",
"co": "4.6.0", "co": "4.6.0",
"colors": "1.1.2", "colors": "1.1.2",
"commander": "2.1.0", "commander": "2.1.0",
"daemonize2": "0.4.2", "daemonize2": "0.4.2",
"errorhandler": "1.4.3",
"event-stream": "3.1.5", "event-stream": "3.1.5",
"express": "3.4.7", "express": "4.13.4",
"express-cors": "0.0.3", "express-cors": "0.0.3",
"express-ws": "0.2.6",
"inquirer": "0.8.5", "inquirer": "0.8.5",
"merkle": "0.1.0", "merkle": "0.1.0",
"moment": "2.6.0", "moment": "2.6.0",
...@@ -55,14 +58,14 @@ ...@@ -55,14 +58,14 @@
"request": "2.31.0", "request": "2.31.0",
"scrypt": "5.4.1", "scrypt": "5.4.1",
"sha1": "1.1.0", "sha1": "1.1.0",
"socket.io": "1.3.7",
"sqlite3": "3.1.1", "sqlite3": "3.1.1",
"superagent": "1.4.0", "superagent": "1.4.0",
"tweetnacl": "0.11.2", "tweetnacl": "0.11.2",
"underscore": "1.8.3", "underscore": "1.8.3",
"vucoin": "0.27.0", "vucoin": "0.27.0",
"winston": "2.1.1", "winston": "2.1.1",
"wotb": "0.3.1" "wotb": "0.3.1",
"ws": "1.0.1"
}, },
"devDependencies": { "devDependencies": {
"coveralls": "2.11.4", "coveralls": "2.11.4",
...@@ -75,7 +78,6 @@ ...@@ -75,7 +78,6 @@
"request-promise": "0.4.2", "request-promise": "0.4.2",
"sha1": "", "sha1": "",
"should": "", "should": "",
"socket.io-client": "1.3.6",
"supertest": "" "supertest": ""
}, },
"bin": { "bin": {
......
...@@ -16,6 +16,8 @@ var expectJSON = httpTest.expectJSON; ...@@ -16,6 +16,8 @@ var expectJSON = httpTest.expectJSON;
var expectAnswer = httpTest.expectAnswer; var expectAnswer = httpTest.expectAnswer;
var expectHttpCode = httpTest.expectHttpCode; var expectHttpCode = httpTest.expectHttpCode;
let WebSocket = require('ws');
require('../../app/lib/logger')().mute(); require('../../app/lib/logger')().mute();
var MEMORY_MODE = true; var MEMORY_MODE = true;
...@@ -230,14 +232,14 @@ describe("Branches", function() { ...@@ -230,14 +232,14 @@ describe("Branches", function() {
}); });
}); });
it('should have an open websocket on /websocket/block', function() { it('should have an open websocket on /ws/block', function() {
var socket = require('socket.io-client')('http://127.0.0.1:7778/websocket/block'); var ws = new WebSocket('ws://127.0.0.1:7778/ws/block');
return Q.Promise(function(resolve, reject){ return Q.Promise(function(resolve, reject){
socket.on('block', function(data){ ws.on('message', function(data){
should.exist(data); should.exist(data);
resolve(data); resolve(data);
}); });
socket.on('error', reject); ws.on('error', reject);
}); });
}); });
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment