diff --git a/src/cesium-plus.ts b/src/cesium-plus.ts index 41dd7980808f690c9f04e0d395dd5f25377f5357..e245330ac304b3419395bf3fdf82143fd72a6565 100644 --- a/src/cesium-plus.ts +++ b/src/cesium-plus.ts @@ -6,8 +6,9 @@ import { timestampToKey, bucket, compareKey, - type resultType, - type diffResult + mergeInodesSync, + concretizeCid, + type diffRes } from './processor' import { emptyInode, type IndexLeaf, type IndexRequest, type IndexVinode, type Pointer } from './types' import { BASE, KEYSIZE } from './consts' @@ -40,7 +41,7 @@ interface CplusProfileMore extends CplusProfile { avatar: Avatar | CID } -// adds all cids +/// adds all cids by groups ot size `groupBy` export async function processCesiumPlusImport(profileCids: CID[], groupBy: number): Promise<CID> { const rootNode: Array<Promise<CID>> = [] for (let i = 0; i < Math.floor(profileCids.length / groupBy); i++) { @@ -51,7 +52,7 @@ export async function processCesiumPlusImport(profileCids: CID[], groupBy: numbe return Promise.all(rootNode).then((r: CID[]) => kubo.dag.put(r) as Promise<CID>) } -// if avatar is present, upload it as a separate file instead +/// if avatar is present, upload it as a separate file instead export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> { const profile = obj as CplusProfileMore const { avatar, ...profileWithoutAvatar } = profile @@ -66,8 +67,9 @@ export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> } } +// UNUSED // import these cid to target AMT, naive approach one by one and asynchronous -export async function importCplusToAMT(cplusCID: CID, amtCid: Pointer<CID>) { +async function importCplusToAMT(cplusCID: CID, amtCid: Pointer<CID>) { const cplusroot = await kubo.dag.get(cplusCID) for (let chunkcid of cplusroot.value) { // process each chunk sequentially to avoid memory overflow @@ -78,7 +80,7 @@ export async function importCplusToAMT(cplusCID: CID, amtCid: Pointer<CID>) { const indexRequest: IndexRequest = { pubkey: profile.issuer, cid: pcid, - timestamp: profile.time, + timestamp: profile.time * 1000, signature: '' // signature is inside document for C+ data } kubo.dag.put(indexRequest).then(async (indexRequestCid) => { @@ -90,9 +92,10 @@ export async function importCplusToAMT(cplusCID: CID, amtCid: Pointer<CID>) { } } +// UNUSED // alternative slow synchronous function to avoid overflowing kubo with connections // exectued in chromium, it can import about 18 profiles per second -export async function importCplusToAMTSync(cplusCID: CID, amtCid: CID): Promise<CID> { +async function importCplusToAMTSync(cplusCID: CID, amtCid: CID): Promise<CID> { const cplusroot = await kubo.dag.get(cplusCID) let importedCount = 0 for (let chunkcid of cplusroot.value) { @@ -103,7 +106,7 @@ export async function importCplusToAMTSync(cplusCID: CID, amtCid: CID): Promise< const indexRequest: IndexRequest = { pubkey: profile.issuer, cid: pcid, - timestamp: profile.time, + timestamp: profile.time * 1000, signature: '' // signature is inside document for C+ data } const indexRequestCid = await kubo.dag.put(indexRequest) @@ -115,35 +118,42 @@ export async function importCplusToAMTSync(cplusCID: CID, amtCid: CID): Promise< return amtCid } -// build virtual AMT from cesium root CID to prepare merge - /// convert array of key/value pairs to virtual inode (tree) // opti: use a dichotomy search instead of iterating over all elements export function arrayToVinode(array: Array<[string, CID]>): IndexVinode { + // initialize empty virtual node const node = emptyInode() as IndexVinode + // process each bucket in order since elements are ordered for (let b = 0; b < BASE; b++) { + // initialize list of elements that should go in this bucket const subArray: Array<[string, CID]> = [] - do { + // shift element from the array until an end condition is reached + while (true) { const elt = array.shift() if (elt == undefined) { + // we reached the end of the array break } - const k = elt[0] - if (bucket(k) != b) { + const k = bucket(elt[0]) + if (k != b) { array.unshift(elt) + // element does not belong to this bucket, go to next break } + // element goes in the current bucket, add it subArray.push(elt) - } while (true) + } + // then process the elements in this bucket if (subArray.length > 0) { - // not empty + // not empty, take first and last keys const k1 = subArray.at(0)![0] const k2 = subArray.at(-1)![0] if (k1 == k2) { - node.children[b] = [k1, arrayToLeaf(subArray.map(([k, v]) => v))] + node.children[b] = [k1, arrayToLeaf(subArray.map(([_k, v]) => v))] continue } - const c = compareKey(k1, k2) as diffResult + // keys have the same size, so if they are not equal and not "enter", they are "diff" + const c = compareKey(k1, k2) as diffRes const minimalSubArray: Array<[string, CID]> = subArray.map(([k, v]) => [k.slice(c.common.length), v]) node.children[b] = [c.common, arrayToVinode(minimalSubArray)] } @@ -151,17 +161,18 @@ export function arrayToVinode(array: Array<[string, CID]>): IndexVinode { return node } +/// transform array of cid to leaf object export function arrayToLeaf(array: CID[]): IndexLeaf { return { leaf: array.sort((a, b) => (a.toString() < b.toString() ? -1 : 1)) } } -// sort all cids and convert timestamp to key -export function sortCids(allCIDs: Array<[number, CID]>): Array<[string, CID]> { +/// sort all cids and convert timestamp to key +export function sortCidsAndConvertKeys(allCIDs: Array<[number, CID]>): Array<[string, CID]> { allCIDs.sort() return allCIDs.map(([t, c]) => [timestampToKey(t), c]) } -// retreive all CIDs +/// retreive all CIDs export async function allCplusCids(cplusCID: CID): Promise<Array<[number, CID]>> { console.log(Date.now() + ' getting all cplus data') const allCIDs: Array<Promise<[number, CID]>> = [] @@ -170,9 +181,37 @@ export async function allCplusCids(cplusCID: CID): Promise<Array<[number, CID]>> const chunk = await kubo.dag.get(chunkcid) for (let pcid of chunk.value) { const p = kubo.dag.get(pcid) - const profile: Promise<[number, CID]> = p.then((v) => [v.value.time, pcid]) + const profile: Promise<[number, CID]> = p.then((v) => [v.value.time * 1000, pcid]) allCIDs.push(profile) } } return Promise.all(allCIDs) } + +/// import all cplus cid to AMT chunk by chunk +// this allows to decrease maximum amount of concurrent connections +// 183 seconds +export async function allCplusCidsToAMTChunked(cplusCID: CID, rootNodeCid: CID): Promise<CID> { + console.log(Date.now() + ' getting all cplus data') + const cplusroot = await kubo.dag.get(cplusCID) + for (let chunkcid of cplusroot.value) { + const allCIDs: Array<Promise<[number, CID]>> = [] + const chunk = await kubo.dag.get(chunkcid) + for (let pcid of chunk.value) { + const p = kubo.dag.get(pcid) + const profile: Promise<[number, CID]> = p.then((v) => [v.value.time * 1000, pcid]) + allCIDs.push(profile) + } + const rootNode = (await kubo.dag.get(rootNodeCid)).value + rootNodeCid = await Promise.all(allCIDs) + .then(sortCidsAndConvertKeys) + .then(arrayToVinode) + .then(async (inode) => { + // console.log(await concretizeCid(inode)) + console.log(Date.now() + ' merging') + return mergeInodesSync(rootNode, inode) + }) + console.log(rootNodeCid) + } + return rootNodeCid +} diff --git a/src/processor.ts b/src/processor.ts index 06ce664ffbed205e84156cfdd4fccc6fb76c1a5f..e128e831238e98c411a4f83ec8b796b86bcb9d40 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -37,18 +37,39 @@ export async function addToIndexQueue(cid: CID, indexRequest: IndexRequest) { } // returns a process function suitable for processInode that simply inserts request in a leaf +// WIP does not handle inserting to a node yet function insertRequest(indexRequestCid: CID): ProcessFunction { - return (maybeLeaf) => { - if (maybeLeaf == null) { + return (input: null | IndexLeaf | IndexInode) => { + if (input == null) { // in this case we want to create a new leaf return processLeaf(emptyLeaf(), indexRequestCid) } else { // in this case we want to insert indexRequestCid in existing leaf - return processLeaf(maybeLeaf, indexRequestCid) + return processLeaf(input as IndexLeaf, indexRequestCid) } } } +/// returns a process function working on virtual nodes +function mergeVinodeRequest(node: IndexVinode | IndexLeaf): ProcessFunction { + return (input: null | IndexLeaf | IndexInode) => { + if (input == null) { + return concretizeCid(node) + } else { + return mergeInodesSync(input, node) + } + } +} + +function mergeInodeRequest(node: CID): ProcessFunction { + return async (input: null | IndexLeaf | IndexInode) => { + if (input == null) { + return kubo.dag.put(node) as Promise<CID> + } + return mergeInodesSync(input, (await kubo.dag.get(node)).value) + } +} + // simplest way to insert index request given its CID // rootCid: root node of the AMT to add the index request in // indexRequestCid: index request to add @@ -94,9 +115,13 @@ function publishHistory(cid: CID) { }) } -// function used to process node +/// function used to process node +// /!\ this is suboptimal to return a CID, the node (or something else) should be returned instead +// this would allow to write more function like ones that delete content +// TODO change this export interface ProcessFunction { - (maybeLeaf: null | IndexLeaf): Promise<CID> + // input is the input data at the place where we want to process + (input: null | IndexLeaf | IndexInode): Promise<CID> } // return bucket corresponding to given letter @@ -119,12 +144,12 @@ export async function processInode(node: IndexInode, key: string, func: ProcessF // must share bucket with a node const [k1, cid1] = node.children[b] as [string, CID] const comp = compareKey(k1, key) - // console.log(comp) switch (comp.type) { - case resultType.Enter: - const e = comp as enterResult - console.log('enter "' + e.common + '" key "' + e.nk + '"') + // enter + case resType.Child: + const e = comp as inRes + // console.log('enter "' + e.common + '" key "' + e.nk + '"') const enterNode: IndexInode | IndexLeaf = (await kubo.dag.get(cid1)).value const enterNodeAsLeaf = enterNode as IndexLeaf const enterNodeAsInode = enterNode as IndexInode @@ -135,15 +160,27 @@ export async function processInode(node: IndexInode, key: string, func: ProcessF node.children[b] = [e.common, await processInode(enterNodeAsInode, e.nk, func)] break - case resultType.End: - console.log('end') - const otherLeaf = (await kubo.dag.get(cid1)).value as IndexLeaf - node.children[b] = [k1, await func(otherLeaf)] + // an additionnal hierarchy level is required + case resType.Parent: + const p = comp as inRes + // console.log('enter "' + p.common + '" key "' + p.nk + '"') + const newiNode = emptyInode() + newiNode.children[p.b] = [p.nk, cid1] + const newiNodeCid = await func(newiNode) + node.children[b] = [p.common, newiNodeCid] + break + + // reached dest + case resType.Same: + // console.log('end on ' + key) + const other = (await kubo.dag.get(cid1)).value + node.children[b] = [k1, await func(other)] break - case resultType.Diff: - const c = comp as diffResult - console.log('diff on "' + c.common + '" keys "' + c.nk1 + '" / "' + c.nk2 + '"') + // diff found + case resType.Diff: + const c = comp as diffRes + // console.log('diff on "' + c.common + '" keys "' + c.nk1 + '" / "' + c.nk2 + '"') const newNode = emptyInode() newNode.children[c.b1] = [c.nk1, cid1] newNode.children[c.b2] = [c.nk2, await func(null)] @@ -154,7 +191,7 @@ export async function processInode(node: IndexInode, key: string, func: ProcessF } // now that we have the new node save it and return const newCid = (await kubo.dag.put(node)) as CID - console.log('new inode: ' + newCid.toString()) + console.log('new inode: ' + newCid.toString() + ' at ' + key) return newCid } @@ -174,7 +211,9 @@ export async function processLeaf(node: IndexLeaf, val: CID): Promise<CID> { /// merge internal nodes, synchronous implementation // useful to merge trees export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: IndexVinode | IndexLeaf): Promise<CID> { - if ((nodeB as IndexLeaf).leaf) { + const isAleaf = (nodeA as IndexLeaf).leaf != undefined + const isBleaf = (nodeB as IndexLeaf).leaf != undefined + if (isAleaf && isBleaf) { // these are not internal nodes, but leaves, and we should merge them const cidSet = new Set([...(nodeA as unknown as IndexLeaf).leaf, ...(nodeB as IndexLeaf).leaf]) const cidList = Array.from(cidSet).sort() @@ -182,6 +221,8 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde leaf: cidList } return kubo.dag.put(newLeaf) as Promise<CID> + } else if (isAleaf || isBleaf) { + throw Error('should not be possible, are keys same size?') } // if we are not yet at the leaf level, we can safely assume that nodeA and nodeB are indeed inodes const noda = nodeA as IndexInode @@ -194,83 +235,136 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde if (nAcb == null && nBcb != null) { // we can concretize nodeB directly const [kB, childB] = nBcb - noda.children[b] = [kB, await concretize(childB)] + noda.children[b] = [kB, await concretizeCid(childB)] } else if (nAcb != null && nBcb != null) { // both are non null const [kA, childA] = nAcb const [kB, childB] = nBcb - if (kA == kB) { - const childAnode = (await kubo.dag.get(childA)).value - noda.children[b] = [kA, await mergeInodesSync(childAnode, childB)] - } else { - // both keys must have same size since we can only merge nodes from same depth - // then because they are different, the only result type is diffResult - const c = compareKey(kA, kB) as diffResult - const newNode = emptyInode() - newNode.children[c.b1] = [c.nk1, childA] - newNode.children[c.b2] = [c.nk2, await concretize(childB)] - const newNodeCid = (await kubo.dag.put(newNode)) as CID - noda.children[b] = [c.common, newNodeCid] + const comp = compareKey(kA, kB) + + switch (comp.type) { + // when keys are the same, this is a "enter", recurse + case resType.Same: + const childAnode: IndexInode | IndexLeaf = (await kubo.dag.get(childA)).value + noda.children[b] = [kA, await mergeInodesSync(childAnode, childB)] + break + + // B is child of A (inside), we add a level of hierarchy + case resType.Child: + const e = comp as inRes + // since B is child (longer key), A can only be an inode, not a leaf + const enterChildAnode: IndexInode = (await kubo.dag.get(childA)).value + const newcNode = emptyInode() as IndexVinode + newcNode.children[e.b] = [e.nk, childB] + const mergec = await mergeInodesSync(enterChildAnode, newcNode) + noda.children[b] = [e.common, mergec] + break + + // B is parent of A, an additional hierachy level is required + case resType.Parent: + const p = comp as inRes + const newiNode = emptyInode() + newiNode.children[p.b] = [p.nk, childA] + const mergep = await mergeInodesSync(newiNode, childB) + noda.children[b] = [p.common, mergep] + break + + // there is a diff + case resType.Diff: + const c = comp as diffRes + const newNode = emptyInode() + newNode.children[c.b1] = [c.nk1, childA] + newNode.children[c.b2] = [c.nk2, await concretizeCid(childB)] + const newNodeCid = (await kubo.dag.put(newNode)) as CID + noda.children[b] = [c.common, newNodeCid] + break } } else { - // keep node untouched + // nBcb is null, keep nAcb bucket untouched (might be null or not) + continue } } // now that we have the new node, we can upload it and return its cid return kubo.dag.put(noda) as Promise<CID> } -/// concretize virtual node -async function concretize(node: IndexVinode | IndexLeaf): Promise<CID> { +/// concretize virtual node to CID +export async function concretizeCid(node: IndexVinode | IndexLeaf): Promise<CID> { if ((node as unknown as IndexLeaf).leaf) { return kubo.dag.put(node) as Promise<CID> } - // this is a virtual inode - const childrenPromise: Array<null | Promise<[string, CID]>> = (node as unknown as IndexVinode).children.map((c) => { + const newNode = await concretize(node as unknown as IndexVinode) + return kubo.dag.put(newNode) as Promise<CID> +} + +/// concretize virtual node +async function concretize(node: IndexVinode): Promise<IndexInode> { + const childrenPromise: Array<null | Promise<[string, CID]>> = node.children.map((c) => { if (c == null) { return null } const [k, v] = c - return concretize(v).then((cid) => [k, cid as CID]) + return concretizeCid(v).then((cid) => [k, cid as CID]) }) - const newNode: IndexInode = { + return { children: await Promise.all(childrenPromise) } - return kubo.dag.put(newNode) as Promise<CID> } -export interface diffResult { - type: resultType +/// result where key differ at some point +export interface diffRes { + /// type of result + type: resType + /// common part of keys common: string + /// bucket of key 1 b1: number + /// rest of key 1 nk1: string + /// bucket of key 2 b2: number + /// rest of key 2 nk2: string } -export interface enterResult { - type: resultType +/// result where key 1 is included in key 2 start +export interface inRes { + /// type of result + type: resType + /// common part (key 1) common: string + /// bucket for key 2 b: number + /// rest of key 2 nk: string } -export interface endResult { - type: resultType +/// result where keys are the same +export interface sameRes { + /// type of result + type: resType } -export type compResult = diffResult | enterResult | endResult -export enum resultType { - Enter, +export type compResult = diffRes | inRes | sameRes +export enum resType { + Child, Diff, - End + Same, + Parent } /// compare keys and return comp result -// expects k1 to be shorter than k2 export function compareKey(k1: string, k2: string): compResult { + // if keys are the same + if (k1 == k2) { + return { + type: resType.Same + } + } + + // start comparison let common = '' - const l = k1.length + const l = Math.min(k1.length, k2.length) for (let i = 0; i < l; i++) { const c1 = k1[i] const c2 = k2[i] @@ -279,7 +373,7 @@ export function compareKey(k1: string, k2: string): compResult { } else { // return the comparison result return { - type: resultType.Diff, + type: resType.Diff, common, b1: bucket(c1), nk1: k1.slice(common.length), @@ -288,19 +382,22 @@ export function compareKey(k1: string, k2: string): compResult { } } } + + // we reached the end of the shortest key without diff + if (k1.length < k2.length) { - const b = bucket(k2[l]) - // we can enter the bucket return { - type: resultType.Enter, + type: resType.Child, common, - b, + b: bucket(k2[l]), nk: k2.slice(common.length) } } else { - // we reached the end return { - type: resultType.End + type: resType.Parent, + common, + b: bucket(k1[l]), + nk: k1.slice(common.length) } } } diff --git a/src/scripts/cesium-plus-import.ts b/src/scripts/cesium-plus-import.ts index 5a5575792e7a45a632d4eca543a2960cae63675d..457ed7b717cba406ea7100a3bf4da1f078390f24 100644 --- a/src/scripts/cesium-plus-import.ts +++ b/src/scripts/cesium-plus-import.ts @@ -2,10 +2,10 @@ import { CID } from 'multiformats' import { processCesiumPlusImport, processCesiumPlusProfile, - importCplusToAMTSync, allCplusCids, - sortCids, - arrayToVinode + sortCidsAndConvertKeys, + arrayToVinode, + allCplusCidsToAMTChunked } from '../cesium-plus' import * as fs from 'fs/promises' import { kubo } from '../kubo' @@ -18,6 +18,10 @@ const profiles = (n: number) => `/home/hugo/ipfs/v2s-datapod/migrate_csplus/prof const CHUNKS = 11 const GROUPBY = 256 +/// do import cesium data +// use json chunks as input +// process base64 images apart +// groups output in a single dag for easy import async function doImport() { const cids: CID[] = [] // manage chunk by chunk to limit memory usage @@ -32,16 +36,16 @@ async function doImport() { processCesiumPlusImport(cids, GROUPBY).then((cid) => console.log(cid)) } - // doImport() -// in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18 -// but this shows we should optimise AMT inserting for higher throughput -function doImportToAMT() { - const cplus = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import - const amt = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid - importCplusToAMTSync(cplus, amt).then(console.log) -} +// UNUSED +// // in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18 +// // but this shows we should optimise AMT inserting for higher throughput +// function doImportToAMT() { +// const cplus = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import +// const amt = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid +// importCplusToAMTSync(cplus, amt).then(console.log) +// } // this is a more optimized version that takes 50 seconds to import all 50000 profiles → 1000 profiles per second async function doMergeAMT() { @@ -51,7 +55,12 @@ async function doMergeAMT() { const rootNode: IndexInode = (await kubo.dag.get(amt)).value allCplusCids(cplus) - .then(sortCids) + // .then((x) => { + // fs.writeFile('./cplus.txt', x.toString()) + // return x + // }) + .then(sortCidsAndConvertKeys) + // .then((x) => fs.writeFile('./cplus3.txt', x.toString())) .then((all) => { console.log(Date.now() + ' converting to virtual tree ') return arrayToVinode(all) @@ -66,4 +75,13 @@ async function doMergeAMT() { }) } -doMergeAMT() +// doMergeAMT() + +async function doAllCplusCidsToAMTChunked() { + const cplusCID = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import + const rootNodeCid = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid + + allCplusCidsToAMTChunked(cplusCID, rootNodeCid) +} + +doAllCplusCidsToAMTChunked()