Commit fdd6338c authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] sync: enhance milestones downloading speed

parent 797a9b26
......@@ -309,6 +309,7 @@ export const CommonConstants = {
ARCHIVES_BLOCKS_CHUNK: 250,
SYNC_BLOCKS_CHUNK: 250,
MILESTONES_PER_PAGE: 50,
CHUNK_PREFIX: 'chunk_',
BLOCKS_IN_MEMORY_MAX: 288 * 60, // 288 = 1 day
......
export enum DataErrors {
INVALID_LEVELDB_IINDEX_DATA_WAS_KICKED,
INVALID_LEVELDB_IINDEX_DATA_TO_BE_KICKED,
......@@ -26,5 +25,6 @@ export enum DataErrors {
CANNOT_REAPPLY_NO_CURRENT_BLOCK,
CANNOT_REVERT_NO_CURRENT_BLOCK,
BLOCK_TO_REVERT_NOT_FOUND,
MEMBER_NOT_FOUND
MEMBER_NOT_FOUND,
MILESTONE_BLOCK_NOT_FOUND
}
......@@ -149,19 +149,6 @@ export const BmaDependency = {
if (program.upnp === true) {
conf.upnp = true;
}
// Configuration errors
if (!conf.nobma) {
if(!conf.ipv4 && !conf.ipv6){
throw new Error("No interface to listen to.");
}
if(!conf.remoteipv4 && !conf.remoteipv6 && !conf.remotehost){
throw new Error('No interface for remote contact.');
}
if (!conf.remoteport) {
throw new Error('No port for remote contact.');
}
}
},
beforeSave: async (conf:NetworkConfDTO, program:any) => {
......@@ -175,6 +162,18 @@ export const BmaDependency = {
service: {
input: (server:Server, conf:NetworkConfDTO, logger:any) => {
// Configuration errors
if (!conf.nobma) {
if(!conf.ipv4 && !conf.ipv6){
throw new Error("BMA: no interface to listen to.");
}
if(!conf.remoteipv4 && !conf.remoteipv6 && !conf.remotehost){
throw new Error('BMA: no interface for remote contact.');
}
if (!conf.remoteport) {
throw new Error('BMA: no port for remote contact.');
}
}
if (!conf.nobma) {
server.addEndpointsDefinitions(() => Promise.resolve(getEndpoint(conf)))
server.addWrongEndpointFilter((endpoints:string[]) => getWrongEndpoints(endpoints, server.conf.pair.pub))
......
......@@ -65,6 +65,8 @@ export const bma = function(server:Server, interfaces:NetworkInterface[]|null, h
httpMethods.httpPOST( '/blockchain/block', (req:any) => blockchain.parseBlock(req), BMALimitation.limitAsHighUsage());
httpMethods.httpGET( '/blockchain/block/:number', (req:any) => blockchain.promoted(req), BMALimitation.limitAsHighUsage());
httpMethods.httpGET( '/blockchain/blocks/:count/:from', (req:any) => blockchain.blocks(req), BMALimitation.limitAsHighUsage());
httpMethods.httpGET( '/blockchain/milestones', (req:any) => blockchain.milestones(req), BMALimitation.limitAsHighUsage());
httpMethods.httpGET( '/blockchain/milestones/:page', (req:any) => blockchain.milestones(req), BMALimitation.limitAsHighUsage());
httpMethods.httpGET( '/blockchain/current', (req:any) => blockchain.current(), BMALimitation.limitAsHighUsage());
httpMethods.httpGET( '/blockchain/hardship/:search', (req:any) => blockchain.hardship(req), BMALimitation.limitAsHighUsage());
httpMethods.httpGET( '/blockchain/difficulties', (req:any) => blockchain.difficulties(), BMALimitation.limitAsHighUsage());
......
......@@ -52,7 +52,8 @@ export const BMAConstants = {
NO_CURRENT_BLOCK: { httpCode: 404, uerr: { ucode: 2010, message: "No current block" }},
PEER_NOT_FOUND: { httpCode: 404, uerr: { ucode: 2012, message: "Peer not found" }},
NO_IDTY_MATCHING_PUB_OR_UID: { httpCode: 404, uerr: { ucode: 2021, message: "No identity matching this pubkey or uid" }},
TX_NOT_FOUND: { httpCode: 400, uerr: { ucode: 2034, message: 'Transaction not found' }}
TX_NOT_FOUND: { httpCode: 400, uerr: { ucode: 2034, message: 'Transaction not found' }},
INCORRECT_PAGE_NUMBER: { httpCode: 400, uerr: { ucode: 2035, message: 'Incorrect page number' }}
// New errors: range 3000-4000
}
......
......@@ -24,15 +24,16 @@ import {
HttpHardship,
HttpMembership,
HttpMemberships,
HttpMilestonePage,
HttpParameters,
HttpStat
} from "../dtos"
import {TransactionDTO} from "../../../../lib/dto/TransactionDTO"
import {DataErrors} from "../../../../lib/common-libs/errors"
import {Underscore} from "../../../../lib/common-libs/underscore"
import * as toJson from "../tojson"
const http2raw = require('../http2raw');
const toJson = require('../tojson');
export class BlockchainBinding extends AbstractController {
......@@ -168,6 +169,11 @@ export class BlockchainBinding extends AbstractController {
}))
}
async milestones(req: any): Promise<HttpMilestonePage> {
const page = ParametersService.getPage(req)
return this.server.milestones(page)
}
async current(): Promise<HttpBlock> {
const current = await this.server.dal.getCurrentBlockOrNull();
if (!current) throw BMAConstants.ERRORS.NO_CURRENT_BLOCK;
......
......@@ -969,3 +969,25 @@ export interface HttpSandboxes {
export const LogLink = {
link: String
};
export interface HttpMilestonePage {
totalPages: number
chunkSize: number
milestonesPerPage: number
currentPage?: number
blocks?: HttpBlock[]
}
export const Milestones = {
totalPages: Number,
chunkSize: Number,
milestonesPerPage: Number,
currentPage: Number,
"blocks": [Block]
}
export const MilestonesPage = {
totalPages: Number,
chunkSize: Number,
milestonesPerPage: Number,
}
......@@ -73,6 +73,17 @@ export class ParametersService {
return parseInt(req.params.minsig)
}
static getPage(req:any): number|undefined {
if(!req.params.page){
return undefined
}
const matches = req.params.page.match(/\d+/)
if(!matches){
throw Error("`page` format is incorrect, must be an integer")
}
return parseInt(req.params.page)
}
static getPubkey = function (req:any, callback:any){
if(!req.params.pubkey){
callback('Parameter `pubkey` is required');
......
......@@ -57,6 +57,14 @@ export class Contacter {
getCurrent() {
return this.get('/blockchain/current', dtos.Block)
}
getMilestonesPage() {
return this.get('/blockchain/milestones', dtos.MilestonesPage)
}
getMilestones(page: number) {
return this.get('/blockchain/milestones/' + page, dtos.Milestones)
}
getPeer() {
return this.get('/network/peering', dtos.Peer)
......
......@@ -28,6 +28,7 @@ export abstract class AbstractSynchronizer {
abstract initWithKnownLocalAndToAndCurrency(to: number, localNumber: number, currency: string): Promise<void>
abstract getCurrent(): Promise<BlockDTO|null>
abstract getBlock(number: number): Promise<BlockDTO|null>
abstract getMilestone(number: number): Promise<BlockDTO|null>
abstract p2pDownloader(): ISyncDownloader
abstract fsDownloader(): ISyncDownloader
abstract syncPeers(fullSync:boolean, to?:number): Promise<void>
......
......@@ -37,6 +37,14 @@ export class BMARemoteContacter implements IRemoteContacter {
return this.contacter.getBlocks(count, from)
}
getMilestones(page: number): Promise<{ chunkSize: number; totalPages: number; currentPage: number; milestonesPerPage: number; blocks: BlockDTO[] }> {
return this.contacter.getMilestones(page)
}
getMilestonesPage(): Promise<{ chunkSize: number; totalPages: number; milestonesPerPage: number }> {
return this.contacter.getMilestonesPage()
}
async getPeers(): Promise<(JSONDBPeer|null)[]> {
return (await this.contacter.getPeersArray()).peers
}
......
......@@ -25,6 +25,10 @@ export interface IRemoteContacter {
getBlock(number: number): Promise<BlockDTO|null>
getMilestonesPage(): Promise<{ chunkSize: number, totalPages: number, milestonesPerPage: number }>
getMilestones(page: number): Promise<{ chunkSize: number, totalPages: number, currentPage: number, milestonesPerPage: number, blocks: BlockDTO[] }>
getBlocks(count: number, from: number): Promise<BlockDTO[]>
getRequirementsPending(number: number): Promise<HttpRequirements>
......
......@@ -105,6 +105,10 @@ export class LocalPathSynchronizer extends AbstractSynchronizer {
return chunk[position]
}
getMilestone(number: number) {
return this.getBlock(number)
}
async syncPeers(fullSync: boolean, to?: number): Promise<void> {
// Does nothing on LocalPathSynchronizer
}
......
......@@ -56,7 +56,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
return this.p2pCandidates.filter(p => p.hasAvailableApi()).length
}
private async waitForAvailableNodes(needed = 1): Promise<P2pCandidate[]> {
private async waitForAvailableNodesAndReserve(needed = 1): Promise<P2pCandidate[]> {
let nodesToWaitFor = this.p2pCandidates.slice()
let nodesAvailable: P2pCandidate[] = []
let i = 0
......@@ -67,7 +67,10 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
nodesAvailable = nodesAvailable.concat(readyNodes)
i++
}
return nodesAvailable
return nodesAvailable.slice(0, needed).map(n => {
n.reserve()
return n
})
}
/**
......@@ -78,7 +81,7 @@ export class P2PSyncDownloader extends ASyncDownloader implements ISyncDownloade
private async getP2Pcandidates(chunkIndex: number): Promise<P2pCandidate[]> {
return this.fifoPromise.pushFIFOPromise('getP2Pcandidates_' + getNanosecondsTime(), async () => {
// We wait a bit to have some available nodes
const readyNodes = await this.waitForAvailableNodes()
const readyNodes = await this.waitForAvailableNodesAndReserve()
// We remove the nodes impossible to reach (timeout)
let byAvgAnswerTime = Underscore.sortBy(readyNodes, p => p.avgResponseTime())
const parallelMax = Math.min(this.PARALLEL_PER_CHUNK, byAvgAnswerTime.length)
......
......@@ -53,6 +53,10 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
private localNumber: number
private watcher: Watcher
private endpoint: string = ""
private hasMilestonesPages: boolean|undefined
private milestones: { [k: number]: BlockDTO } = {}
private milestonesPerPage = 1
private maxPage = 0
constructor(
private host: string,
......@@ -247,6 +251,46 @@ export class RemoteSynchronizer extends AbstractSynchronizer {
return this.node.getBlock(number)
}
async getMilestone(number: number): Promise<BlockDTO|null> {
if (this.hasMilestonesPages === undefined) {
try {
const mlPage = await this.node.getMilestonesPage()
this.hasMilestonesPages = mlPage.chunkSize === this.chunkSize
this.milestonesPerPage = mlPage.milestonesPerPage
this.maxPage = mlPage.totalPages
} catch (e) {
this.hasMilestonesPages = false
}
}
if (!this.hasMilestonesPages) {
return this.getBlock(number)
}
if (this.milestones[number]) {
return this.milestones[number]
}
if ((number + 1) % this.chunkSize !== 0) {
// Something went wrong: we cannot rely on milestones method
this.hasMilestonesPages = false
return this.getBlock(number)
}
const chunkNumber = (number + 1) / this.chunkSize
const pageNumber = (chunkNumber - (chunkNumber % this.milestonesPerPage)) / this.milestonesPerPage + 1
if (pageNumber > this.maxPage) {
// The page is not available: we cannot rely on milestones method at this point
this.hasMilestonesPages = false
return this.getBlock(number)
}
const mlPage = await this.node.getMilestones(pageNumber)
mlPage.blocks.forEach(b => this.milestones[b.number] = b)
if (this.milestones[number]) {
return this.milestones[number]
}
// Even after the download, it seems we don't have our milestone. We will download normally.
this.hasMilestonesPages = false
return this.getBlock(number)
}
static async test(host: string, port: number, keypair: Keypair): Promise<BlockDTO> {
const syncApi = await RemoteSynchronizer.getSyncAPI([{ host, port }], keypair)
const current = await syncApi.api.getCurrent()
......
......@@ -34,6 +34,14 @@ export class WS2PRemoteContacter implements IRemoteContacter {
return this.requester.getBlock(number)
}
getMilestones(page: number): Promise<{ chunkSize: number; totalPages: number; currentPage: number; milestonesPerPage: number; blocks: BlockDTO[] }> {
return this.requester.getMilestones(page) as any
}
getMilestonesPage(): Promise<{ chunkSize: number; totalPages: number; milestonesPerPage: number }> {
return this.requester.getMilestonesPage()
}
getCurrent(): Promise<BlockDTO | null> {
return this.requester.getCurrent()
}
......
......@@ -15,6 +15,7 @@ export class P2pCandidate {
private nbSuccess = 0
private isExcluded: boolean
private failures = 0
private reserved = false
constructor(
private p: PeerDTO,
......@@ -35,7 +36,7 @@ export class P2pCandidate {
}
isReady() {
return this.apiPromise.isResolved() && this.dlPromise.isResolved() && this.api && !this.isExcluded
return !this.reserved && this.apiPromise.isResolved() && this.dlPromise.isResolved() && this.api && !this.isExcluded
}
async waitAvailability(maxWait: number): Promise<boolean> {
......@@ -64,6 +65,7 @@ export class P2pCandidate {
async downloadBlocks(count: number, from: number) {
const start = Date.now()
let error: Error|undefined
this.reserved = false
this.dlPromise = querablep((async () => {
// We try to download the blocks
let blocks: BlockDTO[]|null
......@@ -125,6 +127,10 @@ export class P2pCandidate {
}
})())
}
reserve() {
this.reserved = true
}
}
interface RemoteAPI {
......
......@@ -50,7 +50,7 @@ export class ValidatorStream extends Readable {
try {
const bNumber = Math.min(this.to, (i + 1) * this.syncStrategy.chunkSize - 1)
if (bNumber > maximumCacheNumber) {
block = await this.syncStrategy.getBlock(bNumber)
block = await this.syncStrategy.getMilestone(bNumber)
} else {
block = await this.getBlockFromCache(bNumber)
}
......
......@@ -35,6 +35,8 @@ export class WS2PDocpoolPuller {
getCurrent: async () => null,
getBlock: async () => null,
getBlocks: async () => [],
getMilestonesPage: async () => ({ chunkSize: 0, totalPages: 0, milestonesPerPage: 0 }),
getMilestones: async () => ({ chunkSize: 0, totalPages: 0, currentPage: 0, milestonesPerPage: 0, blocks: [] }),
hostName: ''
}, this.server, this.server.logger)
}
......
......@@ -13,7 +13,8 @@
import {WS2PConnection} from "./WS2PConnection"
import {BlockDTO} from "../../../lib/dto/BlockDTO"
import {PeerDTO} from "../../../lib/dto/PeerDTO";
import {PeerDTO} from "../../../lib/dto/PeerDTO"
import {HttpMilestonePage} from "../../bma/lib/dtos"
export enum WS2P_REQ {
KNOWN_PEERS,
......@@ -21,7 +22,9 @@ export enum WS2P_REQ {
WOT_REQUIREMENTS_OF_PENDING,
BLOCKS_CHUNK,
BLOCK_BY_NUMBER,
CURRENT
CURRENT,
MILESTONES_PAGE,
MILESTONES
}
export class WS2PRequester {
......@@ -45,6 +48,14 @@ export class WS2PRequester {
return this.query(WS2P_REQ.CURRENT)
}
getMilestonesPage(): Promise<HttpMilestonePage> {
return this.query(WS2P_REQ.MILESTONES_PAGE)
}
getMilestones(page: number): Promise<HttpMilestonePage> {
return this.query(WS2P_REQ.MILESTONES, { page })
}
getBlock(number:number): Promise<BlockDTO> {
return this.query(WS2P_REQ.BLOCK_BY_NUMBER, { number })
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment