Skip to content
Snippets Groups Projects
Commit 73691b92 authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

await for db edits

parent c84c730d
Branches
No related tags found
No related merge requests found
...@@ -201,38 +201,35 @@ async function handleIrWithNonNullData<T>(irCID: CID, ir: IndexRequest, q: Query ...@@ -201,38 +201,35 @@ async function handleIrWithNonNullData<T>(irCID: CID, ir: IndexRequest, q: Query
} }
// insert index request in database // insert index request in database
export async function handleInsertRequest(irCID: CID, ir: IndexRequest) { export async function handleInsertRequest(irCID: CID, ir: IndexRequest): Promise<void> {
// console.debug('💾 indexing ' + irCID) // console.debug('💾 indexing ' + irCID)
switch (ir.kind) { switch (ir.kind) {
// insert cesium plus profile // insert cesium plus profile
case CESIUM_PLUS_PROFILE_INSERT: case CESIUM_PLUS_PROFILE_INSERT:
handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfile) return handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfile)
break
// insert cesium plus import // insert cesium plus import
case CESIUM_PLUS_PROFILE_IMPORT: case CESIUM_PLUS_PROFILE_IMPORT:
// transform base58 pubkey to ss58 address with gdev prefix // transform base58 pubkey to ss58 address with gdev prefix
ir.pubkey = base58ToSS58(ir.pubkey, GDEV_PREFIX) ir.pubkey = base58ToSS58(ir.pubkey, GDEV_PREFIX)
handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfileRaw) return handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfileRaw)
break
// delete cesium plus profile // delete cesium plus profile
case CESIUM_PLUS_PROFILE_DELETE: case CESIUM_PLUS_PROFILE_DELETE:
// FIXME if delete instruction is received from past, this should be ignored // FIXME if delete instruction is received from past, this should be ignored
// i.e.: database should keep track of deleted profiles with a timestamp to allow items to be inserted in any order // i.e.: database should keep track of deleted profiles with a timestamp to allow items to be inserted in any order
await client.query(`DELETE FROM profiles WHERE pubkey = $1;`, [ir.pubkey]) await client.query(`DELETE FROM profiles WHERE pubkey = $1;`, [ir.pubkey])
break return
// insert transaction comment // insert transaction comment
case TRANSACTION_COMMENT: case TRANSACTION_COMMENT:
handleIrWithNonNullData<TxComment>(irCID, ir, txComment) return handleIrWithNonNullData<TxComment>(irCID, ir, txComment)
break
// unimplemented // unimplemented
default: default:
console.log('🔴 unimplemented kind ' + ir.kind) console.log('🔴 unimplemented kind ' + ir.kind)
break return
} }
} }
......
...@@ -22,7 +22,7 @@ const MAX_PROCESS_QUEUE = 2000 ...@@ -22,7 +22,7 @@ const MAX_PROCESS_QUEUE = 2000
// root CID of tree // root CID of tree
export const GLOB_root: { cid: CID } = { cid: undefined! } export const GLOB_root: { cid: CID } = { cid: undefined! }
// queue of index requests waiting to be processed // queue of index requests waiting to be processed (added to tree then indexed)
// these are the requests received by the network // these are the requests received by the network
export const GLOB_processQueue: Array<[CID, IndexRequest]> = [] export const GLOB_processQueue: Array<[CID, IndexRequest]> = []
// lock to avoid triggering multiple simultaneous edits // lock to avoid triggering multiple simultaneous edits
...@@ -48,9 +48,9 @@ export enum evtype { ...@@ -48,9 +48,9 @@ export enum evtype {
// event to trigger collecting peer index requests from queue // event to trigger collecting peer index requests from queue
triggerCollect = 'triggerCollect', triggerCollect = 'triggerCollect',
// event to trigger injestion of new requests in database // event to trigger injestion of new requests in database
indexDiff = 'indexDiff', triggerIndex = 'triggerIndex',
// event to trigger computing diff // event to trigger computing diff
computeDiff = 'computeDiff' triggerComputeDiff = 'triggerComputeDiff'
} }
// === HANDLERS === // === HANDLERS ===
...@@ -98,10 +98,10 @@ async function insertBatch(rootCID: CID, items: Array<[CID, IndexRequest]>): Pro ...@@ -98,10 +98,10 @@ async function insertBatch(rootCID: CID, items: Array<[CID, IndexRequest]>): Pro
export async function takeFromProcessQueue(): Promise<void> { export async function takeFromProcessQueue(): Promise<void> {
// ignore event if already processing something or if queue is empty // ignore event if already processing something or if queue is empty
if (GLOB_lockTree) { if (GLOB_lockTree) {
console.log('busy') // console.log('busy on tree')
return return
} }
if (GLOB_processQueue.length == 0) { if (!GLOB_processQueue.length) {
return return
} }
// if not processing, do lock process // if not processing, do lock process
...@@ -117,50 +117,53 @@ export async function takeFromProcessQueue(): Promise<void> { ...@@ -117,50 +117,53 @@ export async function takeFromProcessQueue(): Promise<void> {
// try inserting items // try inserting items
try { try {
// insert batch and get diff // insert batch and get diff
console.log('inserting', items.length, 'items to tree') // console.log('inserting', items.length, 'items to tree')
const diffData = await insertBatch(GLOB_root.cid, items) const diffData = await insertBatch(GLOB_root.cid, items)
// update root cid and publishes it GLOB_diffQueue.push(diffData)
const newCID = diffData.newCID const newCID = diffData.newCID
console.log(`👉 new root CID ${newCID}`) // root CID could be untouched if processed data is already in tree
if (GLOB_root.cid.toString() != newCID.toString()) console.log(`👉 new root CID ${newCID}`)
// update root cid and publishes it
GLOB_root.cid = newCID GLOB_root.cid = newCID
GLOB_lockTree = false
kubo.name.publish(newCID, DD_TAMT_OPT).catch(console.log) kubo.name.publish(newCID, DD_TAMT_OPT).catch(console.log)
// add new data to database
events.emit(evtype.indexDiff, diffData)
} catch (e) { } catch (e) {
console.error(`🥊 error merging ${GLOB_root.cid} with ${items.length} items`, e) console.error(`🥊 error merging ${GLOB_root.cid} with ${items.length} items`, e)
GLOB_processQueue.push(...items) // add them back to the process queue GLOB_processQueue.push(...items) // add them back to the process queue
GLOB_lockTree = false
// try again to collect items
events.emit(evtype.triggerProcess)
} }
GLOB_lockTree = false
checkQueues()
return // nothing return // nothing
} }
// diff event handler // index diff item from diff queue to database
export async function indexKnownDiff(diff: DiffData): Promise<void> { export async function takeFromDiffQueue(): Promise<void> {
// --- 1. manage lock // ignore event if already indexing something or if queue is empty
// if locked, add to index queue
if (GLOB_isIndexing) { if (GLOB_isIndexing) {
GLOB_diffQueue.push(diff) // console.log('busy on indexing')
return
}
if (!GLOB_diffQueue.length) {
return return
} }
// else lock // else lock
GLOB_isIndexing = true GLOB_isIndexing = true
// --- 2. handle // take diff
// check that last indexed CID match old CID const diff: DiffData = GLOB_diffQueue.shift()! // we know that the queue is not empty
const latestCID = await getLatestIndexedCID() const latestCID = (await getLatestIndexedCID())! // (must be set at node start)
// if not, unlock and go to indexStart with the new CID instead // check that last indexed CID in db match old CID of diff
if (latestCID == null || diff.oldCID.toString() != latestCID?.toString()) { // if not, unlock and compute diff with the new CID to make sure we get all items in db
if (diff.oldCID.toString() != latestCID.toString()) {
console.log('🤔 db is not at diff start, computing missing diff') console.log('🤔 db is not at diff start, computing missing diff')
events.emit(evtype.computeDiff, diff.oldCID, diff.newCID) events.emit(evtype.triggerComputeDiff, diff.oldCID, diff.newCID)
} }
console.log('➕ adding', diff.newItems.length, 'items to the db') // still index the diff data
// insert all index requests console.log('➕ indexing', diff.newItems.length, 'items to the db')
diff.newItems.forEach((ir) => handleInsertRequest(ir[0], ir[1])) // insert all index requests (prevents from adding too many at the same time)
// write new CID as latest indexed (this is purely informative) await Promise.all(diff.newItems.map((ir) => handleInsertRequest(ir[0], ir[1])))
console.log(`🦐 latest db cid ${diff.newCID}`) // write new CID as latest indexed (but more data might be needed to be really at this stage)
if (diff.oldCID.toString() != diff.newCID.toString()) console.log(`🦐 latest db cid ${diff.newCID}`)
await setLatestIndexedCID(diff.newCID) await setLatestIndexedCID(diff.newCID)
// TODO compute what db latest cid really should be
// unlock // unlock
GLOB_isIndexing = false GLOB_isIndexing = false
// --- 3. check queues // --- 3. check queues
...@@ -198,12 +201,11 @@ export async function computeDiff(fromCID: CID, toCID: CID): Promise<void> { ...@@ -198,12 +201,11 @@ export async function computeDiff(fromCID: CID, toCID: CID): Promise<void> {
} }
// takes item from merge queue, fetch index request, and put them in process queue // takes item from merge queue, fetch index request, and put them in process queue
export async function collectFromMergeQueue() { export async function takeFromMergeQueue() {
// prevent process queue from growing too much // prevent process queue from growing too much
if (GLOB_processQueue.length > MAX_PROCESS_QUEUE) { if (GLOB_processQueue.length > MAX_PROCESS_QUEUE) {
if (!GLOB_lockTree) { // make sure it is aware
events.emit(evtype.triggerProcess) events.emit(evtype.triggerProcess)
}
return return
} }
// sort merge queue by key to give a better chance of good batching // sort merge queue by key to give a better chance of good batching
...@@ -220,42 +222,53 @@ export async function collectFromMergeQueue() { ...@@ -220,42 +222,53 @@ export async function collectFromMergeQueue() {
items.push(Promise.all([cid, kubo.dag.get(cid).then((r) => r.value)])) items.push(Promise.all([cid, kubo.dag.get(cid).then((r) => r.value)]))
} }
const awaitedItems = await Promise.all(items) const awaitedItems = await Promise.all(items)
awaitedItems.forEach(([c, _ir]) => { // awaitedItems.forEach(([c, _ir]) => {
// make sure to pin the index request to be able to serve its content later // // make sure to pin the index request to be able to serve its content later
kubo.pin.add(c, { recursive: true }).catch((_e) => console.log(`📌 could not pin remote index request ${c}`)) // kubo.pin.add(c, { recursive: true }).catch((_e) => console.log(`📌 could not pin remote index request ${c}`))
}) // })
// add index requests to process queue // add index requests to process queue
GLOB_processQueue.push(...awaitedItems) GLOB_processQueue.push(...awaitedItems)
// ask them to be processed if not already processing // ask them to be processed if not already processing
if (!GLOB_lockTree) { checkProcessQueue()
events.emit(evtype.triggerProcess)
}
// ask next batch if any // ask next batch if any
checkMergeQueue()
}
// check queues to see if new diff came in the meanwile
function checkQueues() {
checkQueueDiff()
checkProcessQueue()
checkMergeQueue()
}
// if merge queue is not empty, collect
function checkMergeQueue() {
if (GLOB_mergeQueue.length) { if (GLOB_mergeQueue.length) {
events.emit(evtype.triggerCollect) events.emit(evtype.triggerCollect)
} }
} }
// check queues // if process queue is not empty, process it
function checkQueues() { function checkProcessQueue() {
// check queue to see if new diff came in the meanwile if (!GLOB_lockTree && GLOB_processQueue.length) {
checkQueueDiff() events.emit(evtype.triggerProcess)
}
} }
// if diff came in the meanwhile, index them // if diff came in the meanwhile, index them
function checkQueueDiff() { function checkQueueDiff() {
if (GLOB_diffQueue.length != 0) { if (GLOB_diffQueue.length) {
indexKnownDiff(GLOB_diffQueue.shift()!) events.emit(evtype.triggerIndex)
} }
} }
setInterval(() => { // setInterval(() => {
console.log( // console.log(
'merge queue', // 'merge queue',
GLOB_mergeQueue.length, // GLOB_mergeQueue.length,
'process queue', // 'process queue',
GLOB_processQueue.length, // GLOB_processQueue.length,
'diff queue', // 'diff queue',
GLOB_diffQueue.length // GLOB_diffQueue.length
) // )
}, 10000) // }, 10000)
...@@ -2,16 +2,10 @@ import { EMPTY_NODE_CID, TOPIC } from '../consts' ...@@ -2,16 +2,10 @@ import { EMPTY_NODE_CID, TOPIC } from '../consts'
import { resolveHist } from '../processor' import { resolveHist } from '../processor'
import { getPubSubHandler } from '../collector' import { getPubSubHandler } from '../collector'
import { KUBO_RPC, kubo, kubo2 } from '../kubo' import { KUBO_RPC, kubo, kubo2 } from '../kubo'
import type { IndexHist, IndexRequest } from '../types' import type { IndexHist } from '../types'
import { CID } from 'multiformats' import { CID } from 'multiformats'
import { import { computeDiff, GLOB_root } from './handlers'
indexKnownDiff, import { takeFromDiffQueue, takeFromProcessQueue, takeFromMergeQueue, validMessageHandler } from './handlers'
computeDiff,
GLOB_root,
takeFromProcessQueue,
collectFromMergeQueue,
validMessageHandler
} from './handlers'
import { events, evtype } from './handlers' import { events, evtype } from './handlers'
import type { DdKeys } from './types' import type { DdKeys } from './types'
import { getRootCIDfromArgs } from './utils' import { getRootCIDfromArgs } from './utils'
...@@ -57,7 +51,7 @@ async function peerSync(trusted_peer_list: string[]): Promise<void> { ...@@ -57,7 +51,7 @@ async function peerSync(trusted_peer_list: string[]): Promise<void> {
for await (const name of kubo.name.resolve(peerDdKeys.tamt)) { for await (const name of kubo.name.resolve(peerDdKeys.tamt)) {
const cid = CID.parse(name.slice(6)) const cid = CID.parse(name.slice(6))
// found peer tree, request index diff // found peer tree, request index diff
events.emit(evtype.computeDiff, GLOB_root.cid, cid) events.emit(evtype.triggerComputeDiff, GLOB_root.cid, cid)
} }
} }
} catch (e) { } catch (e) {
...@@ -119,28 +113,29 @@ GLOB_root.cid = await getRootCIDfromArgs(process.argv) ...@@ -119,28 +113,29 @@ GLOB_root.cid = await getRootCIDfromArgs(process.argv)
await initHistIfNull(GLOB_root.cid) // make sure history is available until then await initHistIfNull(GLOB_root.cid) // make sure history is available until then
let GLOB_histCID: CID = await resolveHist() let GLOB_histCID: CID = await resolveHist()
let GLOB_hist: IndexHist = (await kubo.dag.get(GLOB_histCID)).value let GLOB_hist: IndexHist = (await kubo.dag.get(GLOB_histCID)).value
histPublish() // publish history to track new start root
// get latest db CID // get latest db CID
let latestCID = await getLatestIndexedCID() let latestCID = await getLatestIndexedCID()
if (latestCID == null) { if (latestCID == null) {
latestCID = EMPTY_NODE_CID latestCID = EMPTY_NODE_CID
setLatestIndexedCID(latestCID) await setLatestIndexedCID(latestCID)
} }
console.log(`🛢 latest indexed cid ${latestCID}`) console.log(`🛢 latest indexed cid ${latestCID}`)
// bind event handlers // bind event handlers
events.on(evtype.triggerProcess, takeFromProcessQueue) events.on(evtype.triggerProcess, takeFromProcessQueue)
events.on(evtype.indexDiff, indexKnownDiff) events.on(evtype.triggerIndex, takeFromDiffQueue)
events.on(evtype.computeDiff, computeDiff) events.on(evtype.triggerComputeDiff, computeDiff)
events.on(evtype.triggerCollect, collectFromMergeQueue) events.on(evtype.triggerCollect, takeFromMergeQueue)
pubsubSubscribe() // subscribe to index requests channel pubsubSubscribe() // subscribe to index requests channel
periodicHistPublish(HIST_PUBLISH_PERIOD) // regularly publish history periodicHistPublish(HIST_PUBLISH_PERIOD) // regularly publish history
// periodicPeerSync(PEERSYNC_PERIOD) // regularly sync from peers periodicPeerSync(PEERSYNC_PERIOD) // regularly sync from peers
// emit event to tell indexer to start indexing to database // emit event to tell indexer to start indexing to database
// if it is starting from scratch (emtpy database), it will iterate over all values // if it is starting from scratch (emtpy database), it will iterate over all values
// if it already indexed up to a given cid (last_indexed_cid in db), it will only iterate over the diff // if it already indexed up to a given cid (last_indexed_cid in db), it will only iterate over the diff
events.emit(evtype.computeDiff, latestCID, GLOB_root.cid) events.emit(evtype.triggerComputeDiff, latestCID, GLOB_root.cid)
// at startup browse peer list // at startup browse peer list
peerSync(trusted_peer_list) peerSync(trusted_peer_list)
......
...@@ -131,7 +131,10 @@ async function importIrToAMT(rootNodeCid: CID, input: string) { ...@@ -131,7 +131,10 @@ async function importIrToAMT(rootNodeCid: CID, input: string) {
// 3 minutes // 3 minutes
// import by batch and logs successive cids // import by batch and logs successive cids
// importIrToAMT(EMPTY_NODE_CID, './input/cplusIR.txt') // importIrToAMT(EMPTY_NODE_CID, './input/cplusIR.txt')
const rootCID = CID.parse("bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4") const rootCID = CID.parse("bafyreieybuh6l6bpz3jn76wqbf7jweb4ptq55n3avbaxe3nhkeiabxzmze")
importIrToAMT(rootCID, './input/devIr+labels.txt') importIrToAMT(rootCID, './input/devIr+labels.txt')
// → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey (old with simple nodes) // → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey (old with simple nodes)
// → bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4 (new with more context and fixed labels) // → bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4 (new with more context and fixed labels)
// bafyreieybuh6l6bpz3jn76wqbf7jweb4ptq55n3avbaxe3nhkeiabxzmze
// bafyreifhhss6h5j72ewdcr6b75wda4573wtskjfp2pqiae5l73efwvrvjy
\ No newline at end of file
...@@ -8,9 +8,7 @@ const LOG_EVERY = 1000 ...@@ -8,9 +8,7 @@ const LOG_EVERY = 1000
// const cid = CID.parse('bafyreifdwwsnv4p2ag7egt2hjbxne63u2mfbstvnjde4b6blfvijhpiuri') // 10 latest // const cid = CID.parse('bafyreifdwwsnv4p2ag7egt2hjbxne63u2mfbstvnjde4b6blfvijhpiuri') // 10 latest
// const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a') // const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a')
const cid = CID.parse('bafyreifdwwsnv4p2ag7egt2hjbxne63u2mfbstvnjde4b6blfvijhpiuri') const cid = CID.parse('bafyreic6ozgzlzkp6kkrvhqubttz47mkix36ms6r5inanhsmd6d3mwi7cy')
// const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a')
// const cid = CID.parse('bafyreifsq3dtwbilnccpmjlgenqofhhgs4o5duh3mj6j5jiknmg3rdxl5a')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment