diff --git a/src/indexer/bootstrap.ts b/src/indexer/bootstrap.ts index a286f7e9d0d0bdd8e0e7d40483dc114026abb2d8..20596ada86eaad4b697b8a28ef03905a2335831b 100644 --- a/src/indexer/bootstrap.ts +++ b/src/indexer/bootstrap.ts @@ -1,10 +1,10 @@ import { kubo } from '../kubo' -import { getSelfDdKeys } from './ipns' +import { DD_ROOT_OPT, getSelfDdKeys } from './ipns' // setup the root index from scratch // after that, an other node can setup its own root index from another peer export async function publishKeys() { const keys = await getSelfDdKeys() const cid = await kubo.dag.put(keys) - await kubo.name.publish(cid, { key: 'dd_root', ttl: '1s', lifetime: '24h' }) + await kubo.name.publish(cid, DD_ROOT_OPT) } diff --git a/src/indexer/ipns.ts b/src/indexer/ipns.ts index d09d749232348e90cbc291c5bb9ff7725a593143..2aba41a861882834f4fcd67278b0a8c879fc31e1 100644 --- a/src/indexer/ipns.ts +++ b/src/indexer/ipns.ts @@ -1,19 +1,31 @@ +import type { NamePublishOptions } from 'kubo-rpc-client' import { kubo } from '../kubo' import { type DdKeys } from './types' +// key names by kubo +export const DD_ROOT = 'dd_root' +export const DD_TAMT = 'dd_tamt' +export const DD_TAMT_HIST = 'dd_tamt_hist' +export const DD_PROFILES = 'dd_profiles' + // get keys of self node by name to find all IPNS entries // the keys should be defined per node with the configure.sh script // using dd_ prefix like dd_tamt, dd_tamt_hist... export async function getSelfDdKeys(): Promise<DdKeys> { const ks = new Map((await kubo.key.list()).map((k) => [k.name, k.id])) const keys: DdKeys = { - root: ks.get('dd_root')!, - tamt: ks.get('dd_tamt')!, - tamt_hist: ks.get('dd_tamt_hist')!, - profiles: ks.get('dd_profiles')!, + root: ks.get(DD_ROOT)!, + tamt: ks.get(DD_TAMT)!, + tamt_hist: ks.get(DD_TAMT_HIST)!, + profiles: ks.get(DD_PROFILES)! } return keys } // make keys available for deps export const ddKeys = await getSelfDdKeys() + +// publish options +export const DD_TAMT_HIST_OPT: NamePublishOptions = { key: DD_TAMT_HIST, ttl: '1m', lifetime: '1m' } +export const DD_TAMT_OPT: NamePublishOptions = { key: DD_TAMT, ttl: '1m', lifetime: '24h' } +export const DD_ROOT_OPT: NamePublishOptions = { key: DD_ROOT, ttl: '1h', lifetime: '24h' } diff --git a/src/indexer/start.ts b/src/indexer/start.ts index 97ef7e23074367952fccbc831647b9a272a5a648..9b034edee4c72273d1cd63a10b1b665fcc0933d7 100644 --- a/src/indexer/start.ts +++ b/src/indexer/start.ts @@ -1,16 +1,17 @@ import { TOPIC } from '../consts' -import { timestampToKey, arrayToVinode, publishHistory, mergeInodesSyncCID } from '../processor' +import { timestampToKey, arrayToVinode, publishHistory, mergeInodesSyncCID, resolveHist } from '../processor' import { getPubSubHandler } from '../collector' -import { kubo, kubo2 } from '../kubo' -import type { IndexRequest } from '../types' +import { KUBO_RPC, kubo, kubo2 } from '../kubo' +import type { IndexHist, IndexRequest } from '../types' import { CID } from 'multiformats' import { events, evtype, indexKnownDiff, indexStart } from './handlers' import type { DiffData } from './types' import { getRootCIDfromArgs } from './utils' +import { DD_TAMT_HIST_OPT, DD_TAMT_OPT } from './ipns' // === HANDLERS === -// pubsub message handler +/// ----- pubsub message handler ----- async function validMessageHandler(_cid: CID, dag: IndexRequest): Promise<void> { // re-build the index request (because the type is loosely defined) const ir: IndexRequest = { @@ -22,11 +23,13 @@ async function validMessageHandler(_cid: CID, dag: IndexRequest): Promise<void> } // then store the index request locally kubo.dag - .put(ir, { pin: true }) + .put(ir) .then((cid) => { // cids should be the same if (cid.toString() != _cid.toString()) console.log('⚠️ ' + cid + ' != ' + _cid) console.log('adding valid index request to process queue') + // pin the index request we just added + kubo.pin.add(cid).catch(() => console.log('📌 could not pin index request that we just added ' + cid)) // add index request to the process list processQueue.push([cid, ir]) // ask to process the request @@ -37,7 +40,7 @@ async function validMessageHandler(_cid: CID, dag: IndexRequest): Promise<void> } const handleMessage = getPubSubHandler(validMessageHandler) -// queue event handler +/// ----- queue event handler ----- function handleBatch() { // console.log('entering handleBatch') // ignore event if already processing something or if queue is empty @@ -49,7 +52,7 @@ function handleBatch() { // console.log('nothing to process') return } - // if not processing, do process + // if not processing, do lock process isProcessingQueue = true // console.log('processing handleBatch') // take elements from queue @@ -71,22 +74,46 @@ function handleBatch() { rootCID = cid console.log(`👉 new root CID ${cid}`) // ➡️ publish new CID - kubo.name.publish(rootCID, { ttl: '1s', key: 'self' }).catch(console.log) - // ➡️ publish history TODO limit rate at lower frequency (e.g. 1/minute) - publishHistory(rootCID).catch(console.log) + kubo.name.publish(rootCID, DD_TAMT_OPT).catch(console.log) isProcessingQueue = false // trigger an other event in case new requests arrived meanwhile events.emit(evtype.trigger) // emit event to be processed by indexer + // FIXME there is a vulnerability here since we to not check that items were not submitted twice + // this allows to submit data multiple times for repeated add in the database + // the merge function should return a list of cids that were *really* added const diffData: DiffData = { oldCID: oldCID, newCID: rootCID, newItems: items } events.emit(evtype.indexDiff, diffData) }) .catch((e) => { console.log('error merging ' + rootCID + ' ' + requests) + isProcessingQueue = false + events.emit(evtype.trigger) console.log(e) }) } +// ----- regularly publish history +function periodicHistPublish() { + setInterval(async () => { + // if history is up to date, to nothing + if (hist.current_index.toString() == rootCID.toString()) return + // else, update the history + const newHist: IndexHist = { + last_history: histCID, + current_index: rootCID, + number: hist.number + 1, + timestamp: Date.now() + } + const newHistCID = await kubo.dag.put(newHist) + kubo.name.publish(newHistCID, DD_TAMT_HIST_OPT) + // update global vars + hist = newHist + histCID = newHistCID + return + }, HIST_PUBLISH_PERIOD_MS) +} + // // const controller = new AbortController() // const signal = controller.signal @@ -122,6 +149,9 @@ console.log('✨ starting') // === GLOBALS === +const trusted_peer_list = ['/ipns/k51qzi5uqu5dhhixrgo46nblv2l4ph7wnppjvvbtr9yjkcuo2v9uzrqx85xk09'] +const HIST_PUBLISH_PERIOD_MS = 60000 // 1 minute in milliseconds + // queue of index requests waiting to be processed const processQueue: Array<[CID, IndexRequest]> = [] // lock to avoid triggering multiple simultaneous edits @@ -130,23 +160,24 @@ let isProcessingQueue = false /// global rootCID variable // set it from CLI args -let rootCID = await getRootCIDfromArgs(process.argv) +let rootCID = await getRootCIDfromArgs(process.argv, trusted_peer_list) +let histCID: CID = await resolveHist() +let hist: IndexHist = (await kubo.dag.get(histCID)).value // bind event handlers events.on(evtype.trigger, handleBatch) events.on(evtype.indexDiff, indexKnownDiff) events.on(evtype.indexStart, indexStart) pubsubSubscribe() +periodicHistPublish() // emit event to tell indexer to start indexing to database // if it is starting from scratch, it will iterate over all values // if it already indexed up to a given cid, it will only iterate over the diff events.emit(evtype.indexStart, rootCID) -// // publish new cid and history for next start -// kubo.name.publish(rootCID, { ttl: '1s', key: 'self' }) -// publishHistory(rootCID) // process loop +console.log('🔌 connected to ' + KUBO_RPC) console.log('👂 listening on topic ' + TOPIC + '...') setInterval(() => {}, 1 << 30) ;['SIGINT', 'SIGTERM', 'SIGQUIT'].forEach((signal) => process.on(signal, process.exit)) diff --git a/src/indexer/types.ts b/src/indexer/types.ts index b37cd4f75ca5d1946d365f1b5f93021a226feb0d..cddd70d8b434262ad5594b5115d96d14bcd39449 100644 --- a/src/indexer/types.ts +++ b/src/indexer/types.ts @@ -20,5 +20,3 @@ export interface DdKeys { // C+ profiles kinds grouped in a single tree for efficient indexing profiles: string } -// names of keys for iter since we can not easily get them programatically -export const ddKeysNames = ['root', 'tamt', 'tamt_hist', 'profiles'] diff --git a/src/indexer/utils.ts b/src/indexer/utils.ts index 5cc96981e9798bdf1ebcb9d08c066b213e2f8b0b..8e5f398df03c2a912b18ca90c757606e75812302 100644 --- a/src/indexer/utils.ts +++ b/src/indexer/utils.ts @@ -2,16 +2,13 @@ import { EMPTY_NODE_CID } from '../consts' import { CID } from 'multiformats' import { kubo } from '../kubo' -const does_not_work_like_this_anymore = "update this" - - // get root cid from arg if given // initialize it: // - from CID if given // - from IPNS if given // - from default IPNS else // - as empty node else -export async function getRootCIDfromArgs(argv: string[]): Promise<CID> { +export async function getRootCIDfromArgs(argv: string[], trusted_peer_list: string[]): Promise<CID> { if (argv.length >= 3) { const arg = process.argv[2] // --- using arg --- @@ -34,13 +31,14 @@ export async function getRootCIDfromArgs(argv: string[]): Promise<CID> { } } } else { - // --- no arg given, using hardcoded default --- + // --- no arg given, using any of the default trust list --- + const bootstrap = trusted_peer_list[0] try { - // --- try resolve from default IPNS - for await (const name of kubo.name.resolve(does_not_work_like_this_anymore, { nocache: true })) { + // --- try resolve from friend IPNS + for await (const name of kubo.name.resolve(bootstrap, { nocache: true })) { const cid = CID.parse(name.slice(6)) console.log(`🔨 using ${cid} as startup root node`) - console.log(` resolved from default IPNS ${does_not_work_like_this_anymore}`) + console.log(` resolved from peer ${bootstrap}`) return cid } } catch { diff --git a/src/kubo.ts b/src/kubo.ts index 9928059d709495ee14d7d2fa03a73a44757c92d1..66508469f42df8eddab34582db9f2c60b86fd460 100644 --- a/src/kubo.ts +++ b/src/kubo.ts @@ -1,13 +1,33 @@ import { create } from 'kubo-rpc-client' +import type { KuboRPCClient } from 'kubo-rpc-client' +import { Agent } from 'http' // env -export const KUBO_RPC = 'http://127.0.0.1:5001' -export const KUBO_GATEWAY = 'http://127.0.0.1:8080' +export const KUBO_RPC = process.env.KUBO_RPC || 'http://127.0.0.1:5001' +export const KUBO_GATEWAY = process.env.KUBO_GATEWAY || 'http://127.0.0.1:8080' -export function getKuboClientsBrower() { - return { kubo: create(KUBO_RPC), kubo2: null } +function getKuboClientsNode() { + // create a RPC HTTP client // TODO unix socket for optimization + const kubo: KuboRPCClient = create({ + url: new URL(KUBO_RPC), + agent: new Agent({ + maxSockets: 50000 + }) + }) + // create an other RPC client only for pubsub + const kubo2: KuboRPCClient = create({ + url: new URL(KUBO_RPC), + agent: new Agent({ + keepAlive: true, // to prevent UND_ERR_BODY_TIMEOUT + keepAliveMsecs: 1000 + // maxSockets: 1, + // timeout: 100 + }) + }) + + return { kubo, kubo2 } } -const getKuboClientsPlatform = getKuboClientsBrower +const getKuboClientsPlatform = getKuboClientsNode -export const {kubo, kubo2} = getKuboClientsPlatform() \ No newline at end of file +export const { kubo, kubo2 } = getKuboClientsPlatform() diff --git a/src/processor.ts b/src/processor.ts index dda96b26af9bcd98faa944c63a2fcf7d6a36c64e..bcc2a25f25e83320664bd5c0992c69f5da494c65 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -3,6 +3,7 @@ import { kubo } from './kubo' import { BASE, KEYSIZE } from './consts' import { emptyInode, emptyVinode, emptyLeaf } from './types' import type { IndexInode, IndexLeaf, IndexVinode, IndexRequest, IndexHist } from './types' +import { DD_TAMT_HIST_OPT, DD_TAMT_OPT, ddKeys } from './indexer/ipns' // ===================== utils ===================== @@ -16,12 +17,9 @@ export function bucket(letter: string): number { return parseInt(letter[0], BASE) } -const do_not_use = 'should not be used like this anymore' -const do_not_use_hist = 'should not be used like this anymore' - /// resolve main IPNS history -async function resolveHist(): Promise<CID> { - for await (const name of kubo.name.resolve(do_not_use_hist, { nocache: true })) { +export async function resolveHist(): Promise<CID> { + for await (const name of kubo.name.resolve(ddKeys.tamt_hist, { nocache: true })) { return CID.parse(name.slice(6)) } throw Error('no history') @@ -38,7 +36,7 @@ export async function publishHistory(cid: CID): Promise<void> { timestamp: Date.now() } kubo.dag.put(newHist).then((histcid) => { - kubo.name.publish(histcid, { ttl: '1s', key: 'index_history' }) + kubo.name.publish(histcid, DD_TAMT_HIST_OPT) }) }) }) @@ -55,7 +53,7 @@ async function addToIndex(cid: CID, indexRequest: IndexRequest) { // see this presentation for more info about the merkle tree // https://polkadot-blockchain-academy.github.io/pba-book/substrate/storage/slides.html#/ - for await (const name of kubo.name.resolve(do_not_use, { nocache: true })) { + for await (const name of kubo.name.resolve(ddKeys.root, { nocache: true })) { // get root CID from IPNS const rootCID = CID.parse(name.slice(6)) // insert request into it @@ -63,7 +61,7 @@ async function addToIndex(cid: CID, indexRequest: IndexRequest) { // pin new root CID recursively to be sure to have all data and to meet the "pinned only" reprovide strategy kubo.pin.add(newRootCID, { recursive: true }) // publish the new root CID to IPNS - await kubo.name.publish(newRootCID, { ttl: '1s', key: 'self' }) + await kubo.name.publish(newRootCID, DD_TAMT_OPT) // also update history publishHistory(newRootCID) } @@ -202,7 +200,7 @@ export async function processLeaf(node: IndexLeaf, val: CID): Promise<CID> { export async function mergeInodesSyncCID(nodeACID: CID, nodeB: IndexVinode | IndexLeaf): Promise<CID> { // fail with small timeout since this data is supposed to be pinned locally // console.log('fetching ' + nodeACID) - const nodeA = (await kubo.dag.get(nodeACID, { timeout: 1, verbose: true /*, signal: new AbortSignal() */ })).value + const nodeA = (await kubo.dag.get(nodeACID, { timeout: 100 })).value const newCID = await mergeInodesSync(nodeA, nodeB) // unpin old node CID if different // we do not mind if it was not pinned @@ -225,7 +223,10 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde const newLeaf: IndexLeaf = { leaf: cidList } - return kubo.dag.put(newLeaf, { pin: true }) as Promise<CID> + return kubo.dag.put(newLeaf).then((cid) => { + kubo.pin.add(cid).catch(() => console.log('could not pin newly created leaf')) + return cid + }) } else if (isAleaf || isBleaf) { throw Error('should not be possible, are keys same size?') } @@ -278,7 +279,10 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde const newNode = emptyInode() newNode.children[c.b1] = [c.nk1, childA] newNode.children[c.b2] = [c.nk2, await concretizeCid(childB)] - const newNodeCid = (await kubo.dag.put(newNode, { pin: true })) as CID + const newNodeCid = await kubo.dag.put(newNode).then((cid) => { + kubo.pin.add(cid).catch(() => console.log('could not pin newly created node')) + return cid + }) noda.children[b] = [c.common, newNodeCid] break } @@ -288,7 +292,10 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde } } // now that we have the new node, we can upload it and return its cid - return kubo.dag.put(noda, { pin: true }) as Promise<CID> + return kubo.dag.put(noda).then((cid) => { + kubo.pin.add(cid).catch(() => console.log('could not pin newly merged node')) + return cid + }) } // ===================== virtual nodes management ===================== diff --git a/src/types.ts b/src/types.ts index 46b42749058911bb6dfcd74f5c7fd68a564e9438..a182eafbe131efd1edc369b15f52e825196b3e96 100644 --- a/src/types.ts +++ b/src/types.ts @@ -126,6 +126,8 @@ interface Geoloc { // ================== tx comment export interface TxComment { + /// extrinsic hash as string tx_id: string + /// comment comment: string } diff --git a/src/views/IpnsView.vue b/src/views/IpnsView.vue index dcc9e35e8be4cca9602f13c72a4acf88290f618e..4bef58f00f4628dd7fe00b535a0b6d4166d0f5e9 100644 --- a/src/views/IpnsView.vue +++ b/src/views/IpnsView.vue @@ -4,8 +4,7 @@ import { kubo } from '@/kubo' import { emptyInode, type IndexHist } from '../types' import { EMPTY_NODE_CID } from '../consts' import { CID } from 'multiformats' -import { ddKeys as _ddKeys } from '../indexer/ipns' -import { ddKeysNames, type DdKeys } from '../indexer/types' +import { DD_TAMT_HIST_OPT, ddKeys as _ddKeys } from '../indexer/ipns' import IpnsLink from '@/components/IpnsLink.vue' import CidLink from '@/components/CidLink.vue' @@ -68,7 +67,7 @@ async function initTamtHistOn(cid: CID) { timestamp: Date.now() } const firstHistCID = await kubo.dag.put(firstHist) - kubo.name.publish(firstHistCID, { key: 'dd_tamt_hist', ttl: '1s' }).then(console.log) + kubo.name.publish(firstHistCID, DD_TAMT_HIST_OPT).then(console.log) } // resolve given ipns @@ -90,10 +89,12 @@ async function updateHist(cid: CID): Promise<CID> { timestamp: Date.now() } const newHistCID = await kubo.dag.put(newHist) - kubo.name.publish(newHistCID, { ttl: '1s', key: 'index_history' }) + kubo.name.publish(newHistCID, DD_TAMT_HIST_OPT) return newHistCID } +// names of keys for iter since we can not easily get them programatically +const ddKeysNames = ['root', 'tamt', 'tamt_hist', 'profiles'] onMounted(() => { for (let k of ddKeysNames) { resolveIPNS(ddKeys[k])