diff --git a/app/lib/streams/WS2PStreamer.ts b/app/lib/streams/WS2PStreamer.ts new file mode 100644 index 0000000000000000000000000000000000000000..e324bbbe291f10fd92940925787b085fff4cd71b --- /dev/null +++ b/app/lib/streams/WS2PStreamer.ts @@ -0,0 +1,31 @@ +import * as stream from "stream" +import {WS2PConnection} from "../ws2p/WS2PConnection" + +export class WS2PStreamer extends stream.Transform { + + constructor(private ws2pc:WS2PConnection) { + super({ objectMode: true }) + } + + async _write(obj:any, enc:any, done:any) { + if (obj.joiners) { + await this.ws2pc.pushBlock(obj) + } + else if (obj.pubkey && obj.uid) { + await this.ws2pc.pushIdentity(obj) + } + else if (obj.idty_uid) { + await this.ws2pc.pushCertification(obj) + } + else if (obj.userid) { + await this.ws2pc.pushMembership(obj) + } + else if (obj.issuers) { + await this.ws2pc.pushTransaction(obj) + } + else if (obj.endpoints) { + await this.ws2pc.pushPeer(obj) + } + done && done(); + } +} diff --git a/app/lib/ws2p/WS2PClient.ts b/app/lib/ws2p/WS2PClient.ts new file mode 100644 index 0000000000000000000000000000000000000000..ca5104817338d7ef134fad538d52abe435347133 --- /dev/null +++ b/app/lib/ws2p/WS2PClient.ts @@ -0,0 +1,28 @@ +import {Server} from "../../../server" +import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "./WS2PConnection" +import {WS2PServerMessageHandler} from "./interface/WS2PServerMessageHandler" +import {WS2PStreamer} from "../streams/WS2PStreamer" +import {Key} from "../common-libs/crypto/keyring" + +export class WS2PClient { + + static async connectTo(server:Server, host:string, port:number) { + const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec) + const c = WS2PConnection.newConnectionToAddress( + [host, port].join(':'), + new WS2PServerMessageHandler(server), + new WS2PPubkeyLocalAuth(k2), + new WS2PPubkeyRemoteAuth(k2) + ) + // Streaming + const streamer = new WS2PStreamer(c) + server.pipe(streamer) + c.closed.then(() => { + server.unpipe(streamer) + }) + + // Connecting + await c.connect() + return c + } +} \ No newline at end of file diff --git a/app/lib/ws2p/WS2PConnection.ts b/app/lib/ws2p/WS2PConnection.ts index 638af812274bb3e02cf4bfde5ef881fa67f34c47..aac8b5f1a042e88f9fed7ee4fdba9ae70a1855f8 100644 --- a/app/lib/ws2p/WS2PConnection.ts +++ b/app/lib/ws2p/WS2PConnection.ts @@ -1,5 +1,11 @@ import {Key, verify} from "../common-libs/crypto/keyring" import {WS2PMessageHandler} from "./impl/WS2PMessageHandler" +import {BlockDTO} from "../dto/BlockDTO" +import {IdentityDTO} from "../dto/IdentityDTO" +import {CertificationDTO} from "../dto/CertificationDTO" +import {MembershipDTO} from "../dto/MembershipDTO" +import {TransactionDTO} from "../dto/TransactionDTO" +import {PeerDTO} from "../dto/PeerDTO" const ws = require('ws') const nuuid = require('node-uuid'); @@ -23,6 +29,15 @@ enum WS2P_ERR { ANSWER_TO_UNDEFINED_REQUEST } +export enum WS2P_PUSH { + PEER, + TRANSACTION, + MEMBERSHIP, + CERTIFICATION, + IDENTITY, + BLOCK +} + export interface WS2PAuth { isAuthorizedPubkey(pub:string): Promise<boolean> authenticationIsDone(): Promise<void> @@ -33,6 +48,7 @@ export interface WS2PRemoteAuth extends WS2PAuth { sendACK(ws:any): Promise<void> registerOK(sig: string): Promise<boolean> isAuthenticatedByRemote(): boolean + getPubkey(): string } export interface WS2PLocalAuth extends WS2PAuth { @@ -62,6 +78,10 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth { }) } + getPubkey() { + return this.remotePub + } + async sendACK(ws: any): Promise<void> { const challengeMessage = `WS2P:ACK:${this.pair.pub}:${this.challenge}` const sig = this.pair.signSync(challengeMessage) @@ -192,6 +212,9 @@ export interface WS2PRequest { export class WS2PConnection { private connectp:Promise<any>|undefined + private connectedp:Promise<string> + private connectedResolve:(pub:string)=>void + private connectedReject:()=>void private nbErrors = 0 private nbRequestsCount = 0 private nbResponsesCount = 0 @@ -210,6 +233,7 @@ export class WS2PConnection { constructor( private ws:any, private onWsOpened:Promise<void>, + private onWsClosed:Promise<void>, private messageHandler:WS2PMessageHandler, private localAuth:WS2PLocalAuth, private remoteAuth:WS2PRemoteAuth, @@ -221,7 +245,12 @@ export class WS2PConnection { requestTimeout: REQUEST_TIMEOUT_VALUE }, private expectedPub:string = "" - ) {} + ) { + this.connectedp = new Promise((resolve, reject) => { + this.connectedResolve = resolve + this.connectedReject = reject + }) + } static newConnectionToAddress( address:string, @@ -241,7 +270,10 @@ export class WS2PConnection { const onWsOpened:Promise<void> = new Promise(res => { websocket.on('open', () => res()) }) - return new WS2PConnection(websocket, onWsOpened, messageHandler, localAuth, remoteAuth, options, expectedPub) + const onWsClosed:Promise<void> = new Promise(res => { + websocket.on('close', () => res()) + }) + return new WS2PConnection(websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub) } static newConnectionFromWebSocketServer( @@ -258,7 +290,14 @@ export class WS2PConnection { }, expectedPub:string = "") { const onWsOpened = Promise.resolve() - return new WS2PConnection(websocket, onWsOpened, messageHandler, localAuth, remoteAuth, options, expectedPub) + const onWsClosed:Promise<void> = new Promise(res => { + websocket.on('close', () => res()) + }) + return new WS2PConnection(websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub) + } + + get pubkey() { + return this.remoteAuth.getPubkey() } get nbRequests() { @@ -277,6 +316,14 @@ export class WS2PConnection { return this.nbPushsByRemoteCount } + get connected() { + return this.connectedp + } + + get closed() { + return this.onWsClosed + } + async connect() { if (!this.connectp) { this.connectp = (async () => { @@ -285,136 +332,142 @@ export class WS2PConnection { rej("WS2P connection timeout") }, this.options.connectionTimeout) }) - return Promise.race([connectionTimeout, new Promise((resolve, reject) => { - - (async () => { - await this.onWsOpened - try { - await this.localAuth.sendCONNECT(this.ws) - await Promise.all([ - this.localAuth.authenticationIsDone(), - this.remoteAuth.authenticationIsDone() - ]) - resolve() - } catch (e) { - reject(e) - } - })() - - this.ws.on('message', async (msg:string) => { - const data = JSON.parse(msg) - - // Incorrect data - if (typeof data !== 'object') { - // We only accept JSON objects - await this.errorDetected(WS2P_ERR.MESSAGE_MUST_BE_AN_OBJECT) - } - - // OK: JSON object - else { - - /************************ - * CONNECTION STUFF - ************************/ - - if (data.auth && typeof data.auth === "string") { - - if (data.auth === "CONNECT") { - if (this.remoteAuth.isAuthenticatedByRemote()) { - return this.errorDetected(WS2P_ERR.ALREADY_AUTHENTICATED_BY_REMOTE) - } - else if ( - typeof data.pub !== "string" || typeof data.sig !== "string" || typeof data.challenge !== "string") { - await this.errorDetected(WS2P_ERR.AUTH_INVALID_ASK_FIELDS) - } else { - if (this.expectedPub && data.pub !== this.expectedPub) { - await this.errorDetected(WS2P_ERR.INCORRECT_PUBKEY_FOR_REMOTE) + try { + await Promise.race([connectionTimeout, new Promise((resolve, reject) => { + + (async () => { + await this.onWsOpened + try { + await this.localAuth.sendCONNECT(this.ws) + await Promise.all([ + this.localAuth.authenticationIsDone(), + this.remoteAuth.authenticationIsDone() + ]) + resolve() + } catch (e) { + reject(e) + } + })() + + this.ws.on('message', async (msg:string) => { + const data = JSON.parse(msg) + + // Incorrect data + if (typeof data !== 'object') { + // We only accept JSON objects + await this.errorDetected(WS2P_ERR.MESSAGE_MUST_BE_AN_OBJECT) + } + + // OK: JSON object + else { + + /************************ + * CONNECTION STUFF + ************************/ + + if (data.auth && typeof data.auth === "string") { + + if (data.auth === "CONNECT") { + if (this.remoteAuth.isAuthenticatedByRemote()) { + return this.errorDetected(WS2P_ERR.ALREADY_AUTHENTICATED_BY_REMOTE) + } + else if ( + typeof data.pub !== "string" || typeof data.sig !== "string" || typeof data.challenge !== "string") { + await this.errorDetected(WS2P_ERR.AUTH_INVALID_ASK_FIELDS) } else { - const valid = await this.remoteAuth.registerCONNECT(data.challenge, data.sig, data.pub) - if (valid) { - await this.remoteAuth.sendACK(this.ws) + if (this.expectedPub && data.pub !== this.expectedPub) { + await this.errorDetected(WS2P_ERR.INCORRECT_PUBKEY_FOR_REMOTE) } else { - await this.errorDetected(WS2P_ERR.INCORRECT_ASK_SIGNATURE_FROM_REMOTE) + const valid = await this.remoteAuth.registerCONNECT(data.challenge, data.sig, data.pub) + if (valid) { + await this.remoteAuth.sendACK(this.ws) + } else { + await this.errorDetected(WS2P_ERR.INCORRECT_ASK_SIGNATURE_FROM_REMOTE) + } } } } - } - else if (data.auth === "ACK") { - if (this.localAuth.isRemoteAuthenticated()) { - return this.errorDetected(WS2P_ERR.ALREADY_AUTHENTICATED_REMOTE) - } - if (typeof data.pub !== "string" || typeof data.sig !== "string") { - await this.errorDetected(WS2P_ERR.AUTH_INVALID_ACK_FIELDS) - } else { - if (this.expectedPub && data.pub !== this.expectedPub) { - await this.errorDetected(WS2P_ERR.INCORRECT_PUBKEY_FOR_REMOTE) + else if (data.auth === "ACK") { + if (this.localAuth.isRemoteAuthenticated()) { + return this.errorDetected(WS2P_ERR.ALREADY_AUTHENTICATED_REMOTE) + } + if (typeof data.pub !== "string" || typeof data.sig !== "string") { + await this.errorDetected(WS2P_ERR.AUTH_INVALID_ACK_FIELDS) } else { - try { - const valid = await this.localAuth.registerACK(data.sig, data.pub) - if (valid) { - await this.localAuth.sendOK(this.ws) + if (this.expectedPub && data.pub !== this.expectedPub) { + await this.errorDetected(WS2P_ERR.INCORRECT_PUBKEY_FOR_REMOTE) + } else { + try { + const valid = await this.localAuth.registerACK(data.sig, data.pub) + if (valid) { + await this.localAuth.sendOK(this.ws) + } + } catch (e) { + await this.errorDetected(WS2P_ERR.INCORRECT_ACK_SIGNATURE_FROM_REMOTE) } - } catch (e) { - await this.errorDetected(WS2P_ERR.INCORRECT_ACK_SIGNATURE_FROM_REMOTE) } } } - } - else if (data.auth === "OK") { - if (this.remoteAuth.isAuthenticatedByRemote()) { - return this.errorDetected(WS2P_ERR.ALREADY_AUTHENTICATED_AND_CONFIRMED_BY_REMOTE) - } - if (typeof data.sig !== "string") { - await this.errorDetected(WS2P_ERR.AUTH_INVALID_OK_FIELDS) - } else { - await this.remoteAuth.registerOK(data.sig) + else if (data.auth === "OK") { + if (this.remoteAuth.isAuthenticatedByRemote()) { + return this.errorDetected(WS2P_ERR.ALREADY_AUTHENTICATED_AND_CONFIRMED_BY_REMOTE) + } + if (typeof data.sig !== "string") { + await this.errorDetected(WS2P_ERR.AUTH_INVALID_OK_FIELDS) + } else { + await this.remoteAuth.registerOK(data.sig) + } } - } - else { - await this.errorDetected(WS2P_ERR.UNKNOWN_AUTH_MESSAGE) + else { + await this.errorDetected(WS2P_ERR.UNKNOWN_AUTH_MESSAGE) + } } - } - /************************ - * APPLICATION STUFF - ************************/ + /************************ + * APPLICATION STUFF + ************************/ - else { + else { - if (!this.localAuth.isRemoteAuthenticated()) { - await this.errorDetected(WS2P_ERR.MUST_BE_AUTHENTICATED_FIRST) - } + if (!this.localAuth.isRemoteAuthenticated()) { + await this.errorDetected(WS2P_ERR.MUST_BE_AUTHENTICATED_FIRST) + } - // Request message - else if (data.reqId && typeof data.reqId === "string") { - const body = await this.messageHandler.handleRequestMessage(data) - this.ws.send(JSON.stringify({ resId: data.reqId, body })) - } + // Request message + else if (data.reqId && typeof data.reqId === "string") { + const answer = await this.messageHandler.answerToRequest(data.body) + this.ws.send(JSON.stringify({ resId: data.reqId, body: answer })) + } - // Answer message - else if (data.resId && typeof data.resId === "string") { - // An answer - const request = this.exchanges[data.resId] - this.nbResponsesCount++ - if (request !== undefined) { - request.extras.resolve(data.body) - } else { - await this.errorDetected(WS2P_ERR.ANSWER_TO_UNDEFINED_REQUEST) + // Answer message + else if (data.resId && typeof data.resId === "string") { + // An answer + const request = this.exchanges[data.resId] + this.nbResponsesCount++ + if (request !== undefined) { + request.extras.resolve(data.body) + } else { + await this.errorDetected(WS2P_ERR.ANSWER_TO_UNDEFINED_REQUEST) + } } - } - // Push message - else { - this.nbPushsByRemoteCount++ - await this.messageHandler.handlePushMessage(data) + // Push message + else { + this.nbPushsByRemoteCount++ + await this.messageHandler.handlePushMessage(data) + } } } - } - }) - })]) + }) + })]) + + this.connectedResolve(this.remoteAuth.getPubkey()) + } catch (e) { + this.connectedReject() + } })() } return this.connectp @@ -466,12 +519,39 @@ export class WS2PConnection { }) } - async pushData(body:WS2PRequest) { + async pushBlock(block:BlockDTO) { + return this.pushData(WS2P_PUSH.BLOCK, 'block', block) + } + + async pushIdentity(idty:IdentityDTO) { + return this.pushData(WS2P_PUSH.IDENTITY, 'identity', idty) + } + + async pushCertification(cert:CertificationDTO) { + return this.pushData(WS2P_PUSH.CERTIFICATION, 'certification', cert) + } + + async pushMembership(ms:MembershipDTO) { + return this.pushData(WS2P_PUSH.MEMBERSHIP, 'membership', ms) + } + + async pushTransaction(tx:TransactionDTO) { + return this.pushData(WS2P_PUSH.TRANSACTION, 'transaction', tx) + } + + async pushPeer(peer:PeerDTO) { + return this.pushData(WS2P_PUSH.PEER, 'peer', peer) + } + + async pushData(type:WS2P_PUSH, key:string, data:any) { await this.connect() return new Promise((resolve, reject) => { this.nbPushsToRemoteCount++ this.ws.send(JSON.stringify({ - body + body: { + name: WS2P_PUSH[type], + [key]: data + } }), async (err:any) => { if (err) { return reject(err) diff --git a/app/lib/ws2p/WS2PResponder.ts b/app/lib/ws2p/WS2PResponder.ts deleted file mode 100644 index 8973e2ade1041862831d48f921f95145077c54df..0000000000000000000000000000000000000000 --- a/app/lib/ws2p/WS2PResponder.ts +++ /dev/null @@ -1,39 +0,0 @@ -import {WS2PReqMapper} from "./WS2PReqMapper" - -enum WS2P_REQ { - CURRENT -} - -export enum WS2P_REQERROR { - UNKNOWN_REQUEST -} - -export async function WS2PResponder(data:any, handler:WS2PReqMapper) { - - /********** - * REQUEST - *********/ - if (data.reqId && typeof data.reqId === "string") { - - let body:any = {} - - if (data.body && data.body.name) { - switch (data.body.name) { - case WS2P_REQ[WS2P_REQ.CURRENT]: - body = await handler.getCurrent() - break; - default: - throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST]) - } - } - - return body - } - - /********** - * PUSH - *********/ - else { - - } -} \ No newline at end of file diff --git a/app/lib/ws2p/WS2PServer.ts b/app/lib/ws2p/WS2PServer.ts new file mode 100644 index 0000000000000000000000000000000000000000..79861876f6662ac61da62f228592dc98720b1837 --- /dev/null +++ b/app/lib/ws2p/WS2PServer.ts @@ -0,0 +1,79 @@ +import {Server} from "../../../server" +import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "./WS2PConnection" +import {WS2PServerMessageHandler} from "./interface/WS2PServerMessageHandler" +import {WS2PStreamer} from "../streams/WS2PStreamer" +import {Key} from "../common-libs/crypto/keyring" + +const WebSocketServer = require('ws').Server + +export class WS2PServer { + + private wss:any + private connections:WS2PConnection[] = [] + + private constructor( + private server:Server, + private host:string, + private port:number) { + } + + private listenToWebSocketConnections() { + const key = new Key(this.server.conf.pair.pub, this.server.conf.pair.sec) + this.wss = new WebSocketServer({ host: this.host, port: this.port }) + this.wss.on('connection', async (ws:any) => { + + /****************** + * A NEW CONNECTION + ******************/ + const c = WS2PConnection.newConnectionFromWebSocketServer( + ws, + new WS2PServerMessageHandler(this.server), + new WS2PPubkeyLocalAuth(key), + new WS2PPubkeyRemoteAuth(key), { + connectionTimeout: 100, + requestTimeout: 100 + }) + + this.connections.push(c) + + c.connect().catch((e:any) => console.error('WS2P: cannot connect to incoming WebSocket connection: %s', e)) + + // Broadcasting + const ws2pStreamer = new WS2PStreamer(c) + this.server.pipe(ws2pStreamer) + + ws.on('error', (e:any) => { + this.server.logger.error(e) + }) + + ws.on('close', () => { + this.server.unpipe(ws2pStreamer) + }) + }) + } + + close() { + return this.wss.close() + } + + async getConnection(pubkeyOfConnection:string) { + if (this.connections.length === 0) { + throw "No connections to look into." + } + return Promise.race(this.connections.map(async (c) => { + await c.connected + if (c.pubkey === pubkeyOfConnection) { + return c + } else { + await new Promise(resolve => setTimeout(resolve, 5000)) + throw "Pubkey not matching or too long to be obtained" + } + })) + } + + static async bindOn(server:Server, host:string, port:number) { + const ws2ps = new WS2PServer(server, host, port) + await ws2ps.listenToWebSocketConnections() + return ws2ps + } +} \ No newline at end of file diff --git a/app/lib/ws2p/impl/WS2PMessageHandler.ts b/app/lib/ws2p/impl/WS2PMessageHandler.ts index 8fdfdda6d75ffd30ac15adbcfbdc5b13e0c88f47..650e9f4e90c7de3dccc037af80489f5407c3a3ed 100644 --- a/app/lib/ws2p/impl/WS2PMessageHandler.ts +++ b/app/lib/ws2p/impl/WS2PMessageHandler.ts @@ -2,5 +2,5 @@ import {WS2PResponse} from "./WS2PResponse" export interface WS2PMessageHandler { handlePushMessage(json:any): Promise<void> - handleRequestMessage(json:any): Promise<WS2PResponse> + answerToRequest(json:any): Promise<WS2PResponse> } \ No newline at end of file diff --git a/app/lib/ws2p/impl/WS2PReqMapperByServer.ts b/app/lib/ws2p/impl/WS2PReqMapperByServer.ts index b30496a4fdba04cc42f1b15378d09e7ddb43edd1..9f0101a4d446764ef0a08626a6a7ed33b761591b 100644 --- a/app/lib/ws2p/impl/WS2PReqMapperByServer.ts +++ b/app/lib/ws2p/impl/WS2PReqMapperByServer.ts @@ -3,7 +3,7 @@ import {WS2PReqMapper} from "../interface/WS2PReqMapper" export class WS2PReqMapperByServer implements WS2PReqMapper { - private constructor(protected server:Server) {} + constructor(protected server:Server) {} async getCurrent() { return this.server.BlockchainService.current() diff --git a/app/lib/ws2p/interface/WS2PServerMessageHandler.ts b/app/lib/ws2p/interface/WS2PServerMessageHandler.ts new file mode 100644 index 0000000000000000000000000000000000000000..8ed051fcf2da05c87d03952e116080ef62df0692 --- /dev/null +++ b/app/lib/ws2p/interface/WS2PServerMessageHandler.ts @@ -0,0 +1,88 @@ +import {WS2PMessageHandler} from "../impl/WS2PMessageHandler" +import {WS2PResponse} from "../impl/WS2PResponse" +import {Server} from "../../../../server" +import {WS2PReqMapperByServer} from "../impl/WS2PReqMapperByServer" +import {WS2PReqMapper} from "./WS2PReqMapper" +import {BlockDTO} from "../../dto/BlockDTO" +import {IdentityDTO} from "../../dto/IdentityDTO" +import {CertificationDTO} from "../../dto/CertificationDTO" +import {MembershipDTO} from "../../dto/MembershipDTO" +import {TransactionDTO} from "../../dto/TransactionDTO" +import {PeerDTO} from "../../dto/PeerDTO" + +enum WS2P_REQ { + CURRENT +} + +export enum WS2P_REQERROR { + UNKNOWN_REQUEST +} + +export class WS2PServerMessageHandler implements WS2PMessageHandler { + + protected mapper:WS2PReqMapper + + constructor(protected server:Server) { + this.mapper = new WS2PReqMapperByServer(server) + } + + async handlePushMessage(json: any): Promise<void> { + try { + if (json.body) { + if (json.body.block) { + const dto = BlockDTO.fromJSONObject(json.body.block) + const raw = dto.getRawSigned() + await this.server.writeRawBlock(raw) + } + else if (json.body.identity) { + const dto = IdentityDTO.fromJSONObject(json.body.identity) + const raw = dto.getRawSigned() + await this.server.writeRawIdentity(raw) + } + else if (json.body.certification) { + const dto = CertificationDTO.fromJSONObject(json.body.certification) + const raw = dto.getRawSigned() + await this.server.writeRawCertification(raw) + } + else if (json.body.membership) { + const dto = MembershipDTO.fromJSONObject(json.body.membership) + const raw = dto.getRawSigned() + await this.server.writeRawMembership(raw) + } + else if (json.body.transaction) { + const dto = TransactionDTO.fromJSONObject(json.body.transaction) + const raw = dto.getRaw() + await this.server.writeRawTransaction(raw) + } + else if (json.body.peer) { + const dto = PeerDTO.fromJSONObject(json.body.peer) + const raw = dto.getRawSigned() + await this.server.writeRawPeer(raw) + } + } + } catch(e) { + this.server.logger.warn(e) + } + } + + async answerToRequest(data: any): Promise<WS2PResponse> { + + /********** + * REQUEST + *********/ + + let body:any = {} + + if (data && data.name) { + switch (data.name) { + case WS2P_REQ[WS2P_REQ.CURRENT]: + body = await this.mapper.getCurrent() + break; + default: + throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST]) + } + } + + return body + } +} \ No newline at end of file diff --git a/test/integration/tools/toolbox.ts b/test/integration/tools/toolbox.ts index a732ba052e629acc1151512498630723bfbabaa4..55d99ab3cc4a2b2ed4fc1af3d9fe7b815cae1c83 100644 --- a/test/integration/tools/toolbox.ts +++ b/test/integration/tools/toolbox.ts @@ -19,6 +19,8 @@ import {Key} from "../../../app/lib/common-libs/crypto/keyring" import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "../../../app/lib/ws2p/WS2PConnection" import {WS2PResponse} from "../../../app/lib/ws2p/impl/WS2PResponse" import {WS2PMessageHandler} from "../../../app/lib/ws2p/impl/WS2PMessageHandler" +import {WS2PServer} from "../../../app/lib/ws2p/WS2PServer" +import {WS2PClient} from "../../../app/lib/ws2p/WS2PClient" const assert = require('assert'); const _ = require('underscore'); @@ -65,6 +67,10 @@ export const assertThrows = async (promise:Promise<any>, message:string|null = n } } +export const simpleUser = (uid:string, keyring:{ pub:string, sec:string }, server:TestingServer) => { + return user(uid, keyring, { server }); +} + export const simpleNetworkOf2NodesAnd2Users = async (options:any) => { const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'}; const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'}; @@ -209,6 +215,7 @@ export const fakeSyncServer = async (readBlocksMethod:any, readParticularBlockMe * @param conf */ export const server = (conf:any) => NewTestingServer(conf) +export const simpleTestingServer = (conf:any) => NewTestingServer(conf) export const NewTestingServer = (conf:any) => { const port = PORT++; @@ -272,6 +279,10 @@ export class TestingServer { server.getMainEndpoint = require('../../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint } + get _server() { + return this.server + } + get BlockchainService(): BlockchainService { return this.server.BlockchainService } @@ -315,6 +326,10 @@ export class TestingServer { async writeBlock(obj:any) { return this.server.writeBlock(obj) } + + async writeRawBlock(raw:string) { + return this.server.writeRawBlock(raw) + } async writeIdentity(obj:any): Promise<DBIdentity> { return this.server.writeIdentity(obj) @@ -587,9 +602,40 @@ export async function newWS2PBidirectionnalConnection(k1:Key, k2:Key, serverHand c1 = WS2PConnection.newConnectionToAddress('localhost:' + port, new (class EmptyHandler implements WS2PMessageHandler { async handlePushMessage(json: any): Promise<void> { } - async handleRequestMessage(json: any): Promise<WS2PResponse> { + async answerToRequest(json: any): Promise<WS2PResponse> { return {} } }), new WS2PPubkeyLocalAuth(k2), new WS2PPubkeyRemoteAuth(k2)) }) +} + +export const simpleWS2PNetwork = async (s1:TestingServer, s2:TestingServer) => { + let port = PORT++ + const clientPub = s2.conf.pair.pub + let w1:WS2PConnection|null + + const ws2ps = await WS2PServer.bindOn(s1._server, 'localhost', port) + const ws2pc = await WS2PClient.connectTo(s2._server, 'localhost', port) + + w1 = await ws2ps.getConnection(clientPub) + if (!w1) { + throw "Connection coming from " + clientPub + " was not found" + } + + return { + w1, + ws2pc, + wss: ws2ps + } +} + +export function simpleTestingConf(now = 1500000000, pair:{ pub:string, sec:string }) { + return { + pair, + nbCores: 1, + udTime0: now, + udReevalTime0: now, + sigQty: 1, + medianTimeBlocks: 1 // The medianTime always equals previous block's medianTime + } } \ No newline at end of file diff --git a/test/integration/tools/user.js b/test/integration/tools/user.js index 4bde2687d44e91d088084c36ac45c3e2ae297313..9e82326e7b98ed4aac34eded9bed0267ce337ebc 100644 --- a/test/integration/tools/user.js +++ b/test/integration/tools/user.js @@ -170,6 +170,13 @@ function User (uid, options, node) { }; }; + this.sendMoney = function (amount, recipient, comment) { + return co(function *() { + let raw = yield that.prepareITX(amount, recipient, comment); + yield that.sendTX(raw); + }) + }; + this.sendTX = (rawTX) => co(function *() { let http = yield getContacter(); return http.processTransaction(rawTX); diff --git a/test/integration/ws2p_connection.ts b/test/integration/ws2p_connection.ts index c74fee3d917a78ab61d9a23efa6422d7e5e80d75..25185c66e1603064a5ef904c625b26d0c531b039 100644 --- a/test/integration/ws2p_connection.ts +++ b/test/integration/ws2p_connection.ts @@ -143,7 +143,7 @@ describe('WS2P', () => { s1 = WS2PConnection.newConnectionFromWebSocketServer(ws, new (class TmpHandler implements WS2PMessageHandler { async handlePushMessage(json: any): Promise<void> { } - async handleRequestMessage(json: any): Promise<WS2PResponse> { + async answerToRequest(json: any): Promise<WS2PResponse> { return { answer: 'world' } } }), new WS2PNoLocalAuth(), new WS2PNoRemoteAuth()) @@ -154,7 +154,7 @@ describe('WS2P', () => { s2 = WS2PConnection.newConnectionFromWebSocketServer(ws, new (class TmpHandler implements WS2PMessageHandler { async handlePushMessage(json: any): Promise<void> { } - async handleRequestMessage(json: any): Promise<WS2PResponse> { + async answerToRequest(json: any): Promise<WS2PResponse> { return { answer: 'this is s2![j = ' + (j++) + ']' } } }), new WS2PNoLocalAuth(), new WS2PNoRemoteAuth()) @@ -350,7 +350,7 @@ describe('WS2P', () => { const c5 = WS2PConnection.newConnectionToAddress('localhost:20903', new (class TmpHandler implements WS2PMessageHandler { async handlePushMessage(json: any): Promise<void> { } - async handleRequestMessage(json: any): Promise<WS2PResponse> { + async answerToRequest(json: any): Promise<WS2PResponse> { return { answer: 'success!' } } }), new WS2PPubkeyLocalAuth(keypair), new WS2PPubkeyRemoteAuth(keypair)) @@ -434,7 +434,7 @@ class WS2PMutedHandler implements WS2PMessageHandler { async handlePushMessage(json: any): Promise<void> { } - async handleRequestMessage(json: any): Promise<WS2PResponse> { + async answerToRequest(json: any): Promise<WS2PResponse> { return {} } } diff --git a/test/integration/ws2p_doc_sharing.ts b/test/integration/ws2p_doc_sharing.ts new file mode 100644 index 0000000000000000000000000000000000000000..68d43e13f8f3d67b2956d827f1bb5c56d14896bf --- /dev/null +++ b/test/integration/ws2p_doc_sharing.ts @@ -0,0 +1,108 @@ +import {simpleTestingConf, simpleTestingServer, simpleUser, simpleWS2PNetwork, TestingServer} from "./tools/toolbox" + +const assert = require('assert') + +describe("WS2P doc sharing", function() { + + const now = 1500000000 + let s1:TestingServer, s2:TestingServer, wss:any + let cat:any, tac:any, toc:any + const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'} + const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'} + const tocKeyring = { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'} + + before(async () => { + const conf1 = simpleTestingConf(now, catKeyring) + const conf2 = simpleTestingConf(now, tacKeyring) + s1 = simpleTestingServer(conf1) + s2 = simpleTestingServer(conf2) + cat = simpleUser('cat', catKeyring, s1) + tac = simpleUser('tac', tacKeyring, s1) + toc = simpleUser('toc', tocKeyring, s1) + await s1.initDalBmaConnections() + await s2.initDalBmaConnections() + + const network = await simpleWS2PNetwork(s1, s2) + + await cat.createIdentity(); + await tac.createIdentity(); + await cat.cert(tac); + await tac.cert(cat); + await cat.join(); + await tac.join(); + + wss = network.wss + }) + + after(() => wss.close()) + + it('should see the identity and certs of initial members in the docpool', async () => { + await s2.expect('/wot/lookup/cat', (res:any) => { + assert.equal(res.results.length, 1) + assert.equal(res.results[0].uids[0].others.length, 1) + }) + await s2.expect('/wot/lookup/tac', (res:any) => { + assert.equal(res.results.length, 1) + assert.equal(res.results[0].uids[0].others.length, 1) + }) + }) + + it('should have the same block#0 if we commit', async () => { + await s1.commit({ time: now }) + await s1.commit({ time: now }) + await s1.waitToHaveBlock(1) + await s2.waitToHaveBlock(1) + const b1s1 = await s1.BlockchainService.current() + const b1s2 = await s2.BlockchainService.current() + assert.equal(b1s1.number, 1) + assert.equal(b1s2.number, 1) + assert.equal(b1s1.hash, b1s2.hash) + }) + + it('should see the identity, certs and memberships in the docpool', async () => { + await toc.createIdentity(); + await cat.cert(toc); + await toc.join(); + await s2.expect('/wot/lookup/toc', (res:any) => { + assert.equal(res.results.length, 1) + assert.equal(res.results[0].uids[0].others.length, 1) + }) + await s2.commit({ time: now }) + await s1.waitToHaveBlock(2) + await s2.waitToHaveBlock(2) + const b2s1 = await s1.BlockchainService.current() + const b2s2 = await s2.BlockchainService.current() + assert.equal(b2s1.number, 2) + assert.equal(b2s2.number, 2) + assert.equal(b2s1.hash, b2s2.hash) + assert.equal(b2s2.joiners.length, 1) + }) + + it('should see the transactions pending', async () => { + await cat.sendMoney(54, toc) + await s2.until('transaction', 1) + await s2.expect('/tx/history/' + catKeyring.pub, (res:any) => { + assert.equal(res.history.sending.length, 1) + }) + await s2.expect('/tx/history/' + tocKeyring.pub, (res:any) => { + assert.equal(res.history.pending.length, 1) + }) + await s2.commit({ time: now }) + await s1.waitToHaveBlock(3) + await s2.waitToHaveBlock(3) + const b3s1 = await s1.BlockchainService.current() + const b3s2 = await s2.BlockchainService.current() + assert.equal(b3s1.number, 3) + assert.equal(b3s2.number, 3) + assert.equal(b3s1.hash, b3s2.hash) + assert.equal(b3s2.transactions.length, 1) + }) + + it('should see the peer documents', async () => { + await s1.getPeer() + await s2.until('peer', 1) + await s2.expect('/network/peers', (res:any) => { + assert.equal(res.peers.length, 1) + }) + }) +}) diff --git a/test/integration/ws2p_exchange.ts b/test/integration/ws2p_exchange.ts index c80a4f4c37c55c68a01c566588a2bdefa445c3a4..6a52b330cd17e03634bd74915771e9a9f685066d 100644 --- a/test/integration/ws2p_exchange.ts +++ b/test/integration/ws2p_exchange.ts @@ -2,7 +2,6 @@ import {WS2PConnection} from "../../app/lib/ws2p/WS2PConnection" import {Key} from "../../app/lib/common-libs/crypto/keyring" import {newWS2PBidirectionnalConnection} from "./tools/toolbox" import {WS2PRequester} from "../../app/lib/ws2p/WS2PRequester" -import {WS2PResponder} from "../../app/lib/ws2p/WS2PResponder" import {WS2PReqMapper} from "../../app/lib/ws2p/WS2PReqMapper" import {BlockDTO} from "../../app/lib/dto/BlockDTO" import {WS2PMessageHandler} from "../../app/lib/ws2p/impl/WS2PMessageHandler" @@ -22,12 +21,8 @@ describe('WS2P exchange', () => { async handlePushMessage(json: any): Promise<void> { } - async handleRequestMessage(json: any): Promise<WS2PResponse> { - return await WS2PResponder(json, new (class TestingMapper implements WS2PReqMapper { - async getCurrent(): Promise<BlockDTO> { - return BlockDTO.fromJSONObject({ number: 1, hash: 'A' }) - } - })) + async answerToRequest(json: any): Promise<WS2PResponse> { + return BlockDTO.fromJSONObject({ number: 1, hash: 'A' }) } })) s1 = res.p1