Skip to content
Snippets Groups Projects
Commit ac79f4be authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

refac for pinning

parent 7d6cd684
No related branches found
No related tags found
No related merge requests found
import { CID } from 'multiformats' import { CID } from 'multiformats'
import { kubo } from './kubo' import { kubo } from './kubo'
import { Buffer } from 'buffer' import { Buffer } from 'buffer'
import { timestampToKey, mergeInodesSync, arrayToVinode } from './processor' import { timestampToKey, arrayToVinode, mergeInodesSyncCID } from './processor'
import { type IndexRequest } from './types' import { type IndexRequest } from './types'
import { CESIUM_PLUS_PROFILE_IMPORT } from './consts' import { CESIUM_PLUS_PROFILE_IMPORT } from './consts'
...@@ -112,9 +112,8 @@ export async function cplusIndexRequestsToAMT(cplusrootCID: CID, rootNodeCid: CI ...@@ -112,9 +112,8 @@ export async function cplusIndexRequestsToAMT(cplusrootCID: CID, rootNodeCid: CI
for (let i = 0; i < n / chunkSize; i++) { for (let i = 0; i < n / chunkSize; i++) {
console.log(Date.now() + ' chunk number ' + i) console.log(Date.now() + ' chunk number ' + i)
const chunk = requests.slice(i * chunkSize, (i + 1) * chunkSize) const chunk = requests.slice(i * chunkSize, (i + 1) * chunkSize)
const rootNode = (await kubo.dag.get(rootNodeCid)).value
const tree = arrayToVinode(chunk) // partial tree for this chunk const tree = arrayToVinode(chunk) // partial tree for this chunk
rootNodeCid = await mergeInodesSync(rootNode, tree) rootNodeCid = await mergeInodesSyncCID(rootNodeCid, tree)
console.log('new root node ' + rootNodeCid.toString()) console.log('new root node ' + rootNodeCid.toString())
} }
} }
import { TOPIC } from '../consts' import { TOPIC } from '../consts'
import { timestampToKey, mergeInodesSync, arrayToVinode, publishHistory } from '../processor' import { timestampToKey, arrayToVinode, publishHistory, mergeInodesSyncCID } from '../processor'
import { getPubSubHandler } from '../collector' import { getPubSubHandler } from '../collector'
import { kubo } from '../kubo' import { kubo } from '../kubo'
import type { IndexRequest } from '../types' import type { IndexRequest } from '../types'
...@@ -12,8 +12,13 @@ import { getRootCIDfromArgs } from './utils' ...@@ -12,8 +12,13 @@ import { getRootCIDfromArgs } from './utils'
// pubsub message handler // pubsub message handler
const handleMessage = getPubSubHandler((cid, dag) => { const handleMessage = getPubSubHandler((cid, dag) => {
// add index request to the process list
processQueue.push([cid, dag]) processQueue.push([cid, dag])
// ask to process the request
events.emit(evtype.trigger) events.emit(evtype.trigger)
// try pin the new data so that it is available to the indexer
// there might be a UND_ERR_HEADERS_TIMEOUT if data could not be retreived in time
kubo.pin.add(cid, { recursive: true }).catch(() => console.error('could not pin ' + cid))
}) })
// queue event handler // queue event handler
...@@ -34,17 +39,11 @@ function handleBatch() { ...@@ -34,17 +39,11 @@ function handleBatch() {
const tree = arrayToVinode(requests) const tree = arrayToVinode(requests)
// insert them // insert them
kubo.dag mergeInodesSyncCID(rootCID, tree).then((cid) => {
.get(rootCID)
.then((v) => mergeInodesSync(v.value, tree))
.then((cid) => {
// ➡️ update CID // ➡️ update CID
const oldCID = rootCID const oldCID = rootCID
rootCID = cid rootCID = cid
console.log(`👉 new root CID ${cid}`) console.log(`👉 new root CID ${cid}`)
// ➡️ pin recursively (for "pinned only" reprovide strategy)
// FIXME this causes a UND_ERR_HEADERS_TIMEOUT
// kubo.pin.add(rootCID, { recursive: true })
// ➡️ publish new CID // ➡️ publish new CID
kubo.name.publish(rootCID, { ttl: '1s' }) kubo.name.publish(rootCID, { ttl: '1s' })
// ➡️ publish history TODO limit rate at lower frequency (e.g. 1/minute) // ➡️ publish history TODO limit rate at lower frequency (e.g. 1/minute)
......
...@@ -111,29 +111,6 @@ function insertRequest(indexRequestCid: CID): ProcessFunction { ...@@ -111,29 +111,6 @@ function insertRequest(indexRequestCid: CID): ProcessFunction {
} }
} }
// UNUSED
/// returns a process function working on virtual inodes
function mergeVinodeRequest(node: IndexVinode | IndexLeaf): ProcessFunction {
return (input: null | IndexLeaf | IndexInode) => {
if (input == null) {
return concretizeCid(node)
} else {
return mergeInodesSync(input, node)
}
}
}
// UNUSED
/// returns a process function working on inodes
function mergeInodeRequest(node: CID): ProcessFunction {
return async (input: null | IndexLeaf | IndexInode) => {
if (input == null) {
return kubo.dag.put(node) as Promise<CID>
}
return mergeInodesSync(input, (await kubo.dag.get(node)).value)
}
}
// ===================== tree edit algorithm ===================== // ===================== tree edit algorithm =====================
/// process internal node /// process internal node
...@@ -217,6 +194,19 @@ export async function processLeaf(node: IndexLeaf, val: CID): Promise<CID> { ...@@ -217,6 +194,19 @@ export async function processLeaf(node: IndexLeaf, val: CID): Promise<CID> {
// ===================== merge algorithm ===================== // ===================== merge algorithm =====================
/// wrapper for below function when merging a node which has a CID
export async function mergeInodesSyncCID(nodeACID: CID, nodeB: IndexVinode | IndexLeaf): Promise<CID> {
const nodeA = (await kubo.dag.get(nodeACID)).value
const newCID = await mergeInodesSync(nodeA, nodeB)
// unpin old node CID if different
// we do not mind if it was not pinned
if (nodeACID.toString() != newCID.toString()) kubo.pin.rm(nodeACID).catch(() => {})
// no need to pin new node CID like:
// kubo.pin.add(newCID)
// this is already done with the pin option kubo.dag.put(cid, { pin: true })
return newCID
}
/// merge internal nodes, synchronous implementation /// merge internal nodes, synchronous implementation
// useful to merge trees // useful to merge trees
export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: IndexVinode | IndexLeaf): Promise<CID> { export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: IndexVinode | IndexLeaf): Promise<CID> {
...@@ -229,7 +219,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde ...@@ -229,7 +219,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde
const newLeaf: IndexLeaf = { const newLeaf: IndexLeaf = {
leaf: cidList leaf: cidList
} }
return kubo.dag.put(newLeaf) as Promise<CID> return kubo.dag.put(newLeaf, { pin: true }) as Promise<CID>
} else if (isAleaf || isBleaf) { } else if (isAleaf || isBleaf) {
throw Error('should not be possible, are keys same size?') throw Error('should not be possible, are keys same size?')
} }
...@@ -254,18 +244,16 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde ...@@ -254,18 +244,16 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde
switch (comp.type) { switch (comp.type) {
// when keys are the same, this is a "enter", recurse // when keys are the same, this is a "enter", recurse
case resType.Same: case resType.Same:
const childAnode: IndexInode | IndexLeaf = (await kubo.dag.get(childA)).value noda.children[b] = [kA, await mergeInodesSyncCID(childA, childB)]
noda.children[b] = [kA, await mergeInodesSync(childAnode, childB)]
break break
// B is child of A (inside), we add a level of hierarchy // B is child of A (inside), we add a level of hierarchy
case resType.Child: case resType.Child:
const e = comp as inRes const e = comp as inRes
// since B is child (longer key), A can only be an inode, not a leaf // since B is child (longer key), A can only be an inode, not a leaf
const enterChildAnode: IndexInode = (await kubo.dag.get(childA)).value
const newcNode = emptyVinode() const newcNode = emptyVinode()
newcNode.children[e.b] = [e.nk, childB] newcNode.children[e.b] = [e.nk, childB]
const mergec = await mergeInodesSync(enterChildAnode, newcNode) const mergec = await mergeInodesSyncCID(childA, newcNode)
noda.children[b] = [e.common, mergec] noda.children[b] = [e.common, mergec]
break break
...@@ -284,7 +272,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde ...@@ -284,7 +272,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde
const newNode = emptyInode() const newNode = emptyInode()
newNode.children[c.b1] = [c.nk1, childA] newNode.children[c.b1] = [c.nk1, childA]
newNode.children[c.b2] = [c.nk2, await concretizeCid(childB)] newNode.children[c.b2] = [c.nk2, await concretizeCid(childB)]
const newNodeCid = (await kubo.dag.put(newNode)) as CID const newNodeCid = (await kubo.dag.put(newNode, { pin: true })) as CID
noda.children[b] = [c.common, newNodeCid] noda.children[b] = [c.common, newNodeCid]
break break
} }
...@@ -294,7 +282,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde ...@@ -294,7 +282,7 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde
} }
} }
// now that we have the new node, we can upload it and return its cid // now that we have the new node, we can upload it and return its cid
return kubo.dag.put(noda) as Promise<CID> return kubo.dag.put(noda, { pin: true }) as Promise<CID>
} }
// ===================== virtual nodes management ===================== // ===================== virtual nodes management =====================
......
...@@ -26,45 +26,17 @@ async function doImport() { ...@@ -26,45 +26,17 @@ async function doImport() {
processCesiumPlusImport(cids, GROUPBY).then((cid) => console.log(cid)) processCesiumPlusImport(cids, GROUPBY).then((cid) => console.log(cid))
} }
// doImport()
// UNUSED
// // in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18 // // in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18
// // but this shows we should optimise AMT inserting for higher throughput // // but this shows we should optimise AMT inserting for higher throughput
// function doImportToAMT() { // function doImportToAMT() {
// const cplus = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import // REMOVED
// const amt = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid
// importCplusToAMTSync(cplus, amt).then(console.log)
// } // }
// UNUSED
// // this is a more optimized version that takes 50 seconds to import all 50000 profiles → 1000 profiles per second // // this is a more optimized version that takes 50 seconds to import all 50000 profiles → 1000 profiles per second
// // this version sometimes crashes with EADDRNOTAVAIL because it exceeds the number of concurrent connections // // this version sometimes crashes with EADDRNOTAVAIL because it exceeds the number of concurrent connections
// async function doMergeAMT() { // async function doMergeAMT() {
// const cplus = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import // REMOVED
// const amt = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid
// const rootNode: IndexInode = (await kubo.dag.get(amt)).value
// allCplusCids(cplus)
// // .then((x) => {
// // fs.writeFile('./cplus.txt', x.toString())
// // return x
// // })
// .then(sortCidsAndConvertKeys)
// // .then((x) => fs.writeFile('./cplus3.txt', x.toString()))
// .then((all) => {
// console.log(Date.now() + ' converting to virtual tree ')
// return arrayToVinode(all)
// })
// .then((inode) => {
// console.log(Date.now() + ' merging')
// return mergeInodesSync(rootNode, inode)
// })
// .then((cid) => {
// console.log(Date.now() + ' finished')
// console.log(cid)
// })
// } // }
/// import all previously imported C+ data as index requests into a single AMT /// import all previously imported C+ data as index requests into a single AMT
...@@ -75,4 +47,6 @@ async function doAllCplusCidsToAMT() { ...@@ -75,4 +47,6 @@ async function doAllCplusCidsToAMT() {
cplusIndexRequestsToAMT(cplusCID, rootNodeCid) cplusIndexRequestsToAMT(cplusCID, rootNodeCid)
} }
doAllCplusCidsToAMT() // TODO use command line args to choose what to do
// doImport()
// doAllCplusCidsToAMT()
...@@ -49,7 +49,7 @@ async function importIndex() { ...@@ -49,7 +49,7 @@ async function importIndex() {
// reinit export // reinit export
exportedCid.value = [] exportedCid.value = []
// create new root node // create new root node
importedCid.value = await kubo.dag.put(emptyInode()) as CID importedCid.value = (await kubo.dag.put(emptyInode())) as CID
importCid.value.split('\n').forEach(async (line) => { importCid.value.split('\n').forEach(async (line) => {
if (line != '') { if (line != '') {
try { try {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment