From d370b2bd77ad2a8588d6d34e1c9221599338d220 Mon Sep 17 00:00:00 2001
From: Hugo Trentesaux <hugo@trentesaux.fr>
Date: Sat, 14 Sep 2024 13:24:29 +0200
Subject: [PATCH] wip yesterday

---
 .../default/tables/public_profiles.yaml       |   1 +
 src/indexer/handlers.ts                       | 271 ++++++++++--------
 src/indexer/start.ts                          |  90 +++---
 src/interface.ts                              |   1 +
 src/processor.ts                              |  10 +-
 src/scripts/diff.ts                           |  26 ++
 src/scripts/set.ts                            |  17 ++
 src/scripts/timestamp.ts                      |   7 +
 src/types.ts                                  |   2 +-
 src/utils.ts                                  |  13 +
 10 files changed, 259 insertions(+), 179 deletions(-)
 create mode 100644 src/scripts/diff.ts
 create mode 100644 src/scripts/set.ts
 create mode 100644 src/scripts/timestamp.ts

diff --git a/hasura/metadata/databases/default/tables/public_profiles.yaml b/hasura/metadata/databases/default/tables/public_profiles.yaml
index 6d02240..7a8da5f 100644
--- a/hasura/metadata/databases/default/tables/public_profiles.yaml
+++ b/hasura/metadata/databases/default/tables/public_profiles.yaml
@@ -17,4 +17,5 @@ select_permissions:
         - title
       filter: {}
       limit: 500
+      allow_aggregations: true
     comment: ""
diff --git a/src/indexer/handlers.ts b/src/indexer/handlers.ts
index 0b4baf8..cbb1488 100644
--- a/src/indexer/handlers.ts
+++ b/src/indexer/handlers.ts
@@ -7,9 +7,33 @@ import { getDiff } from '../interface'
 import type { IndexRequest } from '../types'
 import { arrayToVinode, mergeInodesSyncCID, timestampToKey } from '../processor'
 import { DD_TAMT_OPT } from './ipns'
-import type { Globals } from './start'
 
+// config
+
+// size of batch insert in tree
 const BATCH_SIZE = 1000
+// max concurrent index requests fetches
+const MAX_PARALLEL_IR_FETCH = 500
+// max process queue size for synchronisation
+const MAX_PROCESS_QUEUE = 2000
+
+// === GLOBALS ===
+
+// root CID of tree
+export const GLOB_root: { cid: CID } = { cid: undefined! }
+// queue of index requests waiting to be processed
+// these are the requests received by the network
+export const GLOB_processQueue: Array<[CID, IndexRequest]> = []
+// lock to avoid triggering multiple simultaneous edits
+let GLOB_lockTree: boolean = false
+// queue of index requests waiting to be added to the tree
+// these are the requests coming from other pods
+const GLOB_mergeQueue: Array<[string, CID]> = [] // FIXME at the moment the key is not used
+// queue of DiffData waiting to be processed
+const GLOB_diffQueue: Array<DiffData> = []
+// lock to prevent multiple database edits which would fake the last root cid
+// this prevents from overwriting latest indexed data
+let GLOB_isIndexing = false
 
 // === EVENTS ===
 
@@ -19,57 +43,43 @@ export const events = new EventEmitter()
 // event type
 export enum evtype {
   // event to trigger collecting new index requests in queue
-  trigger = 'trigger',
+  triggerProcess = 'triggerProcess',
   // event to trigger collecting peer index requests from queue
   triggerCollect = 'triggerCollect',
   // event to trigger injestion of new requests in database
-  indexDiff = 'indexThat',
+  indexDiff = 'indexDiff',
   // event to trigger computing diff
   computeDiff = 'computeDiff'
 }
 
-// === INDEXING ===
-
-// queue of index requests waiting to be added to the tree
-// these are the requests coming from other pods
-const mergeQueue: Array<[string, CID]> = [] // FIXME at the moment the key is not used
-// queue of DiffData waiting to be processed
-const indexQueueDiff: Array<DiffData> = []
-// lock to prevent multiple database edits which would fake the last root cid
-// this prevents from overwriting latest indexed data
-let isIndexing = false
-
 // === HANDLERS ===
 
 /// ----- pubsub message handler -----
-export function getMessageHandler(processQueue: Array<[CID, IndexRequest]>) {
-  async function validMessageHandler(_cid: CID, dag: IndexRequest): Promise<void> {
-    // re-build the index request (because the type is loosely defined)
-    const ir: IndexRequest = {
-      kind: dag.kind,
-      time: dag.time,
-      data: dag.data,
-      pubkey: dag.pubkey,
-      sig: dag.sig
-    }
-    // then store the index request locally
-    kubo.dag
-      .put(ir)
-      .then((cid) => {
-        // cids should be the same
-        if (cid.toString() != _cid.toString()) console.log('👾 ', cid, '!=', _cid)
-        console.log('adding valid index request to process queue')
-        // pin the index request we just added
-        kubo.pin.add(cid).catch(() => console.log(`📌📌 could not pin index request that we just added ${cid}`))
-        // add index request to the process list
-        processQueue.push([cid, ir])
-        // ask to process the request
-        events.emit(evtype.trigger)
-      })
-      .catch(() => console.log(`📌 could not add valid index request ${_cid}`))
-    return
+export async function validMessageHandler(_cid: CID, dag: IndexRequest): Promise<void> {
+  // re-build the index request (because the type is loosely defined)
+  const ir: IndexRequest = {
+    kind: dag.kind,
+    time: dag.time,
+    data: dag.data,
+    pubkey: dag.pubkey,
+    sig: dag.sig
   }
-  return validMessageHandler
+  // then store the index request locally
+  kubo.dag
+    .put(ir)
+    .then((cid) => {
+      // cids should be the same
+      if (cid.toString() != _cid.toString()) console.log('👾 ', cid, '!=', _cid)
+      console.log('adding valid index request to process queue')
+      // pin the index request we just added
+      kubo.pin.add(cid).catch(() => console.log(`📌📌 could not pin index request that we just added ${cid}`))
+      // add index request to the process list
+      GLOB_processQueue.push([cid, ir])
+      // ask to process the request
+      events.emit(evtype.triggerProcess)
+    })
+    .catch(() => console.log(`📌 could not add valid index request ${_cid}`))
+  return
 }
 
 /// insert a batch of index requests in tree
@@ -84,62 +94,58 @@ async function insertBatch(rootCID: CID, items: Array<[CID, IndexRequest]>): Pro
 }
 
 /// ----- queue event handler -----
-export function getBatchHandler(glob: Globals) {
-  async function handleBatchFromQueue(): Promise<void> {
-    // ignore event if already processing something or if queue is empty
-    if (glob.lockTree) {
-      console.log('busy')
-      return
-    }
-    if (glob.processQueue.length == 0) {
-      return
-    }
-    // if not processing, do lock process
-    glob.lockTree = true
-    // take elements from queue
-    let i = undefined
-    const items: Array<[CID, IndexRequest]> = []
-    let num = 0
-    while ((i = glob.processQueue.shift()) != undefined && num < BATCH_SIZE) {
-      num += 1
-      items.push(i)
-    }
-    console.log('merge queue', mergeQueue.length, 'process queue', glob.processQueue.length)
-
-    // try inserting items
-    try {
-      // insert batch and get diff
-      const diffData = await insertBatch(glob.rootCID, items)
-      // update root cid and publishes it
-      const newCID = diffData.newCID
-      console.log(`👉 new root CID ${newCID}`)
-      glob.rootCID = newCID
-      glob.lockTree = false
-      kubo.name.publish(newCID, DD_TAMT_OPT).catch(console.log)
-      // add new data to database
-      events.emit(evtype.indexDiff, diffData)
-    } catch (e) {
-      console.error(`🥊 error merging ${glob.rootCID} with ${items.length} items`, e)
-      glob.processQueue.push(...items) // add them back to the process queue
-      glob.lockTree = false
-      // try again to collect items
-      events.emit(evtype.trigger)
-    }
-    return // nothing
+export async function takeFromProcessQueue(): Promise<void> {
+  // ignore event if already processing something or if queue is empty
+  if (GLOB_lockTree) {
+    console.log('busy')
+    return
+  }
+  if (GLOB_processQueue.length == 0) {
+    return
+  }
+  // if not processing, do lock process
+  GLOB_lockTree = true
+  // take elements from queue
+  let i = undefined
+  const items: Array<[CID, IndexRequest]> = []
+  let num = 0
+  while ((i = GLOB_processQueue.shift()) != undefined && num < BATCH_SIZE) {
+    num += 1
+    items.push(i)
+  }
+  // try inserting items
+  try {
+    // insert batch and get diff
+    console.log('inserting', items.length, 'items to tree')
+    const diffData = await insertBatch(GLOB_root.cid, items)
+    // update root cid and publishes it
+    const newCID = diffData.newCID
+    console.log(`👉 new root CID ${newCID}`)
+    GLOB_root.cid = newCID
+    GLOB_lockTree = false
+    kubo.name.publish(newCID, DD_TAMT_OPT).catch(console.log)
+    // add new data to database
+    events.emit(evtype.indexDiff, diffData)
+  } catch (e) {
+    console.error(`🥊 error merging ${GLOB_root.cid} with ${items.length} items`, e)
+    GLOB_processQueue.push(...items) // add them back to the process queue
+    GLOB_lockTree = false
+    // try again to collect items
+    events.emit(evtype.triggerProcess)
   }
-  return handleBatchFromQueue
+  return // nothing
 }
 
 // diff event handler
 export async function indexKnownDiff(diff: DiffData): Promise<void> {
   // --- 1. manage lock
   // if locked, add to index queue
-  if (isIndexing) {
-    indexQueueDiff.push(diff)
+  if (GLOB_isIndexing) {
+    GLOB_diffQueue.push(diff)
     return
   }
   // else lock
-  isIndexing = true
+  GLOB_isIndexing = true
   // --- 2. handle
   // check that last indexed CID match old CID
   const latestCID = await getLatestIndexedCID()
@@ -155,12 +161,13 @@ export async function indexKnownDiff(diff: DiffData): Promise<void> {
   console.log(`🦐 latest db cid ${diff.newCID}`)
   await setLatestIndexedCID(diff.newCID)
   // unlock
-  isIndexing = false
+  GLOB_isIndexing = false
   // --- 3. check queues
   checkQueues()
 }
 
 // compute diff with remote tree and add index requests to merge queue
+// TODO prevent computing diff from several peers at the same time, otherwise CIDs will be added multiple times
 export async function computeDiff(fromCID: CID, toCID: CID): Promise<void> {
   // if they differ
   if (fromCID.toString() != toCID.toString()) {
@@ -171,7 +178,7 @@ export async function computeDiff(fromCID: CID, toCID: CID): Promise<void> {
     for await (const leaf of iterator) {
       for (let irCID of leaf) {
         // add it to the process queue to be added in the tree
-        mergeQueue.push(['key', irCID]) // FIXME get key while iterating
+        GLOB_mergeQueue.push(['key', irCID]) // FIXME get key while iterating
         num += 1
       }
       // make sure that collection is triggered regularly
@@ -186,44 +193,43 @@ export async function computeDiff(fromCID: CID, toCID: CID): Promise<void> {
   }
 }
 
-// get collector from mergeQueue to processQueue
-export function getItemsCollector(glob: Globals) {
-  // takes item from merge queue, fetch index request, and put them in process queue
-  async function fetchItemsFromMergeQueue() {
-    // sort merge queue by key to give a better chance of good batching
-    // mergeQueue.sort() // FIXME while key is not there, we can not sort
-    // number of items retreived
-    let num = 0
-    // item taker
-    let i = undefined
-    const items: Array<Promise<[CID, IndexRequest]>> = []
-    // takes items in queue but no more than batch and avoid duplicates and not when queue is full
-    const seen: Map<string, boolean> = new Map()
-    while ((i = mergeQueue.shift()) != undefined && num < BATCH_SIZE && glob.processQueue.length < BATCH_SIZE) {
-      const [_k, cid] = i
-      if (!seen.get(cid.toString())) {
-        seen.set(cid.toString(), true)
-        num += 1
-        items.push(Promise.all([cid, kubo.dag.get(cid).then((r) => r.value)]))
-      }
-    }
-    const awaitedItems = await Promise.all(items)
-    awaitedItems.forEach(([c, _ir]) => {
-      // make sure to pin the index request to be able to serve its content later
-      kubo.pin.add(c, { recursive: true }).catch((_e) => console.log(`📌 could not pin remote index request ${c}`))
-    })
-    // add index requests to process queue
-    glob.processQueue.push(...awaitedItems)
-    // ask them to be processed if not processing
-    if (!glob.lockTree) {
-      events.emit(evtype.trigger)
-    }
-    // ask next batch if any
-    if (mergeQueue.length) {
-      events.emit(evtype.triggerCollect)
+// takes item from merge queue, fetch index request, and put them in process queue
+export async function collectFromMergeQueue() {
+  // prevent process queue from growing too much
+  if (GLOB_processQueue.length > MAX_PROCESS_QUEUE) {
+    if (!GLOB_lockTree) {
+      events.emit(evtype.triggerProcess)
     }
+    return
+  }
+  // sort merge queue by key to give a better chance of good batching
+  // mergeQueue.sort() // FIXME while key is not there, we can not sort
+  // number of items retreived
+  let num = 0
+  // item taker
+  let i = undefined
+  const items: Array<Promise<[CID, IndexRequest]>> = []
+  // takes items in queue but not too many
+  while ((i = GLOB_mergeQueue.shift()) != undefined && num < MAX_PARALLEL_IR_FETCH) {
+    const [_k, cid] = i
+    num += 1
+    items.push(Promise.all([cid, kubo.dag.get(cid).then((r) => r.value)]))
+  }
+  const awaitedItems = await Promise.all(items)
+  awaitedItems.forEach(([c, _ir]) => {
+    // make sure to pin the index request to be able to serve its content later
+    kubo.pin.add(c, { recursive: true }).catch((_e) => console.log(`📌 could not pin remote index request ${c}`))
+  })
+  // add index requests to process queue
+  GLOB_processQueue.push(...awaitedItems)
+  // ask them to be processed if not already processing
+  if (!GLOB_lockTree) {
+    events.emit(evtype.triggerProcess)
+  }
+  // ask next batch if any
+  if (GLOB_mergeQueue.length) {
+    events.emit(evtype.triggerCollect)
   }
-  return fetchItemsFromMergeQueue
 }
 
 // check queues
@@ -234,7 +240,18 @@ function checkQueues() {
 
 // if diff came in the meanwhile, index them
 function checkQueueDiff() {
-  if (indexQueueDiff.length != 0) {
-    indexKnownDiff(indexQueueDiff.shift()!)
+  if (GLOB_diffQueue.length != 0) {
+    indexKnownDiff(GLOB_diffQueue.shift()!)
   }
 }
+
+setInterval(() => {
+  console.log(
+    'merge queue',
+    GLOB_mergeQueue.length,
+    'process queue',
+    GLOB_processQueue.length,
+    'diff queue',
+    GLOB_diffQueue.length
+  )
+}, 10000)
diff --git a/src/indexer/start.ts b/src/indexer/start.ts
index c7065dd..47b9a3b 100644
--- a/src/indexer/start.ts
+++ b/src/indexer/start.ts
@@ -4,7 +4,14 @@ import { getPubSubHandler } from '../collector'
 import { KUBO_RPC, kubo, kubo2 } from '../kubo'
 import type { IndexHist, IndexRequest } from '../types'
 import { CID } from 'multiformats'
-import { getBatchHandler, getMessageHandler, indexKnownDiff, computeDiff, getItemsCollector } from './handlers'
+import {
+  indexKnownDiff,
+  computeDiff,
+  GLOB_root,
+  takeFromProcessQueue,
+  collectFromMergeQueue,
+  validMessageHandler
+} from './handlers'
 import { events, evtype } from './handlers'
 import type { DdKeys } from './types'
 import { getRootCIDfromArgs } from './utils'
@@ -12,61 +19,52 @@ import { DD_TAMT_HIST_OPT, ddKeys } from './ipns'
 import { initHistIfNull, publishKeys, trusted_peer_list } from './bootstrap'
 import { getLatestIndexedCID, setLatestIndexedCID } from './database'
 
-// === GLOBALS ===
-export interface Globals {
-  // queue of index requests waiting to be processed
-  // these are the requests received by the network
-  processQueue: Array<[CID, IndexRequest]>
-  // lock to avoid triggering multiple simultaneous edits
-  lockTree: boolean
-  // root CID of tree
-  rootCID: CID
-}
-
 // ----- regularly publish history
 function periodicHistPublish(interval: number) {
-  setInterval(async () => {
-    // if history is up to date, to nothing
-    if (hist.current_index.toString() == glob.rootCID.toString()) return
-    // else, update the history
-    const newHist: IndexHist = {
-      last_history: histCID,
-      current_index: glob.rootCID,
-      number: hist.number + 1,
-      timestamp: Date.now()
-    }
-    const newHistCID = await kubo.dag.put(newHist)
-    kubo.name.publish(newHistCID, DD_TAMT_HIST_OPT)
-    // update global vars
-    hist = newHist
-    histCID = newHistCID
-    return
-  }, interval)
+  setInterval(histPublish, interval)
+}
+async function histPublish(): Promise<void> {
+  // if history is up to date, to nothing
+  if (GLOB_hist.current_index.toString() == GLOB_root.cid.toString()) return
+  // else, update the history
+  const newHist: IndexHist = {
+    last_history: GLOB_histCID,
+    current_index: GLOB_root.cid,
+    number: GLOB_hist.number + 1,
+    timestamp: Date.now()
+  }
+  const newHistCID = await kubo.dag.put(newHist)
+  kubo.name.publish(newHistCID, DD_TAMT_HIST_OPT)
+  // update global vars
+  GLOB_hist = newHist
+  GLOB_histCID = newHistCID
+  return
 }
 
 // ----- regularly synchronize from peers
 function periodicPeerSync(interval: number) {
   setInterval(async () => peerSync(trusted_peer_list), interval)
 }
-async function peerSync(trusted_peer_list: string[]) {
+async function peerSync(trusted_peer_list: string[]): Promise<void> {
   for (const peer of trusted_peer_list) {
     console.log('🔄 syncing from peer', peer)
     try {
       // resolve peer root keys
-      for await (const name of kubo.name.resolve(peer, { nocache: true, timeout: 100 })) {
+      for await (const name of kubo.name.resolve(peer)) {
         const cid = CID.parse(name.slice(6))
-        const peerDdKeys: DdKeys = (await kubo.dag.get(cid, { timeout: 100 })).value
+        const peerDdKeys: DdKeys = (await kubo.dag.get(cid)).value
         // resolve tamt key
-        for await (const name of kubo.name.resolve(peerDdKeys.tamt, { nocache: true, timeout: 100 })) {
+        for await (const name of kubo.name.resolve(peerDdKeys.tamt)) {
           const cid = CID.parse(name.slice(6))
           // found peer tree, request index diff
-          events.emit(evtype.computeDiff, glob.rootCID, cid)
+          events.emit(evtype.computeDiff, GLOB_root.cid, cid)
         }
       }
     } catch (e) {
       console.error('❌ could not find index of trusted peer', peer, 'due to', e)
     }
   }
+  return
 }
 
 // pubsub configuration
@@ -93,7 +91,7 @@ function anyErrorCallback(e: any) {
 }
 function pubsubSubscribe() {
   kubo2.pubsub
-    .subscribe(TOPIC, getPubSubHandler(getMessageHandler(glob.processQueue)), pubsubSubscribeOptions)
+    .subscribe(TOPIC, getPubSubHandler(validMessageHandler), pubsubSubscribeOptions)
     .catch(pubsubAbortCallback)
   console.log('🔌 connected to', KUBO_RPC)
   console.log('👂 listening on topic', TOPIC)
@@ -116,15 +114,11 @@ const HIST_PUBLISH_PERIOD = 10 * MINUTE
 const PEERSYNC_PERIOD = 1 * DAY
 
 // init globals
-const glob: Globals = {
-  processQueue: [],
-  lockTree: false,
-  // set global rootCID from CLI args
-  rootCID: await getRootCIDfromArgs(process.argv)
-}
-await initHistIfNull(glob.rootCID) // make sure history is available until then
-let histCID: CID = await resolveHist()
-let hist: IndexHist = (await kubo.dag.get(histCID)).value
+// set global rootCID from CLI args
+GLOB_root.cid = await getRootCIDfromArgs(process.argv)
+await initHistIfNull(GLOB_root.cid) // make sure history is available until then
+let GLOB_histCID: CID = await resolveHist()
+let GLOB_hist: IndexHist = (await kubo.dag.get(GLOB_histCID)).value
 
 // get latest db CID
 let latestCID = await getLatestIndexedCID()
@@ -135,18 +129,18 @@ if (latestCID == null) {
 console.log(`🛢 latest indexed cid ${latestCID}`)
 
 // bind event handlers
-events.on(evtype.trigger, getBatchHandler(glob))
+events.on(evtype.triggerProcess, takeFromProcessQueue)
 events.on(evtype.indexDiff, indexKnownDiff)
 events.on(evtype.computeDiff, computeDiff)
-events.on(evtype.triggerCollect, getItemsCollector(glob))
+events.on(evtype.triggerCollect, collectFromMergeQueue)
 pubsubSubscribe() // subscribe to index requests channel
 periodicHistPublish(HIST_PUBLISH_PERIOD) // regularly publish history
-periodicPeerSync(PEERSYNC_PERIOD) // regularly sync from peers
+// periodicPeerSync(PEERSYNC_PERIOD) // regularly sync from peers
 
 // emit event to tell indexer to start indexing to database
 // if it is starting from scratch (emtpy database), it will iterate over all values
 // if it already indexed up to a given cid (last_indexed_cid in db), it will only iterate over the diff
-events.emit(evtype.computeDiff, latestCID, glob.rootCID)
+events.emit(evtype.computeDiff, latestCID, GLOB_root.cid)
 // at startup browse peer list
 peerSync(trusted_peer_list)
 
diff --git a/src/interface.ts b/src/interface.ts
index d75b5b2..a6dd3cc 100644
--- a/src/interface.ts
+++ b/src/interface.ts
@@ -220,6 +220,7 @@ export async function compareInodes(
 }
 
 // compare leaves by "eating" cids one by one
+// based on the assumption that leaves are sorted
 function compareLeafs(
   k: string,
   left: IndexLeaf,
diff --git a/src/processor.ts b/src/processor.ts
index 350418f..a9065ca 100644
--- a/src/processor.ts
+++ b/src/processor.ts
@@ -4,6 +4,7 @@ import { BASE, KEYSIZE } from './consts'
 import { emptyInode, emptyVinode, emptyLeaf } from './types'
 import type { IndexInode, IndexLeaf, IndexVinode, IndexRequest, IndexHist } from './types'
 import { DD_TAMT_HIST_OPT, DD_TAMT_OPT, ddKeys } from './indexer/ipns'
+import { uniqueby } from './utils'
 
 // ===================== utils =====================
 
@@ -219,10 +220,13 @@ export async function mergeInodesSync(nodeA: IndexInode | IndexLeaf, nodeB: Inde
   const isBleaf = (nodeB as IndexLeaf).leaf != undefined
   if (isAleaf && isBleaf) {
     // these are not internal nodes, but leaves, and we should merge them
-    const cidSet = new Set([...(nodeA as unknown as IndexLeaf).leaf, ...(nodeB as IndexLeaf).leaf])
-    const cidList = Array.from(cidSet).sort()
+    // problem: Set will not work on CIDs because they are objects and differ even if they have the same data
+    // const cidSet = new Set([...(nodeA as unknown as IndexLeaf).leaf, ...(nodeB as IndexLeaf).leaf])
+    // const cidListUniqueSorted = Array.from(cidSet).sort()
+    const cidList = [...(nodeA as IndexLeaf).leaf, ...(nodeB as IndexLeaf).leaf]
+    const cidListUniqueSorted = uniqueby(cidList, (k) => k.toString()).sort()
     const newLeaf: IndexLeaf = {
-      leaf: cidList
+      leaf: cidListUniqueSorted
     }
     return kubo.dag.put(newLeaf).then((cid) => {
       // WIP pin des not work well
diff --git a/src/scripts/diff.ts b/src/scripts/diff.ts
new file mode 100644
index 0000000..d605ba9
--- /dev/null
+++ b/src/scripts/diff.ts
@@ -0,0 +1,26 @@
+import { EMPTY_NODE_CID } from '../consts'
+import { getDiff } from '../interface'
+import { CID } from 'multiformats'
+
+const fromCID = EMPTY_NODE_CID
+// const fromCID = CID.parse('bafyreic6qy2k5w6324uayfzoybmzypdv57zk3zxezaiws4h553jjogw6o4')
+const toCID = CID.parse('bafyreihls2kmwx2ufuwx4kbl67f3ipl5wbc6j6snfegy3sttymrhxsgvpa')
+
+const iterator = getDiff(fromCID, toCID)
+
+const BATCH_SIZE = 1000
+
+async function doit() {
+  let num = 0
+  for await (const leaf of iterator) {
+    for (let irCID of leaf) {
+      num += 1
+      if (num % BATCH_SIZE == 0) {
+        console.log(num)
+      }
+      // console.log(irCID.toString())
+    }
+  }
+}
+
+doit()
diff --git a/src/scripts/set.ts b/src/scripts/set.ts
new file mode 100644
index 0000000..ddde1e8
--- /dev/null
+++ b/src/scripts/set.ts
@@ -0,0 +1,17 @@
+import { uniqueby } from '../utils'
+import { CID } from 'multiformats'
+
+let a = [
+  CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy'),
+  CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy')
+]
+
+let s = new Set(a)
+
+console.log(s)
+// two elements
+
+let u = uniqueby(a, (e) => e.toString())
+
+console.log(u)
+// one element
\ No newline at end of file
diff --git a/src/scripts/timestamp.ts b/src/scripts/timestamp.ts
new file mode 100644
index 0000000..28516e6
--- /dev/null
+++ b/src/scripts/timestamp.ts
@@ -0,0 +1,7 @@
+import { timestampToKey } from '../processor'
+
+console.log(timestampToKey(1519405679000)) // 00000161c3a2c198
+console.log(timestampToKey(1519405679)) // 000000005a904a6f
+console.log(timestampToKey(1523008319)) // 000000005ac7433f
+console.log(timestampToKey(1625151291)) // 0000000060ddd73b
+console.log(timestampToKey(1625151291000)) // 0000017a6290be78
diff --git a/src/types.ts b/src/types.ts
index e2373df..c18fb02 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -63,7 +63,7 @@ export function emptyVinode(): IndexVinode {
 
 // leaf
 export interface IndexLeaf {
-  // array of cid, expected to be quite small
+  // array of cid, expected to be quite small, without duplicates, and sorted
   leaf: CID[]
 }
 
diff --git a/src/utils.ts b/src/utils.ts
index 02e9d1c..4cd1b17 100644
--- a/src/utils.ts
+++ b/src/utils.ts
@@ -14,3 +14,16 @@ export function formatDate(d: Date) {
     d.getSeconds().toString().padStart(2, '0')
   )
 }
+
+export function uniqueby<T>(a: Array<T>, k: (e: T) => string) {
+  const seen = new Map<string, boolean>()
+  const unique = []
+  for (const e of a) {
+    const key = k(e)
+    if (!seen.get(key)) {
+      seen.set(key, true)
+      unique.push(e)
+    }
+  }
+  return unique
+}
-- 
GitLab