Mise à jour de GitLab prévue ce samedi 23 octobre 2021 à partir de 9h00 CET

Commit 7c61bed3 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[fix] Add a ban mechanism for WS2P

parent 2a85bfb5
......@@ -35,6 +35,9 @@ export class WS2PCluster {
private maxLevel1Size = WS2PConstants.MAX_LEVEL_1_PEERS
private messageHandler: WS2PServerMessageHandler
// A cache to remember the banned keys
private banned:{ [k:string]: string } = {}
// A cache to know if a block exists or not in the DB
private blockstampsCache:{ [k:string]: number } = {}
......@@ -83,42 +86,43 @@ export class WS2PCluster {
await Promise.all(heads.map(async (h:{ message:string, sig:string }) => {
const message = h.message
const sig = h.sig
try {
if (message && message.match(WS2PConstants.HEAD_REGEXP)) {
const [,, pub, blockstamp]:string[] = message.split(':')
const sigOK = verify(message, sig, pub)
if (sigOK) {
// Already known?
if (!this.headsCache[pub] || this.headsCache[pub].blockstamp !== blockstamp) {
// More recent?
if (!this.headsCache[pub] || parseInt(this.headsCache[pub].blockstamp) < parseInt(blockstamp)) {
// Check that issuer is a member and that the block exists
const memberKey = await this.isMemberKey(pub)
if (memberKey) {
const exists = await this.existsBlock(blockstamp)
if (exists) {
this.headsCache[pub] = { blockstamp, message, sig }
this.newHeads.push({message, sig})
added.push({message, sig})
// Cancel a pending "heads" to be spread
if (this.headsTimeout) {
clearTimeout(this.headsTimeout)
}
// Reprogram it a few moments later
this.headsTimeout = setTimeout(async () => {
const heads = this.newHeads.splice(0, this.newHeads.length)
if (heads.length) {
await this.spreadNewHeads(heads)
}
}, WS2PConstants.HEADS_SPREAD_TIMEOUT)
if (!message) {
throw "EMPTY_MESSAGE_FOR_HEAD"
}
if (message.match(WS2PConstants.HEAD_REGEXP)) {
const [,, pub, blockstamp]:string[] = message.split(':')
const sigOK = verify(message, sig, pub)
if (sigOK) {
// Already known?
if (!this.headsCache[pub] || this.headsCache[pub].blockstamp !== blockstamp) {
// More recent?
if (!this.headsCache[pub] || parseInt(this.headsCache[pub].blockstamp) < parseInt(blockstamp)) {
// Check that issuer is a member and that the block exists
const memberKey = await this.isMemberKey(pub)
if (memberKey) {
const exists = await this.existsBlock(blockstamp)
if (exists) {
this.headsCache[pub] = { blockstamp, message, sig }
this.newHeads.push({message, sig})
added.push({message, sig})
// Cancel a pending "heads" to be spread
if (this.headsTimeout) {
clearTimeout(this.headsTimeout)
}
// Reprogram it a few moments later
this.headsTimeout = setTimeout(async () => {
const heads = this.newHeads.splice(0, this.newHeads.length)
if (heads.length) {
await this.spreadNewHeads(heads)
}
}, WS2PConstants.HEADS_SPREAD_TIMEOUT)
}
}
}
}
} else {
throw "HEAD_MESSAGE_WRONGLY_SIGNED"
}
} catch (e) {
this.server.logger.trace('Rejected message %s:', message, e)
}
}))
this.server.push({
......@@ -195,7 +199,7 @@ export class WS2PCluster {
if (this.ws2pServer) {
await this.ws2pServer.close()
}
const connections = await this.getAllConnections()
const connections = this.getAllConnections()
await Promise.all(connections.map(c => c.close()))
}
......@@ -322,7 +326,7 @@ export class WS2PCluster {
}
private async spreadNewHeads(heads:{ message:string, sig:string }[]) {
const connexions = await this.getAllConnections()
const connexions = this.getAllConnections()
return Promise.all(connexions.map(async (c) => {
try {
await c.pushHeads(heads)
......@@ -438,6 +442,11 @@ export class WS2PCluster {
priorityKeys:string[],
targetWS2PUID = ""
) {
// We do not accept banned keys
if (this.banned[pub]) {
this.server.logger.warn('Connection to %s refused, reason: %s', pub.slice(0, 8), this.banned[pub])
return false
}
let accept = priorityKeys.indexOf(pub) !== -1
if (!accept && connectedPubkeys.indexOf(pub) === -1) {
// Do we have room?
......@@ -504,7 +513,7 @@ export class WS2PCluster {
return this.ws2pServer ? this.ws2pServer.getConnexions() : []
}
async getAllConnections() {
getAllConnections() {
const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions() : []
for (const uuid of Object.keys(this.ws2pClients)) {
all.push(this.ws2pClients[uuid].connection)
......@@ -562,7 +571,7 @@ export class WS2PCluster {
}
private async makeApullShot() {
const connections = await this.getAllConnections()
const connections = this.getAllConnections()
const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT)
await Promise.all(chosen.map(async (conn) => {
......@@ -581,7 +590,7 @@ export class WS2PCluster {
}
async pullDocpool() {
const connections = await this.getAllConnections()
const connections = this.getAllConnections()
const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT)
await Promise.all(chosen.map(async (conn) => {
const puller = new WS2PDocpoolPuller(this.server, conn)
......@@ -594,4 +603,20 @@ export class WS2PCluster {
const served = this.ws2pServer ? this.ws2pServer.getConnexions().map(c => c.pubkey) : []
return clients.concat(served)
}
banConnection(c:WS2PConnection, reason:string) {
this.server.logger.warn('Banning connections of %s for %ss, reason: %s', c.pubkey.slice(0, 8), WS2PConstants.BAN_DURATION_IN_SECONDS, reason)
if (c.pubkey) {
this.banned[c.pubkey] = reason
setTimeout(() => {
delete this.banned[c.pubkey]
}, 1000 * WS2PConstants.BAN_DURATION_IN_SECONDS)
const connections = this.getAllConnections()
for (const connection of connections) {
if (c.pubkey == connection.pubkey) {
connection.close()
}
}
}
}
}
\ No newline at end of file
......@@ -476,7 +476,7 @@ export class WS2PConnection {
// Push message
else {
this.nbPushsByRemoteCount++
await this.messageHandler.handlePushMessage(data)
await this.messageHandler.handlePushMessage(data, this)
}
}
}
......
......@@ -16,6 +16,9 @@ export const WS2PConstants = {
MAX_LEVEL_1_PEERS: 10,
MAX_LEVEL_2_PEERS: 10,
BAN_DURATION_IN_SECONDS: 120,
ERROR_RECALL_DURATION_IN_SECONDS: 60,
HEAD_REGEXP: new RegExp('^WS2P:HEAD:' + CommonConstants.FORMATS.PUBKEY + ':' + CommonConstants.FORMATS.BLOCKSTAMP + '$'),
HEADS_SPREAD_TIMEOUT: 100 // Wait 100ms before sending a bunch of signed heads
......
import {WS2PResponse} from "./WS2PResponse"
import {WS2PConnection} from "../WS2PConnection"
export interface WS2PMessageHandler {
handlePushMessage(json:any): Promise<void>
handlePushMessage(json:any, c:WS2PConnection): Promise<void>
answerToRequest(json:any): Promise<WS2PResponse>
}
\ No newline at end of file
......@@ -11,6 +11,8 @@ import {TransactionDTO} from "../../../../lib/dto/TransactionDTO"
import {PeerDTO} from "../../../../lib/dto/PeerDTO"
import {WS2P_REQ} from "../WS2PRequester"
import {WS2PCluster} from "../WS2PCluster"
import {WS2PConnection} from "../WS2PConnection"
import {WS2PConstants} from "../constants"
export enum WS2P_REQERROR {
UNKNOWN_REQUEST
......@@ -19,49 +21,86 @@ export enum WS2P_REQERROR {
export class WS2PServerMessageHandler implements WS2PMessageHandler {
protected mapper:WS2PReqMapper
private errors:{
[k:string]: {
createdOn: number,
pubkeys: {
[p:string]: boolean
}
}
} = {}
constructor(protected server:Server, protected cluster:WS2PCluster) {
this.mapper = new WS2PReqMapperByServer(server)
}
async handlePushMessage(json: any): Promise<void> {
async handlePushMessage(json: any, c:WS2PConnection): Promise<void> {
let documentHash = ''
try {
if (json.body) {
if (json.body.block) {
const dto = BlockDTO.fromJSONObject(json.body.block)
const raw = dto.getRawSigned()
documentHash = dto.getHash()
await this.server.writeRawBlock(raw)
}
else if (json.body.identity) {
const dto = IdentityDTO.fromJSONObject(json.body.identity)
const raw = dto.getRawSigned()
documentHash = dto.getHash()
await this.server.writeRawIdentity(raw)
}
else if (json.body.certification) {
const dto = CertificationDTO.fromJSONObject(json.body.certification)
const raw = dto.getRawSigned()
documentHash = dto.getHash()
await this.server.writeRawCertification(raw)
}
else if (json.body.membership) {
const dto = MembershipDTO.fromJSONObject(json.body.membership)
const raw = dto.getRawSigned()
documentHash = dto.getHash()
await this.server.writeRawMembership(raw)
}
else if (json.body.transaction) {
const dto = TransactionDTO.fromJSONObject(json.body.transaction)
const raw = dto.getRaw()
documentHash = dto.getHash()
await this.server.writeRawTransaction(raw)
}
else if (json.body.peer) {
const dto = PeerDTO.fromJSONObject(json.body.peer)
const raw = dto.getRawSigned()
documentHash = dto.getHash()
await this.server.writeRawPeer(raw)
}
else if (json.body.heads && typeof json.body.heads === "object" && json.body.heads.length !== undefined) {
if (!json.body.heads.length) {
documentHash = 'HEADs'
throw "Heads empty HEADs received"
}
await this.cluster.headsReceived(json.body.heads || [])
}
}
} catch(e) {
if (documentHash
&& this.errors[documentHash]
&& this.errors[documentHash].pubkeys[c.pubkey] !== undefined
&& this.server.conf.pair.pub !== c.pubkey) { // We do not want to ban ourselves
this.cluster.banConnection(c, "Peer " + (c.pubkey || '--unknown--') + " sending again a wrong document")
} else {
// Remember the error for some time
if (!this.errors[documentHash]) {
this.errors[documentHash] = {
createdOn: Date.now(),
pubkeys: {}
}
}
this.errors[documentHash].pubkeys[c.pubkey] = true
setTimeout(() => {
delete this.errors[documentHash]
}, WS2PConstants.ERROR_RECALL_DURATION_IN_SECONDS)
}
this.server.logger.warn(e)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment