Skip to content
Snippets Groups Projects
Commit a7ff543a authored by Cédric Moreau's avatar Cédric Moreau
Browse files

[enh] WW: also persist blocks refered by WoT data

parent 3ff4894d
Branches
Tags
No related merge requests found
export async function filterAsync<T>(arr: T[], filter: (t: T) => Promise<boolean>) {
const filtered: T[] = []
await Promise.all(arr.map(async t => {
if (await filter(t)) {
filtered.push(t)
}
}))
return filtered
}
\ No newline at end of file
......@@ -210,7 +210,7 @@ export class BlockDTO implements Cloneable {
}
get blockstamp() {
return [this.number, this.getHash()].join('-')
return BlockDTO.blockstamp({ number: this.number, hash: this.getHash() })
}
@MonitorExecutionTime()
......@@ -292,4 +292,8 @@ export class BlockDTO implements Cloneable {
static getHash(block:any) {
return BlockDTO.fromJSONObject(block).getHash()
}
static blockstamp(b: { number: number, hash: string }) {
return [b.number, b.hash].join('-')
}
}
import {Server} from "../../../../../server"
import {WotWizardDAL} from "../wotwizard.init.structure"
import {DBBlock} from "../../../../lib/db/DBBlock"
import {IdentityDTO} from "../../../../lib/dto/IdentityDTO"
import {CertificationDTO} from "../../../../lib/dto/CertificationDTO"
import {MembershipDTO} from "../../../../lib/dto/MembershipDTO"
import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {wwHasBlock} from "../wotwizard.copy.mempool"
import {filterAsync} from "../../../../lib/common-libs/filter-async"
import {Underscore} from "../../../../lib/common-libs/underscore"
import {NewLogger} from "../../../../lib/logger"
export interface WWBlockAccumulator {
accumulate(blocks: DBBlock[]): void
persist(): Promise<void>
}
export function requiredBlocksAccumulator(server: Server, wwDAL: WotWizardDAL): WWBlockAccumulator {
const logger = NewLogger()
const blockstamps: { [k: string]: boolean } = {}
const blockNumbers: { [k: number]: boolean } = {}
return {
accumulate(blocks: DBBlock[]) {
blocks.forEach(b => {
b.identities.forEach(i => blockstamps[IdentityDTO.fromInline(i).buid] = true)
b.certifications.forEach(i => blockNumbers[CertificationDTO.fromInline(i).block_number] = true)
b.joiners
.concat(b.actives)
.concat(b.leavers)
.forEach(i => blockstamps[MembershipDTO.fromInline(i).blockstamp] = true)
})
},
/**
* Returns the blocks that are not in WW but that requires to be because of inserted blocks containing
* interesting blockstamps (from identifies, certifications, memberships)
*/
async persist() {
const chunkLen = 250
const numbers = Object.keys(blockNumbers).map(n => parseInt(n))
const blocksForCertifications: (DBBlock|null)[] = []
for (let i = 0; i < numbers.length; i += chunkLen) {
const chunk = numbers.slice(0, chunkLen)
logger.debug('Chunk %s-%s', i, i + chunkLen)
;(await Promise.all(chunk.map(n => server.dal.getBlock(n))))
.forEach(b => blocksForCertifications.push(b))
}
const blocksForBlockstamps: (DBBlock|null)[] = await Promise.all(
Object
.keys(blockstamps)
.map(b => server.dal.getAbsoluteBlockByBlockstamp(b))
)
const reducedBlocks: { [k: string]: DBBlock } = blocksForCertifications
.concat(blocksForBlockstamps)
.reduce((acc, b) => {
if (b) {
acc[BlockDTO.blockstamp(b)] = b
}
return acc
}, {} as { [k: string]: DBBlock })
const blocksToInsert = await filterAsync(Object.values(reducedBlocks), (b: DBBlock) => wwHasBlock(wwDAL, b).then(b => !b))
const blocksSorted = Underscore.sortBy(blocksToInsert, b => b.number)
await wwDAL.blockDao.insertBatch(blocksSorted.map(f => { (f as any).legacy = true; return f }))
}
}
}
\ No newline at end of file
......@@ -3,31 +3,49 @@ import {Server} from "../../../../server"
import {DBBlock} from "../../../lib/db/DBBlock"
import {Underscore} from "../../../lib/common-libs/underscore"
import {NewLogger} from "../../../lib/logger"
import {WWBlockAccumulator} from "./hooks/wotwizard.block.insert"
export async function copyMemPool(server: Server, wwDAL: WotWizardDAL) {
export async function copyMemPool(server: Server, wwDAL: WotWizardDAL, acc: WWBlockAccumulator) {
const logger = NewLogger()
const identities = await server.dal.idtyDAL.sqlListAll()
// Blocks on which are based identities
const blocks = await Promise.all(identities.map(async idty => {
let b = await server.dal.getAbsoluteBlockByBlockstamp(idty.buid)
if (b) {
const b2 = await wwDAL.blockDao.getAbsoluteBlock(b.number, b.hash)
if (!b2) {
return b
}
}
return null
}))
const blocks = await Promise.all(identities.map(async idty => returnBlockIfPresentInServerButNotInWW(idty.buid, server, wwDAL)))
const toPersist: DBBlock[] = Underscore.uniq(blocks.filter(b => b) as DBBlock[], false, b => [b.number, b.hash].join('-'))
logger.debug('Persisting %s blocks for identities...', toPersist.length)
acc.accumulate(toPersist)
await wwDAL.blockDao.insertBatch(toPersist.map(b => { (b as any).legacy = true; return b }))
await wwDAL.idtyDao.insertBatch(identities)
await wwDAL.certDao.insertBatch(await server.dal.certDAL.sqlListAll())
await wwDAL.msDao.insertBatch(await server.dal.msDAL.sqlListAll())
}
/**
* Returns the server's block of given blockstamp if found in server and not found in WW
* @param blockstamp
* @param server
* @param wwDAL
*/
async function returnBlockIfPresentInServerButNotInWW(blockstamp: string, server: Server, wwDAL: WotWizardDAL) {
let b = await server.dal.getAbsoluteBlockByBlockstamp(blockstamp)
if (b) {
if (!(await wwHasBlock(wwDAL, b))) {
return b
}
}
return null
}
/**
* Tells wether given block is found in WW database or not.
* @param wwDAL
* @param b
*/
export async function wwHasBlock(wwDAL: WotWizardDAL, b: { number: number, hash: string}) {
const wwBlock = await wwDAL.blockDao.getAbsoluteBlock(b.number, b.hash)
return !!wwBlock
}
......@@ -5,21 +5,28 @@ import {addLegacyBlocks} from "./wotwizard.legacy.blocks"
import {addNewBlocks} from "./wotwizard.new.blocks"
import {deleteNonLegacy} from "./wotwizard.delete"
import {copyMemPool} from "./wotwizard.copy.mempool"
import {requiredBlocksAccumulator} from "./hooks/wotwizard.block.insert"
export async function dumpWotWizard(server: Server) {
// 1. Create dump structure if it does not exist
const wwDAL = await createExportStructure(WotWizardConstants.DB_NAME)
// Prepare accumulator for blocks with data refering blocks
const accumulator = requiredBlocksAccumulator(server, wwDAL)
// 2. Integrate legacy blocks (= non-forkable)
await addLegacyBlocks(server, wwDAL)
await addLegacyBlocks(server, wwDAL, accumulator)
// 3. Delete non-legacy data
await deleteNonLegacy(wwDAL)
// 4. Integrate new blocks (= forkable)
await addNewBlocks(server, wwDAL)
await addNewBlocks(server, wwDAL, accumulator)
// 5. Copy mempool
await copyMemPool(server, wwDAL)
await copyMemPool(server, wwDAL, accumulator)
// 6. Persist blocks referenced by a wot data (identities, certifications, memberships)
await accumulator.persist()
}
......@@ -3,8 +3,9 @@ import {Server} from "../../../../server"
import {CommonConstants} from "../../../lib/common-libs/constants"
import {DBBlock} from "../../../lib/db/DBBlock"
import {NewLogger} from "../../../lib/logger"
import {WWBlockAccumulator} from "./hooks/wotwizard.block.insert"
export async function addLegacyBlocks(server: Server, wwDAL: WotWizardDAL) {
export async function addLegacyBlocks(server: Server, wwDAL: WotWizardDAL, acc: WWBlockAccumulator) {
const logger = NewLogger()
......@@ -35,6 +36,7 @@ export async function addLegacyBlocks(server: Server, wwDAL: WotWizardDAL) {
logger.debug('Saving blocks...')
await wwDAL.blockDao.insertBatch(blocksSaved)
acc.accumulate(blocksSaved)
await Promise.all(blocksSaved)
......
......@@ -2,8 +2,9 @@ import {WotWizardDAL} from "./wotwizard.init.structure"
import {Server} from "../../../../server"
import {CommonConstants} from "../../../lib/common-libs/constants"
import {NewLogger} from "../../../lib/logger"
import {WWBlockAccumulator} from "./hooks/wotwizard.block.insert"
export async function addNewBlocks(server: Server, wwDAL: WotWizardDAL) {
export async function addNewBlocks(server: Server, wwDAL: WotWizardDAL, acc: WWBlockAccumulator) {
const logger = NewLogger()
......@@ -25,6 +26,7 @@ export async function addNewBlocks(server: Server, wwDAL: WotWizardDAL) {
const all = blocks.concat(forks).map(f => { (f as any).legacy = false; return f })
logger.debug('Saving %s pending blocks...', all.length)
blocksSaved.push(wwDAL.blockDao.insertBatch(all))
acc.accumulate(all)
}
await Promise.all(blocksSaved)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment