Mise à jour de GitLab prévue ce samedi 23 octobre 2021 à partir de 9h00 CET

Commit c6123e46 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] add WS2P SYNC feature

parent 5b2dc4f6
export enum DataErrors {
WS2P_SYNC_PERIMETER_IS_LIMITED,
PEER_REJECTED,
TOO_OLD_PEER,
LOKI_DIVIDEND_GET_WRITTEN_ON_SHOULD_NOT_BE_USED,
......
......@@ -959,7 +959,7 @@ export class FileDAL {
return this.peerDAL.removePeerByPubkey(pubkey)
}
async findAllPeersNEWUPBut(pubkeys:string[]) {
async findAllPeersBut(pubkeys:string[]) {
const peers = await this.listAllPeers();
return peers.filter((peer:DBPeer) => pubkeys.indexOf(peer.pubkey) == -1
&& ['UP'].indexOf(peer.status) !== -1);
......
......@@ -98,6 +98,7 @@ export interface WS2PConfDTO {
preferedOnly: boolean
privilegedNodes?: string[]
privilegedOnly: boolean
syncLimit?: number
}
}
......@@ -178,6 +179,7 @@ export class ConfDTO implements CurrencyConfDTO, KeypairConfDTO, NetworkConfDTO,
privilegedOnly: boolean
maxPublic?:number
maxPrivate?:number
syncLimit?:number
},
public powNoSecurity = false
) {}
......
......@@ -367,7 +367,7 @@ export class BlockCrawler {
if (current) {
this.pullingEvent(server, 'start', current.number);
this.logger && this.logger.info("Pulling blocks from the network...");
let peers = await server.dal.findAllPeersNEWUPBut([server.conf.pair.pub]);
let peers = await server.dal.findAllPeersBut([server.conf.pair.pub]);
peers = Underscore.shuffle(peers);
if (pubkey) {
peers = Underscore.filter(peers, (p:any) => p.pubkey == pubkey)
......
......@@ -84,6 +84,9 @@ export class WS2PCluster {
// A cache to remember the banned keys
private banned:{ [k:string]: string } = {}
// A cache to remember the banned keys for synchronization
private banned4Sync:{ [k:string]: string } = {}
// A cache to know if a block exists or not in the DB
private blockstampsCache:{ [k:string]: number } = {}
......@@ -285,8 +288,8 @@ export class WS2PCluster {
if (this.ws2pServer) {
await this.ws2pServer.close()
}
this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, connectedPubkeys:string[]) => {
return this.acceptPubkey(pubkey, connectedPubkeys, [], () => this.servedCount(), this.maxLevel2Peers, this.privilegedNodes(), (this.server.conf.ws2p !== undefined && this.server.conf.ws2p.privilegedOnly))
this.ws2pServer = await WS2PServer.bindOn(this.server, host, port, this.fifo, (pubkey:string, isSync: boolean, syncConnectedPubkeys:string[], connectedPubkeys:string[]) => {
return this.acceptPubkey(pubkey, isSync, syncConnectedPubkeys, connectedPubkeys, [], () => this.servedCount(), this.maxLevel2Peers, this.privilegedNodes(), (this.server.conf.ws2p !== undefined && this.server.conf.ws2p.privilegedOnly))
}, this.keyPriorityLevel, this.messageHandler)
this.host = host
this.port = port
......@@ -343,10 +346,11 @@ export class WS2PCluster {
try {
const fullEndpointAddress = WS2PCluster.getFullAddress(host, port, path)
const ws2pc = await WS2PClient.connectTo(this.server, fullEndpointAddress, endpointVersion, ws2pEndpointUUID, messageHandler, expectedPub, (pub:string) => {
const syncPubkeys: string[] = [] // The connection won't be considered as a SYNC connection, so there is no check to do
const connectedPubkeys = this.getConnectedPubkeys()
const connectedWS2PUID = this.getConnectedWS2PUID()
const preferedNodes = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[]
return this.acceptPubkey(expectedPub, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID)
return this.acceptPubkey(expectedPub, false, syncPubkeys, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedNodes, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEndpointUUID)
})
this.ws2pClients[uuid] = ws2pc
pub = ws2pc.connection.pubkey
......@@ -518,10 +522,11 @@ export class WS2PCluster {
const ws2pEnpoint = peer.getOnceWS2PEndpoint(ProxiesConf.canReachTorEndpoint(this.server.conf.proxiesConf), ProxiesConf.canReachClearEndpoint(this.server.conf.proxiesConf))
if (ws2pEnpoint) {
// Check if already connected to the pubkey (in any way: server or client)
const syncPubkeys: string[] = [] // The connection won't be considered as a SYNC connection, so there is no check to do
const connectedPubkeys = this.getConnectedPubkeys()
const connectedWS2PUID = this.getConnectedWS2PUID()
const preferedKeys = (this.server.conf.ws2p && this.server.conf.ws2p.preferedNodes) ? this.server.conf.ws2p.preferedNodes:[]
const shouldAccept = await this.acceptPubkey(peer.pubkey, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedKeys, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid)
const shouldAccept = await this.acceptPubkey(peer.pubkey, false, syncPubkeys, connectedPubkeys, connectedWS2PUID, () => this.clientsCount(), this.maxLevel1Size, preferedKeys, (this.server.conf.ws2p && this.server.conf.ws2p.preferedOnly) || false, ws2pEnpoint.uuid)
if (shouldAccept && (!this.server.conf.ws2p || ws2pEnpoint.uuid !== this.server.conf.ws2p.uuid || peer.pubkey !== this.server.conf.pair.pub)) {
await this.connectToRemoteWS(ws2pEnpoint.version, ws2pEnpoint.host, ws2pEnpoint.port, ws2pEnpoint.path, this.messageHandler, peer.pubkey, ws2pEnpoint.uuid)
await this.removeLowPriorityConnections(preferedKeys)
......@@ -749,6 +754,8 @@ export class WS2PCluster {
protected async acceptPubkey(
pub:string,
isSync: boolean,
syncConnectedPubkeys:string[],
connectedPubkeys:string[],
connectedWS2PUID:string[],
getConcurrentConnexionsCount:()=>number,
......@@ -757,6 +764,25 @@ export class WS2PCluster {
priorityKeysOnly:boolean,
targetWS2PUID = ""
) {
// Sync case is specific
if (isSync) {
if (this.banned4Sync[pub]) {
return false
}
// Already connected
if (syncConnectedPubkeys.indexOf(pub) !== -1) {
return false
}
const limit = (this.server.conf.ws2p && this.server.conf.ws2p.syncLimit) || WS2PConstants.WS2P_SYNC_LIMIT
const ok = syncConnectedPubkeys.length < limit
if (ok) {
// The connection will OK: we prepare the ban right now to give room for future users
this.banSyncConnection(pub)
}
return ok
}
if (this.server.conf.pair.pub === pub) {
// We do not accept oneself connetion
if (this.server.conf.ws2p && this.server.conf.ws2p.uuid === targetWS2PUID || targetWS2PUID === '11111111') {
......@@ -838,7 +864,7 @@ export class WS2PCluster {
}
getAllConnections() {
const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions() : []
const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions().concat(this.ws2pServer.getConnexionsForSync()) : []
for (const uuid of Object.keys(this.ws2pClients)) {
all.push(this.ws2pClients[uuid].connection)
}
......@@ -955,4 +981,12 @@ export class WS2PCluster {
}
}
}
banSyncConnection(pub: string) {
this.server.logger.warn('Banning SYNC connection of %s for %ss (for room)', pub.slice(0, 8), WS2PConstants.SYNC_BAN_DURATION_IN_SECONDS)
this.banned4Sync[pub] = 'sync'
setTimeout(() => {
delete this.banned4Sync[pub]
}, 1000 * WS2PConstants.SYNC_BAN_DURATION_IN_SECONDS)
}
}
......@@ -62,12 +62,13 @@ export interface WS2PAuth {
}
export interface WS2PRemoteAuth extends WS2PAuth {
registerCONNECT(ws2pVersion:number, challenge:string, sig: string, pub: string, ws2pId:string): Promise<boolean>
registerCONNECT(type: 'CONNECT'|'SYNC', ws2pVersion:number, challenge:string, sig: string, pub: string, ws2pId:string): Promise<boolean>
sendACK(ws:any): Promise<void>
registerOK(sig: string): Promise<boolean>
isAuthenticatedByRemote(): boolean
getPubkey(): string
getVersion(): number
isSync(): boolean
}
export interface WS2PLocalAuth extends WS2PAuth {
......@@ -90,11 +91,12 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth {
protected serverAuth:Promise<void>
protected serverAuthResolve:()=>void
protected serverAuthReject:(err:any)=>void
protected isSyncConnection = false
constructor(
protected currency:string,
protected pair:Key,
protected tellIsAuthorizedPubkey:(pub: string) => Promise<boolean> = () => Promise.resolve(true)
protected tellIsAuthorizedPubkey:(pub: string, isSync: boolean) => Promise<boolean> = () => Promise.resolve(true)
) {
this.challenge = nuuid.v4() + nuuid.v4()
this.serverAuth = new Promise((resolve, reject) => {
......@@ -111,6 +113,10 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth {
return this.remotePub
}
isSync() {
return this.isSyncConnection
}
async sendACK(ws: any): Promise<void> {
const challengeMessage = `WS2P:ACK:${this.currency}:${this.pair.pub}:${this.challenge}`
Logger.log('sendACK >>> ' + challengeMessage)
......@@ -122,12 +128,13 @@ export class WS2PPubkeyRemoteAuth implements WS2PRemoteAuth {
}))
}
async registerCONNECT(ws2pVersion:number, challenge:string, sig: string, pub: string, ws2pId:string = ""): Promise<boolean> {
const allow = await this.tellIsAuthorizedPubkey(pub)
async registerCONNECT(type: 'CONNECT'|'SYNC', ws2pVersion:number, challenge:string, sig: string, pub: string, ws2pId:string = ""): Promise<boolean> {
this.isSyncConnection = type === 'SYNC'
const allow = await this.tellIsAuthorizedPubkey(pub, this.isSyncConnection)
if (!allow) {
return false
}
const challengeMessage = (ws2pVersion > 1) ? `WS2P:CONNECT:${this.currency}:${pub}:${ws2pId}:${challenge}`:`WS2P:CONNECT:${this.currency}:${pub}:${challenge}`
const challengeMessage = (ws2pVersion > 1) ? `WS2P:${type}:${this.currency}:${pub}:${ws2pId}:${challenge}`:`WS2P:${type}:${this.currency}:${pub}:${challenge}`
Logger.log('registerCONNECT >>> ' + challengeMessage)
const verified = verify(challengeMessage, sig, pub)
if (verified) {
......@@ -170,6 +177,7 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth {
protected serverAuth:Promise<void>
protected serverAuthResolve:()=>void
protected serverAuthReject:(err:any)=>void
protected isSync: boolean
constructor(
protected currency:string,
......@@ -182,15 +190,17 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth {
this.serverAuthResolve = resolve
this.serverAuthReject = reject
})
this.isSync = false
}
async sendCONNECT(ws:any, ws2pVersion:number): Promise<void> {
const connectWord = this.isSync ? 'SYNC' : 'CONNECT'
if (ws2pVersion > 1) {
const challengeMessage = `WS2P:${ws2pVersion}:CONNECT:${this.currency}:${this.pair.pub}:${this.ws2pId}:${this.challenge}`
const challengeMessage = `WS2P:${ws2pVersion}:${connectWord}:${this.currency}:${this.pair.pub}:${this.ws2pId}:${this.challenge}`
Logger.log('sendCONNECT >>> ' + challengeMessage)
const sig = this.pair.signSync(challengeMessage)
await ws.send(JSON.stringify({
auth: 'CONNECT',
auth: `${connectWord}`,
version: ws2pVersion,
pub: this.pair.pub,
ws2pid: this.ws2pId,
......@@ -199,11 +209,11 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth {
}))
return this.serverAuth
} else if (ws2pVersion == 1) {
const challengeMessage = `WS2P:CONNECT:${this.currency}:${this.pair.pub}:${this.challenge}`
const challengeMessage = `WS2P:${connectWord}:${this.currency}:${this.pair.pub}:${this.challenge}`
Logger.log('sendCONNECT >>> ' + challengeMessage)
const sig = this.pair.signSync(challengeMessage)
await ws.send(JSON.stringify({
auth: 'CONNECT',
auth: `${connectWord}`,
pub: this.pair.pub,
challenge: this.challenge,
sig
......@@ -248,6 +258,19 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth {
}
}
export class WS2PPubkeySyncLocalAuth extends WS2PPubkeyLocalAuth {
constructor(
protected currency:string,
protected pair:Key,
protected ws2pId:string,
protected tellIsAuthorizedPubkey:(pub: string) => Promise<boolean> = () => Promise.resolve(true)
) {
super(currency, pair, ws2pId, tellIsAuthorizedPubkey)
this.isSync = true
}
}
export interface WS2PRequest {
name:string,
params?:any
......@@ -377,6 +400,10 @@ export class WS2PConnection {
return this.expectedWS2PUID
}
get isSync() {
return this.remoteAuth.isSync()
}
get nbRequests() {
return this.nbRequestsCount
}
......@@ -405,7 +432,12 @@ export class WS2PConnection {
return this.ws.close()
}
async connect() {
async connectAsInitiator() {
return this.connect(true)
}
async connect(initiator = false) {
const whoIs = initiator ? 'INITIATOR' : 'SERVER'
if (!this.connectp) {
this.connectp = (async () => {
const connectionTimeout = new Promise((res, rej) => {
......@@ -448,7 +480,7 @@ export class WS2PConnection {
if (data.auth && typeof data.auth === "string") {
if (data.auth === "CONNECT") {
if (data.auth === "CONNECT" || data.auth === "SYNC") {
if (data.version) {
if (typeof data.version !== "number") {
await this.errorDetected(WS2P_ERR.AUTH_INVALID_ASK_FIELDS)
......@@ -466,7 +498,7 @@ export class WS2PConnection {
if (this.expectedPub && data.pub !== this.expectedPub) {
await this.errorDetected(WS2P_ERR.INCORRECT_PUBKEY_FOR_REMOTE)
} else {
const valid = await this.remoteAuth.registerCONNECT(this.ws2pVersion, data.challenge, data.sig, data.pub, (this.ws2pVersion > 1) ? data.ws2pID:"")
const valid = await this.remoteAuth.registerCONNECT(data.auth, this.ws2pVersion, data.challenge, data.sig, data.pub, (this.ws2pVersion > 1) ? data.ws2pID:"")
if (valid) {
await this.remoteAuth.sendACK(this.ws)
} else {
......@@ -527,7 +559,7 @@ export class WS2PConnection {
// Request message
else if (data.reqId && typeof data.reqId === "string") {
try {
const answer = await this.messageHandler.answerToRequest(data.body)
const answer = await this.messageHandler.answerToRequest(data.body, this)
this.ws.send(JSON.stringify({ resId: data.reqId, body: answer }))
} catch (e) {
this.ws.send(JSON.stringify({ resId: data.reqId, err: e }))
......@@ -567,7 +599,7 @@ export class WS2PConnection {
}
async request(body:WS2PRequest) {
await this.connect()
await this.connectAsInitiator()
const uuid = nuuid.v4()
return new Promise((resolve, reject) => {
this.nbRequestsCount++
......@@ -643,7 +675,7 @@ export class WS2PConnection {
}
async pushData(type:WS2P_PUSH, key:string, data:any) {
await this.connect()
await this.connectAsInitiator()
return new Promise((resolve, reject) => {
this.nbPushsToRemoteCount++
try {
......
......@@ -15,6 +15,8 @@ import {WS2PConnection} from "./WS2PConnection"
import {BlockDTO} from "../../../lib/dto/BlockDTO"
export enum WS2P_REQ {
KNOWN_PEERS,
PEER_DOCUMENT,
WOT_REQUIREMENTS_OF_PENDING,
BLOCKS_CHUNK,
BLOCK_BY_NUMBER,
......
......@@ -26,13 +26,14 @@ export class WS2PServer extends events.EventEmitter {
private wss:any
private connections:WS2PConnection[] = []
private synConnections:WS2PConnection[] = []
private constructor(
private server:Server,
private host:string,
private port:number,
private fifo:GlobalFifoPromise,
private shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>,
private shouldAcceptConnection:(pubkey:string, isSync: boolean, syncConnectedPubkeys:string[], connectedPubkeys:string[])=>Promise<boolean>,
public keyPriorityLevel:(pubkey:string, privilegedKeys:string[])=>Promise<number>) {
super()
}
......@@ -48,6 +49,10 @@ export class WS2PServer extends events.EventEmitter {
return this.connections.slice()
}
getConnexionsForSync() {
return this.synConnections.slice()
}
countConnexions() {
const connections = this.getConnexions()
let count = 0
......@@ -71,7 +76,7 @@ export class WS2PServer extends events.EventEmitter {
******************/
let saidPubkey:string = ""
const acceptPubkey = async (pub:string) => {
const acceptPubkey = async (pub:string, isSync: boolean) => {
if (!saidPubkey) {
saidPubkey = pub
}
......@@ -79,7 +84,7 @@ export class WS2PServer extends events.EventEmitter {
// The key must be identical
return false
}
return await this.shouldAcceptConnection(pub, this.getConnexions().map(c => c.pubkey))
return await this.shouldAcceptConnection(pub, isSync, this.getConnexionsForSync().map(c => c.pubkey), this.getConnexions().map(c => c.pubkey))
}
let timeout = {
connectionTimeout: WS2PConstants.CONNEXION_TIMEOUT,
......@@ -95,13 +100,40 @@ export class WS2PServer extends events.EventEmitter {
const c = WS2PConnection.newConnectionFromWebSocketServer(
ws,
messageHandler,
new WS2PPubkeyLocalAuth(this.server.conf.currency, key, myWs2pId, acceptPubkey),
new WS2PPubkeyLocalAuth(this.server.conf.currency, key, myWs2pId, pub => acceptPubkey(pub, false)),
new WS2PPubkeyRemoteAuth(this.server.conf.currency, key, acceptPubkey),
timeout
)
try {
await c.connect()
/**
* Sync is a particular case:
* - we remember the connection
* - we allow it to run for a limited period of time
* - we don't broadcast any data to it
* - we only allow blocks+peering fetching, any other request is forbidden and closes the connection
*/
if (c.isSync) {
// We remember it
this.synConnections.push(c)
// When the connection closes:
ws.on('close', () => {
// Remove the connection
const index = this.synConnections.indexOf(c)
if (index !== -1) {
// Remove the connection
this.synConnections.splice(index, 1)
c.close()
}
})
// We close the connection after a given delay
setTimeout(() => c.close(), WS2PConstants.SYNC_CONNECTION_DURATION_IN_SECONDS)
// We don't broadcast or pipe data
return
}
const host = ws._sender._socket._handle.owner.remoteAddress
const port = ws._sender._socket._handle.owner.remotePort
this.server.push({
......@@ -217,7 +249,7 @@ export class WS2PServer extends events.EventEmitter {
}))
}
static async bindOn(server:Server, host:string, port:number, fifo:GlobalFifoPromise, shouldAcceptConnection:(pubkey:string, connectedPubkeys:string[])=>Promise<boolean>, keyPriorityLevel:(pubkey:string, privilegedKeys:string[])=>Promise<number>, messageHandler:WS2PMessageHandler) {
static async bindOn(server:Server, host:string, port:number, fifo:GlobalFifoPromise, shouldAcceptConnection:(pubkey:string, isSync: boolean, syncConnectedPubkeys:string[], connectedPubkeys:string[])=>Promise<boolean>, keyPriorityLevel:(pubkey:string, privilegedKeys:string[])=>Promise<number>, messageHandler:WS2PMessageHandler) {
const ws2ps = new WS2PServer(server, host, port, fifo, shouldAcceptConnection, keyPriorityLevel)
await ws2ps.listenToWebSocketConnections(messageHandler)
server.logger.info('WS2P server %s listening on %s:%s', server.conf.pair.pub, host, port)
......
......@@ -12,6 +12,7 @@
// GNU Affero General Public License for more details.
import {CommonConstants} from "../../../lib/common-libs/constants"
export const WS2PConstants = {
NETWORK: {
......@@ -56,6 +57,7 @@ export const WS2PConstants = {
},
BAN_DURATION_IN_SECONDS: 120,
SYNC_BAN_DURATION_IN_SECONDS: 240,
BAN_ON_REPEAT_THRESHOLD: 5,
ERROR_RECALL_DURATION_IN_SECONDS: 60,
SINGLE_RECORD_PROTECTION_IN_SECONDS: 60,
......@@ -93,5 +95,8 @@ export const WS2PConstants = {
INITIAL_CONNECTION_PEERS_BUNDLE_SIZE: 5,
HEADS_SPREAD_TIMEOUT: 100 // Wait 100ms before sending a bunch of signed heads
HEADS_SPREAD_TIMEOUT: 100, // Wait 100ms before sending a bunch of signed heads
WS2P_SYNC_LIMIT: 15, // Number of concurrent peers for sync
SYNC_CONNECTION_DURATION_IN_SECONDS: 120, // Duration of the SYNC connection
}
\ No newline at end of file
......@@ -13,8 +13,9 @@
import {WS2PResponse} from "./WS2PResponse"
import {WS2PConnection} from "../WS2PConnection"
export interface WS2PMessageHandler {
handlePushMessage(json:any, c:WS2PConnection): Promise<void>
answerToRequest(json:any): Promise<WS2PResponse>
answerToRequest(json:any, c:WS2PConnection): Promise<WS2PResponse>
}
\ No newline at end of file
......@@ -11,11 +11,12 @@
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
import {IdentityForRequirements} from './../../../../service/BlockchainService';
import {IdentityForRequirements} from '../../../../service/BlockchainService';
import {Server} from "../../../../../server"
import {WS2PReqMapper} from "../interface/WS2PReqMapper"
import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {DBBlock} from "../../../../lib/db/DBBlock"
import {PeerDTO} from "../../../../lib/dto/PeerDTO"
export class WS2PReqMapperByServer implements WS2PReqMapper {
......@@ -68,4 +69,12 @@ export class WS2PReqMapperByServer implements WS2PReqMapper {
identities: all
}
}
async getPeer(): Promise<PeerDTO> {
return this.server.PeeringService.peer()
}
async getKnownPeers(): Promise<PeerDTO[]> {
return (await this.server.dal.findAllPeersBut([])).map(p => PeerDTO.fromDBPeer(p))
}
}
\ No newline at end of file
......@@ -13,6 +13,7 @@
import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {DBBlock} from "../../../../lib/db/DBBlock"
import {PeerDTO} from "../../../../lib/dto/PeerDTO"
export interface WS2PReqMapper {
......@@ -20,4 +21,6 @@ export interface WS2PReqMapper {
getBlock(number:number): Promise<BlockDTO>
getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]>
getRequirementsOfPending(minCert:number): Promise<any>
getPeer(): Promise<PeerDTO>
getKnownPeers(): Promise<PeerDTO[]>
}
\ No newline at end of file
<
......@@ -27,6 +27,7 @@ import {WS2PCluster} from "../WS2PCluster"
import {WS2PConnection} from "../WS2PConnection"
import {WS2PConstants} from "../constants"
import {CommonConstants} from "../../../../lib/common-libs/constants"
import {DataErrors} from "../../../../lib/common-libs/errors"
export enum WS2P_REQERROR {
UNKNOWN_REQUEST
......@@ -49,6 +50,13 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
}
async handlePushMessage(json: any, c:WS2PConnection): Promise<void> {
if (c.isSync) {
// Push messages are forbidden on sync connection
c.close()
return
}
let documentHash = ''
try {
if (json.body) {
......@@ -144,7 +152,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
}
}
async answerToRequest(data: any): Promise<WS2PResponse> {
async answerToRequest(data: any, c:WS2PConnection): Promise<WS2PResponse> {
/**********
* REQUEST
......@@ -152,11 +160,20 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
let body:any = {}
const forbiddenRequestsForSync: string[] = [] // For now, no WS2P requests are forbidden
if (c.isSync && (!data || !data.name || forbiddenRequestsForSync.indexOf(data.name) !== -1)) {
// Some messages are forbidden on sync connection
c.close()
throw Error(DataErrors[DataErrors.WS2P_SYNC_PERIMETER_IS_LIMITED])
}
if (data && data.name) {
switch (data.name) {
case WS2P_REQ[WS2P_REQ.CURRENT]:
body = await this.mapper.getCurrent()
break;
case WS2P_REQ[WS2P_REQ.BLOCK_BY_NUMBER]:
if (isNaN(data.params.number)) {
throw "Wrong param `number`"
......@@ -164,6 +181,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
const number:number = data.params.number
body = await this.mapper.getBlock(number)
break;
case WS2P_REQ[WS2P_REQ.BLOCKS_CHUNK]:
if (isNaN(data.params.count)) {
throw "Wrong param `count`"
......@@ -175,6 +193,7 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
const fromNumber:number = data.params.fromNumber
body = await this.mapper.getBlocks(count, fromNumber)
break;
case WS2P_REQ[WS2P_REQ.WOT_REQUIREMENTS_OF_PENDING]:
if (isNaN(data.params.minCert)) {