diff --git a/src/collector.ts b/src/collector.ts index e82f5a034f3d47b7df9d8aa32b314dc1e1985f85..68bb81159c196923bfe5d466815d27fb6dd9f876 100644 --- a/src/collector.ts +++ b/src/collector.ts @@ -105,24 +105,27 @@ export function getPubSubHandler( // build index request from dag or throw error function dagAsIndexRequest(dag: any): IndexRequest { + // check fields + // 1. kind must be a string with a length lower than 64 + assert(typeof dag.kind === "string") + assert(dag.kind.length <= 64) + // 2. time must be not too old (one minute for example) + // should not be in the future + // (prevents too much control on the storage key) + assert(typeof dag.time === "number") + assert(Date.now() - dag.time < MAX_IR_TIME_DIFF) + // 3. data must be null or a cid + if(dag.data != null) { + assert(!!CID.asCID(dag.data)) + } + // all checks passed, pubkey and signature are checked later on // extracts fields of interest from dag // note: extra fields are theoretically allowed but will not be taken into account and not saved - const ir: IndexRequest = { + return { kind: dag.kind, time: dag.time, data: dag.data, pubkey: dag.pubkey, sig: dag.sig } - // check fields - // 1. kind must be a string with a lenght lower than 64 - assert(typeof ir.kind === "string") - assert(ir.kind.length <= 64) - // 2. time must be not too old (one minute for example) - // should not be in the future - // prevents too much control on the storage key - assert(typeof ir.time === "number") - assert(Date.now() - ir.time < MAX_IR_TIME_DIFF) - // all checks passed - return ir } \ No newline at end of file diff --git a/src/indexer/handlers.ts b/src/indexer/handlers.ts index 18b437d8421be8cc954c1e60011d21638f419b50..649a268a5c9d959765ae4f9c8ff9d884b748e75c 100644 --- a/src/indexer/handlers.ts +++ b/src/indexer/handlers.ts @@ -57,7 +57,7 @@ export enum evtype { /// ----- pubsub message handler ----- export async function validMessageHandler(_cid: CID, ir: IndexRequest): Promise<void> { - // store the index request locally + // store the index request locally (it is safe because of checks in collector) kubo.dag .put(ir) .then((cid) => { @@ -65,7 +65,7 @@ export async function validMessageHandler(_cid: CID, ir: IndexRequest): Promise< // however, only the final index request cid is taken into account for safety if (cid.toString() != _cid.toString()) console.log('👾 ', cid, '!=', _cid) console.log('adding valid index request to process queue') - // pin the index request we just added + // asynchronously pin the index request we just added (non recursive at this point) kubo.pin.add(cid).catch(() => console.log(`📌📌 could not pin index request that we just added ${cid}`)) // add index request to the process list GLOB_processQueue.push([cid, ir]) @@ -222,6 +222,7 @@ export async function takeFromMergeQueue() { const awaitedItems = await Promise.all(items) // awaitedItems.forEach(([c, _ir]) => { // // make sure to pin the index request to be able to serve its content later + // // note: pubsub IRs are already pinned but peers IRs might not be // kubo.pin.add(c, { recursive: true }).catch((_e) => console.log(`📌 could not pin remote index request ${c}`)) // }) // add index requests to process queue