Skip to content
Snippets Groups Projects
Commit 317b0a1b authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

refac merge algo

parent d0cfe89f
No related branches found
No related tags found
No related merge requests found
......@@ -6,8 +6,9 @@ import {
timestampToKey,
bucket,
compareKey,
type resultType,
type diffResult
mergeInodesSync,
concretizeCid,
type diffRes
} from './processor'
import { emptyInode, type IndexLeaf, type IndexRequest, type IndexVinode, type Pointer } from './types'
import { BASE, KEYSIZE } from './consts'
......@@ -40,7 +41,7 @@ interface CplusProfileMore extends CplusProfile {
avatar: Avatar | CID
}
// adds all cids
/// adds all cids by groups ot size `groupBy`
export async function processCesiumPlusImport(profileCids: CID[], groupBy: number): Promise<CID> {
const rootNode: Array<Promise<CID>> = []
for (let i = 0; i < Math.floor(profileCids.length / groupBy); i++) {
......@@ -51,7 +52,7 @@ export async function processCesiumPlusImport(profileCids: CID[], groupBy: numbe
return Promise.all(rootNode).then((r: CID[]) => kubo.dag.put(r) as Promise<CID>)
}
// if avatar is present, upload it as a separate file instead
/// if avatar is present, upload it as a separate file instead
export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> {
const profile = obj as CplusProfileMore
const { avatar, ...profileWithoutAvatar } = profile
......@@ -66,8 +67,9 @@ export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID>
}
}
// UNUSED
// import these cid to target AMT, naive approach one by one and asynchronous
export async function importCplusToAMT(cplusCID: CID, amtCid: Pointer<CID>) {
async function importCplusToAMT(cplusCID: CID, amtCid: Pointer<CID>) {
const cplusroot = await kubo.dag.get(cplusCID)
for (let chunkcid of cplusroot.value) {
// process each chunk sequentially to avoid memory overflow
......@@ -78,7 +80,7 @@ export async function importCplusToAMT(cplusCID: CID, amtCid: Pointer<CID>) {
const indexRequest: IndexRequest = {
pubkey: profile.issuer,
cid: pcid,
timestamp: profile.time,
timestamp: profile.time * 1000,
signature: '' // signature is inside document for C+ data
}
kubo.dag.put(indexRequest).then(async (indexRequestCid) => {
......@@ -90,9 +92,10 @@ export async function importCplusToAMT(cplusCID: CID, amtCid: Pointer<CID>) {
}
}
// UNUSED
// alternative slow synchronous function to avoid overflowing kubo with connections
// exectued in chromium, it can import about 18 profiles per second
export async function importCplusToAMTSync(cplusCID: CID, amtCid: CID): Promise<CID> {
async function importCplusToAMTSync(cplusCID: CID, amtCid: CID): Promise<CID> {
const cplusroot = await kubo.dag.get(cplusCID)
let importedCount = 0
for (let chunkcid of cplusroot.value) {
......@@ -103,7 +106,7 @@ export async function importCplusToAMTSync(cplusCID: CID, amtCid: CID): Promise<
const indexRequest: IndexRequest = {
pubkey: profile.issuer,
cid: pcid,
timestamp: profile.time,
timestamp: profile.time * 1000,
signature: '' // signature is inside document for C+ data
}
const indexRequestCid = await kubo.dag.put(indexRequest)
......@@ -115,35 +118,42 @@ export async function importCplusToAMTSync(cplusCID: CID, amtCid: CID): Promise<
return amtCid
}
// build virtual AMT from cesium root CID to prepare merge
/// convert array of key/value pairs to virtual inode (tree)
// opti: use a dichotomy search instead of iterating over all elements
export function arrayToVinode(array: Array<[string, CID]>): IndexVinode {
// initialize empty virtual node
const node = emptyInode() as IndexVinode
// process each bucket in order since elements are ordered
for (let b = 0; b < BASE; b++) {
// initialize list of elements that should go in this bucket
const subArray: Array<[string, CID]> = []
do {
// shift element from the array until an end condition is reached
while (true) {
const elt = array.shift()
if (elt == undefined) {
// we reached the end of the array
break
}
const k = elt[0]
if (bucket(k) != b) {
const k = bucket(elt[0])
if (k != b) {
array.unshift(elt)
// element does not belong to this bucket, go to next
break
}
// element goes in the current bucket, add it
subArray.push(elt)
} while (true)
}
// then process the elements in this bucket
if (subArray.length > 0) {
// not empty
// not empty, take first and last keys
const k1 = subArray.at(0)![0]
const k2 = subArray.at(-1)![0]
if (k1 == k2) {
node.children[b] = [k1, arrayToLeaf(subArray.map(([k, v]) => v))]
node.children[b] = [k1, arrayToLeaf(subArray.map(([_k, v]) => v))]
continue
}
const c = compareKey(k1, k2) as diffResult
// keys have the same size, so if they are not equal and not "enter", they are "diff"
const c = compareKey(k1, k2) as diffRes
const minimalSubArray: Array<[string, CID]> = subArray.map(([k, v]) => [k.slice(c.common.length), v])
node.children[b] = [c.common, arrayToVinode(minimalSubArray)]
}
......@@ -151,17 +161,18 @@ export function arrayToVinode(array: Array<[string, CID]>): IndexVinode {
return node
}
/// transform array of cid to leaf object
export function arrayToLeaf(array: CID[]): IndexLeaf {
return { leaf: array.sort((a, b) => (a.toString() < b.toString() ? -1 : 1)) }
}
// sort all cids and convert timestamp to key
export function sortCids(allCIDs: Array<[number, CID]>): Array<[string, CID]> {
/// sort all cids and convert timestamp to key
export function sortCidsAndConvertKeys(allCIDs: Array<[number, CID]>): Array<[string, CID]> {
allCIDs.sort()
return allCIDs.map(([t, c]) => [timestampToKey(t), c])
}
// retreive all CIDs
/// retreive all CIDs
export async function allCplusCids(cplusCID: CID): Promise<Array<[number, CID]>> {
console.log(Date.now() + ' getting all cplus data')
const allCIDs: Array<Promise<[number, CID]>> = []
......@@ -170,9 +181,37 @@ export async function allCplusCids(cplusCID: CID): Promise<Array<[number, CID]>>
const chunk = await kubo.dag.get(chunkcid)
for (let pcid of chunk.value) {
const p = kubo.dag.get(pcid)
const profile: Promise<[number, CID]> = p.then((v) => [v.value.time, pcid])
const profile: Promise<[number, CID]> = p.then((v) => [v.value.time * 1000, pcid])
allCIDs.push(profile)
}
}
return Promise.all(allCIDs)
}
/// import all cplus cid to AMT chunk by chunk
// this allows to decrease maximum amount of concurrent connections
// 183 seconds
export async function allCplusCidsToAMTChunked(cplusCID: CID, rootNodeCid: CID): Promise<CID> {
console.log(Date.now() + ' getting all cplus data')
const cplusroot = await kubo.dag.get(cplusCID)
for (let chunkcid of cplusroot.value) {
const allCIDs: Array<Promise<[number, CID]>> = []
const chunk = await kubo.dag.get(chunkcid)
for (let pcid of chunk.value) {
const p = kubo.dag.get(pcid)
const profile: Promise<[number, CID]> = p.then((v) => [v.value.time * 1000, pcid])
allCIDs.push(profile)
}
const rootNode = (await kubo.dag.get(rootNodeCid)).value
rootNodeCid = await Promise.all(allCIDs)
.then(sortCidsAndConvertKeys)
.then(arrayToVinode)
.then(async (inode) => {
// console.log(await concretizeCid(inode))
console.log(Date.now() + ' merging')
return mergeInodesSync(rootNode, inode)
})
console.log(rootNodeCid)
}
return rootNodeCid
}
......@@ -37,18 +37,39 @@ export async function addToIndexQueue(cid: CID, indexRequest: IndexRequest) {
}
// returns a process function suitable for processInode that simply inserts request in a leaf
// WIP does not handle inserting to a node yet
function insertRequest(indexRequestCid: CID): ProcessFunction {
return (maybeLeaf) => {
if (maybeLeaf == null) {
return (input: null | IndexLeaf | IndexInode) => {
if (input == null) {
// in this case we want to create a new leaf
return processLeaf(emptyLeaf(), indexRequestCid)
} else {
// in this case we want to insert indexRequestCid in existing leaf
return processLeaf(maybeLeaf, indexRequestCid)
return processLeaf(input as IndexLeaf, indexRequestCid)
}
}
}
/// returns a process function working on virtual nodes
function mergeVinodeRequest(node: IndexVinode | IndexLeaf): ProcessFunction {
return (input: null | IndexLeaf | IndexInode) => {
if (input == null) {
return concretizeCid(node)
} else {
return mergeInodesSync(input, node)
}
}
}
function mergeInodeRequest(node: CID): ProcessFunction {
return async (input: null | IndexLeaf | IndexInode) => {
if (input == null) {
return kubo.dag.put(node) as Promise<CID>
}
return mergeInodesSync(input, (await kubo.dag.get(node)).value)
}
}
// simplest way to insert index request given its CID
// rootCid: root node of the AMT to add the index request in
// indexRequestCid: index request to add
......@@ -94,9 +115,13 @@ function publishHistory(cid: CID) {
})
}
// function used to process node
/// function used to process node
// /!\ this is suboptimal to return a CID, the node (or something else) should be returned instead
// this would allow to write more function like ones that delete content
// TODO change this
export interface ProcessFunction {
(maybeLeaf: null | IndexLeaf): Promise<CID>
// input is the input data at the place where we want to process
(input: null | IndexLeaf | IndexInode): Promise<CID>
}
// return bucket corresponding to given letter
......@@ -119,12 +144,12 @@ export async function processInode(node: IndexInode, key: string, func: ProcessF
// must share bucket with a node
const [k1, cid1] = node.children[b] as [string, CID]
const comp = compareKey(k1, key)
// console.log(comp)
switch (comp.type) {
case resultType.Enter:
const e = comp as enterResult
console.log('enter "' + e.common + '" key "' + e.nk + '"')
// enter
case resType.Child:
const e = comp as inRes
// console.log('enter "' + e.common + '" key "' + e.nk + '"')
const enterNode: IndexInode | IndexLeaf = (await kubo.dag.get(cid1)).value
const enterNodeAsLeaf = enterNode as IndexLeaf
const enterNodeAsInode = enterNode as IndexInode
......@@ -135,15 +160,27 @@ export async function processInode(node: IndexInode, key: string, func: ProcessF
node.children[b] = [e.common, await processInode(enterNodeAsInode, e.nk, func)]
break
case resultType.End:
console.log('end')
const otherLeaf = (await kubo.dag.get(cid1)).value as IndexLeaf
node.children[b] = [k1, await func(otherLeaf)]
// an additionnal hierarchy level is required
case resType.Parent:
const p = comp as inRes
// console.log('enter "' + p.common + '" key "' + p.nk + '"')
const newiNode = emptyInode()
newiNode.children[p.b] = [p.nk, cid1]
const newiNodeCid = await func(newiNode)
node.children[b] = [p.common, newiNodeCid]
break
// reached dest
case resType.Same:
// console.log('end on ' + key)
const other = (await kubo.dag.get(cid1)).value
node.children[b] = [k1, await func(other)]
break
case resultType.Diff:
const c = comp as diffResult
console.log('diff on "' + c.common + '" keys "' + c.nk1 + '" / "' + c.nk2 + '"')
// diff found
case resType.Diff:
const c = comp as diffRes
// console.log('diff on "' + c.common + '" keys "' + c.nk1 + '" / "' + c.nk2 + '"')
const newNode = emptyInode()
newNode.children[c.b1] = [c.nk1, cid1]
newNode.children[c.b2] = [c.nk2, await func(null)]
......@@ -154,7 +191,7 @@ export async function processInode(node: IndexInode, key: string, func: ProcessF
}
// now that we have the new node save it and return
const newCid = (await kubo.dag.put(node)) as CID
console.log('new inode: ' + newCid.toString())
console.log('new inode: ' + newCid.toString() + ' at ' + key)
return newCid
}
......@@ -174,7 +211,9 @@ export async function processLeaf(node: IndexLeaf, val: CID): Promise<CID> {
/// merge internal nodes, synchronous implementation
// useful to merge trees
export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: IndexVinode | IndexLeaf): Promise<CID> {
if ((nodeB as IndexLeaf).leaf) {
const isAleaf = (nodeA as IndexLeaf).leaf != undefined
const isBleaf = (nodeB as IndexLeaf).leaf != undefined
if (isAleaf && isBleaf) {
// these are not internal nodes, but leaves, and we should merge them
const cidSet = new Set([...(nodeA as unknown as IndexLeaf).leaf, ...(nodeB as IndexLeaf).leaf])
const cidList = Array.from(cidSet).sort()
......@@ -182,6 +221,8 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde
leaf: cidList
}
return kubo.dag.put(newLeaf) as Promise<CID>
} else if (isAleaf || isBleaf) {
throw Error('should not be possible, are keys same size?')
}
// if we are not yet at the leaf level, we can safely assume that nodeA and nodeB are indeed inodes
const noda = nodeA as IndexInode
......@@ -194,83 +235,136 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde
if (nAcb == null && nBcb != null) {
// we can concretize nodeB directly
const [kB, childB] = nBcb
noda.children[b] = [kB, await concretize(childB)]
noda.children[b] = [kB, await concretizeCid(childB)]
} else if (nAcb != null && nBcb != null) {
// both are non null
const [kA, childA] = nAcb
const [kB, childB] = nBcb
if (kA == kB) {
const childAnode = (await kubo.dag.get(childA)).value
const comp = compareKey(kA, kB)
switch (comp.type) {
// when keys are the same, this is a "enter", recurse
case resType.Same:
const childAnode: IndexInode | IndexLeaf = (await kubo.dag.get(childA)).value
noda.children[b] = [kA, await mergeInodesSync(childAnode, childB)]
} else {
// both keys must have same size since we can only merge nodes from same depth
// then because they are different, the only result type is diffResult
const c = compareKey(kA, kB) as diffResult
break
// B is child of A (inside), we add a level of hierarchy
case resType.Child:
const e = comp as inRes
// since B is child (longer key), A can only be an inode, not a leaf
const enterChildAnode: IndexInode = (await kubo.dag.get(childA)).value
const newcNode = emptyInode() as IndexVinode
newcNode.children[e.b] = [e.nk, childB]
const mergec = await mergeInodesSync(enterChildAnode, newcNode)
noda.children[b] = [e.common, mergec]
break
// B is parent of A, an additional hierachy level is required
case resType.Parent:
const p = comp as inRes
const newiNode = emptyInode()
newiNode.children[p.b] = [p.nk, childA]
const mergep = await mergeInodesSync(newiNode, childB)
noda.children[b] = [p.common, mergep]
break
// there is a diff
case resType.Diff:
const c = comp as diffRes
const newNode = emptyInode()
newNode.children[c.b1] = [c.nk1, childA]
newNode.children[c.b2] = [c.nk2, await concretize(childB)]
newNode.children[c.b2] = [c.nk2, await concretizeCid(childB)]
const newNodeCid = (await kubo.dag.put(newNode)) as CID
noda.children[b] = [c.common, newNodeCid]
break
}
} else {
// keep node untouched
// nBcb is null, keep nAcb bucket untouched (might be null or not)
continue
}
}
// now that we have the new node, we can upload it and return its cid
return kubo.dag.put(noda) as Promise<CID>
}
/// concretize virtual node
async function concretize(node: IndexVinode | IndexLeaf): Promise<CID> {
/// concretize virtual node to CID
export async function concretizeCid(node: IndexVinode | IndexLeaf): Promise<CID> {
if ((node as unknown as IndexLeaf).leaf) {
return kubo.dag.put(node) as Promise<CID>
}
// this is a virtual inode
const childrenPromise: Array<null | Promise<[string, CID]>> = (node as unknown as IndexVinode).children.map((c) => {
const newNode = await concretize(node as unknown as IndexVinode)
return kubo.dag.put(newNode) as Promise<CID>
}
/// concretize virtual node
async function concretize(node: IndexVinode): Promise<IndexInode> {
const childrenPromise: Array<null | Promise<[string, CID]>> = node.children.map((c) => {
if (c == null) {
return null
}
const [k, v] = c
return concretize(v).then((cid) => [k, cid as CID])
return concretizeCid(v).then((cid) => [k, cid as CID])
})
const newNode: IndexInode = {
return {
children: await Promise.all(childrenPromise)
}
return kubo.dag.put(newNode) as Promise<CID>
}
export interface diffResult {
type: resultType
/// result where key differ at some point
export interface diffRes {
/// type of result
type: resType
/// common part of keys
common: string
/// bucket of key 1
b1: number
/// rest of key 1
nk1: string
/// bucket of key 2
b2: number
/// rest of key 2
nk2: string
}
export interface enterResult {
type: resultType
/// result where key 1 is included in key 2 start
export interface inRes {
/// type of result
type: resType
/// common part (key 1)
common: string
/// bucket for key 2
b: number
/// rest of key 2
nk: string
}
export interface endResult {
type: resultType
/// result where keys are the same
export interface sameRes {
/// type of result
type: resType
}
export type compResult = diffResult | enterResult | endResult
export enum resultType {
Enter,
export type compResult = diffRes | inRes | sameRes
export enum resType {
Child,
Diff,
End
Same,
Parent
}
/// compare keys and return comp result
// expects k1 to be shorter than k2
export function compareKey(k1: string, k2: string): compResult {
// if keys are the same
if (k1 == k2) {
return {
type: resType.Same
}
}
// start comparison
let common = ''
const l = k1.length
const l = Math.min(k1.length, k2.length)
for (let i = 0; i < l; i++) {
const c1 = k1[i]
const c2 = k2[i]
......@@ -279,7 +373,7 @@ export function compareKey(k1: string, k2: string): compResult {
} else {
// return the comparison result
return {
type: resultType.Diff,
type: resType.Diff,
common,
b1: bucket(c1),
nk1: k1.slice(common.length),
......@@ -288,19 +382,22 @@ export function compareKey(k1: string, k2: string): compResult {
}
}
}
// we reached the end of the shortest key without diff
if (k1.length < k2.length) {
const b = bucket(k2[l])
// we can enter the bucket
return {
type: resultType.Enter,
type: resType.Child,
common,
b,
b: bucket(k2[l]),
nk: k2.slice(common.length)
}
} else {
// we reached the end
return {
type: resultType.End
type: resType.Parent,
common,
b: bucket(k1[l]),
nk: k1.slice(common.length)
}
}
}
......@@ -2,10 +2,10 @@ import { CID } from 'multiformats'
import {
processCesiumPlusImport,
processCesiumPlusProfile,
importCplusToAMTSync,
allCplusCids,
sortCids,
arrayToVinode
sortCidsAndConvertKeys,
arrayToVinode,
allCplusCidsToAMTChunked
} from '../cesium-plus'
import * as fs from 'fs/promises'
import { kubo } from '../kubo'
......@@ -18,6 +18,10 @@ const profiles = (n: number) => `/home/hugo/ipfs/v2s-datapod/migrate_csplus/prof
const CHUNKS = 11
const GROUPBY = 256
/// do import cesium data
// use json chunks as input
// process base64 images apart
// groups output in a single dag for easy import
async function doImport() {
const cids: CID[] = []
// manage chunk by chunk to limit memory usage
......@@ -32,16 +36,16 @@ async function doImport() {
processCesiumPlusImport(cids, GROUPBY).then((cid) => console.log(cid))
}
// doImport()
// in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18
// but this shows we should optimise AMT inserting for higher throughput
function doImportToAMT() {
const cplus = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import
const amt = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid
importCplusToAMTSync(cplus, amt).then(console.log)
}
// UNUSED
// // in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18
// // but this shows we should optimise AMT inserting for higher throughput
// function doImportToAMT() {
// const cplus = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import
// const amt = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid
// importCplusToAMTSync(cplus, amt).then(console.log)
// }
// this is a more optimized version that takes 50 seconds to import all 50000 profiles → 1000 profiles per second
async function doMergeAMT() {
......@@ -51,7 +55,12 @@ async function doMergeAMT() {
const rootNode: IndexInode = (await kubo.dag.get(amt)).value
allCplusCids(cplus)
.then(sortCids)
// .then((x) => {
// fs.writeFile('./cplus.txt', x.toString())
// return x
// })
.then(sortCidsAndConvertKeys)
// .then((x) => fs.writeFile('./cplus3.txt', x.toString()))
.then((all) => {
console.log(Date.now() + ' converting to virtual tree ')
return arrayToVinode(all)
......@@ -66,4 +75,13 @@ async function doMergeAMT() {
})
}
doMergeAMT()
// doMergeAMT()
async function doAllCplusCidsToAMTChunked() {
const cplusCID = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import
const rootNodeCid = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid
allCplusCidsToAMTChunked(cplusCID, rootNodeCid)
}
doAllCplusCidsToAMTChunked()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment