diff --git a/README.md b/README.md index e96dc9a4d4dccc4f31e3e4c331f0a48402bbd5dd..1a61b021aa47b8065a44889b68d788910eae9d9e 100644 --- a/README.md +++ b/README.md @@ -31,3 +31,7 @@ This will make easier to insert this data in any AMT or other data structure. time npx tsx src/scripts/cesium-plus-import.ts # bafyreigczogsiuhaqus7eucalkwsy4vfkh3f4zg3c3rkvltxrwji6p5rnq ``` + +## TODO + +When using Kubo node, libp2p is not needed at all. diff --git a/package.json b/package.json index 88c988c4ce649bb0e5994c2ec7134fdd634659ab..0248acbe5df058213a87bbf8d90bd2cdd57151a5 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,8 @@ "@chainsafe/libp2p-gossipsub": "^13.0.0", "@chainsafe/libp2p-noise": "^15.0.0", "@libp2p/bootstrap": "^10.0.16", + "@libp2p/mplex": "^10.0.18", + "@libp2p/ping": "^1.0.14", "@libp2p/tcp": "^9.0.18", "@libp2p/webtransport": "^4.0.21", "@multiformats/multiaddr": "^12.2.1", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c9b641faca2e67d76a9a7ba7b29906d6848b4017..380e4afaf08c0dfdad12be594bcb58e5955df0d7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -14,6 +14,12 @@ dependencies: '@libp2p/bootstrap': specifier: ^10.0.16 version: 10.0.18 + '@libp2p/mplex': + specifier: ^10.0.18 + version: 10.0.18 + '@libp2p/ping': + specifier: ^1.0.14 + version: 1.0.14 '@libp2p/tcp': specifier: ^9.0.18 version: 9.0.18 @@ -853,6 +859,21 @@ packages: - supports-color dev: false + /@libp2p/mplex@10.0.18: + resolution: {integrity: sha512-KoMFye2ameya9GNXWjY07eM5AQg0a2iTB3muCb29Z8GhtFME41awOHlYMjMdwEaShkxwnXRjDRyIGc6BS2z0wA==} + dependencies: + '@libp2p/interface': 1.1.6 + '@libp2p/utils': 5.2.8 + it-pipe: 3.0.1 + it-pushable: 3.2.3 + it-stream-types: 2.0.1 + uint8-varint: 2.0.4 + uint8arraylist: 2.4.8 + uint8arrays: 5.0.3 + transitivePeerDependencies: + - supports-color + dev: false + /@libp2p/multistream-select@5.1.6: resolution: {integrity: sha512-IlGnEs5/xkk5+zMWRaI+gF7XzzyJqiNQIzSqBiE/Q+zq1VrhdqRyRYriBgwxd6eJHq3+9byUCllfMN2E4bzlSw==} dependencies: @@ -938,6 +959,18 @@ packages: - supports-color dev: false + /@libp2p/ping@1.0.14: + resolution: {integrity: sha512-AK6HfXDuEulH4rHD3axNk1l+q0oDdKaXJ2EbR/rl0CF8/ys/xkWHXoQnRwP6RqMjdx26Q7PhuJ/HlasaIkb4pw==} + dependencies: + '@libp2p/crypto': 4.0.5 + '@libp2p/interface': 1.1.6 + '@libp2p/interface-internal': 1.0.11 + '@multiformats/multiaddr': 12.2.1 + it-first: 3.0.4 + it-pipe: 3.0.1 + uint8arrays: 5.0.3 + dev: false + /@libp2p/pubsub@9.0.13: resolution: {integrity: sha512-t+4ulBUJ1R/s1RAN27JrbO9ZN5Bbg0YKDsRJPZtCNRWhuGe1jo6myl+boRFTFz+aF6/K9iU2FhPUpKgMKC1XIw==} dependencies: @@ -2653,6 +2686,10 @@ packages: engines: {node: '>=16.0.0', npm: '>=7.0.0'} dev: false + /it-first@3.0.4: + resolution: {integrity: sha512-FtQl84iTNxN5EItP/JgL28V2rzNMkCzTUlNoj41eVdfix2z1DBuLnBqZ0hzYhGGa1rMpbQf0M7CQSA2adlrLJg==} + dev: false + /it-glob@1.0.2: resolution: {integrity: sha512-Ch2Dzhw4URfB9L/0ZHyY+uqOnKvBNeS/SMcRiPmJfpHiM0TsUZn+GkpcZxAoF3dJVdPm/PuIk3A4wlV7SUo23Q==} dependencies: diff --git a/src/collector.ts b/src/collector.ts index 8116808bdc7e578092c5268f64582eb7bcb8be02..d421efb650c7c115229f3a8257c38c7bd28428c2 100644 --- a/src/collector.ts +++ b/src/collector.ts @@ -26,7 +26,8 @@ export function isValidSignature(signedMessage: Uint8Array, signature: string, a // message handling function // TODO allow more control on what to do on messages (filtering, trust list...) -export function handleMessage(message: CustomEvent<Message>) { +// NOTE: currently unused, more like a template +function handleMessage(message: CustomEvent<Message>) { const msg = new TextDecoder().decode(message.detail.data) // const log = `[${message.detail.topic}]: ` + msg // console.log(log) diff --git a/src/consts.ts b/src/consts.ts index bc03ae05c6ee63def47857fbfd6d9af2172354bc..efb887e0a10b5c2820aec77f5c597ee26dafd01d 100644 --- a/src/consts.ts +++ b/src/consts.ts @@ -1,3 +1,5 @@ +import { CID } from 'multiformats' + // base used for index key export const BASE = 16 // current root index @@ -7,3 +9,6 @@ export const IPNS_HIST = '/ipns/k51qzi5uqu5dm8rro4yfhnpjwpfv1lk44tsgoht6a4m0jwa1 // key size can be expressed in x chars depending on base export const KEYSIZE = (64 * Math.log(2)) / Math.log(BASE) + +// empty root cid +export const EMPTY_NODE_CID = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') diff --git a/src/main.ts b/src/main.ts index 74829f872f8507df8e7ffea21736b2e1b327656f..48007187a7616d1ab7c879f7b79a0e9035c53109 100644 --- a/src/main.ts +++ b/src/main.ts @@ -12,10 +12,10 @@ import { multiaddr } from '@multiformats/multiaddr' import { CID } from 'multiformats' import { kubo } from './kubo' import type { IndexRequest } from './types' -import { addToIndexQueue } from './processor' import { buildBytesPayload, isValidSignature } from './collector' // start libp2p +// there is no need to do that if we have a kubo node other than demo purpose const libp2p = await libp2pForBrowser() await libp2p.start() await libp2p.dial(local_peer.map(multiaddr)) @@ -23,7 +23,7 @@ const pubsub = libp2p.services.pubsub as PubSub // === define what to do of pubsub messages and start listening === -// message handler +// message handler used for demo purpose when in app mode, to display results in the feed pubsub.addEventListener('message', (message) => { const msg = new TextDecoder().decode(message.detail.data) // const log = `[${message.detail.topic}]: ` + msg @@ -35,7 +35,8 @@ pubsub.addEventListener('message', (message) => { const bytesPayload = buildBytesPayload(dag.timestamp, dag.cid) const isValid = isValidSignature(bytesPayload, dag.signature, dag.pubkey) if (isValid) { - addToIndexQueue(cid, dag) + // here we would do the processing + // addToIndexQueue(cid, dag) } else { feed.value.push('[invalid sig] ' + msg) } diff --git a/src/processor.ts b/src/processor.ts index 8d5d5c1c145cc6668cd95d84f81c7c58db18a179..549fab38bd857027c3f60310c7cfb287ea1cab2f 100644 --- a/src/processor.ts +++ b/src/processor.ts @@ -25,7 +25,7 @@ async function resolveHist(): Promise<CID> { } /// add cid as new value in history -function publishHistory(cid: CID) { +export function publishHistory(cid: CID) { resolveHist().then((lastHistCid) => { kubo.dag.get(lastHistCid).then((lastHist) => { const newHist: IndexHist = { diff --git a/src/scripts/start-indexer.ts b/src/scripts/start-indexer.ts new file mode 100644 index 0000000000000000000000000000000000000000..e2665dbbf741463b490094bd7ee4e48563f388ec --- /dev/null +++ b/src/scripts/start-indexer.ts @@ -0,0 +1,147 @@ +import { TOPIC } from '../p2p' +import { timestampToKey, mergeInodesSync, arrayToVinode, publishHistory } from '../processor' +import { buildBytesPayload, isValidSignature } from '../collector' +import { kubo } from '../kubo' +import type { IndexRequest } from '../types' +import { IPNS, EMPTY_NODE_CID } from '../consts' +import { CID } from 'multiformats' +import EventEmitter from 'events' + +// this script allows to start an indexer + +// queue of index requests waiting to be processed +const processQueue: Array<[CID, IndexRequest]> = [] +const queueEvents = new EventEmitter() +let isProcessingQueue = false + +// show args +// console.log(process.argv) + +/// global rootCID variable +// initialize it: +// - from CID if given +// - from IPNS if given +// - from default IPNS else +// - as empty node else +let rootCID = EMPTY_NODE_CID + +// get root cid from arg if given +if (process.argv.length >= 3) { + const arg = process.argv[2] + try { + // try parse as CID + rootCID = CID.parse(arg) + console.log(`using ${rootCID} as startup root node`) + } catch { + try { + // try resolve as ipns + for await (const name of kubo.name.resolve(arg, { nocache: true })) { + const cid = CID.parse(name.slice(6)) + console.log(`using resolved ${cid} as startup root node`) + rootCID = cid + break + } + } catch { + console.error(`can not parse ${arg} as CID or IPNS entry`) + process.exit() + } + } +} else { + try { + // no arg given, try to resolve from default IPNS + for await (const name of kubo.name.resolve(IPNS, { nocache: true })) { + const cid = CID.parse(name.slice(6)) + console.log(`using ${cid} resolved from default IPNS ${IPNS} as startup root node`) + rootCID = cid + break + } + } catch { + console.log('using empty node insead') + } +} + +// === define what to do of pubsub messages and start listening === + +// message handler +function handleMessage(message: any) { + // console.log('received message') + // console.log(message) + const msg = new TextDecoder().decode(message.data).trim() + + try { + const cid = CID.parse(msg) + // console.log(msg) + + kubo.dag.get(cid).then(function (d) { + const dag = d.value as IndexRequest + // TODO some validation on dag to ensure it is exact format + // TODO some validation on timestamp to prevent too much control on the key + // example: 0 < Date.now() - timestamp < 1 minute + const bytesPayload = buildBytesPayload(dag.timestamp, dag.cid) + const isValid = isValidSignature(bytesPayload, dag.signature, dag.pubkey) + if (isValid) { + // at this point we can apply different treatment based on the key + // we could for example have a trust list published in ipfs + // this is mostly for spam prevention + // but for the moment we will just assume that we trust any key and add the index request to the pool + + // we can also apply different treatment based on the key: + // high trust => common timestamp AMT + // low trust => sandboxed HAMT + + processQueue.push([cid, dag]) + queueEvents.emit('trigger') + + // this is all that this indexer does + // the rest (autopinning, postgres indexing... will be managed by an other part of the indexer) + } else { + console.log('[invalid sig] ' + msg) + } + }) + } catch { + console.log('[invalid] ' + msg) + } +} + +// queue event handler +function handleBatch() { + // ignore event if already processing something or if queue is empty + if (isProcessingQueue) return + if (processQueue.length == 0) return + // if not processing do process + isProcessingQueue = true + // take elements from queue + let i = undefined + const items = [] + while ((i = processQueue.shift()) != undefined) { + items.push(i) + } + // convert it to a list of [key, cid] for batch insert (merge) + const requests = items.map(([cid, dag]) => [timestampToKey(dag.timestamp), cid]).sort() as Array<[string, CID]> + const tree = arrayToVinode(requests) + + // insert them + kubo.dag + .get(rootCID) + .then((v) => mergeInodesSync(v.value, tree)) + .then((cid) => { + // update CID and schedule publishing of new CID in history + rootCID = cid + console.log(`new root CID ${cid}`) + kubo.name.publish(rootCID, { ttl: '1s' }) + publishHistory(rootCID) + isProcessingQueue = false + queueEvents.emit('trigger') + }) +} + +queueEvents.on('trigger', handleBatch) +queueEvents.on('trigger', handleBatch) + +// console.log(await kubo.pubsub.ls()) +// console.log(await kubo.pubsub.peers(TOPIC)) +kubo.pubsub.subscribe(TOPIC, handleMessage) + +console.log('listening...') +setInterval(() => {}, 1 << 30) +;['SIGINT', 'SIGTERM', 'SIGQUIT'].forEach((signal) => process.on(signal, process.exit)) diff --git a/src/scripts/start-standalone-indexer.ts b/src/scripts/start-standalone-indexer.ts deleted file mode 100644 index 11c7027b8ad4fef4500e303e826cdead28c5a8e4..0000000000000000000000000000000000000000 --- a/src/scripts/start-standalone-indexer.ts +++ /dev/null @@ -1,56 +0,0 @@ -import type { PubSub } from '@libp2p/interface' -import type { Libp2p } from '@libp2p/interface' -import { createLibp2p } from 'libp2p' -import { tcp } from '@libp2p/tcp' -import { bootstrap } from '@libp2p/bootstrap' -import { multiaddr } from '@multiformats/multiaddr' -import { noise } from '@chainsafe/libp2p-noise' -import { gossipsub } from '@chainsafe/libp2p-gossipsub' -import { local_peer, TOPIC } from '../p2p' -import { handleMessage } from '../collector' - -// this script allows to start an indexer without the web stack (vue indexer ui) - -// return libp2p object compatible with node transport -function libp2pForNode(): Promise<Libp2p> { - return createLibp2p({ - peerDiscovery: [ - bootstrap({ - list: local_peer - }) - ], - transports: [tcp()], - connectionEncryption: [noise()], - services: { - pubsub: gossipsub() - }, - connectionGater: { - denyDialMultiaddr: async () => false - } - }) -} - -console.log(process.argv) - -/// global rootCID variable -// initialize it: -// - from CID if given -// - from IPNS if given -// - from default IPNS else -// - as empty node else -export let rootCID = null - -// start libp2p -const libp2p = await libp2pForNode() -await libp2p.start() -await libp2p.dial(local_peer.map(multiaddr)) -const pubsub = libp2p.services.pubsub as PubSub - -// === define what to do of pubsub messages and start listening === - -// message handler -pubsub.addEventListener('message', handleMessage) -// subscribe -pubsub.subscribe(TOPIC) - -console.log('end') diff --git a/src/views/IndexView.vue b/src/views/IndexView.vue index 5c3c452869424aed65df71e5c9d01c4aed0827a2..1f66536653dbefeb9afd027220b5b374e0fa54b4 100644 --- a/src/views/IndexView.vue +++ b/src/views/IndexView.vue @@ -57,19 +57,28 @@ onMounted(() => { <h1>Index</h1> <h2>Tree preview</h2> - <p> + <p>You can enter the CID of a node to preview its tree. Click on "+" to expand chidren.</p> + <div> <input v-model="rootNodeMsg" @keyup.enter="setrootNodeCid" size="59" /> <button @click="setrootNodeCid">set</button> <div class="mono">{{ rootNodeCid }}</div> + </div> + <p> + <IndexNode + v-if="rootNodeCid" + :key="rootNodeCid.toString()" + :cid="rootNodeCid" + :parentkey="''" + :nodekey="''" + ></IndexNode> </p> - <p><IndexNode v-if="rootNodeCid" :key="rootNodeCid.toString()" :cid="rootNodeCid" :parentkey="''" :nodekey="''"></IndexNode></p> <h2>Index history</h2> - <p> - resolve from history IPNS + <div> + resolve from history IPNS <button @click="resolveHist">resolve</button> <div class="mono"> {{ IPNS_HIST }}</div> - </p> + </div> <p v-if="histCid" :key="histCid.toString()"><IndexHistory :cid="histCid" /></p> </div> </template>