Unverified Commit 4a173cfc authored by Éloïs's avatar Éloïs
Browse files

[enh] prepare the management of several ws2p versions + continuation of...

[enh] prepare the management of several ws2p versions + continuation of refactoring ws2p connetions priority
parent 1d251881
......@@ -33,7 +33,9 @@ const CONDITIONS = "(&&|\\|\\|| |[()]|(SIG\\(" + PUBKEY + "\\)|(XHX\\([A-F0-9]
const BMA_REGEXP = /^BASIC_MERKLED_API( ([a-z_][a-z0-9-_.]*))?( ([0-9.]+))?( ([0-9a-f:]+))?( ([0-9]+))$/
const BMATOR_REGEXP = /^BMATOR( ([a-z0-9]{16})\.onion)( ([0-9.]+))?( ([0-9a-f:]+))?( ([0-9]+))$/
const WS2P_REGEXP = /^WS2P ([a-f0-9]{8}) ([a-z_][a-z0-9-_.]*|[0-9.]+|[0-9a-f:]+) ([0-9]+)(?: (.+))?$/
const WS2P_V2_REGEXP = /^WS2P ([0-9]+) ([a-f0-9]{8}) ([a-z_][a-z0-9-_.]*|[0-9.]+|[0-9a-f:]+) ([0-9]+)(?: (.+))?$/
const WS2PTOR_REGEXP = /^WS2PTOR ([a-f0-9]{8}) ([a-z0-9-_.]*|[0-9.]+|[0-9a-f:]+.onion) ([0-9]+)(?: (.+))?$/
const WS2PTOR_V2_REGEXP = /^WS2PTOR ([0-9]+) ([a-f0-9]{8}) ([a-z0-9-_.]*|[0-9.]+|[0-9a-f:]+.onion) ([0-9]+)(?: (.+))?$/
const WS_FULL_ADDRESS_ONION_REGEX = /^(?:wss?:\/\/)(?:www\.)?([0-9a-z]{16}\.onion)(:[0-9]+)?$/
const IPV4_REGEXP = /^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$/;
const IPV6_REGEXP = /^((([0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){6}:[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){5}:([0-9A-Fa-f]{1,4}:)?[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){4}:([0-9A-Fa-f]{1,4}:){0,2}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){3}:([0-9A-Fa-f]{1,4}:){0,3}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){2}:([0-9A-Fa-f]{1,4}:){0,4}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){6}((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|(([0-9A-Fa-f]{1,4}:){0,5}:((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|(::([0-9A-Fa-f]{1,4}:){0,5}((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|([0-9A-Fa-f]{1,4}::([0-9A-Fa-f]{1,4}:){0,5}[0-9A-Fa-f]{1,4})|(::([0-9A-Fa-f]{1,4}:){0,6}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){1,7}:))$/;
......@@ -95,7 +97,9 @@ export const CommonConstants = {
BMA_REGEXP,
BMATOR_REGEXP,
WS2P_REGEXP,
WS2P_V2_REGEXP,
WS2PTOR_REGEXP,
WS2PTOR_V2_REGEXP,
WS_FULL_ADDRESS_ONION_REGEX,
IPV4_REGEXP,
IPV6_REGEXP,
......
......@@ -2,6 +2,7 @@ import {DBPeer} from "../dal/sqliteDAL/PeerDAL"
import {hashf} from "../common"
import {CommonConstants} from "../common-libs/constants"
import {Cloneable} from "./Cloneable"
import { WS2PConstants } from '../../modules/ws2p/lib/constants';
export interface WS2PEndpoint {
uuid:string
......@@ -94,29 +95,59 @@ export class PeerDTO implements Cloneable {
return bma || {};
}
getWS2P(canReachTorEp:boolean, canReachClearEp:boolean) {
let api:{ uuid:string, host:string, port:number, path:string }|null = null
const endpointRegexp = (canReachTorEp) ? CommonConstants.WS2PTOR_REGEXP:CommonConstants.WS2P_REGEXP
getOnceWS2PEndpoint(canReachTorEp:boolean, canReachClearEp:boolean, uuidExcluded:string[] = []) {
let api:{ version:number, uuid:string, host:string, port:number, path:string }|null = null
let bestWS2PVersionAvailable:number = 0
let bestWS2PTORVersionAvailable:number = 0
for (const ep of this.endpoints) {
if (canReachTorEp) {
const matches:any = ep.match(CommonConstants.WS2PTOR_REGEXP)
if (matches) {
return {
uuid: matches[1],
host: matches[2] || '',
port: parseInt(matches[3]) || 0,
path: matches[4]
let matches:any = ep.match(CommonConstants.WS2PTOR_V2_REGEXP)
if (matches && parseInt(matches[1]) > bestWS2PTORVersionAvailable && (uuidExcluded.indexOf(matches[2]) === -1)) {
bestWS2PTORVersionAvailable = matches[1]
api = {
version: parseInt(matches[1]),
uuid: matches[2],
host: matches[3] || '',
port: parseInt(matches[4]) || 0,
path: matches[5]
}
} else {
matches = ep.match(CommonConstants.WS2PTOR_REGEXP)
if (matches && bestWS2PTORVersionAvailable == 0 && (uuidExcluded.indexOf(matches[1]) === -1)) {
bestWS2PTORVersionAvailable = 1
api = {
version: 1,
uuid: matches[1],
host: matches[2] || '',
port: parseInt(matches[3]) || 0,
path: matches[4]
}
}
}
}
if (canReachClearEp) {
const matches:any = !api && ep.match(CommonConstants.WS2P_REGEXP)
if (matches) {
// If can reach clear endpoint and not found tor endpoint
if (canReachClearEp && bestWS2PTORVersionAvailable == 0) {
let matches:any = ep.match(CommonConstants.WS2P_V2_REGEXP)
if (matches && parseInt(matches[1]) > bestWS2PVersionAvailable && (uuidExcluded.indexOf(matches[2]) === -1)) {
bestWS2PVersionAvailable = parseInt(matches[1])
api = {
uuid: matches[1],
host: matches[2] || '',
port: parseInt(matches[3]) || 0,
path: matches[4]
version: parseInt(matches[1]),
uuid: matches[2],
host: matches[3] || '',
port: parseInt(matches[4]) || 0,
path: matches[5]
}
} else {
matches = ep.match(CommonConstants.WS2P_REGEXP)
if (matches && bestWS2PVersionAvailable == 0 && (uuidExcluded.indexOf(matches[1]) === -1)) {
bestWS2PVersionAvailable = 1
api = {
version: 1,
uuid: matches[1],
host: matches[2] || '',
port: parseInt(matches[3]) || 0,
path: matches[4]
}
}
}
}
......@@ -124,6 +155,18 @@ export class PeerDTO implements Cloneable {
return api || null
}
getAllWS2PEndpoints(canReachTorEp:boolean, canReachClearEp:boolean, myUUID:string) {
let apis:{ uuid:string, host:string, port:number, path:string }[] = []
let uuidExcluded:string[] = [myUUID]
let api = this.getOnceWS2PEndpoint(canReachTorEp, canReachClearEp, uuidExcluded)
while (api !== null) {
uuidExcluded.push(api.uuid)
apis.push(api)
api = this.getOnceWS2PEndpoint(canReachTorEp, canReachClearEp, uuidExcluded)
}
return apis
}
getDns() {
const bma = this.getBMA();
return bma.dns ? bma.dns : null;
......
......@@ -17,6 +17,7 @@ import { CommonConstants } from '../../../lib/common-libs/constants';
import { Package } from "../../../lib/common/package";
import { Constants } from "../../prover/lib/constants";
import { ProxiesConf } from '../../../lib/proxy';
import { Keypair } from '../../../lib/dto/ConfDTO';
const es = require('event-stream')
const nuuid = require('node-uuid')
......@@ -268,7 +269,7 @@ export class WS2PCluster {
this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => {
const privilegedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.privilegedNodes) ? this.server.conf.ws2p.privilegedNodes:[]
return this.acceptPubkey(pubkey, connectedPubkeys, [], () => this.servedCount(), this.maxLevel2Peers, privilegedNodes, (this.server.conf.ws2p !== undefined && this.server.conf.ws2p.privilegedOnly))
}, this.messageHandler)
}, this.keyPriorityLevel, this.messageHandler)
this.host = host
this.port = port
return this.ws2pServer
......@@ -330,13 +331,20 @@ export class WS2PCluster {
}
async connectToWS2Peers() {
const myUUID = (this.server.conf.ws2p && this.server.conf.ws2p.uuid) ? this.server.conf.ws2p.uuid:""
const potentials = await this.server.dal.getWS2Peers()
const peers:PeerDTO[] = potentials.map((p:any) => PeerDTO.fromJSONObject(p))
const prefered = ((this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) || []).slice() // Copy
// Our key is also a prefered one, so we connect to our siblings
prefered.push(this.server.conf.pair.pub)
const canReachTorEndpoint = ProxiesConf.canReachTorEndpoint(this.server.conf.proxiesConf)
peers.sort((a, b) => {
// Top priority at our own nodes
if (a.pubkey === this.server.conf.pair.pub && b.pubkey !== this.server.conf.pair.pub) {
return -1
} else if (a.pubkey !== this.server.conf.pair.pub && b.pubkey === this.server.conf.pair.pub) {
return 1
}
const aIsPrefered = prefered.indexOf(a.pubkey) !== -1
const bIsPrefered = prefered.indexOf(b.pubkey) !== -1
......@@ -373,20 +381,34 @@ export class WS2PCluster {
const canReachClearEndpoint = ProxiesConf.canReachClearEndpoint(this.server.conf.proxiesConf)
while (i < peers.length && this.clientsCount() < this.maxLevel1Size) {
const p = peers[i]
const api = p.getWS2P(canReachTorEndpoint, canReachClearEndpoint)
if (api) {
try {
// We do not connect to local host
if (!this.server.conf.ws2p || api.uuid !== this.server.conf.ws2p.uuid || p.pubkey !== this.server.conf.pair.pub || api.uuid === '11111111') {
await this.connectToRemoteWS(api.host, api.port, api.path, this.messageHandler, p.pubkey, api.uuid)
if (p.pubkey === this.server.conf.pair.pub) {
const apis = p.getAllWS2PEndpoints(canReachTorEndpoint, canReachClearEndpoint, myUUID)
for (const api of apis) {
try {
// We do not connect to local host
if (api.uuid !== myUUID || api.uuid === '11111111') {
await this.connectToRemoteWS(api.host, api.port, api.path, this.messageHandler, p.pubkey, api.uuid)
}
} catch (e) {
this.server.logger.debug('WS2P: init: failed connection')
}
}
} else {
const api = p.getOnceWS2PEndpoint(canReachTorEndpoint, canReachClearEndpoint)
if (api) {
try {
// We do not connect to local host
if (api.uuid !== myUUID || api.uuid === '11111111') {
await this.connectToRemoteWS(api.host, api.port, api.path, this.messageHandler, p.pubkey, api.uuid)
}
} catch (e) {
this.server.logger.debug('WS2P: init: failed connection')
}
} catch (e) {
this.server.logger.debug('WS2P: init: failed connection')
}
}
i++
// Trim the eventual extra connections
setTimeout(() => this.trimClientConnections(), WS2PConstants.CONNEXION_TIMEOUT)
setTimeout(() => this.trimClientConnections(prefered), WS2PConstants.CONNEXION_TIMEOUT)
}
}
......@@ -400,16 +422,16 @@ export class WS2PCluster {
// New peer
if (data.endpoints) {
const peer = PeerDTO.fromJSONObject(data)
const ws2pEnpoint = peer.getWS2P(ProxiesConf.canReachTorEndpoint(this.server.conf.proxiesConf), ProxiesConf.canReachClearEndpoint(this.server.conf.proxiesConf))
const ws2pEnpoint = peer.getOnceWS2PEndpoint(ProxiesConf.canReachTorEndpoint(this.server.conf.proxiesConf), ProxiesConf.canReachClearEndpoint(this.server.conf.proxiesConf))
if (ws2pEnpoint) {
// Check if already connected to the pubkey (in any way: server or client)
const connectedPubkeys = this.getConnectedPubkeys()
const connectedWS2PUID = this.getConnectedWS2PUID()
const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[]
const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid)
const preferedKeys = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[]
const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedKeys, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid)
if (shouldAccept && (!this.server.conf.ws2p || ws2pEnpoint.uuid !== this.server.conf.ws2p.uuid || peer.pubkey !== this.server.conf.pair.pub || ws2pEnpoint.uuid === '11111111')) {
await this.connectToRemoteWS(ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey, ws2pEnpoint.uuid)
await this.trimClientConnections()
await this.trimClientConnections(preferedKeys)
}
}
}
......@@ -480,7 +502,60 @@ export class WS2PCluster {
return { sig, message, pub }
}
async trimClientConnections() {
async removeLowPriorityConnections(privilegedKeys:string[]) {
let serverPubkeys:string[] = []
if (this.ws2pServer) {
serverPubkeys = this.ws2pServer.getConnexions().map(c => c.pubkey)
}
let disconnectedOne = true
// Disconnect Private connexions already present under Public
while (disconnectedOne) {
disconnectedOne = false
let uuids = Object.keys(this.ws2pClients)
uuids = _.shuffle(uuids)
for (const uuid of uuids) {
const client = this.ws2pClients[uuid]
const pub = client.connection.pubkey
const isNotOurself = pub !== this.server.conf.pair.pub
const isAlreadyInPublic = serverPubkeys.indexOf(pub) !== -1
if (isNotOurself && isAlreadyInPublic) {
client.connection.close()
await client.connection.closed
disconnectedOne = true
if (this.ws2pClients[uuid]) {
delete this.ws2pClients[uuid]
}
}
}
}
// Disconnect Private connexions until the maximum size is respected
while (disconnectedOne && this.clientsCount() > this.maxLevel1Size) {
let uuids = Object.keys(this.ws2pClients)
uuids = _.shuffle(uuids)
let lowPriorityConnectionUUID:string = uuids[0]
let minPriorityLevel = this.keyPriorityLevel(this.ws2pClients[lowPriorityConnectionUUID].connection.pubkey, privilegedKeys)
for (const uuid of uuids) {
const client = this.ws2pClients[uuid]
if (uuid !== lowPriorityConnectionUUID) {
let uuidPriorityLevel = this.keyPriorityLevel(client.connection.pubkey, privilegedKeys)
if (uuidPriorityLevel < minPriorityLevel) {
lowPriorityConnectionUUID = uuid
minPriorityLevel = uuidPriorityLevel
}
}
delete this.ws2pClients[lowPriorityConnectionUUID]
}
}
}
keyPriorityLevel(pubkey:string, preferedOrPrivilegedKeys:string[]) {
let priorityLevel = (this.server.dal.isMember(pubkey)) ? WS2PConstants.CONNECTIONS_PRIORITY.MEMBER_KEY_LEVEL:0
priorityLevel += (preferedOrPrivilegedKeys.indexOf(pubkey) !== -1) ? WS2PConstants.CONNECTIONS_PRIORITY.PREFERED_PRIVILEGED_KEY_LEVEL:0
priorityLevel += (this.server.conf.pair.pub === pubkey) ? WS2PConstants.CONNECTIONS_PRIORITY.SELF_KEY_LEVEL:0
return priorityLevel
}
async trimClientConnections(preferedKeys:string[]) {
let serverPubkeys:string[] = []
if (this.ws2pServer) {
serverPubkeys = this.ws2pServer.getConnexions().map(c => c.pubkey)
......@@ -587,9 +662,19 @@ export class WS2PCluster {
return false
}
// We do not accept oneself connetion
if (this.server.conf.pair.pub === pub && this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID) {
return false
if (this.server.conf.pair.pub === pub) {
// We do not accept oneself connetion
if (this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID) {
return false
} else {
// We always accept self nodes, and they have a supreme priority (these are siblings)
if (targetWS2PUID === "" || this.isNewSiblingNode(pub, targetWS2PUID, connectedWS2PUID) ) {
return true
} else {
// We are already connected to this self node (same WS2PUID)
return false
}
}
}
// We do not accept banned keys
......@@ -613,17 +698,10 @@ export class WS2PCluster {
if (getConcurrentConnexionsCount() < maxConcurrentConnexionsSize) {
// Yes: just connect to it
return true
} else if (this.server.conf.pair.pub === pub) {
// We always accept self nodes, and they have a supreme priority (these are siblings)
if (targetWS2PUID !== "") {
if (this.isNewSiblingNode(pub, targetWS2PUID, connectedWS2PUID)) {
return true
}
}
}
else if (connectedPubkeys.indexOf(pub) === -1)
{
let minPriorityLevel = WS2PConstants.MAX_PRIORITY_LEVEL
let minPriorityLevel = WS2PConstants.CONNECTIONS_PRIORITY.MAX_PRIORITY_LEVEL
for (const connectedPubkey of connectedPubkeys) {
let connectedPubkeyPriorityLevel = this.ws2pServer.keyPriorityLevel(connectedPubkey, priorityKeys)
if (connectedPubkeyPriorityLevel < minPriorityLevel) {
......
......@@ -7,6 +7,7 @@ import {WS2PConstants} from "./constants"
import {WS2PMessageHandler} from "./impl/WS2PMessageHandler"
import {WS2PStreamer} from "./WS2PStreamer"
import {WS2PSingleWriteStream} from "./WS2PSingleWriteStream"
import { WS2PCluster } from './WS2PCluster';
const WebSocketServer = require('ws').Server
......@@ -21,7 +22,8 @@ export class WS2PServer extends events.EventEmitter {
private host:string,
private port:number,
private fifo:GlobalFifoPromise,
private shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>) {
private shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>,
public keyPriorityLevel:(pubkey:string, privilegedKeys:string[])=>number) {
super()
// Conf: max public connections
if (this.server.conf.ws2p && this.server.conf.ws2p.maxPublic !== undefined) {
......@@ -143,13 +145,6 @@ export class WS2PServer extends events.EventEmitter {
this.removeConnection(lowPriorityConnection)
}
keyPriorityLevel(pubkey:string, privilegedKeys:string[]) {
let priorityLevel = (this.server.dal.isMember(pubkey)) ? 1:0
priorityLevel += (privilegedKeys.indexOf(pubkey) !== -1) ? 2:0
priorityLevel += (this.server.conf.pair.pub === pubkey) ? 4:0
return priorityLevel
}
private removeConnection(c:WS2PConnection) {
const index = this.connections.indexOf(c)
if (index !== -1) {
......@@ -183,8 +178,8 @@ export class WS2PServer extends events.EventEmitter {
}))
}
static async bindOn(server:Server, host:string, port:number, fifo:GlobalFifoPromise, shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>, messageHandler:WS2PMessageHandler) {
const ws2ps = new WS2PServer(server, host, port, fifo, shouldAcceptConnection)
static async bindOn(server:Server, host:string, port:number, fifo:GlobalFifoPromise, shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>, keyPriorityLevel:(pubkey:string, privilegedKeys:string[])=>number, messageHandler:WS2PMessageHandler) {
const ws2ps = new WS2PServer(server, host, port, fifo, shouldAcceptConnection, keyPriorityLevel)
await ws2ps.listenToWebSocketConnections(messageHandler)
server.logger.info('WS2P server %s listening on %s:%s', server.conf.pair.pub, host, port)
return ws2ps
......
import {CommonConstants} from "../../../lib/common-libs/constants"
export const WS2PConstants = {
WS2P_VERSION: 2,
WS2P_UPNP_TTL: 600,
WS2P_PORTS_START: 20900,
WS2P_PORTS_END: 20999,
......@@ -19,7 +21,13 @@ export const WS2PConstants = {
MAX_LEVEL_1_PEERS: 10,
MAX_LEVEL_2_PEERS: 10,
CONNECTIONS_LOW_LEVEL: 3,
MAX_PRIORITY_LEVEL: 7,
CONNECTIONS_PRIORITY: {
MEMBER_KEY_LEVEL: 1,
PREFERED_PRIVILEGED_KEY_LEVEL: 2,
SELF_KEY_LEVEL: 4,
MAX_PRIORITY_LEVEL: 7,
},
BAN_DURATION_IN_SECONDS: 120,
BAN_ON_REPEAT_THRESHOLD: 5,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment