Commit e8ee7c44 authored by Cédric Moreau's avatar Cédric Moreau

[enh] dump-ww: better save all blocks, simpler

parent dea2bf46
Pipeline #5004 failed with stages
in 8 minutes and 18 seconds
import {Server} from "../../../../../server"
import {WotWizardDAL} from "../wotwizard.init.structure"
import {DBBlock} from "../../../../lib/db/DBBlock"
import {IdentityDTO} from "../../../../lib/dto/IdentityDTO"
import {CertificationDTO} from "../../../../lib/dto/CertificationDTO"
import {MembershipDTO} from "../../../../lib/dto/MembershipDTO"
import {BlockDTO} from "../../../../lib/dto/BlockDTO"
import {wwHasBlock} from "../wotwizard.copy.mempool"
import {filterAsync} from "../../../../lib/common-libs/filter-async"
import {Underscore} from "../../../../lib/common-libs/underscore"
import {NewLogger} from "../../../../lib/logger"
export interface WWBlockAccumulator {
accumulate(blocks: DBBlock[]): void
persist(): Promise<void>
}
export function requiredBlocksAccumulator(server: Server, wwDAL: WotWizardDAL): WWBlockAccumulator {
const logger = NewLogger()
const blockstamps: { [k: string]: boolean } = {}
const blockNumbers: { [k: number]: boolean } = {}
return {
accumulate(blocks: DBBlock[]) {
blocks.forEach(b => {
b.identities.forEach(i => blockstamps[IdentityDTO.fromInline(i).buid] = true)
b.certifications.forEach(i => blockNumbers[CertificationDTO.fromInline(i).block_number] = true)
b.joiners
.concat(b.actives)
.concat(b.leavers)
.forEach(i => blockstamps[MembershipDTO.fromInline(i).blockstamp] = true)
})
},
/**
* Returns the blocks that are not in WW but that requires to be because of inserted blocks containing
* interesting blockstamps (from identifies, certifications, memberships)
*/
async persist() {
const chunkLen = 250
const numbers = Object.keys(blockNumbers).map(n => parseInt(n))
const blocksForCertifications: (DBBlock|null)[] = []
for (let i = 0; i < numbers.length; i += chunkLen) {
const chunk = numbers.slice(i, i + chunkLen)
logger.debug('Chunk %s-%s', chunk[0], chunk[chunk.length - 1])
;(await Promise.all(chunk.map(n => server.dal.getBlock(n))))
.forEach(b => blocksForCertifications.push(b))
}
const blocksForBlockstamps: (DBBlock|null)[] = await Promise.all(
Object
.keys(blockstamps)
.map(b => server.dal.getAbsoluteBlockByBlockstamp(b))
)
const reducedBlocks: { [k: string]: DBBlock } = blocksForCertifications
.concat(blocksForBlockstamps)
.reduce((acc, b) => {
if (b) {
acc[BlockDTO.blockstamp(b)] = b
}
return acc
}, {} as { [k: string]: DBBlock })
const blocksToInsert = await filterAsync(Object.values(reducedBlocks), (b: DBBlock) => wwHasBlock(wwDAL, b).then(b => !b))
const blocksSorted = Underscore.sortBy(blocksToInsert, b => b.number)
await wwDAL.blockDao.insertBatch(blocksSorted.map(f => { (f as any).legacy = true; return f }))
}
}
}
\ No newline at end of file
......@@ -3,9 +3,8 @@ import {Server} from "../../../../server"
import {DBBlock} from "../../../lib/db/DBBlock"
import {Underscore} from "../../../lib/common-libs/underscore"
import {NewLogger} from "../../../lib/logger"
import {WWBlockAccumulator} from "./hooks/wotwizard.block.insert"
export async function copyMemPool(server: Server, wwDAL: WotWizardDAL, acc: WWBlockAccumulator) {
export async function copyMemPool(server: Server, wwDAL: WotWizardDAL) {
const logger = NewLogger()
......@@ -17,7 +16,6 @@ export async function copyMemPool(server: Server, wwDAL: WotWizardDAL, acc: WWBl
const toPersist: DBBlock[] = Underscore.uniq(blocks.filter(b => b) as DBBlock[], false, b => [b.number, b.hash].join('-'))
logger.debug('Persisting %s blocks for identities...', toPersist.length)
acc.accumulate(toPersist)
await wwDAL.blockDao.insertBatch(toPersist.map(b => { (b as any).legacy = true; return b }))
await wwDAL.idtyDao.insertBatch(identities)
await wwDAL.certDao.insertBatch(await server.dal.certDAL.sqlListAll())
......
......@@ -5,28 +5,21 @@ import {addLegacyBlocks} from "./wotwizard.legacy.blocks"
import {addNewBlocks} from "./wotwizard.new.blocks"
import {deleteNonLegacy} from "./wotwizard.delete"
import {copyMemPool} from "./wotwizard.copy.mempool"
import {requiredBlocksAccumulator} from "./hooks/wotwizard.block.insert"
export async function dumpWotWizard(server: Server) {
// 1. Create dump structure if it does not exist
const wwDAL = await createExportStructure(WotWizardConstants.DB_NAME)
// Prepare accumulator for blocks with data refering blocks
const accumulator = requiredBlocksAccumulator(server, wwDAL)
// 2. Integrate legacy blocks (= non-forkable)
await addLegacyBlocks(server, wwDAL, accumulator)
await addLegacyBlocks(server, wwDAL)
// 3. Delete non-legacy data
await deleteNonLegacy(wwDAL)
// 4. Integrate new blocks (= forkable)
await addNewBlocks(server, wwDAL, accumulator)
await addNewBlocks(server, wwDAL)
// 5. Copy mempool
await copyMemPool(server, wwDAL, accumulator)
// 6. Persist blocks referenced by a wot data (identities, certifications, memberships)
await accumulator.persist()
await copyMemPool(server, wwDAL)
}
......@@ -3,9 +3,8 @@ import {Server} from "../../../../server"
import {CommonConstants} from "../../../lib/common-libs/constants"
import {DBBlock} from "../../../lib/db/DBBlock"
import {NewLogger} from "../../../lib/logger"
import {WWBlockAccumulator} from "./hooks/wotwizard.block.insert"
export async function addLegacyBlocks(server: Server, wwDAL: WotWizardDAL, acc: WWBlockAccumulator) {
export async function addLegacyBlocks(server: Server, wwDAL: WotWizardDAL) {
const logger = NewLogger()
......@@ -14,29 +13,23 @@ export async function addLegacyBlocks(server: Server, wwDAL: WotWizardDAL, acc:
const start = currentWW && currentWW.number + 1 || 0
const end = current && Math.max(-1, current.number - 100) || -1
const blocksSaved: DBBlock[] = []
let blocksSaved: DBBlock[] = []
logger.debug('Reading blocks...')
// We loop taking care of archives structure
for (let i = start; i <= end; i += CommonConstants.ARCHIVES_BLOCKS_CHUNK) {
const blocks = await server.dal.getBlocksBetween(i, Math.min(end, i + CommonConstants.ARCHIVES_BLOCKS_CHUNK) - 1)
const filtered = blocks.filter(b => b.joiners.length
|| b.actives.length
|| b.leavers.length
|| b.revoked.length
|| b.excluded.length
|| b.certifications.length
)
if (filtered.length) {
const legacies = filtered.map(f => { (f as any).legacy = true; return f })
legacies.forEach(l => blocksSaved.push(l))
// blocksSaved.push(wwDAL.blockDao.insertBatch(legacies))
const legacies = blocks.map(f => { (f as any).legacy = true; return f })
legacies.forEach(l => blocksSaved.push(l))
if (i % 25000 === 0) {
logger.debug('Saving 25 blocks... (%s yet stored)', i)
await wwDAL.blockDao.insertBatch(blocksSaved)
blocksSaved = []
}
}
logger.debug('Saving blocks...')
await wwDAL.blockDao.insertBatch(blocksSaved)
acc.accumulate(blocksSaved)
await Promise.all(blocksSaved)
......
......@@ -2,9 +2,8 @@ import {WotWizardDAL} from "./wotwizard.init.structure"
import {Server} from "../../../../server"
import {CommonConstants} from "../../../lib/common-libs/constants"
import {NewLogger} from "../../../lib/logger"
import {WWBlockAccumulator} from "./hooks/wotwizard.block.insert"
export async function addNewBlocks(server: Server, wwDAL: WotWizardDAL, acc: WWBlockAccumulator) {
export async function addNewBlocks(server: Server, wwDAL: WotWizardDAL) {
const logger = NewLogger()
......@@ -26,7 +25,6 @@ export async function addNewBlocks(server: Server, wwDAL: WotWizardDAL, acc: WWB
const all = blocks.concat(forks).map(f => { (f as any).legacy = false; return f })
logger.debug('Saving %s pending blocks...', all.length)
blocksSaved.push(wwDAL.blockDao.insertBatch(all))
acc.accumulate(all)
}
await Promise.all(blocksSaved)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment