Skip to content
Snippets Groups Projects
Select Git revision
  • 86f8b2f665b88ae36353a6ead0945a4928a8b3b2
  • main default protected
  • release/1.1
  • encrypt_comments
  • mnemonic_dewif
  • authors_rules
  • 0.14
  • rtd
  • 1.2.1 protected
  • 1.2.0 protected
  • 1.1.1 protected
  • 1.1.0 protected
  • 1.0.0 protected
  • 1.0.0rc1 protected
  • 1.0.0rc0 protected
  • 1.0.0-rc protected
  • 0.62.0 protected
  • 0.61.0 protected
  • 0.60.1 protected
  • 0.58.1 protected
  • 0.60.0 protected
  • 0.58.0 protected
  • 0.57.0 protected
  • 0.56.0 protected
  • 0.55.1 protected
  • 0.55.0 protected
  • 0.54.3 protected
  • 0.54.2 protected
28 results

modules.rst.txt

Blame
  • cesium-plus-import.ts 4.78 KiB
    import { CID } from 'multiformats'
    import { wrapCplusInIndexRequest, indexRequestsToAMT } from '../cesium-plus'
    import { createInterface } from 'readline'
    import { appendFile, createReadStream } from 'fs'
    import { timestampToKey } from '../processor'
    import { readFile } from 'fs/promises'
    import { EMPTY_NODE_CID } from '../consts'
    
    // fetch raw profile
    // fetchRawCplus('38QzVPhRLbEiqJtvCmRY6A6SraheNA6fJbomFX75b2qb').then(console.log)
    async function fetchRawCplus(id: string): Promise<string> {
      const ENDPOINT = 'https://g1.data.e-is.pro'
      return fetch(ENDPOINT + '/user/profile/' + id + '/_source').then((b) => b.text())
    }
    
    /// download all c+ data and add them to a file
    async function downloadAllCplusProfilesRaw(filename: string) {
      const SCROLL_TIME = '5m'
      const PAGE_SIZE = 100
      const ENDPOINT = 'https://g1.data.e-is.pro'
      const URL = `${ENDPOINT}/user/profile/_search?scroll=${SCROLL_TIME}&size=${PAGE_SIZE}`
      const NOTIF = 1000
    
      // first batch
      let batch = await fetch(`${URL}&filter_path=_scroll_id,hits.total,hits.hits._id`, {
        method: 'post',
        body: JSON.stringify({
          query: { match_all: {} }
        })
      }).then((b) => b.json())
      let scroll_id = batch._scroll_id
      const total = batch.hits.total
      let scrolled = PAGE_SIZE
    
      console.log(`downloading ${total} cplus profiles...`)
    
      // process batches while available
      while (scrolled < total) {
        // add raw source to the file
        for (const hit of batch.hits.hits) {
          fetchRawCplus(hit._id).then((cplusRaw) => appendFile(filename, cplusRaw + '\n', () => {}))
        }
        // take next batch
        batch = await fetch(ENDPOINT + '/_search/scroll', {
          method: 'post',
          body: JSON.stringify({
            scroll: SCROLL_TIME,
            scroll_id: scroll_id
          })
        }).then((b) => b.json())
        scroll_id = batch._scroll_id
        scrolled += PAGE_SIZE
        if (scrolled % NOTIF == 0) {
          console.log(`${scrolled.toString().padStart(5)}/${total}`)
        }
      }
      console.log(`${total}/${total}, done.`)
    }
    
    /// put all raw cplus profiles to ipfs node in index request and write result to a file
    async function wrapRawProfilesInIndexRequest() {
      const LIMIT = 500 // max number of lines to process simultaneously
      const NOTIF = 2000 // log every N lines processed
      const input = './input/cplusimport.jsonl'
      const output = './input/cplusIR.txt'
      const rejected = './input/cplusHS.txt'
      const convertImg = false // also upload base64 image as separate file for later reference
      let queueSize = 0
      let readTotal = 0
    
      function process(line: string) {
        queueSize++
        if (queueSize > LIMIT) {
          linereader.pause()
        }
        try {
          const cplus = JSON.parse(line)
          wrapCplusInIndexRequest(cplus, line, convertImg)
            .then((cid) => timestampToKey(cplus.time) + ' ' + cid.toString() + '\n')
            .then((l) =>
              appendFile(output, l, () => {
                readTotal++
                queueSize--
                if (queueSize < LIMIT) {
                  linereader.resume()
                }
                if (readTotal % NOTIF == 0) {
                  console.log(`processed ${readTotal} profiles`)
                }
              })
            )
            .catch((e) => {
              console.log(e)
              appendFile(rejected, line, () => {
                readTotal++
              })
            })
        } catch (e) {
          appendFile(rejected, line + '\n\n\n', () => {})
        }
      }
      const linereader = createInterface(createReadStream(input))
      linereader.on('line', process)
      linereader.on('close', () => console.log('done'))
    }
    
    // expects to receive a file with on each line a label and the index request CID
    async function importIrToAMT(rootNodeCid: CID, input: string) {
      const requests = await readFile(input, 'utf8')
        .then((r) => r.split('\n'))
        .then((p) =>
          p
            .filter((e) => e.length > 0)
            .map((e) => {
              const parts = e.split(' ')
              return [parts[0], CID.parse(parts[1])] as [string, CID]
            })
        )
        .then((l) => l.sort())
      await indexRequestsToAMT(requests, rootNodeCid)
    }
    
    // 26 minutes
    // this can take a while because ~50000 profiles are downloaded in raw format independantly
    // downloadAllCplusProfilesRaw('./input/cplusimport.jsonl')
    
    // 12 minutes
    // speed is reduced to limit RAM usage and concurrent writes to IPFS node
    // wrapRawProfilesInIndexRequest()
    
    // 3 minutes
    // import by batch and logs successive cids
    // importIrToAMT(EMPTY_NODE_CID, './input/cplusIR.txt')
    const rootCID = CID.parse("bafyreieybuh6l6bpz3jn76wqbf7jweb4ptq55n3avbaxe3nhkeiabxzmze")
    importIrToAMT(rootCID, './input/devIr+labels.txt')
    // → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey (old with simple nodes)
    // → bafyreicklp6mtqzubxxti2uddggmsttjbdcuahm2uqxuri4z6duypliax4 (new with more context and fixed labels)
    
    // bafyreieybuh6l6bpz3jn76wqbf7jweb4ptq55n3avbaxe3nhkeiabxzmze
    // bafyreifhhss6h5j72ewdcr6b75wda4573wtskjfp2pqiae5l73efwvrvjy