diff --git a/src/cesium-plus.ts b/src/cesium-plus.ts index 8203bc2dedcd5fd93fa5603567a89e7943932c7f..b0ae64bc2657cb8199dd0538eeae25a00e866486 100644 --- a/src/cesium-plus.ts +++ b/src/cesium-plus.ts @@ -26,7 +26,11 @@ async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> { // ========================= merge functions /// wrap cplus raw document in index request -export async function wrapCplusInIndexRequest(cplus: CplusProfile, cplusRaw: string, convertImg: boolean): Promise<CID> { +export async function wrapCplusInIndexRequest( + cplus: CplusProfile, + cplusRaw: string, + convertImg: boolean +): Promise<CID> { // process avatar separately if requested const avatar = cplus.avatar as Avatar if (convertImg && avatar != undefined && avatar._content != undefined) { @@ -55,8 +59,9 @@ export async function cplusIrToAMT(requests: Array<[string, CID]>, rootNodeCid: for (let i = 0; i < n / chunkSize; i++) { console.log(Date.now() + ' chunk number ' + i) const chunk = requests.slice(i * chunkSize, (i + 1) * chunkSize) - const tree = arrayToVinode(chunk) // partial tree for this chunk - rootNodeCid = await mergeInodesSyncCID(rootNodeCid, tree) - console.log('new root node ' + rootNodeCid.toString()) + const tree = arrayToVinode('', chunk) // partial tree for this chunk + const mergeResult = await mergeInodesSyncCID(rootNodeCid, tree) + rootNodeCid = mergeResult[1] + console.log(`new root node ${rootNodeCid} with ${mergeResult[0]} items`) } } diff --git a/src/consts.ts b/src/consts.ts index b655b4bb7dfab0290ed007f56ccc06932ce3e54a..c45fe7becf0cf545c74a0af6337b87ca74e9b635 100644 --- a/src/consts.ts +++ b/src/consts.ts @@ -11,7 +11,7 @@ export const BASE = 16 export const KEYSIZE = (64 * Math.log(2)) / Math.log(BASE) // empty root cid -export const EMPTY_NODE_CID = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') +export const EMPTY_NODE_CID = CID.parse('bafyreic3z6fetku5dpudahsne3dwbmwkddfnqxapshknel6a2i5xhiwige') // document kind of old cesium plus profile imported in the indexer export const CESIUM_PLUS_PROFILE_IMPORT = 'cplus_raw' diff --git a/src/indexer/handlers.ts b/src/indexer/handlers.ts index cbb1488f8a599b8a334e2cca98d8b5df2dac2384..10fa6b733afcd91c6ba7d2027674a4f55631798b 100644 --- a/src/indexer/handlers.ts +++ b/src/indexer/handlers.ts @@ -7,6 +7,7 @@ import { getDiff } from '../interface' import type { IndexRequest } from '../types' import { arrayToVinode, mergeInodesSyncCID, timestampToKey } from '../processor' import { DD_TAMT_OPT } from './ipns' +import { setTimeout } from 'timers/promises' // config @@ -86,9 +87,9 @@ export async function validMessageHandler(_cid: CID, dag: IndexRequest): Promise async function insertBatch(rootCID: CID, items: Array<[CID, IndexRequest]>): Promise<DiffData> { // convert it to a list of [key, cid] for batch insert (merge) const requests = items.map(([cid, ir]) => [timestampToKey(ir.time), cid]).sort() as Array<[string, CID]> - const tree = arrayToVinode(requests) + const tree = arrayToVinode('', requests) // insert them - const newCID = await mergeInodesSyncCID(rootCID, tree) + const [_ctx, newCID] = await mergeInodesSyncCID(rootCID, tree) const diffData: DiffData = { oldCID: rootCID, newCID, newItems: items } return diffData } @@ -185,6 +186,9 @@ export async function computeDiff(fromCID: CID, toCID: CID): Promise<void> { if (num > BATCH_SIZE) { num = 0 events.emit(evtype.triggerCollect) + // This is a hack to limit injestion of new data and let time to process all + // for 100 000 documents with a batch size of 1000 and 3 seconds, it is adding 5 minutes overall + await setTimeout(3000) // 3 sec } } events.emit(evtype.triggerCollect) diff --git a/src/interface.ts b/src/interface.ts index a6dd3cc94234ade33119c03d3a94e3e67206e636..a582fccc015614ff3a364911552fd349aa71aa15 100644 --- a/src/interface.ts +++ b/src/interface.ts @@ -2,6 +2,7 @@ import type { CID } from 'multiformats' import { kubo } from './kubo' import { emptyInode, type IndexInode, type IndexLeaf } from './types' import { BASE, KEYSIZE } from './consts' +import assert from 'assert' // interface to easily iterate over the AMT and get all CIDs inside leaves export async function* getAll(cid: CID): AsyncIterable<CID[]> { @@ -50,6 +51,8 @@ export async function* getDiff(cid1: CID, cid2: CID): AsyncIterable<CID[]> { // same as getDiff, but operates on index internal nodes only, not cids that can represent something else async function* getDiffInodes(inode1: IndexInode, inode2: IndexInode): AsyncIterable<CID[]> { + assert(inode1.ctx == inode2.ctx) + const ctx = inode1.ctx // iterate over nodes children for (let i = 0; i < BASE; i++) { // ======== simple case ======== @@ -89,7 +92,7 @@ async function* getDiffInodes(inode1: IndexInode, inode2: IndexInode): AsyncIter if (lik.length > rik.length && lik.startsWith(rik)) { // intermediate inode might have been added to the right // create virtual node then dig deeper in right - const lvnode = emptyInode() + const lvnode = emptyInode(ctx + rik) const b = parseInt(lik[rik.length], BASE) lvnode.children[b] = [lik.slice(rik.length), lic] const r = await kubo.dag.get(ric) @@ -99,7 +102,7 @@ async function* getDiffInodes(inode1: IndexInode, inode2: IndexInode): AsyncIter if (lik.length < rik.length && rik.startsWith(lik)) { // intermediate inode might have been added to the left // create virtual node then dig deeper in left - const rvnode = emptyInode() + const rvnode = emptyInode(ctx + lik) const b = parseInt(rik[lik.length], BASE) rvnode.children[b] = [rik.slice(lik.length), ric] const l = await kubo.dag.get(lic) @@ -190,7 +193,7 @@ export async function compareInodes( console.log('diff ' + nk) // intermediate inode might have been added to the right // create virtual node then dig deeper in right - const lvnode = emptyInode() + const lvnode = emptyInode(nk) const b = parseInt(lik[rik.length], BASE) lvnode.children[b] = [lik.slice(rik.length), lic] kubo.dag.get(ric).then((r) => { @@ -203,7 +206,7 @@ export async function compareInodes( console.log('diff ' + nk) // intermediate inode might have been added to the left // create virtual node then dig deeper in left - const rvnode = emptyInode() + const rvnode = emptyInode(nk) const b = parseInt(rik[lik.length], BASE) rvnode.children[b] = [rik, ric] kubo.dag.get(ric).then((l) => { diff --git a/src/processor.ts b/src/processor.ts index a9065cab9ad253d1b4d8d73f2f26bd6180a722c2..9deef90546541e1251e7ba0df3b8fbdd1e3b8eb9 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -5,6 +5,7 @@ import { emptyInode, emptyVinode, emptyLeaf } from './types' import type { IndexInode, IndexLeaf, IndexVinode, IndexRequest, IndexHist } from './types' import { DD_TAMT_HIST_OPT, DD_TAMT_OPT, ddKeys } from './indexer/ipns' import { uniqueby } from './utils' +import assert from 'assert' // ===================== utils ===================== @@ -45,160 +46,160 @@ export async function publishHistory(cid: CID): Promise<void> { // ===================== main API ===================== -// UNUSED -// example on how we would add cid to index -async function addToIndex(cid: CID, indexRequest: IndexRequest) { - // at this point we might want to add a queue to avoid recomputing the whole tree at each new entry - // but for the moment we will not group entries and do re-compute the whole tree each time - // - // see this presentation for more info about the merkle tree - // https://polkadot-blockchain-academy.github.io/pba-book/substrate/storage/slides.html#/ - - for await (const name of kubo.name.resolve(ddKeys.root, { nocache: true })) { - // get root CID from IPNS - const rootCID = CID.parse(name.slice(6)) - // insert request into it - const newRootCID = await insertKnownIndexRequest(rootCID, cid, indexRequest) - // pin new root CID recursively to be sure to have all data and to meet the "pinned only" reprovide strategy - kubo.pin.add(newRootCID, { recursive: true }) - // publish the new root CID to IPNS - await kubo.name.publish(newRootCID, DD_TAMT_OPT) - // also update history - publishHistory(newRootCID) - } -} - -/// simplest way to insert index request given its CID -// rootCid: root node of the AMT to add the index request in -// indexRequestCid: index request to add -// returns the new root cid -export async function insertIndexRequest(rootCid: CID, indexRequestCid: CID): Promise<CID> { - const [rootDag, indexDag] = await Promise.all([kubo.dag.get(rootCid), kubo.dag.get(indexRequestCid)]) - const key = timestampToKey(indexDag.value.timestamp) - return processInode(rootDag.value, key, insertRequest(indexRequestCid)) -} - -/// same as above but with known request -export async function insertKnownIndexRequest( - rootCid: CID, - indexRequestCid: CID, - indexRequest: IndexRequest -): Promise<CID> { - const key = timestampToKey(indexRequest.time) - const rootDag = await kubo.dag.get(rootCid) - return processInode(rootDag.value, key, insertRequest(indexRequestCid)) -} +// // UNUSED +// // example on how we would add cid to index +// async function addToIndex(cid: CID, indexRequest: IndexRequest) { +// // at this point we might want to add a queue to avoid recomputing the whole tree at each new entry +// // but for the moment we will not group entries and do re-compute the whole tree each time +// // +// // see this presentation for more info about the merkle tree +// // https://polkadot-blockchain-academy.github.io/pba-book/substrate/storage/slides.html#/ + +// for await (const name of kubo.name.resolve(ddKeys.root, { nocache: true })) { +// // get root CID from IPNS +// const rootCID = CID.parse(name.slice(6)) +// // insert request into it +// const newRootCID = await insertKnownIndexRequest(rootCID, cid, indexRequest) +// // pin new root CID recursively to be sure to have all data and to meet the "pinned only" reprovide strategy +// kubo.pin.add(newRootCID, { recursive: true }) +// // publish the new root CID to IPNS +// await kubo.name.publish(newRootCID, DD_TAMT_OPT) +// // also update history +// publishHistory(newRootCID) +// } +// } + +// /// simplest way to insert index request given its CID +// // rootCid: root node of the AMT to add the index request in +// // indexRequestCid: index request to add +// // returns the new root cid +// export async function insertIndexRequest(rootCid: CID, indexRequestCid: CID): Promise<CID> { +// const [rootDag, indexDag] = await Promise.all([kubo.dag.get(rootCid), kubo.dag.get(indexRequestCid)]) +// const key = timestampToKey(indexDag.value.timestamp) +// return processInode(rootDag.value, key, insertRequest(indexRequestCid)) +// } + +// /// same as above but with known request +// export async function insertKnownIndexRequest( +// rootCid: CID, +// indexRequestCid: CID, +// indexRequest: IndexRequest +// ): Promise<CID> { +// const key = timestampToKey(indexRequest.time) +// const rootDag = await kubo.dag.get(rootCid) +// return processInode(rootDag.value, key, insertRequest(indexRequestCid)) +// } // ===================== process function utils ===================== -/// function used to process node -// /!\ this is suboptimal to return a CID, the node (or something else) should be returned instead -// this would allow to write more function like ones that delete content -// TODO change this -export interface ProcessFunction { - // input is the input data at the place where we want to process - (input: null | IndexLeaf | IndexInode): Promise<CID> -} - -// returns a process function suitable for processInode that simply inserts request in a leaf -// WIP does not handle inserting to a node yet -function insertRequest(indexRequestCid: CID): ProcessFunction { - return (input: null | IndexLeaf | IndexInode) => { - if (input == null) { - // in this case we want to create a new leaf - return processLeaf(emptyLeaf(), indexRequestCid) - } else { - // in this case we want to insert indexRequestCid in existing leaf - return processLeaf(input as IndexLeaf, indexRequestCid) - } - } -} +// /// function used to process node +// // /!\ this is suboptimal to return a CID, the node (or something else) should be returned instead +// // this would allow to write more function like ones that delete content +// // TODO change this +// export interface ProcessFunction { +// // input is the input data at the place where we want to process +// (input: null | IndexLeaf | IndexInode): Promise<CID> +// } + +// // returns a process function suitable for processInode that simply inserts request in a leaf +// // WIP does not handle inserting to a node yet +// function insertRequest(indexRequestCid: CID): ProcessFunction { +// return (input: null | IndexLeaf | IndexInode) => { +// if (input == null) { +// // in this case we want to create a new leaf +// return processLeaf(emptyLeaf(), indexRequestCid) +// } else { +// // in this case we want to insert indexRequestCid in existing leaf +// return processLeaf(input as IndexLeaf, indexRequestCid) +// } +// } +// } // ===================== tree edit algorithm ===================== -/// process internal node -/// insert the CID returned by `func` at the position given by `key` -/// returns the CID of the resulting modified parent node -export async function processInode(node: IndexInode, key: string, func: ProcessFunction): Promise<CID> { - // console.log("key: " + key) - - // bucket - const b = bucket(key) - // if bucket is available, place leaf in it - if (node.children[b] === null) { - node.children[b] = [key, await func(null)] - } else { - // must share bucket with a node - const [k1, cid1] = node.children[b] as [string, CID] - const comp = compareKey(k1, key) - - switch (comp.type) { - // enter - case resType.Child: - const e = comp as inRes - // console.log('enter "' + e.common + '" key "' + e.nk + '"') - const enterNode: IndexInode | IndexLeaf = (await kubo.dag.get(cid1)).value - const enterNodeAsLeaf = enterNode as IndexLeaf - const enterNodeAsInode = enterNode as IndexInode - if (enterNodeAsLeaf.leaf) { - // we can not enter a leaf at this stage - throw Error('should not enter a leaf, this should have been an end') - } - node.children[b] = [e.common, await processInode(enterNodeAsInode, e.nk, func)] - break - - // an additionnal hierarchy level is required - case resType.Parent: - const p = comp as inRes - // console.log('enter "' + p.common + '" key "' + p.nk + '"') - const newiNode = emptyInode() - newiNode.children[p.b] = [p.nk, cid1] - const newiNodeCid = await func(newiNode) - node.children[b] = [p.common, newiNodeCid] - break - - // reached dest - case resType.Same: - // console.log('end on ' + key) - const other = (await kubo.dag.get(cid1)).value - node.children[b] = [k1, await func(other)] - break - - // diff found - case resType.Diff: - const c = comp as diffRes - // console.log('diff on "' + c.common + '" keys "' + c.nk1 + '" / "' + c.nk2 + '"') - const newNode = emptyInode() - newNode.children[c.b1] = [c.nk1, cid1] - newNode.children[c.b2] = [c.nk2, await func(null)] - const newNodeCid = (await kubo.dag.put(newNode)) as CID - node.children[b] = [c.common, newNodeCid] - break - } - } - // now that we have the new node save it and return - const newCid = (await kubo.dag.put(node)) as CID - console.log('new inode: ' + newCid.toString() + ' at ' + key) - return newCid -} - -/// process leaf in the tree -/// children have to be unique and ordered -export async function processLeaf(node: IndexLeaf, val: CID): Promise<CID> { - if (!node.leaf.some((c) => c.toString() == val.toString())) { - // only insert if not already there (avoid duplicate) - node.leaf.push(val) - // ensure leaf order is predictible - node.leaf.sort((a, b) => (a.toString() < b.toString() ? -1 : 1)) - } - const newCid = (await kubo.dag.put(node)) as CID - return newCid -} +// /// process internal node +// /// insert the CID returned by `func` at the position given by `key` +// /// returns the CID of the resulting modified parent node +// export async function processInode(node: IndexInode, key: string, func: ProcessFunction): Promise<CID> { +// // console.log("key: " + key) + +// // bucket +// const b = bucket(key) +// // if bucket is available, place leaf in it +// if (node.children[b] === null) { +// node.children[b] = [key, await func(null)] +// } else { +// // must share bucket with a node +// const [k1, cid1] = node.children[b] as [string, CID] +// const comp = compareKey(k1, key) + +// switch (comp.type) { +// // enter +// case resType.Child: +// const e = comp as inRes +// // console.log('enter "' + e.common + '" key "' + e.nk + '"') +// const enterNode: IndexInode | IndexLeaf = (await kubo.dag.get(cid1)).value +// const enterNodeAsLeaf = enterNode as IndexLeaf +// const enterNodeAsInode = enterNode as IndexInode +// if (enterNodeAsLeaf.leaf) { +// // we can not enter a leaf at this stage +// throw Error('should not enter a leaf, this should have been an end') +// } +// node.children[b] = [e.common, await processInode(enterNodeAsInode, e.nk, func)] +// break + +// // an additionnal hierarchy level is required +// case resType.Parent: +// const p = comp as inRes +// // console.log('enter "' + p.common + '" key "' + p.nk + '"') +// const newiNode = emptyInode() +// newiNode.children[p.b] = [p.nk, cid1] +// const newiNodeCid = await func(newiNode) +// node.children[b] = [p.common, newiNodeCid] +// break + +// // reached dest +// case resType.Same: +// // console.log('end on ' + key) +// const other = (await kubo.dag.get(cid1)).value +// node.children[b] = [k1, await func(other)] +// break + +// // diff found +// case resType.Diff: +// const c = comp as diffRes +// // console.log('diff on "' + c.common + '" keys "' + c.nk1 + '" / "' + c.nk2 + '"') +// const newNode = emptyInode() +// newNode.children[c.b1] = [c.nk1, cid1] +// newNode.children[c.b2] = [c.nk2, await func(null)] +// const newNodeCid = (await kubo.dag.put(newNode)) as CID +// node.children[b] = [c.common, newNodeCid] +// break +// } +// } +// // now that we have the new node save it and return +// const newCid = (await kubo.dag.put(node)) as CID +// console.log('new inode: ' + newCid.toString() + ' at ' + key) +// return newCid +// } + +// /// process leaf in the tree +// /// children have to be unique and ordered +// export async function processLeaf(node: IndexLeaf, val: CID): Promise<CID> { +// if (!node.leaf.some((c) => c.toString() == val.toString())) { +// // only insert if not already there (avoid duplicate) +// node.leaf.push(val) +// // ensure leaf order is predictible +// node.leaf.sort((a, b) => (a.toString() < b.toString() ? -1 : 1)) +// } +// const newCid = (await kubo.dag.put(node)) as CID +// return newCid +// } // ===================== merge algorithm ===================== /// wrapper for below function when merging a node which has a CID -export async function mergeInodesSyncCID(nodeACID: CID, nodeB: IndexVinode | IndexLeaf): Promise<CID> { +export async function mergeInodesSyncCID(nodeACID: CID, nodeB: IndexVinode | IndexLeaf): Promise<[number, CID]> { // fail with small timeout since this data is supposed to be pinned locally // console.log('fetching ' + nodeACID) const nodeA = (await kubo.dag.get(nodeACID, { timeout: 1000 })).value // FIXME decrease this timeout without bug @@ -215,23 +216,33 @@ export async function mergeInodesSyncCID(nodeACID: CID, nodeB: IndexVinode | Ind /// merge internal nodes, synchronous implementation // useful to merge trees -export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: IndexVinode | IndexLeaf): Promise<CID> { +// returns the count of elements and a CID +export async function mergeInodesSync( + nodeA: IndexInode | IndexLeaf, + nodeB: IndexVinode | IndexLeaf +): Promise<[number, CID]> { const isAleaf = (nodeA as IndexLeaf).leaf != undefined const isBleaf = (nodeB as IndexLeaf).leaf != undefined + const [leafA, leafB] = [nodeA as IndexLeaf, nodeB as IndexLeaf] if (isAleaf && isBleaf) { + // should only merge leaves with same key + assert(leafA.key == leafB.key) // these are not internal nodes, but leaves, and we should merge them // problem: Set will not work on CIDs because they are objects and differ even if they have the same data // const cidSet = new Set([...(nodeA as unknown as IndexLeaf).leaf, ...(nodeB as IndexLeaf).leaf]) // const cidListUniqueSorted = Array.from(cidSet).sort() const cidList = [...(nodeA as IndexLeaf).leaf, ...(nodeB as IndexLeaf).leaf] - const cidListUniqueSorted = uniqueby(cidList, (k) => k.toString()).sort() + const cidListUniqueSorted = uniqueby(cidList, (k) => k.toString()).sort((a, b) => + a.toString() < b.toString() ? -1 : 1 + ) const newLeaf: IndexLeaf = { - leaf: cidListUniqueSorted + leaf: cidListUniqueSorted, + key: leafA.key } return kubo.dag.put(newLeaf).then((cid) => { // WIP pin des not work well // kubo.pin.add(cid).catch((_e) => console.log(`📌📌 could not pin newly created leaf ${cid}`)) - return cid + return [newLeaf.leaf.length, cid] }) } else if (isAleaf || isBleaf) { throw Error('should not be possible, are keys same size?') @@ -241,13 +252,16 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde const nodb = nodeB as IndexVinode // process each bucket sequencially + let itemCount = 0 for (let b = 0; b < BASE; b++) { const nAcb = noda.children[b] const nBcb = nodb.children[b] if (nAcb == null && nBcb != null) { // we can concretize nodeB directly const [kB, childB] = nBcb - noda.children[b] = [kB, await concretizeCid(childB)] + const [count, childBCID] = await concretizeCid(childB) + itemCount += count + noda.children[b] = [kB, childBCID] } else if (nAcb != null && nBcb != null) { // both are non null const [kA, childA] = nAcb @@ -257,34 +271,42 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde switch (comp.type) { // when keys are the same, this is a "enter", recurse case resType.Same: - noda.children[b] = [kA, await mergeInodesSyncCID(childA, childB)] + const [countS, nodeCID] = await mergeInodesSyncCID(childA, childB) + itemCount += countS + noda.children[b] = [kA, nodeCID] break // B is child of A (inside), we add a level of hierarchy case resType.Child: const e = comp as inRes // since B is child (longer key), A can only be an inode, not a leaf - const newcNode = emptyVinode() + const newcNode = emptyVinode(e.common) newcNode.children[e.b] = [e.nk, childB] - const mergec = await mergeInodesSyncCID(childA, newcNode) + const [countC, mergec] = await mergeInodesSyncCID(childA, newcNode) + itemCount += countC noda.children[b] = [e.common, mergec] break // B is parent of A, an additional hierachy level is required case resType.Parent: const p = comp as inRes - const newiNode = emptyInode() + const newiNode = emptyInode(p.common) newiNode.children[p.b] = [p.nk, childA] - const mergep = await mergeInodesSync(newiNode, childB) + const [countP, mergep] = await mergeInodesSync(newiNode, childB) + itemCount += countP noda.children[b] = [p.common, mergep] break // there is a diff case resType.Diff: const c = comp as diffRes - const newNode = emptyInode() + const newNode = emptyInode(c.common) newNode.children[c.b1] = [c.nk1, childA] - newNode.children[c.b2] = [c.nk2, await concretizeCid(childB)] + // here, this is suboptimal since we are forced to fetch childA to take into account its item count + itemCount += await getItemCount(childA) + const [countD, childBCID] = await concretizeCid(childB) + itemCount += countD + newNode.children[c.b2] = [c.nk2, childBCID] const newNodeCid = await kubo.dag.put(newNode).then((cid) => { // WIP pinning does not work well // kubo.pin.add(cid).catch((_e) => console.log(`📌📌 could not pin newly created node ${cid}`)) @@ -295,24 +317,29 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde } } else { // nBcb is null, keep nAcb bucket untouched (might be null or not) + if (nAcb == null) continue + const [_kA, childA] = nAcb + itemCount += await getItemCount(childA) // suboptimal too continue } } // now that we have the new node, we can upload it and return its cid - return kubo.dag.put(noda).then((cid) => { + noda.count = itemCount + const nodaCID = kubo.dag.put(noda).then((cid) => { // WIP pinning does not work well // kubo.pin.add(cid).catch((e) => console.log(`📌📌 could not pin newly merged node ${cid}`)) return cid }) + return Promise.all([itemCount, nodaCID]) } // ===================== virtual nodes management ===================== /// convert array of key/value pairs to virtual inode (tree) // opti: use a dichotomy search instead of iterating over all elements -export function arrayToVinode(array: Array<[string, CID]>): IndexVinode { +export function arrayToVinode(ctx: string, array: Array<[string, CID]>): IndexVinode { // initialize empty virtual node - const node = emptyVinode() + const node = emptyVinode(ctx) // process each bucket in order since elements are ordered for (let b = 0; b < BASE; b++) { // initialize list of elements that should go in this bucket @@ -339,44 +366,72 @@ export function arrayToVinode(array: Array<[string, CID]>): IndexVinode { const k1 = subArray.at(0)![0] const k2 = subArray.at(-1)![0] if (k1 == k2) { - node.children[b] = [k1, arrayToLeaf(subArray.map(([_k, v]) => v))] + // if they are the same, all items have the same key, this is a leaf + node.children[b] = [ + k1, + arrayToLeaf( + ctx + k1, + subArray.map(([_k, v]) => v) + ) + ] continue } // keys have the same size, so if they are not equal and not "enter", they are "diff" const c = compareKey(k1, k2) as diffRes const minimalSubArray: Array<[string, CID]> = subArray.map(([k, v]) => [k.slice(c.common.length), v]) - node.children[b] = [c.common, arrayToVinode(minimalSubArray)] + node.children[b] = [c.common, arrayToVinode(ctx + c.common, minimalSubArray)] } } return node } /// transform array of cid to leaf object -export function arrayToLeaf(array: CID[]): IndexLeaf { - return { leaf: array.sort((a, b) => (a.toString() < b.toString() ? -1 : 1)) } +export function arrayToLeaf(key: string, array: CID[]): IndexLeaf { + return { key, leaf: array.sort((a, b) => (a.toString() < b.toString() ? -1 : 1)) } } /// concretize virtual node to CID -export async function concretizeCid(node: IndexVinode | IndexLeaf): Promise<CID> { - if ((node as unknown as IndexLeaf).leaf) { - return kubo.dag.put(node) as Promise<CID> +export async function concretizeCid(node: IndexVinode | IndexLeaf): Promise<[number, CID]> { + const nodeL = node as IndexLeaf + if (nodeL.leaf) { + return Promise.all([nodeL.leaf.length, kubo.dag.put(node) as Promise<CID>]) } - const newNode = await concretize(node as unknown as IndexVinode) - return kubo.dag.put(newNode) as Promise<CID> + const newNode = await concretize(node as IndexVinode) + return Promise.all([newNode.count, kubo.dag.put(newNode) as Promise<CID>]) } /// concretize virtual node async function concretize(node: IndexVinode): Promise<IndexInode> { + let itemCount = 0 // count total items of nodes const childrenPromise: Array<null | Promise<[string, CID]>> = node.children.map((c) => { if (c == null) { return null } const [k, v] = c - return concretizeCid(v).then((cid) => [k, cid as CID]) + return concretizeCid(v).then(([count, cid]) => { + itemCount += count // increment item count + return [k, cid as CID] + }) }) return { - children: await Promise.all(childrenPromise) + children: await Promise.all(childrenPromise), + count: itemCount, + ctx: node.ctx + } +} + +/// get item count contained in a given CID +async function getItemCount(cid: CID): Promise<number> { + const node = (await kubo.dag.get(cid)).value + const nodeL = node as IndexLeaf + const nodeN = node as IndexInode + if (nodeL.leaf) { + return nodeL.leaf.length + } + if (nodeN.children) { + return nodeN.count } + throw Error('can not get item count of this object: ' + cid) } // ===================== key comparison ===================== diff --git a/src/scripts/cesium-plus-import.ts b/src/scripts/cesium-plus-import.ts index 642a21f44405f846deb433c392acd5ca2f38986b..9584481b799518c6132f1e1aaa10c2ea21c26cb8 100644 --- a/src/scripts/cesium-plus-import.ts +++ b/src/scripts/cesium-plus-import.ts @@ -4,6 +4,7 @@ import { createInterface } from 'readline' import { appendFile, createReadStream } from 'fs' import { timestampToKey } from '../processor' import { readFile } from 'fs/promises' +import { EMPTY_NODE_CID } from '../consts' // fetch raw profile // fetchRawCplus('38QzVPhRLbEiqJtvCmRY6A6SraheNA6fJbomFX75b2qb').then(console.log) @@ -116,7 +117,7 @@ async function importAllCplusToAMT() { }) ) .then((l) => l.sort()) - const rootNodeCid = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid + const rootNodeCid = EMPTY_NODE_CID // empty root cid await cplusIrToAMT(requests, rootNodeCid) } @@ -130,5 +131,5 @@ async function importAllCplusToAMT() { // 3 minutes // import by batch and logs successive cids -// importAllCplusToAMT() +importAllCplusToAMT() // → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey \ No newline at end of file diff --git a/src/scripts/diff.ts b/src/scripts/diff.ts index d605ba91b198368b6118a80d87489a9b3a0b07fe..ebdfe54ee18111d75d3d68af69095b1bc2797485 100644 --- a/src/scripts/diff.ts +++ b/src/scripts/diff.ts @@ -1,6 +1,9 @@ import { EMPTY_NODE_CID } from '../consts' import { getDiff } from '../interface' import { CID } from 'multiformats' +import { setTimeout } from 'timers/promises' + +console.log('start') const fromCID = EMPTY_NODE_CID // const fromCID = CID.parse('bafyreic6qy2k5w6324uayfzoybmzypdv57zk3zxezaiws4h553jjogw6o4') @@ -18,7 +21,9 @@ async function doit() { if (num % BATCH_SIZE == 0) { console.log(num) } - // console.log(irCID.toString()) + await setTimeout(1000) // wait 1 sec + + console.log(irCID.toString()) } } } diff --git a/src/scripts/emptyRootCID.ts b/src/scripts/emptyRootCID.ts new file mode 100644 index 0000000000000000000000000000000000000000..dfb3927456a298884a93d14e459dfe12b3e760c1 --- /dev/null +++ b/src/scripts/emptyRootCID.ts @@ -0,0 +1,12 @@ +import { EMPTY_NODE_CID } from '../consts' +import { kubo } from '../kubo' +import { concretizeCid } from '../processor' +import { emptyInode, emptyVinode } from '../types' + +const node = emptyInode('') +kubo.dag.put(node).then((cid) => console.log('node cid', cid)) + +const vnode = emptyVinode('') +concretizeCid(vnode).then(([_, vcid]) => console.log('vnode cid', vcid)) + +console.log('empty node cid', EMPTY_NODE_CID) diff --git a/src/types.ts b/src/types.ts index c18fb025cb9f07fde9d6cf04c98127eb22dfbb61..0359932f9e6c30c1168ed894b3b947b66e688f71 100644 --- a/src/types.ts +++ b/src/types.ts @@ -33,17 +33,23 @@ export interface IndexRequest { // internal node export interface IndexInode { - // children + /// children // the length of the children array is equal to the BASE children: (null | [string, CID])[] // string is the key // we use a string here for demo since javascript does not support easy u32 bitwise operation // but in real world, we will use a bitvector, likely in base16, groups of 4 bits + /// context about the key prefix of the node + ctx: string + /// count of elements below this node + count: number } // default constructor -export function emptyInode(): IndexInode { +export function emptyInode(ctx: string): IndexInode { return { - children: new Array(BASE).fill(null) + children: new Array(BASE).fill(null), + ctx, + count: 0 } } @@ -53,11 +59,13 @@ export function emptyInode(): IndexInode { export interface IndexVinode { // same as IndexInode children: (null | [string, IndexVinode | IndexLeaf])[] + ctx: string } // default constructor -export function emptyVinode(): IndexVinode { +export function emptyVinode(ctx: string): IndexVinode { return { - children: new Array(BASE).fill(null) + children: new Array(BASE).fill(null), + ctx, } } @@ -65,11 +73,14 @@ export function emptyVinode(): IndexVinode { export interface IndexLeaf { // array of cid, expected to be quite small, without duplicates, and sorted leaf: CID[] + // key of this leaf + key: string } -export function emptyLeaf(): IndexLeaf { +export function emptyLeaf(key: string): IndexLeaf { return { - leaf: [] + leaf: [], + key } }