diff --git a/.gitignore b/.gitignore index 118821d6b2af69d91b87eb11664624e315448843..7e12058844ee7ffd32658444a3752fb22802a537 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,6 @@ node_modules *.local # Env -.env \ No newline at end of file +.env + +input/* \ No newline at end of file diff --git a/input/.gitkeep b/input/.gitkeep new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/scripts/cesium-plus-import.ts b/src/scripts/cesium-plus-import.ts index 99d0b3ca3e79e5ead28ac0c418c0e02b6dc98084..c92eafc78f1cad9615e398d9a832ed9a49d78650 100644 --- a/src/scripts/cesium-plus-import.ts +++ b/src/scripts/cesium-plus-import.ts @@ -6,7 +6,8 @@ import { cplusRawIrCID, cplusIrToAMT } from '../cesium-plus' -import * as fs from 'fs/promises' +import { createInterface } from 'readline' +import { appendFile, createReadStream } from 'fs' import { timestampToKey } from '../processor' // profile files @@ -61,17 +62,15 @@ async function fetchRawCplus(id: string): Promise<string> { return fetch(ENDPOINT + '/user/profile/' + id + '/_source').then((b) => b.text()) } -/// download all c+ data and add them to kubo -async function getAllCplusIr(): Promise<Array<[string, CID]>> { +/// download all c+ data and add them to a file +// strange behavior: all lines are written (`wc -l input/cplusimport.jsonl`) way before logging finishes +async function getAllCplusIr() { + const filename = './input/cplusimport.jsonl' const SCROLL_TIME = '5m' - const PAGE_SIZE = 100 + const PAGE_SIZE = 1000 const ENDPOINT = 'https://g1.data.e-is.pro' - - const cids: Array<Promise<[string, CID]>> = [] const URL = ENDPOINT + '/user/profile/_search?scroll=' + SCROLL_TIME - const decoder = new TextDecoder() - // first batch let batch = await fetch(URL + '&_source=false&filter_path=took,_scroll_id,hits.hits._id,hits.total', { method: 'post', @@ -88,22 +87,17 @@ async function getAllCplusIr(): Promise<Array<[string, CID]>> { console.log(`importing ${total} cplus profiles...`) // process batches while available - while (batch.took > 0) { + while (totalimported < total) { // add to the list for (const hit of batch.hits.hits) { - cids.push( - fetchRawCplus(hit._id) - .then((cplusRaw) => [cplusRaw, JSON.parse(cplusRaw)]) - .then(([cplusRaw, cplus]) => Promise.all([timestampToKey(cplus.time), cplusRawIrCID(cplus, cplusRaw)])) - ) + fetchRawCplus(hit._id).then((cplusRaw) => appendFile(filename, cplusRaw + '\n', () => {})) } // console.log(ids) imported += batch.took totalimported += batch.took - if (imported > 100) { - console.log(`${totalimported.toString().padStart(5)}/${total} imported`) + if (imported > 1000) { + console.log(`${totalimported.toString().padStart(5)}/${total}`) imported = 0 - await Promise.all(cids) } // take next batch @@ -118,8 +112,51 @@ async function getAllCplusIr(): Promise<Array<[string, CID]>> { scroll_id = batch._scroll_id // console.log(scroll_id) } +} - return Promise.all(cids).then((l) => l.sort()) +/// put all raw cplus profiles to ipfs node in index request and write result to a file +async function putIrToIPFS() { + const LIMIT = 500 // max number of lines to process simultaneously + const NOTIF = 2000 // log every N lines processed + const input = './input/cplusimport.jsonl' + const output = './input/cplusIR.txt' + const rejected = './input/cplusHS.txt' + let queueSize = 0 + let read = 0 + + function process(line: string) { + queueSize++ + if (queueSize > LIMIT) { + linereader.pause() + } + try { + const cplus = JSON.parse(line) + cplusRawIrCID(cplus, line) + .then((cid) => timestampToKey(cplus.time) + ' ' + cid.toString() + '\n') + .then((l) => + appendFile(output, l, () => { + read++ + queueSize-- + if (queueSize < LIMIT) { + linereader.resume() + } + if (read % NOTIF == 0) { + console.log(`processed ${read} lines`) + } + }) + ).catch(e => { + console.log(e); + appendFile(rejected, line, () => { + read++ + }) + }) + } catch (e) { + appendFile(rejected, line + '\n\n\n', () => {}) + } + } + const linereader = createInterface(createReadStream(input)) + linereader.on('line', process) + linereader.on('close', () => console.log('done')) } async function importAllCplusToAMT() { @@ -131,4 +168,6 @@ async function importAllCplusToAMT() { // TODO use command line args to choose what to do // doImport() // doAllCplusCidsToAMT() -importAllCplusToAMT() +// importAllCplusToAMT() +// getAllCplusIr() +putIrToIPFS()