diff --git a/README.md b/README.md index 1a61b021aa47b8065a44889b68d788910eae9d9e..72e96076c9a2fa2d80daf63c024974257e18412a 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ ```sh pnpm install -pnpm dev +pnpm dev # for vue dev UI ``` ## Import Cesium+ data @@ -32,6 +32,14 @@ time npx tsx src/scripts/cesium-plus-import.ts # bafyreigczogsiuhaqus7eucalkwsy4vfkh3f4zg3c3rkvltxrwji6p5rnq ``` +## Start collector and indexer + +To start pubsub collector to IPFS and database indexer + +```sh +npx tsx src/scripts/start-indexer.ts +``` + ## TODO When using Kubo node, libp2p is not needed at all. diff --git a/src/indexer/handlers.ts b/src/indexer/handlers.ts new file mode 100644 index 0000000000000000000000000000000000000000..d5472ea2d071165c602b9fa846c99fb10dfa73f4 --- /dev/null +++ b/src/indexer/handlers.ts @@ -0,0 +1,8 @@ +import type { DiffData } from './types' +import { CID } from 'multiformats' + +// diff indexer event handler +export async function indexDiff(diff: DiffData) {} + +// start indexer event handler +export async function indexStart(cid: CID) {} diff --git a/src/indexer/types.ts b/src/indexer/types.ts new file mode 100644 index 0000000000000000000000000000000000000000..832592337157e2b32a0d189cdbff3f1b8815e1bf --- /dev/null +++ b/src/indexer/types.ts @@ -0,0 +1,9 @@ +import { CID } from 'multiformats' +import type { IndexRequest } from '../types' + +// data added between two CIDs +export interface DiffData { + oldCID: CID + newCID: CID + newItems: Array<[CID, IndexRequest]> +} diff --git a/src/scripts/start-indexer.ts b/src/scripts/start-indexer.ts index e2665dbbf741463b490094bd7ee4e48563f388ec..910d0658cdb3b3c4a17639a9ec181c78da1fef6e 100644 --- a/src/scripts/start-indexer.ts +++ b/src/scripts/start-indexer.ts @@ -6,17 +6,17 @@ import type { IndexRequest } from '../types' import { IPNS, EMPTY_NODE_CID } from '../consts' import { CID } from 'multiformats' import EventEmitter from 'events' +import { indexDiff, indexStart } from '../indexer/handlers' // this script allows to start an indexer +// === GLOBALS === + // queue of index requests waiting to be processed const processQueue: Array<[CID, IndexRequest]> = [] -const queueEvents = new EventEmitter() +const events = new EventEmitter() let isProcessingQueue = false -// show args -// console.log(process.argv) - /// global rootCID variable // initialize it: // - from CID if given @@ -25,44 +25,60 @@ let isProcessingQueue = false // - as empty node else let rootCID = EMPTY_NODE_CID +// === EVENTS === + +// event type +enum evtype { + // event to trigger collecting new index requests in queue + trigger = 'trigger', + // event to trigger injestion of new requests in database + indexDiff = 'indexThat', + // event to trigger database sync from scratch or last state + indexStart = 'start' +} + +// === INIT === + // get root cid from arg if given -if (process.argv.length >= 3) { - const arg = process.argv[2] - try { - // try parse as CID - rootCID = CID.parse(arg) - console.log(`using ${rootCID} as startup root node`) - } catch { +async function setRootCIDfromArgs(argv: string[]) { + if (argv.length >= 3) { + const arg = process.argv[2] try { - // try resolve as ipns - for await (const name of kubo.name.resolve(arg, { nocache: true })) { + // try parse as CID + rootCID = CID.parse(arg) + console.log(`using ${rootCID} as startup root node`) + } catch { + try { + // try resolve as ipns + for await (const name of kubo.name.resolve(arg, { nocache: true })) { + const cid = CID.parse(name.slice(6)) + console.log(`using resolved ${cid} as startup root node`) + rootCID = cid + break + } + } catch { + console.error(`can not parse ${arg} as CID or IPNS entry`) + process.exit() + } + } + } else { + try { + // no arg given, try to resolve from default IPNS + for await (const name of kubo.name.resolve(IPNS, { nocache: true })) { const cid = CID.parse(name.slice(6)) - console.log(`using resolved ${cid} as startup root node`) + console.log(`using ${cid} resolved from default IPNS ${IPNS} as startup root node`) rootCID = cid break } } catch { - console.error(`can not parse ${arg} as CID or IPNS entry`) - process.exit() + console.log('using empty node insead') } } -} else { - try { - // no arg given, try to resolve from default IPNS - for await (const name of kubo.name.resolve(IPNS, { nocache: true })) { - const cid = CID.parse(name.slice(6)) - console.log(`using ${cid} resolved from default IPNS ${IPNS} as startup root node`) - rootCID = cid - break - } - } catch { - console.log('using empty node insead') - } } -// === define what to do of pubsub messages and start listening === +// === HANDLERS === -// message handler +// pubsub message handler function handleMessage(message: any) { // console.log('received message') // console.log(message) @@ -90,7 +106,7 @@ function handleMessage(message: any) { // low trust => sandboxed HAMT processQueue.push([cid, dag]) - queueEvents.emit('trigger') + events.emit(evtype.trigger) // this is all that this indexer does // the rest (autopinning, postgres indexing... will be managed by an other part of the indexer) @@ -108,11 +124,11 @@ function handleBatch() { // ignore event if already processing something or if queue is empty if (isProcessingQueue) return if (processQueue.length == 0) return - // if not processing do process + // if not processing, do process isProcessingQueue = true // take elements from queue let i = undefined - const items = [] + const items: Array<[CID, IndexRequest]> = [] while ((i = processQueue.shift()) != undefined) { items.push(i) } @@ -126,22 +142,39 @@ function handleBatch() { .then((v) => mergeInodesSync(v.value, tree)) .then((cid) => { // update CID and schedule publishing of new CID in history + const oldCID = rootCID rootCID = cid console.log(`new root CID ${cid}`) kubo.name.publish(rootCID, { ttl: '1s' }) publishHistory(rootCID) isProcessingQueue = false - queueEvents.emit('trigger') + // trigger an other event in case new requests arrived meanwhile + events.emit(evtype.trigger) + // emit event to be processed by indexer + events.emit(evtype.indexDiff, { oldCID: oldCID, newCID: rootCID, newItems: items }) }) } -queueEvents.on('trigger', handleBatch) -queueEvents.on('trigger', handleBatch) +// ===================== START =========================== -// console.log(await kubo.pubsub.ls()) -// console.log(await kubo.pubsub.peers(TOPIC)) +// set root CID from CLI args +await setRootCIDfromArgs(process.argv) + +// bind event handlers +events.on(evtype.trigger, handleBatch) +events.on(evtype.indexDiff, indexDiff) +events.on(evtype.indexStart, indexStart) kubo.pubsub.subscribe(TOPIC, handleMessage) +// emit event to tell indexer to start indexing to database +// if it is starting from scratch, it will iterate over all values +// if it already indexed up to a given cid, it will only iterate over the diff +events.emit(evtype.indexStart, rootCID) +// // optionally publish new cid and history with +// kubo.name.publish(rootCID, { ttl: '1s' }) +// publishHistory(rootCID) + +// process loop console.log('listening...') setInterval(() => {}, 1 << 30) ;['SIGINT', 'SIGTERM', 'SIGQUIT'].forEach((signal) => process.on(signal, process.exit))