diff --git a/src/cesium-plus.ts b/src/cesium-plus.ts index b428229fb7789e8d9b3b17e685fc4cb6d28f6d8c..a171c52a43087df7bbdae248bc3cbeea77475da7 100644 --- a/src/cesium-plus.ts +++ b/src/cesium-plus.ts @@ -1,7 +1,7 @@ import { CID } from 'multiformats' import { kubo } from './kubo' import { Buffer } from 'buffer' -import { timestampToKey, mergeInodesSync, arrayToVinode } from './processor' +import { timestampToKey, arrayToVinode, mergeInodesSyncCID } from './processor' import { type IndexRequest } from './types' import { CESIUM_PLUS_PROFILE_IMPORT } from './consts' @@ -112,9 +112,8 @@ export async function cplusIndexRequestsToAMT(cplusrootCID: CID, rootNodeCid: CI for (let i = 0; i < n / chunkSize; i++) { console.log(Date.now() + ' chunk number ' + i) const chunk = requests.slice(i * chunkSize, (i + 1) * chunkSize) - const rootNode = (await kubo.dag.get(rootNodeCid)).value const tree = arrayToVinode(chunk) // partial tree for this chunk - rootNodeCid = await mergeInodesSync(rootNode, tree) + rootNodeCid = await mergeInodesSyncCID(rootNodeCid, tree) console.log('new root node ' + rootNodeCid.toString()) } } diff --git a/src/indexer/start.ts b/src/indexer/start.ts index 7addd5f20d11c80e4d1aec8eb9cb363bceac3d13..c034979c166d23a0c937f8fbe5d80936c3c04935 100644 --- a/src/indexer/start.ts +++ b/src/indexer/start.ts @@ -1,5 +1,5 @@ import { TOPIC } from '../consts' -import { timestampToKey, mergeInodesSync, arrayToVinode, publishHistory } from '../processor' +import { timestampToKey, arrayToVinode, publishHistory, mergeInodesSyncCID } from '../processor' import { getPubSubHandler } from '../collector' import { kubo } from '../kubo' import type { IndexRequest } from '../types' @@ -12,8 +12,13 @@ import { getRootCIDfromArgs } from './utils' // pubsub message handler const handleMessage = getPubSubHandler((cid, dag) => { + // add index request to the process list processQueue.push([cid, dag]) + // ask to process the request events.emit(evtype.trigger) + // try pin the new data so that it is available to the indexer + // there might be a UND_ERR_HEADERS_TIMEOUT if data could not be retreived in time + kubo.pin.add(cid, { recursive: true }).catch(() => console.error('could not pin ' + cid)) }) // queue event handler @@ -34,28 +39,22 @@ function handleBatch() { const tree = arrayToVinode(requests) // insert them - kubo.dag - .get(rootCID) - .then((v) => mergeInodesSync(v.value, tree)) - .then((cid) => { - // ➡️ update CID - const oldCID = rootCID - rootCID = cid - console.log(`👉 new root CID ${cid}`) - // ➡️ pin recursively (for "pinned only" reprovide strategy) - // FIXME this causes a UND_ERR_HEADERS_TIMEOUT - // kubo.pin.add(rootCID, { recursive: true }) - // ➡️ publish new CID - kubo.name.publish(rootCID, { ttl: '1s' }) - // ➡️ publish history TODO limit rate at lower frequency (e.g. 1/minute) - publishHistory(rootCID) - isProcessingQueue = false - // trigger an other event in case new requests arrived meanwhile - events.emit(evtype.trigger) - // emit event to be processed by indexer - const diffData: DiffData = { oldCID: oldCID, newCID: rootCID, newItems: items } - events.emit(evtype.indexDiff, diffData) - }) + mergeInodesSyncCID(rootCID, tree).then((cid) => { + // ➡️ update CID + const oldCID = rootCID + rootCID = cid + console.log(`👉 new root CID ${cid}`) + // ➡️ publish new CID + kubo.name.publish(rootCID, { ttl: '1s' }) + // ➡️ publish history TODO limit rate at lower frequency (e.g. 1/minute) + publishHistory(rootCID) + isProcessingQueue = false + // trigger an other event in case new requests arrived meanwhile + events.emit(evtype.trigger) + // emit event to be processed by indexer + const diffData: DiffData = { oldCID: oldCID, newCID: rootCID, newItems: items } + events.emit(evtype.indexDiff, diffData) + }) } // ===================== START =========================== diff --git a/src/processor.ts b/src/processor.ts index 535da3bcf7b1ef2a96a2fb9663be053acfbcf17b..d6526b1e47e86341c613248105a770db7eb74af4 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -111,29 +111,6 @@ function insertRequest(indexRequestCid: CID): ProcessFunction { } } -// UNUSED -/// returns a process function working on virtual inodes -function mergeVinodeRequest(node: IndexVinode | IndexLeaf): ProcessFunction { - return (input: null | IndexLeaf | IndexInode) => { - if (input == null) { - return concretizeCid(node) - } else { - return mergeInodesSync(input, node) - } - } -} - -// UNUSED -/// returns a process function working on inodes -function mergeInodeRequest(node: CID): ProcessFunction { - return async (input: null | IndexLeaf | IndexInode) => { - if (input == null) { - return kubo.dag.put(node) as Promise<CID> - } - return mergeInodesSync(input, (await kubo.dag.get(node)).value) - } -} - // ===================== tree edit algorithm ===================== /// process internal node @@ -217,6 +194,19 @@ export async function processLeaf(node: IndexLeaf, val: CID): Promise<CID> { // ===================== merge algorithm ===================== +/// wrapper for below function when merging a node which has a CID +export async function mergeInodesSyncCID(nodeACID: CID, nodeB: IndexVinode | IndexLeaf): Promise<CID> { + const nodeA = (await kubo.dag.get(nodeACID)).value + const newCID = await mergeInodesSync(nodeA, nodeB) + // unpin old node CID if different + // we do not mind if it was not pinned + if (nodeACID.toString() != newCID.toString()) kubo.pin.rm(nodeACID).catch(() => {}) + // no need to pin new node CID like: + // kubo.pin.add(newCID) + // this is already done with the pin option kubo.dag.put(cid, { pin: true }) + return newCID +} + /// merge internal nodes, synchronous implementation // useful to merge trees export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: IndexVinode | IndexLeaf): Promise<CID> { @@ -229,7 +219,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde const newLeaf: IndexLeaf = { leaf: cidList } - return kubo.dag.put(newLeaf) as Promise<CID> + return kubo.dag.put(newLeaf, { pin: true }) as Promise<CID> } else if (isAleaf || isBleaf) { throw Error('should not be possible, are keys same size?') } @@ -254,18 +244,16 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde switch (comp.type) { // when keys are the same, this is a "enter", recurse case resType.Same: - const childAnode: IndexInode | IndexLeaf = (await kubo.dag.get(childA)).value - noda.children[b] = [kA, await mergeInodesSync(childAnode, childB)] + noda.children[b] = [kA, await mergeInodesSyncCID(childA, childB)] break // B is child of A (inside), we add a level of hierarchy case resType.Child: const e = comp as inRes // since B is child (longer key), A can only be an inode, not a leaf - const enterChildAnode: IndexInode = (await kubo.dag.get(childA)).value const newcNode = emptyVinode() newcNode.children[e.b] = [e.nk, childB] - const mergec = await mergeInodesSync(enterChildAnode, newcNode) + const mergec = await mergeInodesSyncCID(childA, newcNode) noda.children[b] = [e.common, mergec] break @@ -284,7 +272,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde const newNode = emptyInode() newNode.children[c.b1] = [c.nk1, childA] newNode.children[c.b2] = [c.nk2, await concretizeCid(childB)] - const newNodeCid = (await kubo.dag.put(newNode)) as CID + const newNodeCid = (await kubo.dag.put(newNode, { pin: true })) as CID noda.children[b] = [c.common, newNodeCid] break } @@ -294,7 +282,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde } } // now that we have the new node, we can upload it and return its cid - return kubo.dag.put(noda) as Promise<CID> + return kubo.dag.put(noda, { pin: true }) as Promise<CID> } // ===================== virtual nodes management ===================== diff --git a/src/scripts/cesium-plus-import.ts b/src/scripts/cesium-plus-import.ts index 380d9835f7fddf4ce5e701111a57d0112d52665b..f64ea1e7bf182e3a285f6adf3d50efe41621b0fd 100644 --- a/src/scripts/cesium-plus-import.ts +++ b/src/scripts/cesium-plus-import.ts @@ -26,45 +26,17 @@ async function doImport() { processCesiumPlusImport(cids, GROUPBY).then((cid) => console.log(cid)) } -// doImport() -// UNUSED // // in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18 // // but this shows we should optimise AMT inserting for higher throughput // function doImportToAMT() { -// const cplus = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import -// const amt = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid -// importCplusToAMTSync(cplus, amt).then(console.log) +// REMOVED // } -// UNUSED // // this is a more optimized version that takes 50 seconds to import all 50000 profiles → 1000 profiles per second // // this version sometimes crashes with EADDRNOTAVAIL because it exceeds the number of concurrent connections // async function doMergeAMT() { -// const cplus = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import -// const amt = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid - -// const rootNode: IndexInode = (await kubo.dag.get(amt)).value - -// allCplusCids(cplus) -// // .then((x) => { -// // fs.writeFile('./cplus.txt', x.toString()) -// // return x -// // }) -// .then(sortCidsAndConvertKeys) -// // .then((x) => fs.writeFile('./cplus3.txt', x.toString())) -// .then((all) => { -// console.log(Date.now() + ' converting to virtual tree ') -// return arrayToVinode(all) -// }) -// .then((inode) => { -// console.log(Date.now() + ' merging') -// return mergeInodesSync(rootNode, inode) -// }) -// .then((cid) => { -// console.log(Date.now() + ' finished') -// console.log(cid) -// }) +// REMOVED // } /// import all previously imported C+ data as index requests into a single AMT @@ -75,4 +47,6 @@ async function doAllCplusCidsToAMT() { cplusIndexRequestsToAMT(cplusCID, rootNodeCid) } -doAllCplusCidsToAMT() +// TODO use command line args to choose what to do +// doImport() +// doAllCplusCidsToAMT() diff --git a/src/views/IoView.vue b/src/views/IoView.vue index 81a64bde1a52c120728ea2e880fe5c2010314724..c4109712abcbf29d32e2aad5689397e17c088781 100644 --- a/src/views/IoView.vue +++ b/src/views/IoView.vue @@ -49,7 +49,7 @@ async function importIndex() { // reinit export exportedCid.value = [] // create new root node - importedCid.value = await kubo.dag.put(emptyInode()) as CID + importedCid.value = (await kubo.dag.put(emptyInode())) as CID importCid.value.split('\n').forEach(async (line) => { if (line != '') { try {