Skip to content
Snippets Groups Projects
Commit c980570d authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] WS2P document sharing must not close the streams

parent 8f98d936
No related branches found
No related tags found
No related merge requests found
import * as stream from "stream" import * as stream from "stream"
import {WS2PConnection} from "../../modules/ws2p/lib/WS2PConnection" import {WS2PConnection} from "../../modules/ws2p/lib/WS2PConnection"
import {NewLogger} from "../logger"
const logger = NewLogger()
export class WS2PStreamer extends stream.Transform { export class WS2PStreamer extends stream.Transform {
...@@ -8,23 +11,27 @@ export class WS2PStreamer extends stream.Transform { ...@@ -8,23 +11,27 @@ export class WS2PStreamer extends stream.Transform {
} }
async _write(obj:any, enc:any, done:any) { async _write(obj:any, enc:any, done:any) {
if (obj.joiners) { try {
await this.ws2pc.pushBlock(obj) if (obj.joiners) {
} await this.ws2pc.pushBlock(obj)
else if (obj.pubkey && obj.uid) { }
await this.ws2pc.pushIdentity(obj) else if (obj.pubkey && obj.uid) {
} await this.ws2pc.pushIdentity(obj)
else if (obj.idty_uid) { }
await this.ws2pc.pushCertification(obj) else if (obj.idty_uid) {
} await this.ws2pc.pushCertification(obj)
else if (obj.userid) { }
await this.ws2pc.pushMembership(obj) else if (obj.userid) {
} await this.ws2pc.pushMembership(obj)
else if (obj.issuers) { }
await this.ws2pc.pushTransaction(obj) else if (obj.issuers) {
} await this.ws2pc.pushTransaction(obj)
else if (obj.endpoints) { }
await this.ws2pc.pushPeer(obj) else if (obj.endpoints) {
await this.ws2pc.pushPeer(obj)
}
} catch (e) {
logger.warn(e)
} }
done && done(); done && done();
} }
......
...@@ -252,40 +252,44 @@ export class WS2PCluster { ...@@ -252,40 +252,44 @@ export class WS2PCluster {
} }
// Also listen for network updates, and connect to new nodes // Also listen for network updates, and connect to new nodes
this.server.pipe(es.mapSync(async (data:any) => { this.server.pipe(es.mapSync((data:any) => {
// New peer (async () => {
if (data.endpoints) { // New peer
const peer = PeerDTO.fromJSONObject(data) if (data.endpoints) {
const ws2pEnpoint = peer.getWS2P() const peer = PeerDTO.fromJSONObject(data)
if (ws2pEnpoint && peer.pubkey !== this.server.conf.pair.pub) { const ws2pEnpoint = peer.getWS2P()
// Check if already connected to the pubkey (in any way: server or client) if (ws2pEnpoint && peer.pubkey !== this.server.conf.pair.pub) {
const connectedPubkeys = this.getConnectedPubkeys() // Check if already connected to the pubkey (in any way: server or client)
const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || [])) const connectedPubkeys = this.getConnectedPubkeys()
if (shouldAccept) { const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes || []))
await this.connect(ws2pEnpoint.host, ws2pEnpoint.port, this.messageHandler) if (shouldAccept) {
// Trim the eventual extra connections await this.connect(ws2pEnpoint.host, ws2pEnpoint.port, this.messageHandler)
await this.trimClientConnections() // Trim the eventual extra connections
await this.trimClientConnections()
}
} }
} }
}
// Block received // Block received
else if (data.joiners) { else if (data.joiners) {
// Update the cache // Update the cache
this.blockstampsCache[[data.number, data.hash].join('-')] = Date.now() this.blockstampsCache[[data.number, data.hash].join('-')] = Date.now()
} }
// HEAD changed // HEAD changed
else if (data.bcEvent === OtherConstants.BC_EVENT.HEAD_CHANGED || data.bcEvent === OtherConstants.BC_EVENT.SWITCHED) { else if (data.bcEvent === OtherConstants.BC_EVENT.HEAD_CHANGED || data.bcEvent === OtherConstants.BC_EVENT.SWITCHED) {
// Propagate this change to the network // Propagate this change to the network
const { sig, message } = this.sayHeadChangedTo(data.block.number, data.block.hash) const { sig, message } = this.sayHeadChangedTo(data.block.number, data.block.hash)
try { try {
await this.broadcastHead(message, sig) await this.broadcastHead(message, sig)
} catch (e) { } catch (e) {
this.server.logger.warn(e) this.server.logger.warn(e)
}
} }
} })()
return data
})) }))
} }
...@@ -296,7 +300,13 @@ export class WS2PCluster { ...@@ -296,7 +300,13 @@ export class WS2PCluster {
private async spreadNewHeads(heads:{ message:string, sig:string }[]) { private async spreadNewHeads(heads:{ message:string, sig:string }[]) {
const connexions = await this.getAllConnections() const connexions = await this.getAllConnections()
return Promise.all(connexions.map(c => c.pushHeads(heads))) return Promise.all(connexions.map(async (c) => {
try {
await c.pushHeads(heads)
} catch (e) {
this.server.logger.warn('Could not spread new HEAD info to %s WS2P %s %s', c.pubkey)
}
}))
} }
private sayHeadChangedTo(number:number, hash:string) { private sayHeadChangedTo(number:number, hash:string) {
......
...@@ -159,7 +159,7 @@ export class BlockchainService extends FIFOService { ...@@ -159,7 +159,7 @@ export class BlockchainService extends FIFOService {
await this.blockResolution() await this.blockResolution()
// Resolve the potential forks // Resolve the potential forks
await this.forkResolution() await this.forkResolution()
const current = this.current() const current = await this.current()
this.push({ this.push({
bcEvent: OtherConstants.BC_EVENT.RESOLUTION_DONE, bcEvent: OtherConstants.BC_EVENT.RESOLUTION_DONE,
block: current block: current
......
...@@ -190,6 +190,7 @@ export class Server extends stream.Duplex implements HookableServer { ...@@ -190,6 +190,7 @@ export class Server extends stream.Duplex implements HookableServer {
this.emit('bcEvent', e) this.emit('bcEvent', e)
} }
this.streamPush(e) this.streamPush(e)
return e
})) }))
return this.conf; return this.conf;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment