Skip to content
Snippets Groups Projects
Commit aa7afc85 authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

refac to prepare database injestion

parent 693f66a4
No related branches found
No related tags found
No related merge requests found
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
```sh ```sh
pnpm install pnpm install
pnpm dev pnpm dev # for vue dev UI
``` ```
## Import Cesium+ data ## Import Cesium+ data
...@@ -32,6 +32,14 @@ time npx tsx src/scripts/cesium-plus-import.ts ...@@ -32,6 +32,14 @@ time npx tsx src/scripts/cesium-plus-import.ts
# bafyreigczogsiuhaqus7eucalkwsy4vfkh3f4zg3c3rkvltxrwji6p5rnq # bafyreigczogsiuhaqus7eucalkwsy4vfkh3f4zg3c3rkvltxrwji6p5rnq
``` ```
## Start collector and indexer
To start pubsub collector to IPFS and database indexer
```sh
npx tsx src/scripts/start-indexer.ts
```
## TODO ## TODO
When using Kubo node, libp2p is not needed at all. When using Kubo node, libp2p is not needed at all.
import type { DiffData } from './types'
import { CID } from 'multiformats'
// diff indexer event handler
export async function indexDiff(diff: DiffData) {}
// start indexer event handler
export async function indexStart(cid: CID) {}
import { CID } from 'multiformats'
import type { IndexRequest } from '../types'
// data added between two CIDs
export interface DiffData {
oldCID: CID
newCID: CID
newItems: Array<[CID, IndexRequest]>
}
...@@ -6,17 +6,17 @@ import type { IndexRequest } from '../types' ...@@ -6,17 +6,17 @@ import type { IndexRequest } from '../types'
import { IPNS, EMPTY_NODE_CID } from '../consts' import { IPNS, EMPTY_NODE_CID } from '../consts'
import { CID } from 'multiformats' import { CID } from 'multiformats'
import EventEmitter from 'events' import EventEmitter from 'events'
import { indexDiff, indexStart } from '../indexer/handlers'
// this script allows to start an indexer // this script allows to start an indexer
// === GLOBALS ===
// queue of index requests waiting to be processed // queue of index requests waiting to be processed
const processQueue: Array<[CID, IndexRequest]> = [] const processQueue: Array<[CID, IndexRequest]> = []
const queueEvents = new EventEmitter() const events = new EventEmitter()
let isProcessingQueue = false let isProcessingQueue = false
// show args
// console.log(process.argv)
/// global rootCID variable /// global rootCID variable
// initialize it: // initialize it:
// - from CID if given // - from CID if given
...@@ -25,44 +25,60 @@ let isProcessingQueue = false ...@@ -25,44 +25,60 @@ let isProcessingQueue = false
// - as empty node else // - as empty node else
let rootCID = EMPTY_NODE_CID let rootCID = EMPTY_NODE_CID
// === EVENTS ===
// event type
enum evtype {
// event to trigger collecting new index requests in queue
trigger = 'trigger',
// event to trigger injestion of new requests in database
indexDiff = 'indexThat',
// event to trigger database sync from scratch or last state
indexStart = 'start'
}
// === INIT ===
// get root cid from arg if given // get root cid from arg if given
if (process.argv.length >= 3) { async function setRootCIDfromArgs(argv: string[]) {
const arg = process.argv[2] if (argv.length >= 3) {
try { const arg = process.argv[2]
// try parse as CID
rootCID = CID.parse(arg)
console.log(`using ${rootCID} as startup root node`)
} catch {
try { try {
// try resolve as ipns // try parse as CID
for await (const name of kubo.name.resolve(arg, { nocache: true })) { rootCID = CID.parse(arg)
console.log(`using ${rootCID} as startup root node`)
} catch {
try {
// try resolve as ipns
for await (const name of kubo.name.resolve(arg, { nocache: true })) {
const cid = CID.parse(name.slice(6))
console.log(`using resolved ${cid} as startup root node`)
rootCID = cid
break
}
} catch {
console.error(`can not parse ${arg} as CID or IPNS entry`)
process.exit()
}
}
} else {
try {
// no arg given, try to resolve from default IPNS
for await (const name of kubo.name.resolve(IPNS, { nocache: true })) {
const cid = CID.parse(name.slice(6)) const cid = CID.parse(name.slice(6))
console.log(`using resolved ${cid} as startup root node`) console.log(`using ${cid} resolved from default IPNS ${IPNS} as startup root node`)
rootCID = cid rootCID = cid
break break
} }
} catch { } catch {
console.error(`can not parse ${arg} as CID or IPNS entry`) console.log('using empty node insead')
process.exit()
} }
} }
} else {
try {
// no arg given, try to resolve from default IPNS
for await (const name of kubo.name.resolve(IPNS, { nocache: true })) {
const cid = CID.parse(name.slice(6))
console.log(`using ${cid} resolved from default IPNS ${IPNS} as startup root node`)
rootCID = cid
break
}
} catch {
console.log('using empty node insead')
}
} }
// === define what to do of pubsub messages and start listening === // === HANDLERS ===
// message handler // pubsub message handler
function handleMessage(message: any) { function handleMessage(message: any) {
// console.log('received message') // console.log('received message')
// console.log(message) // console.log(message)
...@@ -90,7 +106,7 @@ function handleMessage(message: any) { ...@@ -90,7 +106,7 @@ function handleMessage(message: any) {
// low trust => sandboxed HAMT // low trust => sandboxed HAMT
processQueue.push([cid, dag]) processQueue.push([cid, dag])
queueEvents.emit('trigger') events.emit(evtype.trigger)
// this is all that this indexer does // this is all that this indexer does
// the rest (autopinning, postgres indexing... will be managed by an other part of the indexer) // the rest (autopinning, postgres indexing... will be managed by an other part of the indexer)
...@@ -108,11 +124,11 @@ function handleBatch() { ...@@ -108,11 +124,11 @@ function handleBatch() {
// ignore event if already processing something or if queue is empty // ignore event if already processing something or if queue is empty
if (isProcessingQueue) return if (isProcessingQueue) return
if (processQueue.length == 0) return if (processQueue.length == 0) return
// if not processing do process // if not processing, do process
isProcessingQueue = true isProcessingQueue = true
// take elements from queue // take elements from queue
let i = undefined let i = undefined
const items = [] const items: Array<[CID, IndexRequest]> = []
while ((i = processQueue.shift()) != undefined) { while ((i = processQueue.shift()) != undefined) {
items.push(i) items.push(i)
} }
...@@ -126,22 +142,39 @@ function handleBatch() { ...@@ -126,22 +142,39 @@ function handleBatch() {
.then((v) => mergeInodesSync(v.value, tree)) .then((v) => mergeInodesSync(v.value, tree))
.then((cid) => { .then((cid) => {
// update CID and schedule publishing of new CID in history // update CID and schedule publishing of new CID in history
const oldCID = rootCID
rootCID = cid rootCID = cid
console.log(`new root CID ${cid}`) console.log(`new root CID ${cid}`)
kubo.name.publish(rootCID, { ttl: '1s' }) kubo.name.publish(rootCID, { ttl: '1s' })
publishHistory(rootCID) publishHistory(rootCID)
isProcessingQueue = false isProcessingQueue = false
queueEvents.emit('trigger') // trigger an other event in case new requests arrived meanwhile
events.emit(evtype.trigger)
// emit event to be processed by indexer
events.emit(evtype.indexDiff, { oldCID: oldCID, newCID: rootCID, newItems: items })
}) })
} }
queueEvents.on('trigger', handleBatch) // ===================== START ===========================
queueEvents.on('trigger', handleBatch)
// console.log(await kubo.pubsub.ls()) // set root CID from CLI args
// console.log(await kubo.pubsub.peers(TOPIC)) await setRootCIDfromArgs(process.argv)
// bind event handlers
events.on(evtype.trigger, handleBatch)
events.on(evtype.indexDiff, indexDiff)
events.on(evtype.indexStart, indexStart)
kubo.pubsub.subscribe(TOPIC, handleMessage) kubo.pubsub.subscribe(TOPIC, handleMessage)
// emit event to tell indexer to start indexing to database
// if it is starting from scratch, it will iterate over all values
// if it already indexed up to a given cid, it will only iterate over the diff
events.emit(evtype.indexStart, rootCID)
// // optionally publish new cid and history with
// kubo.name.publish(rootCID, { ttl: '1s' })
// publishHistory(rootCID)
// process loop
console.log('listening...') console.log('listening...')
setInterval(() => {}, 1 << 30) setInterval(() => {}, 1 << 30)
;['SIGINT', 'SIGTERM', 'SIGQUIT'].forEach((signal) => process.on(signal, process.exit)) ;['SIGINT', 'SIGTERM', 'SIGQUIT'].forEach((signal) => process.on(signal, process.exit))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment