Skip to content
Snippets Groups Projects
Commit 2cfc3b49 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] #1147 Prevent double-sending of a same document + ban on 5 errors

parent 6b0fc1cb
Branches
Tags
No related merge requests found
......@@ -3,7 +3,8 @@ import {WS2PConnection, WS2PPubkeyLocalAuth, WS2PPubkeyRemoteAuth} from "./WS2PC
import {Key} from "../../../lib/common-libs/crypto/keyring"
import {WS2PMessageHandler} from "./impl/WS2PMessageHandler"
import {WS2PConstants} from "./constants"
import { WS2PStreamer } from "./WS2PStreamer";
import {WS2PStreamer} from "./WS2PStreamer"
import {WS2PSingleWriteStream} from "./WS2PSingleWriteStream"
export class WS2PClient {
......@@ -22,17 +23,22 @@ export class WS2PClient {
},
expectedPub
)
const singleWriteProtection = new WS2PSingleWriteStream()
const streamer = new WS2PStreamer(c)
c.connected
.then(() => {
// Streaming
server.pipe(streamer)
server
.pipe(singleWriteProtection)
.pipe(streamer)
})
.catch(() => {
server.unpipe(streamer)
server.unpipe(singleWriteProtection)
singleWriteProtection.unpipe(streamer)
})
c.closed.then(() => {
server.unpipe(streamer)
server.unpipe(singleWriteProtection)
singleWriteProtection.unpipe(streamer)
})
// Connecting
......
......@@ -6,6 +6,7 @@ import * as events from "events"
import {WS2PConstants} from "./constants"
import {WS2PMessageHandler} from "./impl/WS2PMessageHandler"
import {WS2PStreamer} from "./WS2PStreamer"
import {WS2PSingleWriteStream} from "./WS2PSingleWriteStream"
const WebSocketServer = require('ws').Server
......@@ -87,15 +88,19 @@ export class WS2PServer extends events.EventEmitter {
this.server.logger.info('WS2P: established incoming connection from %s:%s', host, port)
// Broadcasting
const singleWriteProtection = new WS2PSingleWriteStream()
const ws2pStreamer = new WS2PStreamer(c)
this.server.pipe(ws2pStreamer)
this.server
.pipe(singleWriteProtection)
.pipe(ws2pStreamer)
ws.on('error', (e:any) => {
this.server.logger.error(e)
})
ws.on('close', () => {
this.server.unpipe(ws2pStreamer)
this.server.unpipe(singleWriteProtection)
singleWriteProtection.unpipe(ws2pStreamer)
this.removeConnection(c)
this.server.push({
ws2p: 'disconnected',
......
import * as stream from "stream"
import {NewLogger} from "../../../lib/logger"
import {CertificationDTO} from "../../../lib/dto/CertificationDTO"
import {IdentityDTO} from "../../../lib/dto/IdentityDTO"
import {BlockDTO} from "../../../lib/dto/BlockDTO"
import {MembershipDTO} from "../../../lib/dto/MembershipDTO"
import {TransactionDTO} from "../../../lib/dto/TransactionDTO"
import {PeerDTO} from "../../../lib/dto/PeerDTO"
import {WS2PConstants} from "./constants"
const logger = NewLogger()
export class WS2PSingleWriteStream extends stream.Transform {
private detections:{
[k:string]: number
} = {}
constructor(private protectionDuration = 1000 * WS2PConstants.SINGLE_RECORD_PROTECTION_IN_SECONDS) {
super({ objectMode: true })
}
getNbProtectionsCurrently() {
return Object.keys(this.detections).length
}
async _write(obj:any, enc:any, done:any) {
let documentHash = ''
let doStream = false
try {
if (obj.joiners) {
const dto = BlockDTO.fromJSONObject(obj)
documentHash = dto.getHash()
}
else if (obj.pubkey && obj.uid) {
const dto = IdentityDTO.fromJSONObject(obj)
documentHash = dto.getHash()
}
else if (obj.idty_uid) {
const dto = CertificationDTO.fromJSONObject(obj)
documentHash = dto.getHash()
}
else if (obj.userid) {
const dto = MembershipDTO.fromJSONObject(obj)
documentHash = dto.getHash()
}
else if (obj.issuers) {
const dto = TransactionDTO.fromJSONObject(obj)
documentHash = dto.getHash()
}
else if (obj.endpoints) {
const dto = PeerDTO.fromJSONObject(obj)
documentHash = dto.getHash()
}
if (documentHash) {
if (!this.detections[documentHash]) {
doStream = true
this.detections[documentHash] = 1
} else {
this.detections[documentHash]++
logger.warn('WS2P OUT => Document detected %s times: %s', this.detections[documentHash], JSON.stringify(obj))
}
setTimeout(() => {
delete this.detections[documentHash]
}, this.protectionDuration)
}
if (doStream) {
this.push(obj)
}
} catch (e) {
logger.warn('WS2P >> SingleWrite >>', e)
}
done && done()
}
}
......@@ -18,7 +18,9 @@ export const WS2PConstants = {
CONNECTIONS_LOW_LEVEL: 3,
BAN_DURATION_IN_SECONDS: 120,
BAN_ON_REPEAT_THRESHOLD: 5,
ERROR_RECALL_DURATION_IN_SECONDS: 60,
SINGLE_RECORD_PROTECTION_IN_SECONDS: 60,
HEAD_V0_REGEXP: new RegExp('^WS2P:HEAD:'
+ CommonConstants.FORMATS.PUBKEY + ':'
......
......@@ -26,7 +26,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
[k:string]: {
createdOn: number,
pubkeys: {
[p:string]: boolean
[p:string]: any[]
}
}
} = {}
......@@ -88,7 +88,20 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
&& this.errors[documentHash]
&& this.errors[documentHash].pubkeys[c.pubkey] !== undefined
&& this.server.conf.pair.pub !== c.pubkey) { // We do not want to ban ourselves
this.cluster.banConnection(c, "Peer " + (c.pubkey || '--unknown--') + " sending again a wrong document")
this.errors[documentHash].pubkeys[c.pubkey].push(json.body)
if (this.errors[documentHash].pubkeys[c.pubkey].length >= WS2PConstants.BAN_ON_REPEAT_THRESHOLD) {
let message = "peer " + (c.pubkey || '--unknown--') + " sent " + WS2PConstants.BAN_ON_REPEAT_THRESHOLD + " times a same wrong document: " + (e && (e.message || (e.uerr && e.uerr.message)) || JSON.stringify(e))
this.cluster.banConnection(c, message)
for (const body of this.errors[documentHash].pubkeys[c.pubkey]) {
message += '\n => ' + JSON.stringify(body)
}
} else {
let message = "WS2P IN => " + (c.pubkey || '--unknown--') + " sent " + this.errors[documentHash].pubkeys[c.pubkey].length + " times a same wrong document: " + (e && (e.message || (e.uerr && e.uerr.message)) || JSON.stringify(e))
for (const body of this.errors[documentHash].pubkeys[c.pubkey]) {
message += '\n => ' + JSON.stringify(body)
}
this.server.logger.warn(message)
}
} else {
// Remember the error for some time
if (!this.errors[documentHash]) {
......@@ -97,7 +110,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
pubkeys: {}
}
}
this.errors[documentHash].pubkeys[c.pubkey] = true
this.errors[documentHash].pubkeys[c.pubkey] = [json.body]
setTimeout(() => {
delete this.errors[documentHash]
}, 1000 * WS2PConstants.ERROR_RECALL_DURATION_IN_SECONDS)
......
import * as stream from "stream"
import * as assert from "assert"
import {WS2PSingleWriteStream} from "../../../../app/modules/ws2p/lib/WS2PSingleWriteStream"
const es = require('event-stream')
describe('WS2P Single Write limiter', () => {
const PROTECTION_DURATION = 100
it('should detect double writings', async () => {
const source = new Readable()
const protection = new WS2PSingleWriteStream(PROTECTION_DURATION)
let nbDocs = 0
await new Promise(res => {
source
.pipe(protection)
.pipe(es.mapSync(() => {
nbDocs++
if (nbDocs >= 2) {
res()
}
}))
// Writing
source.push({ joiners: [] }) // A block
source.push({ joiners: [] }) // A block
source.push({ endpoints: [] }) // A peer
})
assert.equal(nbDocs, 2)
assert.equal(protection.getNbProtectionsCurrently(), 2)
await new Promise(res => setTimeout(res, PROTECTION_DURATION + 100))
assert.equal(protection.getNbProtectionsCurrently(), 0)
})
})
class Readable extends stream.Readable {
constructor() {
super({ objectMode: true })
}
async _read() {
}
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment