Commit 76a34a5a authored by Cédric Moreau's avatar Cédric Moreau

[enh] #1084 WS2P: implement block pulling

parent f794ae96
......@@ -19,6 +19,7 @@ app/lib/system/*.js
app/lib/streams/*.js
app/lib/helpers/*.js
app/lib/ws2p/*.js
app/lib/ws2p/*/*.js
app/lib/*.js
app/modules/wizard.js
app/modules/router.js
......
export const randomPick = <T>(elements:T[], max:number) => {
const chosen:T[] = []
const nbElements = elements.length
for (let i = 0; i < Math.min(nbElements, max); i++) {
const randIndex = Math.max(Math.floor(Math.random() * 10) - (10 - nbElements) - i, 0)
chosen.push(elements[randIndex])
elements.splice(randIndex, 1)
}
return chosen
}
\ No newline at end of file
import {BlockDTO} from "../dto/BlockDTO"
import {AbstractDAO} from "../../modules/crawler/lib/pulling"
import {Server} from "../../../server"
import {DBBlock} from "../db/DBBlock"
import {PeerDTO} from "../dto/PeerDTO"
import {CrawlerConstants} from "../../modules/crawler/lib/constants"
import {tx_cleaner} from "../../modules/crawler/lib/tx_cleaner"
import {WS2PConnection} from "./WS2PConnection"
import {WS2PRequester} from "./WS2PRequester"
export class WS2PBlockPuller {
constructor(
private server:Server,
private connection:WS2PConnection
) {}
async pull() {
const requester = WS2PRequester.fromConnection(this.connection)
// node.pubkey = p.pubkey;
let dao = new WS2PDao(this.server, requester)
await dao.pull(this.server.conf, this.server.logger)
}
}
interface RemoteNode {
getCurrent: () => Promise<BlockDTO>
getBlock: (number:number) => Promise<BlockDTO>
getBlocks: (count:number, fromNumber:number) => Promise<BlockDTO[]>
}
class WS2PDao extends AbstractDAO {
private node:RemoteNode
private lastDownloaded:BlockDTO|null
private nodeCurrent:BlockDTO|null = null
public newCurrent:BlockDTO|null = null
constructor(
private server:Server,
private requester:WS2PRequester
) {
super()
this.node = {
getCurrent: async () => {
return this.requester.getCurrent()
},
getBlock: async (number:number) => {
return this.requester.getBlock(number)
},
getBlocks: async (count:number, fromNumber:number) => {
return this.requester.getBlocks(count, fromNumber)
}
}
}
async localCurrent(): Promise<DBBlock | null> {
return this.server.dal.getCurrentBlockOrNull()
}
async remoteCurrent(source: RemoteNode): Promise<BlockDTO | null> {
this.nodeCurrent = await source.getCurrent()
return this.nodeCurrent
}
async remotePeers(source?: any): Promise<PeerDTO[]> {
const peer:any = this.node
return Promise.resolve([peer])
}
async getLocalBlock(number: number): Promise<DBBlock> {
return this.server.dal.getBlock(number)
}
async getRemoteBlock(thePeer: any, number: number): Promise<BlockDTO> {
let block = null;
try {
block = await thePeer.getBlock(number);
tx_cleaner(block.transactions);
} catch (e) {
if (e.httpCode != 404) {
throw e;
}
}
return block;
}
async applyMainBranch(block: BlockDTO): Promise<boolean> {
const existing = await this.server.dal.getAbsoluteBlockByNumberAndHash(block.number, block.hash)
if (!existing) {
let addedBlock = await this.server.writeBlock(block, false, true)
if (!this.lastDownloaded) {
this.lastDownloaded = await this.remoteCurrent(this.node)
}
this.server.pullingEvent('applying', {number: block.number, last: this.lastDownloaded && this.lastDownloaded.number})
if (addedBlock) {
this.newCurrent = addedBlock
// Emit block events (for sharing with the network) only in forkWindowSize
if (this.nodeCurrent && this.nodeCurrent.number - addedBlock.number < this.server.conf.forksize) {
this.server.streamPush(addedBlock);
}
}
}
return true
}
async removeForks(): Promise<boolean> {
return true
}
async isMemberPeer(thePeer: PeerDTO): Promise<boolean> {
return true
}
async downloadBlocks(thePeer: any, fromNumber: number, count?: number | undefined): Promise<BlockDTO[]> {
if (!count) {
count = CrawlerConstants.CRAWL_BLOCK_CHUNK
}
let blocks = await thePeer.getBlocks(count, fromNumber);
// Fix for #734
for (const block of blocks) {
for (const tx of block.transactions) {
tx.version = CrawlerConstants.TRANSACTION_VERSION;
}
}
return blocks;
}
}
......@@ -6,6 +6,8 @@ import {Key} from "../common-libs/crypto/keyring"
export class WS2PClient {
private constructor(public connection:WS2PConnection) {}
static async connectTo(server:Server, host:string, port:number) {
const k2 = new Key(server.conf.pair.pub, server.conf.pair.sec)
const c = WS2PConnection.newConnectionToAddress(
......@@ -23,6 +25,6 @@ export class WS2PClient {
// Connecting
await c.connect()
return c
return new WS2PClient(c)
}
}
\ No newline at end of file
......@@ -2,6 +2,9 @@ import {WS2PServer} from "./WS2PServer"
import {Server} from "../../../server"
import {WS2PClient} from "./WS2PClient"
import {WS2PConnection} from "./WS2PConnection"
import {randomPick} from "../common-libs/randomPick"
import {CrawlerConstants} from "../../modules/crawler/lib/constants"
import {WS2PBlockPuller} from "./WS2PBlockPuller"
const nuuid = require('node-uuid')
......@@ -28,9 +31,37 @@ export class WS2PCluster {
const uuid = nuuid.v4()
const ws2pc = await WS2PClient.connectTo(this.server, host, port)
this.ws2pClients[uuid] = ws2pc
ws2pc.closed.then(() => {
ws2pc.connection.closed.then(() => {
delete this.ws2pClients[uuid]
})
return ws2pc
return ws2pc.connection
}
async getAllConnections() {
const all:WS2PConnection[] = this.ws2pServer ? this.ws2pServer.getConnexions() : []
for (const uuid of Object.keys(this.ws2pClients)) {
all.push(this.ws2pClients[uuid].connection)
}
return all
}
async pullBlocks() {
const connections = await this.getAllConnections()
const chosen = randomPick(connections, CrawlerConstants.CRAWL_PEERS_COUNT)
await Promise.all(chosen.map(async (conn) => {
const puller = new WS2PBlockPuller(this.server, conn)
await puller.pull()
}))
await this.server.BlockchainService.pushFIFO("WS2PCrawlerResolution", async () => {
await this.server.BlockchainService.blockResolution()
await this.server.BlockchainService.forkResolution()
})
const current = await this.server.dal.getCurrentBlockOrNull()
if (current) {
this.server.pullingEvent('end', current.number)
}
}
}
\ No newline at end of file
......@@ -200,7 +200,8 @@ export class WS2PPubkeyLocalAuth implements WS2PLocalAuth {
}
export interface WS2PRequest {
name:string
name:string,
params?:any
}
/**
......@@ -214,7 +215,7 @@ export class WS2PConnection {
private connectp:Promise<any>|undefined
private connectedp:Promise<string>
private connectedResolve:(pub:string)=>void
private connectedReject:()=>void
private connectedReject:(e:any)=>void
private nbErrors = 0
private nbRequestsCount = 0
private nbResponsesCount = 0
......@@ -442,8 +443,12 @@ export class WS2PConnection {
// Request message
else if (data.reqId && typeof data.reqId === "string") {
const answer = await this.messageHandler.answerToRequest(data.body)
this.ws.send(JSON.stringify({ resId: data.reqId, body: answer }))
try {
const answer = await this.messageHandler.answerToRequest(data.body)
this.ws.send(JSON.stringify({ resId: data.reqId, body: answer }))
} catch (e) {
this.ws.send(JSON.stringify({ resId: data.reqId, err: e }))
}
}
// Answer message
......@@ -470,7 +475,8 @@ export class WS2PConnection {
this.connectedResolve(this.remoteAuth.getPubkey())
} catch (e) {
this.connectedReject()
this.connectedReject(e)
throw e
}
})()
}
......
import {WS2PConnection} from "./WS2PConnection"
import {BlockDTO} from "../dto/BlockDTO"
enum WS2P_REQ {
export enum WS2P_REQ {
BLOCKS_CHUNK,
BLOCK_BY_NUMBER,
CURRENT
}
......@@ -13,13 +16,22 @@ export class WS2PRequester {
return new WS2PRequester(ws2pc)
}
getCurrent() {
getCurrent(): Promise<BlockDTO> {
return this.query(WS2P_REQ.CURRENT)
}
private query(req:WS2P_REQ) {
getBlock(number:number): Promise<BlockDTO> {
return this.query(WS2P_REQ.BLOCK_BY_NUMBER, { number })
}
getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]> {
return this.query(WS2P_REQ.BLOCKS_CHUNK, { count, fromNumber })
}
private query(req:WS2P_REQ, params:any = {}): Promise<any> {
return this.ws2pc.request({
name: WS2P_REQ[req]
name: WS2P_REQ[req],
params: params
})
}
}
\ No newline at end of file
import {Server} from "../../../../server"
import {WS2PReqMapper} from "../interface/WS2PReqMapper"
import {BlockDTO} from "../../dto/BlockDTO"
export class WS2PReqMapperByServer implements WS2PReqMapper {
......@@ -8,4 +9,20 @@ export class WS2PReqMapperByServer implements WS2PReqMapper {
async getCurrent() {
return this.server.BlockchainService.current()
}
getBlock(number: number): Promise<BlockDTO[]> {
return this.server.dal.getBlock(number)
}
async getBlocks(count: number, from: number): Promise<BlockDTO[]> {
if (count > 5000) {
throw 'Count is too high'
}
const current = await this.server.dal.getCurrentBlockOrNull()
count = Math.min(current.number - from + 1, count)
if (!current || current.number < from) {
return []
}
return this.server.dal.getBlocksBetween(from, from + count - 1)
}
}
\ No newline at end of file
......@@ -3,4 +3,6 @@ import {BlockDTO} from "../../dto/BlockDTO"
export interface WS2PReqMapper {
getCurrent(): Promise<BlockDTO>
getBlock(number:number): Promise<BlockDTO[]>
getBlocks(count:number, fromNumber:number): Promise<BlockDTO[]>
}
\ No newline at end of file
......@@ -9,10 +9,7 @@ import {CertificationDTO} from "../../dto/CertificationDTO"
import {MembershipDTO} from "../../dto/MembershipDTO"
import {TransactionDTO} from "../../dto/TransactionDTO"
import {PeerDTO} from "../../dto/PeerDTO"
enum WS2P_REQ {
CURRENT
}
import {WS2P_REQ} from "../WS2PRequester"
export enum WS2P_REQERROR {
UNKNOWN_REQUEST
......@@ -78,6 +75,24 @@ export class WS2PServerMessageHandler implements WS2PMessageHandler {
case WS2P_REQ[WS2P_REQ.CURRENT]:
body = await this.mapper.getCurrent()
break;
case WS2P_REQ[WS2P_REQ.BLOCK_BY_NUMBER]:
if (isNaN(data.params.number)) {
throw "Wrong param `number`"
}
const number:number = data.params.number
body = await this.mapper.getBlock(number)
break;
case WS2P_REQ[WS2P_REQ.BLOCKS_CHUNK]:
if (isNaN(data.params.count)) {
throw "Wrong param `count`"
}
if (isNaN(data.params.fromNumber)) {
throw "Wrong param `fromNumber`"
}
const count:number = data.params.count
const fromNumber:number = data.params.fromNumber
body = await this.mapper.getBlocks(count, fromNumber)
break;
default:
throw Error(WS2P_REQERROR[WS2P_REQERROR.UNKNOWN_REQUEST])
}
......
......@@ -10,6 +10,8 @@ export const CrawlerConstants = {
FORK_ALLOWED: true,
MAX_NUMBER_OF_PEERS_FOR_PULLING: 4,
PULLING_MINIMAL_DELAY: 20,
CRAWL_BLOCK_CHUNK: 50, // During a crawl, the quantity of blocks to download
CRAWL_PEERS_COUNT: 4,
PULLING_INTERVAL_TARGET: 240,
COUNT_FOR_ENOUGH_PEERS: 4,
SANDBOX_FIRST_PULL_DELAY: 1000 * 60 * 10, // milliseconds
......
......@@ -35,7 +35,7 @@ export class Crawler extends stream.Transform implements DuniterService {
this.peerCrawler = new PeerCrawler(server, conf, logger)
this.peerTester = new PeerTester(server, conf, logger)
this.blockCrawler = new BlockCrawler(server, logger, this)
this.blockCrawler = new BlockCrawler(server, logger)
this.sandboxCrawler = new SandboxCrawler(server, conf, logger)
}
......@@ -303,7 +303,7 @@ export class PeerTester implements DuniterService {
export class BlockCrawler {
private CONST_BLOCKS_CHUNK = 50
private CONST_BLOCKS_CHUNK = CrawlerConstants.CRAWL_BLOCK_CHUNK
private pullingActualIntervalDuration = CrawlerConstants.PULLING_MINIMAL_DELAY
private programStart = Date.now()
private syncBlockFifo = async.queue((task:any, callback:any) => task(callback), 1)
......@@ -311,8 +311,7 @@ export class BlockCrawler {
constructor(
private server:Server,
private logger:any,
private PROCESS:stream.Transform) {
private logger:any) {
}
async startService() {
......@@ -470,17 +469,7 @@ export class BlockCrawler {
}
private pullingEvent(server:Server, type:string, number:any = null) {
server.push({
pulling: {
type: type,
data: number
}
});
if (type !== 'end') {
this.PROCESS.push({ pulling: 'processing' });
} else {
this.PROCESS.push({ pulling: 'finished' });
}
server.pullingEvent(type, number)
}
private isConnectionError(err:any) {
......
......@@ -473,6 +473,20 @@ export class Server extends stream.Duplex implements HookableServer {
}
}
pullingEvent(type:string, number:any = null) {
this.push({
pulling: {
type: type,
data: number
}
})
if (type !== 'end') {
this.push({ pulling: 'processing' })
} else {
this.push({ pulling: 'finished' })
}
}
async reapplyTo(number:number) {
const current = await this.BlockchainService.current();
if (current.number == number) {
......
......@@ -354,6 +354,10 @@ export class TestingServer {
async writePeer(obj:any) {
return this.server.writePeer(obj)
}
async pullingEvent(type:string, number:number) {
this.server.pullingEvent(type, number)
}
exportAllDataAsZIP() {
return this.server.exportAllDataAsZIP()
......@@ -609,7 +613,7 @@ export async function newWS2PBidirectionnalConnection(k1:Key, k2:Key, serverHand
})
}
export const simpleWS2PNetwork: (s1: TestingServer, s2: TestingServer) => Promise<{ w1: WS2PConnection; ws2pc: WS2PConnection; wss: WS2PServer }> = async (s1: TestingServer, s2: TestingServer) => {
export const simpleWS2PNetwork: (s1: TestingServer, s2: TestingServer) => Promise<{ w1: WS2PConnection; ws2pc: WS2PConnection; wss: WS2PServer, cluster1:WS2PCluster, cluster2:WS2PCluster }> = async (s1: TestingServer, s2: TestingServer) => {
let port = PORT++
const clientPub = s2.conf.pair.pub
let w1: WS2PConnection | null
......@@ -627,7 +631,9 @@ export const simpleWS2PNetwork: (s1: TestingServer, s2: TestingServer) => Promis
return {
w1,
ws2pc,
wss: ws2ps
wss: ws2ps,
cluster1,
cluster2
}
}
......
......@@ -406,6 +406,10 @@ class WS2PNoLocalAuth implements WS2PLocalAuth {
class WS2PNoRemoteAuth implements WS2PRemoteAuth {
getPubkey(): string {
return ""
}
async sendACK(ws: any): Promise<void> {
}
......@@ -435,6 +439,6 @@ class WS2PMutedHandler implements WS2PMessageHandler {
}
async answerToRequest(json: any): Promise<WS2PResponse> {
return {}
throw "Does not answer"
}
}
import {simpleTestingConf, simpleTestingServer, simpleUser, simpleWS2PNetwork, TestingServer} from "./tools/toolbox"
import {WS2PCluster} from "../../app/lib/ws2p/WS2PCluster"
const assert = require('assert')
describe("WS2P block pulling", function() {
const now = 1500000000
let s1:TestingServer, s2:TestingServer, wss:any
let cluster2:WS2PCluster
let cat:any, tac:any
const catKeyring = { pub: 'HgTTJLAQ5sqfknMq7yLPZbehtuLSsKj9CxWN7k8QvYJd', sec: '51w4fEShBk1jCMauWu4mLpmDVfHksKmWcygpxriqCEZizbtERA6de4STKRkQBpxmMUwsKXRjSzuQ8ECwmqN1u2DP'}
const tacKeyring = { pub: '2LvDg21dVXvetTD9GdkPLURavLYEqP3whauvPWX4c2qc', sec: '2HuRLWgKgED1bVio1tdpeXrf7zuUszv1yPHDsDj7kcMC4rVSN9RC58ogjtKNfTbH1eFz7rn38U1PywNs3m6Q7UxE'}
let b0, b1, b2
before(async () => {
const conf1 = simpleTestingConf(now, catKeyring)
const conf2 = simpleTestingConf(now, tacKeyring)
s1 = simpleTestingServer(conf1)
s2 = simpleTestingServer(conf2)
cat = simpleUser('cat', catKeyring, s1)
tac = simpleUser('tac', tacKeyring, s1)
await s1.initDalBmaConnections()
await s2.initDalBmaConnections()
await cat.createIdentity();
await tac.createIdentity();
await cat.cert(tac);
await tac.cert(cat);
await cat.join();
await tac.join();
b0 = await s1.commit({ time: now })
b1 = await s1.commit({ time: now })
b2 = await s1.commit({ time: now })
await s1.commit({ time: now })
await s1.commit({ time: now })
await s1.commit({ time: now })
await s1.commit({ time: now }) // b6
await s2.writeBlock(b0)
await s2.writeBlock(b1)
await s2.writeBlock(b2)
await s2.waitToHaveBlock(2)
const network = await simpleWS2PNetwork(s1, s2)
wss = network.wss
cluster2 = network.cluster2
})
after(() => wss.close())
it('should have b#6 on s1, b#2 on s2', async () => {
const currentS1 = await s1.BlockchainService.current()
const currentS2 = await s2.BlockchainService.current()
assert.equal(currentS1.number, 6)
assert.equal(currentS2.number, 2)
})
it('should be able to pull and have the same current block as a result', async () => {
await cluster2.pullBlocks()
const currentS1 = await s1.BlockchainService.current()
const currentS2 = await s2.BlockchainService.current()
assert.equal(currentS1.number, 6)
assert.equal(currentS2.number, 6)
})
})
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment