diff --git a/src/cesium-plus.ts b/src/cesium-plus.ts index b0ae64bc2657cb8199dd0538eeae25a00e866486..e20f5c0c5dd93ded6ddc0348c00f61e0665511f3 100644 --- a/src/cesium-plus.ts +++ b/src/cesium-plus.ts @@ -52,7 +52,7 @@ export async function wrapCplusInIndexRequest( } /// import sorted cplus index requests to AMT -export async function cplusIrToAMT(requests: Array<[string, CID]>, rootNodeCid: CID) { +export async function indexRequestsToAMT(requests: Array<[string, CID]>, rootNodeCid: CID) { const chunkSize = 5000 const n = requests.length console.log(Date.now() + ' merging') diff --git a/src/processor.ts b/src/processor.ts index 9deef90546541e1251e7ba0df3b8fbdd1e3b8eb9..86ac55cc5f588501560c092701c4eacc4e28eb7b 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -250,6 +250,8 @@ export async function mergeInodesSync( // if we are not yet at the leaf level, we can safely assume that nodeA and nodeB are indeed inodes const noda = nodeA as IndexInode const nodb = nodeB as IndexVinode + assert(noda.ctx == nodb.ctx) + const ctx = noda.ctx // process each bucket sequencially let itemCount = 0 @@ -280,7 +282,7 @@ export async function mergeInodesSync( case resType.Child: const e = comp as inRes // since B is child (longer key), A can only be an inode, not a leaf - const newcNode = emptyVinode(e.common) + const newcNode = emptyVinode(ctx + e.common) newcNode.children[e.b] = [e.nk, childB] const [countC, mergec] = await mergeInodesSyncCID(childA, newcNode) itemCount += countC @@ -290,7 +292,7 @@ export async function mergeInodesSync( // B is parent of A, an additional hierachy level is required case resType.Parent: const p = comp as inRes - const newiNode = emptyInode(p.common) + const newiNode = emptyInode(ctx + p.common) newiNode.children[p.b] = [p.nk, childA] const [countP, mergep] = await mergeInodesSync(newiNode, childB) itemCount += countP @@ -300,7 +302,7 @@ export async function mergeInodesSync( // there is a diff case resType.Diff: const c = comp as diffRes - const newNode = emptyInode(c.common) + const newNode = emptyInode(ctx + c.common) newNode.children[c.b1] = [c.nk1, childA] // here, this is suboptimal since we are forced to fetch childA to take into account its item count itemCount += await getItemCount(childA) diff --git a/src/scripts/addLabelsToIR.ts b/src/scripts/addLabelsToIR.ts new file mode 100644 index 0000000000000000000000000000000000000000..b566274de79e1b027892fd9e823d0fb12af52e22 --- /dev/null +++ b/src/scripts/addLabelsToIR.ts @@ -0,0 +1,54 @@ +import { kubo } from '../kubo' +import { timestampToKey } from '../processor' +import { CID } from 'multiformats' +import { appendFile, createReadStream } from 'fs' +import { createInterface } from 'readline' + +async function addLabels(input: string, output: string) { + const LIMIT = 500 // max number of lines to process simultaneously + const NOTIF = 2000 // log every N lines processed + const rejected = './input/HS.txt' + let queueSize = 0 + let readTotal = 0 + + function process(line: string) { + queueSize++ + if (queueSize > LIMIT) { + linereader.pause() + } + try { + const irCid = line + const irCID = CID.parse(irCid) + kubo.dag + .get(irCID) + .then((x) => timestampToKey(x.value.time) + ' ' + irCid + '\n') + .then((l) => + appendFile(output, l, () => { + readTotal++ + queueSize-- + if (queueSize < LIMIT) { + linereader.resume() + } + if (readTotal % NOTIF == 0) { + console.log(`processed ${readTotal} profiles`) + } + }) + ) + .catch((e) => { + console.log(e) + appendFile(rejected, line, () => { + readTotal++ + }) + }) + } catch (e) { + console.log(e) + appendFile(rejected, line + '\n\n\n', () => {}) + } + } + const linereader = createInterface(createReadStream(input)) + linereader.on('line', process) + linereader.on('close', () => console.log('done')) +} + +// addLabels('./input/cids.txt', './input/cids+labels.txt') +addLabels('./input/devIr.txt', './input/devIr+labels.txt') diff --git a/src/scripts/cesium-plus-import.ts b/src/scripts/cesium-plus-import.ts index 9584481b799518c6132f1e1aaa10c2ea21c26cb8..740f6cd952bb48da817cc84a46f6e73169173bd1 100644 --- a/src/scripts/cesium-plus-import.ts +++ b/src/scripts/cesium-plus-import.ts @@ -1,5 +1,5 @@ import { CID } from 'multiformats' -import { wrapCplusInIndexRequest, cplusIrToAMT } from '../cesium-plus' +import { wrapCplusInIndexRequest, indexRequestsToAMT } from '../cesium-plus' import { createInterface } from 'readline' import { appendFile, createReadStream } from 'fs' import { timestampToKey } from '../processor' @@ -104,8 +104,8 @@ async function wrapRawProfilesInIndexRequest() { linereader.on('close', () => console.log('done')) } -async function importAllCplusToAMT() { - const input = './input/cplusIR.txt' +// expects to receive a file with on each line a label and the index request CID +async function importIrToAMT(rootNodeCid: CID, input: string) { const requests = await readFile(input, 'utf8') .then((r) => r.split('\n')) .then((p) => @@ -117,8 +117,7 @@ async function importAllCplusToAMT() { }) ) .then((l) => l.sort()) - const rootNodeCid = EMPTY_NODE_CID // empty root cid - await cplusIrToAMT(requests, rootNodeCid) + await indexRequestsToAMT(requests, rootNodeCid) } // 26 minutes @@ -131,5 +130,8 @@ async function importAllCplusToAMT() { // 3 minutes // import by batch and logs successive cids -importAllCplusToAMT() -// → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey \ No newline at end of file +// importIrToAMT(EMPTY_NODE_CID, './input/cplusIR.txt') +const rootCID = CID.parse("bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4") +importIrToAMT(rootCID, './input/devIr+labels.txt') +// → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey (old with simple nodes) +// → bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4 (new with more context and fixed labels) \ No newline at end of file diff --git a/src/scripts/getAll.ts b/src/scripts/getAll.ts new file mode 100644 index 0000000000000000000000000000000000000000..a57437bc7daaddac38ebc090765b424971aa71ff --- /dev/null +++ b/src/scripts/getAll.ts @@ -0,0 +1,45 @@ +import { kubo } from '../kubo' +import { getAll } from '../interface' +import { CID } from 'multiformats' +import type { IndexRequest } from '../types' +import { CESIUM_PLUS_PROFILE_IMPORT, CESIUM_PLUS_PROFILE_INSERT } from '../consts' + +const LOG_EVERY = 1000 + +// const cid = CID.parse('bafyreifdwwsnv4p2ag7egt2hjbxne63u2mfbstvnjde4b6blfvijhpiuri') // 10 latest +// const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a') +const cid = CID.parse('bafyreifdwwsnv4p2ag7egt2hjbxne63u2mfbstvnjde4b6blfvijhpiuri') +// const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a') +// const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a') + + + +const iterator = getAll(cid) + +console.log('in', cid) + +async function doit() { + let num = 0 + for await (const leaf of iterator) { + for (let item of leaf) { + num += 1 + // const [k, irCID] = item + const irCID = item + kubo.dag + .get(irCID) + .then((res) => res.value) + .then((r) => { + const ir = r as IndexRequest + if (ir.kind != CESIUM_PLUS_PROFILE_IMPORT) { + console.log(irCID.toString()) + } + }) + if (num % LOG_EVERY == 0) { + console.log(num) + } + } + } + console.log(num) +} + +doit() diff --git a/src/scripts/timestamp.ts b/src/scripts/timestamp.ts index 28516e6e13cf23f939e1bbfd0ff1ef57976ec3e0..79dd020c183c8a58e9d6b9b58ddc8bdb68f96e93 100644 --- a/src/scripts/timestamp.ts +++ b/src/scripts/timestamp.ts @@ -5,3 +5,7 @@ console.log(timestampToKey(1519405679)) // 000000005a904a6f console.log(timestampToKey(1523008319)) // 000000005ac7433f console.log(timestampToKey(1625151291)) // 0000000060ddd73b console.log(timestampToKey(1625151291000)) // 0000017a6290be78 + +console.log(timestampToKey(1725985291005)) // 00000191dcbd7afd + +console.log(timestampToKey(1539382746000)) // 000001666a5c9b90