Skip to content
Snippets Groups Projects
Commit 199d94b8 authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] #1084 WS2P: generate WS2P endpoint

parent 02e39f5d
Branches
Tags
No related merge requests found
Showing
with 118 additions and 55 deletions
......@@ -139,6 +139,15 @@ export class PeerDTO implements Cloneable {
return this.endpoints.reduce((found:boolean, endpoint:string) => found || endpoint == ep, false)
}
containsAllEndpoints(endpoints:string[]) {
for (const ep of endpoints) {
if (!this.containsEndpoint(ep)) {
return false
}
}
return true
}
endpointSum() {
return this.endpoints.join('_')
}
......@@ -189,4 +198,14 @@ export class PeerDTO implements Cloneable {
static endpoint2host(endpoint:string) {
return PeerDTO.fromJSONObject({ endpoints: [endpoint] }).getURL()
}
static indexOfFirst(endpoints:string[], intoEndpoints:string[]) {
for (let i = 0; i < intoEndpoints.length; i++) {
const index = endpoints.indexOf(intoEndpoints[i])
if (index !== -1) {
return index
}
}
return 0
}
}
\ No newline at end of file
......@@ -95,7 +95,7 @@ export class RouterStream extends stream.Transform {
members = RouterStream.chooseXin(members, isSelfDocument ? constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_MEMBERS_TO_FORWARD_TO);
nonmembers = RouterStream.chooseXin(nonmembers, isSelfDocument ? constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO_FOR_SELF_DOCUMENTS : constants.NETWORK.MAX_NON_MEMBERS_TO_FORWARD_TO);
let mainRoutes:any = members.map((p:any) => (p.member = true) && p).concat(nonmembers);
let mirrors = await this.peeringService.mirrorEndpoints();
let mirrors = await this.peeringService.mirrorBMAEndpoints();
const peersToRoute:DBPeer[] = mainRoutes.concat(mirrors.map((mep, index) => { return {
pubkey: 'M' + index + '_' + this.peeringService.pubkey,
endpoints: [mep]
......
......@@ -138,7 +138,7 @@ export const BmaDependency = {
service: {
input: (server:Server, conf:NetworkConfDTO, logger:any) => {
server.getMainEndpoint = () => Promise.resolve(getEndpoint(conf))
server.addEndpointsDefinitions(() => Promise.resolve(getEndpoint(conf)))
return new BMAPI(server, conf, logger)
}
},
......
......@@ -108,7 +108,7 @@ export const CrawlerDependency = {
logger.info('Applied');
let selfPeer = await server.dal.getPeer(server.PeeringService.pubkey);
if (!selfPeer) {
await server.PeeringService.generateSelfPeer(server.conf, 0)
await server.PeeringService.generateSelfPeer(server.conf)
selfPeer = await server.dal.getPeer(server.PeeringService.pubkey);
}
logger.info('Send self peering ...');
......
......@@ -46,7 +46,9 @@ export const WS2PDependency = {
service: {
input: (server:Server, conf:WS2PConfDTO, logger:any) => {
return new WS2PAPI(server, conf, logger)
const api = new WS2PAPI(server, conf, logger)
server.addEndpointsDefinitions(() => api.getEndpoint())
return api
}
}
}
......@@ -56,7 +58,7 @@ export class WS2PAPI extends stream.Transform {
// Public http interface
private cluster:WS2PCluster
private upnpAPI:WS2PUpnp
private upnpAPI:WS2PUpnp|null
constructor(
private server:Server,
......@@ -89,6 +91,7 @@ export class WS2PAPI extends stream.Transform {
this.upnpAPI = await new WS2PUpnp(this.logger)
const { host, port } = await this.upnpAPI.startRegular()
await this.cluster.listen(host, port)
await this.server.PeeringService.generateSelfPeer(this.server.conf)
} catch (e) {
this.logger.warn(e);
}
......@@ -103,4 +106,8 @@ export class WS2PAPI extends stream.Transform {
this.upnpAPI.stopRegular();
}
}
async getEndpoint() {
return this.upnpAPI ? this.upnpAPI.getRemoteEndpoint() : ''
}
}
\ No newline at end of file
......@@ -13,6 +13,8 @@ export class WS2PCluster {
private ws2pServer:WS2PServer|null = null
private ws2pClients:{[k:string]:WS2PClient} = {}
private host:string|null = null
private port:number|null = null
constructor(private server:Server) {}
......@@ -21,6 +23,8 @@ export class WS2PCluster {
await this.ws2pServer.close()
}
this.ws2pServer = await WS2PServer.bindOn(this.server, host, port)
this.host = host
this.port = port
return this.ws2pServer
}
......
......@@ -2,8 +2,15 @@ import {WS2PConstants} from "./constants"
const upnp = require('nnupnp');
const Q = require('q');
interface UPnPBinding {
remotehost:string
host:string
port:number
}
export class WS2PUpnp {
private currentConfig:UPnPBinding|null
private interval:NodeJS.Timer|null
private client = upnp.createClient()
......@@ -20,10 +27,14 @@ export class WS2PUpnp {
}
}
async getRemoteEndpoint() {
return !this.currentConfig ? '' : ['WS2P', this.currentConfig.remotehost, this.currentConfig.port].join(' ')
}
openPort() {
return Q.Promise(async (resolve:any, reject:any) => {
const upnpBinding = await WS2PUpnp.getAvailablePort(this.client)
this.logger.trace('WS2P: mapping external port %s to local %s using UPnP...', upnpBinding.host, upnpBinding.port)
this.logger.trace('WS2P: mapping external port %s to local %s using UPnP...', upnpBinding.port, [upnpBinding.host, upnpBinding.port].join(':'))
const client = upnp.createClient()
client.portMapping({
'public': upnpBinding.port,
......@@ -35,6 +46,7 @@ export class WS2PUpnp {
this.logger.warn(err)
return reject(err)
}
this.currentConfig = upnpBinding
resolve(upnpBinding)
})
})
......@@ -56,7 +68,7 @@ export class WS2PUpnp {
}
static async getLocalIP(client:any) {
return await new Promise((resolve:any, reject:any) => {
return await new Promise<string>((resolve:any, reject:any) => {
client.findGateway((err:any, res:any, localIP:any) => {
if (err) return reject(err)
resolve(localIP)
......@@ -64,8 +76,18 @@ export class WS2PUpnp {
})
}
static async getRemoteIP(client:any): Promise<string> {
return await new Promise<string>((resolve:any, reject:any) => {
client.externalIp((err:any, externalIP:string) => {
if (err) return reject(err)
resolve(externalIP)
})
})
}
static async getAvailablePort(client:any) {
const localIP = await WS2PUpnp.getLocalIP(client)
const remoteIP = await WS2PUpnp.getRemoteIP(client)
const mappings:{
private: {
host:string
......@@ -89,6 +111,7 @@ export class WS2PUpnp {
throw "No port available for UPnP"
}
return {
remotehost: remoteIP,
host: localIP,
port: availablePort
}
......
......@@ -52,14 +52,15 @@ export class PeeringService {
}
let thePeer = this.peerInstance;
if (!thePeer) {
thePeer = await this.generateSelfPeer(this.conf, 0)
thePeer = await this.generateSelfPeer(this.conf)
}
return PeerDTO.fromJSONObject(thePeer)
}
async mirrorEndpoints() {
let localPeer = await this.peer();
return this.getOtherEndpoints(localPeer.endpoints, this.conf);
async mirrorBMAEndpoints() {
const localPeer = await this.peer();
const localEndpoints = await this.server.getEndpoints()
return this.getOtherEndpoints(localPeer.endpoints, localEndpoints).filter((ep) => ep.match(/^BASIC_MERKLED_API/))
}
checkPeerSignature(p:PeerDTO) {
......@@ -149,11 +150,11 @@ export class PeeringService {
this.logger.info('✔ PEER %s', peering.pubkey.substr(0, 8))
let savedPeer = PeerDTO.fromJSONObject(peerEntity).toDBPeer()
if (peerEntity.pubkey == this.selfPubkey) {
const localEndpoint = await this.server.getMainEndpoint(this.conf);
const localNodeNotListed = !peerEntityOld.containsEndpoint(localEndpoint);
const localEndpoints = await this.server.getEndpoints()
const localNodeNotListed = !peerEntityOld.containsAllEndpoints(localEndpoints)
const current = localNodeNotListed && (await this.dal.getCurrentBlockOrNull());
if (!localNodeNotListed) {
const indexOfThisNode = peerEntity.endpoints.indexOf(localEndpoint);
const indexOfThisNode = PeerDTO.indexOfFirst(localEndpoints, peerEntity.endpoints)
if (indexOfThisNode !== -1) {
this.server.push({
nodeIndexInPeers: indexOfThisNode
......@@ -164,7 +165,7 @@ export class PeeringService {
}
if (localNodeNotListed && (!current || current.number > blockNumber)) {
// Document with pubkey of local peer, but doesn't contain local interface: we must add it
this.generateSelfPeer(this.conf, 0);
this.generateSelfPeer(this.conf);
} else {
this.peerInstance = peerEntity;
}
......@@ -182,7 +183,7 @@ export class PeeringService {
return this.server.writePeer(pretendedNewer)
}
async generateSelfPeer(theConf:ConfDTO, signalTimeInterval:number) {
async generateSelfPeer(theConf:ConfDTO, signalTimeInterval = 0) {
const current = await this.server.dal.getCurrentBlockOrNull();
const currency = theConf.currency || constants.DEFAULT_CURRENCY_NAME;
const peers = await this.dal.findPeers(this.selfPubkey);
......@@ -195,8 +196,8 @@ export class PeeringService {
if (peers.length != 0 && peers[0]) {
p1 = _(peers[0]).extend({version: constants.DOCUMENTS_VERSION, currency: currency});
}
let endpoint = await this.server.getMainEndpoint(theConf);
let otherPotentialEndpoints = this.getOtherEndpoints(p1.endpoints, theConf);
const localEndpoints = await this.server.getEndpoints()
const otherPotentialEndpoints = this.getOtherEndpoints(p1.endpoints, localEndpoints)
logger.info('Sibling endpoints:', otherPotentialEndpoints);
let reals = await Promise.all(otherPotentialEndpoints.map(async (theEndpoint:string) => {
let real = true;
......@@ -220,7 +221,7 @@ export class PeeringService {
return real;
}))
let toConserve = otherPotentialEndpoints.filter((ep, i) => reals[i]);
if (!currency || endpoint == 'BASIC_MERKLED_API') {
if (!currency) {
logger.error('It seems there is an issue with your configuration.');
logger.error('Please restart your node with:');
logger.error('$ duniter restart');
......@@ -240,7 +241,7 @@ export class PeeringService {
currency: currency,
pubkey: this.selfPubkey,
block: targetBlock ? [targetBlock.number, targetBlock.hash].join('-') : constants.PEER.SPECIAL_BLOCK,
endpoints: _.uniq([endpoint].concat(toConserve).concat(this.conf.endpoints || []))
endpoints: _.uniq(localEndpoints.concat(toConserve).concat(this.conf.endpoints || []))
};
const raw2 = dos2unix(PeerDTO.fromJSONObject(p2).getRaw());
logger.info('External access:', PeerDTO.fromJSONObject(p2).getURL())
......@@ -259,16 +260,14 @@ export class PeeringService {
// Set peer's statut to UP
await this.peer(selfPeer);
this.server.streamPush(selfPeer);
logger.info("Next peering signal in %s min", signalTimeInterval / 1000 / 60);
if (signalTimeInterval) {
logger.info("Next peering signal in %s min", signalTimeInterval / 1000 / 60)
}
return selfPeer;
}
private getOtherEndpoints(endpoints:string[], theConf:ConfDTO) {
return endpoints.filter((ep) => {
return !ep.match(constants.BMA_REGEXP) || (
!(ep.includes(' ' + theConf.remoteport) && (
ep.includes(theConf.remotehost || '') || ep.includes(theConf.remoteipv6 || '') || ep.includes(theConf.remoteipv4 || ''))));
});
private getOtherEndpoints(endpoints:string[], localEndpoints:string[]) {
return endpoints.filter((ep) => localEndpoints.indexOf(ep) === -1)
}
}
......
......@@ -3,7 +3,7 @@ import {MembershipService} from "./app/service/MembershipService"
import {PeeringService} from "./app/service/PeeringService"
import {BlockchainService} from "./app/service/BlockchainService"
import {TransactionService} from "./app/service/TransactionsService"
import {ConfDTO, NetworkConfDTO} from "./app/lib/dto/ConfDTO"
import {ConfDTO} from "./app/lib/dto/ConfDTO"
import {FileDAL} from "./app/lib/dal/fileDAL"
import {DuniterBlockchain} from "./app/lib/blockchain/DuniterBlockchain"
import {SQLBlockchain} from "./app/lib/blockchain/SqlBlockchain"
......@@ -24,7 +24,6 @@ import {PeerDTO} from "./app/lib/dto/PeerDTO"
import {OtherConstants} from "./app/lib/other_constants"
export interface HookableServer {
getMainEndpoint: (...args:any[]) => Promise<any>
generatorGetJoinData: (...args:any[]) => Promise<any>
generatorComputeNewCerts: (...args:any[]) => Promise<any>
generatorNewCertsToLinks: (...args:any[]) => Promise<any>
......@@ -47,6 +46,7 @@ const logger = require('./app/lib/logger').NewLogger('server');
export class Server extends stream.Duplex implements HookableServer {
private paramsP:Promise<any>|null
private endpointsDefinitions:(()=>Promise<string>)[] = []
conf:ConfDTO
dal:FileDAL
......@@ -316,7 +316,7 @@ export class Server extends stream.Duplex implements HookableServer {
}
recomputeSelfPeer() {
return this.PeeringService.generateSelfPeer(this.conf, 0)
return this.PeeringService.generateSelfPeer(this.conf)
}
getCountOfSelfMadePoW() {
......@@ -565,17 +565,19 @@ export class Server extends stream.Duplex implements HookableServer {
return this.dal.getLogContent(linesQuantity)
}
addEndpointsDefinitions(definition:()=>Promise<string>) {
this.endpointsDefinitions.push(definition)
}
async getEndpoints() {
const endpoints = await Promise.all(this.endpointsDefinitions.map(d => d()))
return endpoints.filter(ep => !!ep)
}
/*****************
* MODULES PLUGS
****************/
/**
* Default endpoint. To be overriden by a module to specify another endpoint value (for ex. BMA).
*/
getMainEndpoint(conf:NetworkConfDTO): Promise<any> {
return Promise.resolve('DEFAULT_ENDPOINT')
}
/**
* Default WoT incoming data for new block. To be overriden by a module.
*/
......
......@@ -69,8 +69,8 @@ describe("SelfFork", function () {
});
yield s1.initWithDAL().then(bma).then((bmapi) => bmapi.openConnections());
yield s2.initWithDAL().then(bma).then((bmapi) => bmapi.openConnections());
s1.getMainEndpoint = index_1.BmaDependency.duniter.methods.getMainEndpoint;
s2.getMainEndpoint = index_1.BmaDependency.duniter.methods.getMainEndpoint;
s1.addEndpointsDefinitions(() => index_1.BmaDependency.duniter.methods.getMainEndpoint(s1.conf));
s2.addEndpointsDefinitions(() => index_1.BmaDependency.duniter.methods.getMainEndpoint(s2.conf));
// Server 1
yield cat.createIdentity();
yield toc.createIdentity();
......
import {OtherConstants} from "../../app/lib/other_constants"
import {NewLogger} from "../../app/lib/logger"
import {BmaDependency} from "../../app/modules/bma/index";
import {BmaDependency} from "../../app/modules/bma/index"
import {CrawlerDependency} from "../../app/modules/crawler/index"
import {waitForkResolution, waitToHaveBlock} from "./tools/toolbox"
......@@ -79,8 +79,8 @@ describe("SelfFork", function() {
yield s1.initWithDAL().then(bma).then((bmapi:any) => bmapi.openConnections());
yield s2.initWithDAL().then(bma).then((bmapi:any) => bmapi.openConnections());
s1.getMainEndpoint = BmaDependency.duniter.methods.getMainEndpoint
s2.getMainEndpoint = BmaDependency.duniter.methods.getMainEndpoint
s1.addEndpointsDefinitions(() => BmaDependency.duniter.methods.getMainEndpoint(s1.conf))
s2.addEndpointsDefinitions(() => BmaDependency.duniter.methods.getMainEndpoint(s2.conf))
// Server 1
yield cat.createIdentity();
......
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const index_1 = require("../../app/modules/crawler/index");
const index_2 = require("../../app/modules/bma/index");
const co = require('co');
const _ = require('underscore');
const duniter = require('../../index');
......@@ -46,8 +47,8 @@ describe("Switch", function () {
toc = user('toc', { pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo', sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F' }, { server: s1 });
yield s1.initWithDAL().then(bma).then((bmapi) => bmapi.openConnections());
yield s2.initWithDAL().then(bma).then((bmapi) => bmapi.openConnections());
s1.getMainEndpoint = require('../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint;
s2.getMainEndpoint = require('../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint;
s1.addEndpointsDefinitions(() => index_2.BmaDependency.duniter.methods.getMainEndpoint(s1.conf));
s2.addEndpointsDefinitions(() => index_2.BmaDependency.duniter.methods.getMainEndpoint(s2.conf));
yield cat.createIdentity();
yield toc.createIdentity();
yield toc.cert(cat);
......
"use strict";
import {CrawlerDependency} from "../../app/modules/crawler/index"
import {BmaDependency} from "../../app/modules/bma/index"
const co = require('co');
const _ = require('underscore');
......@@ -61,8 +62,8 @@ describe("Switch", function() {
yield s1.initWithDAL().then(bma).then((bmapi:any) => bmapi.openConnections());
yield s2.initWithDAL().then(bma).then((bmapi:any) => bmapi.openConnections());
s1.getMainEndpoint = require('../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint
s2.getMainEndpoint = require('../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint
s1.addEndpointsDefinitions(() => BmaDependency.duniter.methods.getMainEndpoint(s1.conf))
s2.addEndpointsDefinitions(() => BmaDependency.duniter.methods.getMainEndpoint(s2.conf))
yield cat.createIdentity();
yield toc.createIdentity();
yield toc.cert(cat);
......
......@@ -87,8 +87,8 @@ describe("HTTP API", function() {
const b1 = yield commit({ time: now + 120 });
yield server2.writeBlock(b0)
yield server2.writeBlock(b1)
const p1 = yield server.PeeringService.generateSelfPeer(server.conf, 0)
yield server2.PeeringService.generateSelfPeer(server2.conf, 0)
const p1 = yield server.PeeringService.generateSelfPeer(server.conf)
yield server2.PeeringService.generateSelfPeer(server2.conf)
yield server2.writePeer(p1)
server2.writeBlock(yield commit({ time: now + 120 * 2 }))
server2.writeBlock(yield commit({ time: now + 120 * 3 }))
......@@ -293,14 +293,14 @@ describe("HTTP API", function() {
let resolve5, resolve6, resolve7
const p5 = new Promise(res => resolve5 = res)
const p6 = new Promise(res => resolve6 = res)
server.getMainEndpoint = () => "BASIC_MERKLED_API localhost 7777"
server.addEndpointsDefinitions(() => Promise.resolve("BASIC_MERKLED_API localhost 7777"))
const p1 = yield server.PeeringService.generateSelfPeer({
currency: server.conf.currency
}, 0)
client.on('message', function message(data) {
const peer = JSON.parse(data);
if (peer.block.match(/2-/)) {
server2.PeeringService.generateSelfPeer(server.conf, 0)
server2.PeeringService.generateSelfPeer(server.conf)
return resolve5(peer)
}
if (peer.block.match(/1-/) && peer.pubkey === 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo') {
......
......@@ -23,6 +23,7 @@ const s1 = node('bb33', _.extend({
ipv4: '127.0.0.1',
port: '20501',
remoteport: '20501',
ws2p: { upnp: false },
pair: {
pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd',
sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'
......@@ -34,6 +35,7 @@ const s1 = node('bb33', _.extend({
const s2 = node('bb12', _.extend({
port: '20502',
remoteport: '20502',
ws2p: { upnp: false },
pair: {
pub: 'DKpQPUL4ckzXYdnDRvCRKAm1gNvSdmAXnTrJZ7LvM5Qo',
sec: '64EYRvdPpTfLGGmaX5nijLXRqWXaVz8r1Z1GtaahXwVSJGQRn7tqkxLb288zwSYzELMEG5ZhXSBYSxsTsz1m9y8F'
......
......@@ -82,7 +82,7 @@ describe("Network", function() {
const commitS3 = commit(s3);
return [s1, s2, s3].reduce(function(p, server) {
server.getMainEndpoint = require('../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint
server.addEndpointsDefinitions(() => require('../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint(server.conf))
return p
.then(function(){
return server
......
......@@ -71,12 +71,12 @@ describe("Generation", function() {
let servers = [s1, s2];
for (const server of servers) {
server.getMainEndpoint = require('../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint
server.addEndpointsDefinitions(() => require('../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint(server.conf))
yield server.initWithDAL();
server.bma = yield bma(server);
yield server.bma.openConnections();
require('../../app/modules/router').duniter.methods.routeToNetwork(server);
yield server.PeeringService.generateSelfPeer(server.conf, 0);
yield server.PeeringService.generateSelfPeer(server.conf);
const prover = require('../../app/modules/prover').ProverDependency.duniter.methods.prover(server);
server.startBlockComputation = () => prover.startService();
server.stopBlockComputation = () => prover.stopService();
......
......@@ -230,6 +230,10 @@ export const NewTestingServer = (conf:any) => {
if (conf.sigQty === undefined) {
conf.sigQty = 1;
}
// Disable UPnP during tests
if (!conf.ws2p) {
conf.ws2p = { upnp: false }
}
const server = new Server(
'~/.config/duniter/' + (conf.homename || 'dev_unit_tests'),
conf.memory !== undefined ? conf.memory : MEMORY_MODE,
......@@ -276,7 +280,9 @@ export class TestingServer {
private port:number,
private server:Server) {
server.getMainEndpoint = require('../../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint
server.addEndpointsDefinitions(async () => {
return require('../../../app/modules/bma').BmaDependency.duniter.methods.getMainEndpoint(server.conf)
})
}
get _server() {
......
......@@ -2,7 +2,6 @@ import {WS2PConnection} from "../../app/modules/ws2p/lib/WS2PConnection"
import {Key} from "../../app/lib/common-libs/crypto/keyring"
import {newWS2PBidirectionnalConnection} from "./tools/toolbox"
import {WS2PRequester} from "../../app/modules/ws2p/lib/WS2PRequester"
import {WS2PReqMapper} from "../../app/modules/ws2p/WS2PReqMapper"
import {BlockDTO} from "../../app/lib/dto/BlockDTO"
import {WS2PMessageHandler} from "../../app/modules/ws2p/lib/impl/WS2PMessageHandler"
import {WS2PResponse} from "../../app/modules/ws2p/lib/impl/WS2PResponse"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment