diff --git a/README.md b/README.md index 16dd0333968ae9c78404419858212bc7d8b87d18..fb174c30f7fe7428f13ad2ee7bdb3f973caad5e2 100644 --- a/README.md +++ b/README.md @@ -37,7 +37,7 @@ time npx tsx src/scripts/cesium-plus-import.ts To start pubsub collector to IPFS and database indexer ```sh -npx tsx src/scripts/start-indexer.ts +npx tsx src/indexer/start.ts ``` ## TODO diff --git a/src/indexer/database.ts b/src/indexer/database.ts index 951295a9ba5beac4dec3748bfca29e08d6276d3c..d8e3d33953f817f9a251ca613605f3748cc1cef2 100644 --- a/src/indexer/database.ts +++ b/src/indexer/database.ts @@ -26,22 +26,33 @@ await client.connect() // functions -// latest root cid that was indexed -export async function getLatestIndexedCID(): Promise<CID | null> { - const res = await client.query('SELECT value FROM meta WHERE key=$1', ['last_indexed_cid']) +// get key in key/value meta +async function getKey(key: string): Promise<string | null> { + const res = await client.query('SELECT value FROM meta WHERE key=$1', [key]) if (res.rows.length != 0) { - return CID.parse(res.rows[0].value) + return res.rows[0].value } else { return null } } -export async function setLatestIndexedCID(cid: CID) { +// set key in key/value meta +async function setKey(key: string, value: string) { await client.query( - 'INSERT INTO meta(key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = meta.value', - ['last_indexed_cid', cid.toString()] + 'INSERT INTO meta(key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value', + [key, value] ) } +// latest root cid that was indexed +export async function getLatestIndexedCID(): Promise<CID | null> { + const val = await getKey('last_indexed_cid') + return val ? CID.parse(val) : null +} +// set latest root cid +export async function setLatestIndexedCID(cid: CID) { + await setKey('last_indexed_cid', cid.toString()) +} + // cesium plus profile query and param builder const cesiumPlusProfile: QueryBuilder = { query: `INSERT INTO diff --git a/src/indexer/handlers.ts b/src/indexer/handlers.ts index 7e5e18abe5f36d05ab87a59a016ff8e563f5cba1..be60c0d49ad5d53e7a4ff79f95b70120c1b930e4 100644 --- a/src/indexer/handlers.ts +++ b/src/indexer/handlers.ts @@ -5,23 +5,57 @@ import { getLatestIndexedCID, handleInsertRequest, setLatestIndexedCID } from '. import { kubo } from '../kubo' import { getDiff, getAll } from '../interface' +// === EVENTS === + // events to communicate between processes export const events = new EventEmitter() +// event type +export enum evtype { + // event to trigger collecting new index requests in queue + trigger = 'trigger', + // event to trigger injestion of new requests in database + indexDiff = 'indexThat', + // event to trigger database sync from scratch or last state + indexStart = 'start' +} + +// === INDEXING === + +// queue of index requests waiting to be processed +const indexQueue: Array<DiffData> = [] // lock to prevent multiple database edits which would fake the last root cid +// this prevents from overwriting latest indexed data let isIndexing = false +// === HANDLERS === + // diff indexer event handler -export async function indexDiff(diff: DiffData) { +export async function indexKnownDiff(diff: DiffData) { // if locked, add to index queue + if (isIndexing) { + indexQueue.push(diff) + return + } // else lock + isIndexing = true // check that last indexed CID match old CID + const latestCID = await getLatestIndexedCID() // if not, unlock and go to indexStart with the new CID instead - // filter by kind - // update database - // write new CID as latest indexed - // unlock - // send check queue event to see if diff came in the meanwhile + if (latestCID == null || diff.oldCID.toString() != latestCID?.toString()) { + console.log('diff is not based on same cid as database, re-computing diff') + isIndexing = false + await indexStart(diff.newCID) + } else { + // insert all index requests + diff.newItems.forEach((ir) => handleInsertRequest(ir[0], ir[1])) + // write new CID as latest indexed + await setLatestIndexedCID(diff.newCID) + // unlock + isIndexing = false + } + // check queue to see if diff came in the meanwile + checkQueue() } // start indexer event handler @@ -48,6 +82,13 @@ export async function indexStart(cid: CID) { } // unlock isIndexing = false - // send check queue event to see if diff came in the meanwile - // TODO + // check queue to see if diff came in the meanwile + checkQueue() +} + +// if diff came in the meanwhile, index them +function checkQueue() { + if (indexQueue.length != 0) { + indexKnownDiff(indexQueue.shift()!) + } } diff --git a/src/indexer/start.ts b/src/indexer/start.ts index 9c6dc392d82c08cefab93c7a7d950f273ada9dca..1fc959d4f4534d6bef699e6c50572d99dbee7561 100644 --- a/src/indexer/start.ts +++ b/src/indexer/start.ts @@ -3,78 +3,10 @@ import { timestampToKey, mergeInodesSync, arrayToVinode, publishHistory } from ' import { buildBytesPayload, isValidSignature } from '../collector' import { kubo } from '../kubo' import type { IndexRequest } from '../types' -import { IPNS, EMPTY_NODE_CID } from '../consts' import { CID } from 'multiformats' -import { events, indexDiff, indexStart } from './handlers' +import { events, evtype, indexKnownDiff, indexStart } from './handlers' import type { DiffData } from './types' - -// this script allows to start an indexer - -// === GLOBALS === - -// queue of index requests waiting to be processed -const processQueue: Array<[CID, IndexRequest]> = [] -// lock to avoid triggering multiple simultaneous edits -let isProcessingQueue = false - -/// global rootCID variable -// initialize it: -// - from CID if given -// - from IPNS if given -// - from default IPNS else -// - as empty node else -let rootCID = EMPTY_NODE_CID - -// === EVENTS === - -// event type -enum evtype { - // event to trigger collecting new index requests in queue - trigger = 'trigger', - // event to trigger injestion of new requests in database - indexDiff = 'indexThat', - // event to trigger database sync from scratch or last state - indexStart = 'start' -} - -// === INIT === - -// get root cid from arg if given -async function setRootCIDfromArgs(argv: string[]) { - if (argv.length >= 3) { - const arg = process.argv[2] - try { - // try parse as CID - rootCID = CID.parse(arg) - console.log(`using ${rootCID} as startup root node`) - } catch { - try { - // try resolve as ipns - for await (const name of kubo.name.resolve(arg, { nocache: true })) { - const cid = CID.parse(name.slice(6)) - console.log(`using resolved ${cid} as startup root node`) - rootCID = cid - break - } - } catch { - console.error(`can not parse ${arg} as CID or IPNS entry`) - process.exit() - } - } - } else { - try { - // no arg given, try to resolve from default IPNS - for await (const name of kubo.name.resolve(IPNS, { nocache: true })) { - const cid = CID.parse(name.slice(6)) - console.log(`using ${cid} resolved from default IPNS ${IPNS} as startup root node`) - rootCID = cid - break - } - } catch { - console.log('using empty node insead') - } - } -} +import { getRootCIDfromArgs } from './utils' // === HANDLERS === @@ -162,12 +94,21 @@ function handleBatch() { // ===================== START =========================== -// set root CID from CLI args -await setRootCIDfromArgs(process.argv) +// === GLOBALS === + +// queue of index requests waiting to be processed +const processQueue: Array<[CID, IndexRequest]> = [] +// lock to avoid triggering multiple simultaneous edits +// this prevents from branching the local AMT +let isProcessingQueue = false + +/// global rootCID variable +// set it from CLI args +let rootCID = await getRootCIDfromArgs(process.argv) // bind event handlers events.on(evtype.trigger, handleBatch) -events.on(evtype.indexDiff, indexDiff) +events.on(evtype.indexDiff, indexKnownDiff) events.on(evtype.indexStart, indexStart) kubo.pubsub.subscribe(TOPIC, handleMessage) diff --git a/src/indexer/utils.ts b/src/indexer/utils.ts new file mode 100644 index 0000000000000000000000000000000000000000..1d4edfe8fce8f20a7094d3725bbf3b15b1de68c1 --- /dev/null +++ b/src/indexer/utils.ts @@ -0,0 +1,47 @@ +import { IPNS, EMPTY_NODE_CID } from '../consts' +import { CID } from 'multiformats' +import { kubo } from '../kubo' + +// get root cid from arg if given +// initialize it: +// - from CID if given +// - from IPNS if given +// - from default IPNS else +// - as empty node else +export async function getRootCIDfromArgs(argv: string[]): Promise<CID> { + if (argv.length >= 3) { + const arg = process.argv[2] + try { + // try parse as CID + const rootCID = CID.parse(arg) + console.log(`using ${rootCID} as startup root node`) + return rootCID + } catch { + try { + // try resolve as ipns + for await (const name of kubo.name.resolve(arg, { nocache: true })) { + const cid = CID.parse(name.slice(6)) + console.log(`using resolved ${cid} as startup root node`) + return cid + } + } catch { + console.error(`can not parse ${arg} as CID or IPNS entry`) + process.exit() + } + } + } else { + try { + // no arg given, try to resolve from default IPNS + for await (const name of kubo.name.resolve(IPNS, { nocache: true })) { + const cid = CID.parse(name.slice(6)) + console.log(`using ${cid} resolved from default IPNS ${IPNS} as startup root node`) + return cid + } + } catch { + console.log('using empty node insead') + return EMPTY_NODE_CID + } + } + console.log('unreachable') + return EMPTY_NODE_CID +} diff --git a/src/main.ts b/src/main.ts index 5179324ab1ef7572bd4e73bf6e6df1210037d570..f8cbf195e914cbbb3a66e81b67b7c7b79ec5e9ce 100644 --- a/src/main.ts +++ b/src/main.ts @@ -37,7 +37,7 @@ pubsub.addEventListener('message', (message) => { const isValid = isValidSignature(bytesPayload, dag.sig, dag.pubk) if (isValid) { // here we would do the processing - addToIndex(cid, dag) + // addToIndex(cid, dag) } else { feed.value.push('[invalid sig] ' + msg) }