Skip to content
Snippets Groups Projects
Commit 0345d7e5 authored by Éloïs's avatar Éloïs
Browse files

Revert "Revert "[mod] refactoring ws2p connetion priorities #1184 #1185""

This reverts commit a36d2892.
parent 4702dcec
No related branches found
No related tags found
3 merge requests!1222Add startup scripts,!1220Stable/ws2p v1.1 trymerge,!1208Stable/ws2p v1.1
......@@ -13,7 +13,7 @@ export class WS2PClient {
private constructor(public connection:WS2PConnection) {}
static async connectTo(server:Server, fullEndpointAddress:string, uuid:string, messageHandler:WS2PMessageHandler, expectedPub:string, allowKey:(pub:string)=>Promise<boolean> ) {
static async connectTo(server:Server, fullEndpointAddress:string, expectedWS2PUID:string, messageHandler:WS2PMessageHandler, expectedPub:string, allowKey:(pub:string)=>Promise<boolean> ) {
const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec)
const c = WS2PConnection.newConnectionToAddress(
fullEndpointAddress,
......@@ -25,7 +25,8 @@ export class WS2PClient {
connectionTimeout: WS2PConstants.REQUEST_TIMEOUT,
requestTimeout: WS2PConstants.REQUEST_TIMEOUT
},
expectedPub
expectedPub,
expectedWS2PUID
)
const singleWriteProtection = new WS2PSingleWriteStream()
const streamer = new WS2PStreamer(c)
......
......@@ -267,7 +267,7 @@ export class WS2PCluster {
}
this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => {
const privilegedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.privilegedNodes) ? this.server.conf.ws2p.privilegedNodes:[]
return this.acceptPubkey(pubkey, connectedPubkeys, () => this.servedCount(), this.maxLevel2Peers, privilegedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.privilegedOnly || false))
return this.acceptPubkey(pubkey, connectedPubkeys, [], () => this.servedCount(), this.maxLevel2Peers, privilegedNodes, (this.server.conf.ws2p !== undefined && this.server.conf.ws2p.privilegedOnly))
}, this.messageHandler)
this.host = host
this.port = port
......@@ -298,8 +298,9 @@ export class WS2PCluster {
const fullEndpointAddress = WS2PCluster.getFullAddress(host, port, path)
const ws2pc = await WS2PClient.connectTo(this.server, fullEndpointAddress, ws2pEndpointUUID, messageHandler, expectedPub, (pub:string) => {
const connectedPubkeys = this.getConnectedPubkeys()
const connectedWS2PUID = this.getConnectedWS2PUID()
const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[]
return this.acceptPubkey(expectedPub, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID)
return this.acceptPubkey(expectedPub, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID)
})
this.ws2pClients[uuid] = ws2pc
pub = ws2pc.connection.pubkey
......@@ -403,10 +404,11 @@ export class WS2PCluster {
if (ws2pEnpoint) {
// Check if already connected to the pubkey (in any way: server or client)
const connectedPubkeys = this.getConnectedPubkeys()
const connectedWS2PUID = this.getConnectedWS2PUID()
const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[]
const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid)
const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid)
if (shouldAccept && (!this.server.conf.ws2p || ws2pEnpoint.uuid !== this.server.conf.ws2p.uuid || peer.pubkey !== this.server.conf.pair.pub || ws2pEnpoint.uuid === '11111111')) {
await this.connectToRemoteWS(ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey)
await this.connectToRemoteWS(ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey, ws2pEnpoint.uuid)
await this.trimClientConnections()
}
}
......@@ -573,64 +575,75 @@ export class WS2PCluster {
protected async acceptPubkey(
pub:string,
connectedPubkeys:string[],
connectedWS2PUID:string[],
getConcurrentConnexionsCount:()=>number,
maxConcurrentConnexionsSize:number,
priorityKeys:string[],
priorityKeysOnly:boolean,
targetWS2PUID = ""
) {
// We need ws2pServer instance
if (!this.ws2pServer) {
return false
}
// We do not accept oneself connetion
if (this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID) {
return false
}
// We do not accept banned keys
if (this.banned[pub]) {
this.server.logger.warn('Connection to %s refused, reason: %s', pub.slice(0, 8), this.banned[pub])
return false
}
let accept = priorityKeys.indexOf(pub) !== -1
if (!accept && !priorityKeysOnly && connectedPubkeys.indexOf(pub) === -1) {
// Do we have room?
if (this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID) {
accept = false
// Is priority key ?
let isPriorityKey = priorityKeys.indexOf(pub) !== -1
// We do not accept forbidden keys
if (priorityKeysOnly && !isPriorityKey && this.server.conf.pair.pub !== pub) {
return false
}
else if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) {
// Is member key ?
const isMemberPeer = await this.server.dal.isMember(pub)
// Do we have room?
if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) {
// Yes: just connect to it
accept = true
return true
} else if (this.server.conf.pair.pub === pub) {
// We always accept self nodes, and they have a supreme priority (these are siblings)
if (targetWS2PUID !== "") {
if (this.isNewSiblingNode(pub, targetWS2PUID, connectedWS2PUID)) {
return true
}
else {
// No: let's verify some peer has a lower priority
if (connectedPubkeys.indexOf(this.server.conf.pair.pub) !== -1) {
// Yes, we are connected to ourself. Let's replace this connexion
accept = true
}
else {
// Does this node have the priority over at least one node?
const isMemberPeer = await this.server.dal.isMember(pub)
if (isMemberPeer) {
// The node may have the priority over at least 1 other node
let i = 0, existsOneNonMemberNode = false
while (!existsOneNonMemberNode && i < connectedPubkeys.length) {
const isAlsoAMemberPeer = await this.server.dal.isMember(connectedPubkeys[i])
existsOneNonMemberNode = !isAlsoAMemberPeer
i++
}
if (existsOneNonMemberNode) {
// The node has the priority over a non-member peer: try to connect
accept = true
}
else if (connectedPubkeys.indexOf(pub) === -1)
{
let minPriorityLevel = WS2PConstants.MAX_PRIORITY_LEVEL
for (const connectedPubkey of connectedPubkeys) {
let connectedPubkeyPriorityLevel = this.ws2pServer.keyPriorityLevel(connectedPubkey, priorityKeys)
if (connectedPubkeyPriorityLevel < minPriorityLevel) {
minPriorityLevel = connectedPubkeyPriorityLevel
}
}
if (this.ws2pServer.keyPriorityLevel(pub, priorityKeys) > minPriorityLevel) {
return true
}
} else {
// The pubkey is already connected: we accept only self nodes, and they have a supreme priority (these are siblings)
if (targetWS2PUID) {
if (this.isSiblingNode(pub, targetWS2PUID)) {
accept = true
}
return false
}
isNewSiblingNode(pub:string, targetWS2PUID:string, connectedWS2PUID:string[]) {
for (const uuid of connectedWS2PUID) {
if (uuid === targetWS2PUID) {
return false
}
return accept
}
isSiblingNode(pub:string, uuid:string) {
return !!(this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid !== uuid)
return true
}
async getLevel1Connections() {
......@@ -742,6 +755,12 @@ export class WS2PCluster {
return clients.concat(served)
}
getConnectedWS2PUID() {
const clients = Object.keys(this.ws2pClients).map(k => this.ws2pClients[k].connection.uuid)
const served = this.ws2pServer ? this.ws2pServer.getConnexions().map(c => c.uuid) : []
return clients.concat(served)
}
banConnection(c:WS2PConnection, reason:string) {
this.server.logger.warn('Banning connections of %s for %ss, reason: %s', c.pubkey.slice(0, 8), WS2PConstants.BAN_DURATION_IN_SECONDS, reason)
if (c.pubkey) {
......
......@@ -256,7 +256,8 @@ export class WS2PConnection {
connectionTimeout: REQUEST_TIMEOUT_VALUE,
requestTimeout: REQUEST_TIMEOUT_VALUE
},
private expectedPub:string = ""
private expectedPub:string = "",
private expectedWS2PUID:string = ""
) {
this.connectedp = new Promise((resolve, reject) => {
this.connectedResolve = resolve
......@@ -277,7 +278,8 @@ export class WS2PConnection {
connectionTimeout: REQUEST_TIMEOUT_VALUE,
requestTimeout: REQUEST_TIMEOUT_VALUE
},
expectedPub:string = "") {
expectedPub:string = "",
expectedWS2PUID:string = "") {
if (address.match(WS2PConstants.FULL_ADDRESS_ONION_REGEX)) {
options = {
connectionTimeout: WS2PConstants.CONNEXION_TOR_TIMEOUT,
......@@ -292,7 +294,7 @@ export class WS2PConnection {
websocket.on('close', () => res())
})
websocket.on('error', () => websocket.close())
return new WS2PConnection(websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub)
return new WS2PConnection(websocket, onWsOpened, onWsClosed, messageHandler, localAuth, remoteAuth, options, expectedPub, expectedWS2PUID)
}
static newConnectionFromWebSocketServer(
......@@ -319,6 +321,10 @@ export class WS2PConnection {
return this.remoteAuth.getPubkey()
}
get uuid() {
return this.expectedWS2PUID
}
get nbRequests() {
return this.nbRequestsCount
}
......
......@@ -116,8 +116,8 @@ export class WS2PServer extends events.EventEmitter {
}
})
})
await this.trimConnections()
let privilegedKeys = (this.server.conf.ws2p && this.server.conf.ws2p.privilegedNodes) ? this.server.conf.ws2p.privilegedNodes:[]
await this.removeLowPriorityConnection(privilegedKeys)
await this.server.dal.setPeerUP(c.pubkey)
......@@ -128,44 +128,26 @@ export class WS2PServer extends events.EventEmitter {
})
}
async trimConnections() {
/*** OVERFLOW TRIMMING ***/
let disconnectedOne = true
// Disconnect non-members
while (disconnectedOne && this.connections.length > this.maxLevel2Size) {
disconnectedOne = false
for (const c of this.connections) {
const isMember = await this.server.dal.isMember(c.pubkey)
if (!isMember && !disconnectedOne) {
c.close()
this.removeConnection(c)
disconnectedOne = true
}
}
}
// Disconnect members
while (this.connections.length > this.maxLevel2Size) {
async removeLowPriorityConnection(privilegedKeys:string[]) {
let lowPriorityConnection:WS2PConnection = this.connections[0]
let minPriorityLevel = this.keyPriorityLevel(lowPriorityConnection.pubkey, privilegedKeys)
for (const c of this.connections) {
c.close()
this.removeConnection(c)
}
}
/*** DUPLICATES TRIMMING ***/
disconnectedOne = true
while (disconnectedOne) {
disconnectedOne = false
const pubkeysFound = []
for (const c of this.connections) {
if (pubkeysFound.indexOf(c.pubkey) !== -1) {
c.close()
this.removeConnection(c)
disconnectedOne = true
if (c !== lowPriorityConnection) {
let cPriorityLevel = this.keyPriorityLevel(c.pubkey, privilegedKeys)
if (cPriorityLevel < minPriorityLevel) {
lowPriorityConnection = c
minPriorityLevel = cPriorityLevel
}
else if (c.pubkey !== this.server.conf.pair.pub) {
pubkeysFound.push(c.pubkey)
}
}
this.removeConnection(lowPriorityConnection)
}
keyPriorityLevel(pubkey:string, privilegedKeys:string[]) {
let priorityLevel = (this.server.dal.isMember(pubkey)) ? 1:0
priorityLevel += (privilegedKeys.indexOf(pubkey) !== -1) ? 2:0
priorityLevel += (this.server.conf.pair.pub === pubkey) ? 4:0
return priorityLevel
}
private removeConnection(c:WS2PConnection) {
......
......@@ -19,6 +19,7 @@ export const WS2PConstants = {
MAX_LEVEL_1_PEERS: 10,
MAX_LEVEL_2_PEERS: 10,
CONNECTIONS_LOW_LEVEL: 3,
MAX_PRIORITY_LEVEL: 7,
BAN_DURATION_IN_SECONDS: 120,
BAN_ON_REPEAT_THRESHOLD: 5,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment