diff --git a/src/indexer/database.ts b/src/indexer/database.ts index da91c81ea38cfbe0f0fd5fe5b70a7b5e2c4428f8..e2a72e9a5059449ad7e65e862e5149a51e78e1d1 100644 --- a/src/indexer/database.ts +++ b/src/indexer/database.ts @@ -201,38 +201,35 @@ async function handleIrWithNonNullData<T>(irCID: CID, ir: IndexRequest, q: Query } // insert index request in database -export async function handleInsertRequest(irCID: CID, ir: IndexRequest) { +export async function handleInsertRequest(irCID: CID, ir: IndexRequest): Promise<void> { // console.debug('💾 indexing ' + irCID) switch (ir.kind) { // insert cesium plus profile case CESIUM_PLUS_PROFILE_INSERT: - handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfile) - break + return handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfile) // insert cesium plus import case CESIUM_PLUS_PROFILE_IMPORT: // transform base58 pubkey to ss58 address with gdev prefix ir.pubkey = base58ToSS58(ir.pubkey, GDEV_PREFIX) - handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfileRaw) - break + return handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfileRaw) // delete cesium plus profile case CESIUM_PLUS_PROFILE_DELETE: // FIXME if delete instruction is received from past, this should be ignored // i.e.: database should keep track of deleted profiles with a timestamp to allow items to be inserted in any order await client.query(`DELETE FROM profiles WHERE pubkey = $1;`, [ir.pubkey]) - break + return // insert transaction comment case TRANSACTION_COMMENT: - handleIrWithNonNullData<TxComment>(irCID, ir, txComment) - break + return handleIrWithNonNullData<TxComment>(irCID, ir, txComment) // unimplemented default: console.log('🔴 unimplemented kind ' + ir.kind) - break + return } } diff --git a/src/indexer/handlers.ts b/src/indexer/handlers.ts index 10fa6b733afcd91c6ba7d2027674a4f55631798b..58ce1e15b85de9c8d2821519125411d004a38d1a 100644 --- a/src/indexer/handlers.ts +++ b/src/indexer/handlers.ts @@ -22,7 +22,7 @@ const MAX_PROCESS_QUEUE = 2000 // root CID of tree export const GLOB_root: { cid: CID } = { cid: undefined! } -// queue of index requests waiting to be processed +// queue of index requests waiting to be processed (added to tree then indexed) // these are the requests received by the network export const GLOB_processQueue: Array<[CID, IndexRequest]> = [] // lock to avoid triggering multiple simultaneous edits @@ -48,9 +48,9 @@ export enum evtype { // event to trigger collecting peer index requests from queue triggerCollect = 'triggerCollect', // event to trigger injestion of new requests in database - indexDiff = 'indexDiff', + triggerIndex = 'triggerIndex', // event to trigger computing diff - computeDiff = 'computeDiff' + triggerComputeDiff = 'triggerComputeDiff' } // === HANDLERS === @@ -98,10 +98,10 @@ async function insertBatch(rootCID: CID, items: Array<[CID, IndexRequest]>): Pro export async function takeFromProcessQueue(): Promise<void> { // ignore event if already processing something or if queue is empty if (GLOB_lockTree) { - console.log('busy') + // console.log('busy on tree') return } - if (GLOB_processQueue.length == 0) { + if (!GLOB_processQueue.length) { return } // if not processing, do lock process @@ -117,50 +117,53 @@ export async function takeFromProcessQueue(): Promise<void> { // try inserting items try { // insert batch and get diff - console.log('inserting', items.length, 'items to tree') + // console.log('inserting', items.length, 'items to tree') const diffData = await insertBatch(GLOB_root.cid, items) - // update root cid and publishes it + GLOB_diffQueue.push(diffData) const newCID = diffData.newCID - console.log(`👉 new root CID ${newCID}`) + // root CID could be untouched if processed data is already in tree + if (GLOB_root.cid.toString() != newCID.toString()) console.log(`👉 new root CID ${newCID}`) + // update root cid and publishes it GLOB_root.cid = newCID - GLOB_lockTree = false kubo.name.publish(newCID, DD_TAMT_OPT).catch(console.log) - // add new data to database - events.emit(evtype.indexDiff, diffData) } catch (e) { console.error(`🥊 error merging ${GLOB_root.cid} with ${items.length} items`, e) GLOB_processQueue.push(...items) // add them back to the process queue - GLOB_lockTree = false - // try again to collect items - events.emit(evtype.triggerProcess) } + GLOB_lockTree = false + checkQueues() return // nothing } -// diff event handler -export async function indexKnownDiff(diff: DiffData): Promise<void> { - // --- 1. manage lock - // if locked, add to index queue +// index diff item from diff queue to database +export async function takeFromDiffQueue(): Promise<void> { + // ignore event if already indexing something or if queue is empty if (GLOB_isIndexing) { - GLOB_diffQueue.push(diff) + // console.log('busy on indexing') + return + } + if (!GLOB_diffQueue.length) { return } // else lock GLOB_isIndexing = true - // --- 2. handle - // check that last indexed CID match old CID - const latestCID = await getLatestIndexedCID() - // if not, unlock and go to indexStart with the new CID instead - if (latestCID == null || diff.oldCID.toString() != latestCID?.toString()) { + // take diff + const diff: DiffData = GLOB_diffQueue.shift()! // we know that the queue is not empty + const latestCID = (await getLatestIndexedCID())! // (must be set at node start) + // check that last indexed CID in db match old CID of diff + // if not, unlock and compute diff with the new CID to make sure we get all items in db + if (diff.oldCID.toString() != latestCID.toString()) { console.log('🤔 db is not at diff start, computing missing diff') - events.emit(evtype.computeDiff, diff.oldCID, diff.newCID) + events.emit(evtype.triggerComputeDiff, diff.oldCID, diff.newCID) } - console.log('➕ adding', diff.newItems.length, 'items to the db') - // insert all index requests - diff.newItems.forEach((ir) => handleInsertRequest(ir[0], ir[1])) - // write new CID as latest indexed (this is purely informative) - console.log(`🦐 latest db cid ${diff.newCID}`) + // still index the diff data + console.log('➕ indexing', diff.newItems.length, 'items to the db') + // insert all index requests (prevents from adding too many at the same time) + await Promise.all(diff.newItems.map((ir) => handleInsertRequest(ir[0], ir[1]))) + // write new CID as latest indexed (but more data might be needed to be really at this stage) + if (diff.oldCID.toString() != diff.newCID.toString()) console.log(`🦐 latest db cid ${diff.newCID}`) await setLatestIndexedCID(diff.newCID) + // TODO compute what db latest cid really should be // unlock GLOB_isIndexing = false // --- 3. check queues @@ -198,12 +201,11 @@ export async function computeDiff(fromCID: CID, toCID: CID): Promise<void> { } // takes item from merge queue, fetch index request, and put them in process queue -export async function collectFromMergeQueue() { +export async function takeFromMergeQueue() { // prevent process queue from growing too much if (GLOB_processQueue.length > MAX_PROCESS_QUEUE) { - if (!GLOB_lockTree) { - events.emit(evtype.triggerProcess) - } + // make sure it is aware + events.emit(evtype.triggerProcess) return } // sort merge queue by key to give a better chance of good batching @@ -220,42 +222,53 @@ export async function collectFromMergeQueue() { items.push(Promise.all([cid, kubo.dag.get(cid).then((r) => r.value)])) } const awaitedItems = await Promise.all(items) - awaitedItems.forEach(([c, _ir]) => { - // make sure to pin the index request to be able to serve its content later - kubo.pin.add(c, { recursive: true }).catch((_e) => console.log(`📌 could not pin remote index request ${c}`)) - }) + // awaitedItems.forEach(([c, _ir]) => { + // // make sure to pin the index request to be able to serve its content later + // kubo.pin.add(c, { recursive: true }).catch((_e) => console.log(`📌 could not pin remote index request ${c}`)) + // }) // add index requests to process queue GLOB_processQueue.push(...awaitedItems) // ask them to be processed if not already processing - if (!GLOB_lockTree) { - events.emit(evtype.triggerProcess) - } + checkProcessQueue() // ask next batch if any + checkMergeQueue() +} + +// check queues to see if new diff came in the meanwile +function checkQueues() { + checkQueueDiff() + checkProcessQueue() + checkMergeQueue() +} + +// if merge queue is not empty, collect +function checkMergeQueue() { if (GLOB_mergeQueue.length) { events.emit(evtype.triggerCollect) } } -// check queues -function checkQueues() { - // check queue to see if new diff came in the meanwile - checkQueueDiff() +// if process queue is not empty, process it +function checkProcessQueue() { + if (!GLOB_lockTree && GLOB_processQueue.length) { + events.emit(evtype.triggerProcess) + } } // if diff came in the meanwhile, index them function checkQueueDiff() { - if (GLOB_diffQueue.length != 0) { - indexKnownDiff(GLOB_diffQueue.shift()!) + if (GLOB_diffQueue.length) { + events.emit(evtype.triggerIndex) } } -setInterval(() => { - console.log( - 'merge queue', - GLOB_mergeQueue.length, - 'process queue', - GLOB_processQueue.length, - 'diff queue', - GLOB_diffQueue.length - ) -}, 10000) +// setInterval(() => { +// console.log( +// 'merge queue', +// GLOB_mergeQueue.length, +// 'process queue', +// GLOB_processQueue.length, +// 'diff queue', +// GLOB_diffQueue.length +// ) +// }, 10000) diff --git a/src/indexer/start.ts b/src/indexer/start.ts index 47b9a3b0ea04a1f908be84f1cb053836fa2007c9..6627761497ccdaaf6f776d421e8169f01594291a 100644 --- a/src/indexer/start.ts +++ b/src/indexer/start.ts @@ -2,16 +2,10 @@ import { EMPTY_NODE_CID, TOPIC } from '../consts' import { resolveHist } from '../processor' import { getPubSubHandler } from '../collector' import { KUBO_RPC, kubo, kubo2 } from '../kubo' -import type { IndexHist, IndexRequest } from '../types' +import type { IndexHist } from '../types' import { CID } from 'multiformats' -import { - indexKnownDiff, - computeDiff, - GLOB_root, - takeFromProcessQueue, - collectFromMergeQueue, - validMessageHandler -} from './handlers' +import { computeDiff, GLOB_root } from './handlers' +import { takeFromDiffQueue, takeFromProcessQueue, takeFromMergeQueue, validMessageHandler } from './handlers' import { events, evtype } from './handlers' import type { DdKeys } from './types' import { getRootCIDfromArgs } from './utils' @@ -57,7 +51,7 @@ async function peerSync(trusted_peer_list: string[]): Promise<void> { for await (const name of kubo.name.resolve(peerDdKeys.tamt)) { const cid = CID.parse(name.slice(6)) // found peer tree, request index diff - events.emit(evtype.computeDiff, GLOB_root.cid, cid) + events.emit(evtype.triggerComputeDiff, GLOB_root.cid, cid) } } } catch (e) { @@ -119,28 +113,29 @@ GLOB_root.cid = await getRootCIDfromArgs(process.argv) await initHistIfNull(GLOB_root.cid) // make sure history is available until then let GLOB_histCID: CID = await resolveHist() let GLOB_hist: IndexHist = (await kubo.dag.get(GLOB_histCID)).value +histPublish() // publish history to track new start root // get latest db CID let latestCID = await getLatestIndexedCID() if (latestCID == null) { latestCID = EMPTY_NODE_CID - setLatestIndexedCID(latestCID) + await setLatestIndexedCID(latestCID) } console.log(`🛢 latest indexed cid ${latestCID}`) // bind event handlers events.on(evtype.triggerProcess, takeFromProcessQueue) -events.on(evtype.indexDiff, indexKnownDiff) -events.on(evtype.computeDiff, computeDiff) -events.on(evtype.triggerCollect, collectFromMergeQueue) +events.on(evtype.triggerIndex, takeFromDiffQueue) +events.on(evtype.triggerComputeDiff, computeDiff) +events.on(evtype.triggerCollect, takeFromMergeQueue) pubsubSubscribe() // subscribe to index requests channel periodicHistPublish(HIST_PUBLISH_PERIOD) // regularly publish history -// periodicPeerSync(PEERSYNC_PERIOD) // regularly sync from peers +periodicPeerSync(PEERSYNC_PERIOD) // regularly sync from peers // emit event to tell indexer to start indexing to database // if it is starting from scratch (emtpy database), it will iterate over all values // if it already indexed up to a given cid (last_indexed_cid in db), it will only iterate over the diff -events.emit(evtype.computeDiff, latestCID, GLOB_root.cid) +events.emit(evtype.triggerComputeDiff, latestCID, GLOB_root.cid) // at startup browse peer list peerSync(trusted_peer_list) diff --git a/src/scripts/cesium-plus-import.ts b/src/scripts/cesium-plus-import.ts index 740f6cd952bb48da817cc84a46f6e73169173bd1..b2c3c5cf52b2619bbd34faf3607edb7ae51525d9 100644 --- a/src/scripts/cesium-plus-import.ts +++ b/src/scripts/cesium-plus-import.ts @@ -131,7 +131,10 @@ async function importIrToAMT(rootNodeCid: CID, input: string) { // 3 minutes // import by batch and logs successive cids // importIrToAMT(EMPTY_NODE_CID, './input/cplusIR.txt') -const rootCID = CID.parse("bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4") +const rootCID = CID.parse("bafyreieybuh6l6bpz3jn76wqbf7jweb4ptq55n3avbaxe3nhkeiabxzmze") importIrToAMT(rootCID, './input/devIr+labels.txt') // → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey (old with simple nodes) -// → bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4 (new with more context and fixed labels) \ No newline at end of file +// → bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4 (new with more context and fixed labels) + +// bafyreieybuh6l6bpz3jn76wqbf7jweb4ptq55n3avbaxe3nhkeiabxzmze +// bafyreifhhss6h5j72ewdcr6b75wda4573wtskjfp2pqiae5l73efwvrvjy \ No newline at end of file diff --git a/src/scripts/getAll.ts b/src/scripts/getAll.ts index a57437bc7daaddac38ebc090765b424971aa71ff..dcbf3a1dc54a0ad3bbaf2bb56cf41beccc6c8f4f 100644 --- a/src/scripts/getAll.ts +++ b/src/scripts/getAll.ts @@ -8,9 +8,7 @@ const LOG_EVERY = 1000 // const cid = CID.parse('bafyreifdwwsnv4p2ag7egt2hjbxne63u2mfbstvnjde4b6blfvijhpiuri') // 10 latest // const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a') -const cid = CID.parse('bafyreifdwwsnv4p2ag7egt2hjbxne63u2mfbstvnjde4b6blfvijhpiuri') -// const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a') -// const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a') +const cid = CID.parse('bafyreic6ozgzlzkp6kkrvhqubttz47mkix36ms6r5inanhsmd6d3mwi7cy')