diff --git a/src/cesium-plus.ts b/src/cesium-plus.ts index e20f5c0c5dd93ded6ddc0348c00f61e0665511f3..6e11658229c5a3170874e05f85aea37b0776b8b2 100644 --- a/src/cesium-plus.ts +++ b/src/cesium-plus.ts @@ -56,7 +56,7 @@ export async function indexRequestsToAMT(requests: Array<[string, CID]>, rootNod const chunkSize = 5000 const n = requests.length console.log(Date.now() + ' merging') - for (let i = 0; i < n / chunkSize; i++) { + for (let i = 0; i < Math.ceil(n / chunkSize); i++) { console.log(Date.now() + ' chunk number ' + i) const chunk = requests.slice(i * chunkSize, (i + 1) * chunkSize) const tree = arrayToVinode('', chunk) // partial tree for this chunk diff --git a/src/scripts/cesium-plus-import.ts b/src/scripts/cesium-plus-import.ts index 83988825b9aed9f2a6fe8b522b60dcfc5571b1f6..a106b9d206b3cef263dba1205b38cdeb7a6290e3 100644 --- a/src/scripts/cesium-plus-import.ts +++ b/src/scripts/cesium-plus-import.ts @@ -59,6 +59,7 @@ async function downloadAllCplusProfilesRaw(endpoint: string, filename: string) { /// put all raw cplus profiles to ipfs node in index request and write result to a file async function wrapRawProfilesInIndexRequest(input: string, output: string) { + const TIMESTAMP_MULTIPLIER = 1000 // cesium plus data was using seconds instead of milliseconds const LIMIT = 500 // max number of lines to process simultaneously const NOTIF = 2000 // log every N lines processed const rejected = './input/cplusHS.txt' @@ -66,40 +67,47 @@ async function wrapRawProfilesInIndexRequest(input: string, output: string) { let queueSize = 0 let readTotal = 0 - function process(line: string) { - queueSize++ - if (queueSize > LIMIT) { - linereader.pause() - } - try { - const cplus = JSON.parse(line) - wrapCplusInIndexRequest(cplus, line, convertImg) - .then((cid) => timestampToKey(cplus.time) + ' ' + cid.toString() + '\n') - .then((l) => - appendFile(output, l, () => { - readTotal++ - queueSize-- - if (queueSize < LIMIT) { - linereader.resume() - } - if (readTotal % NOTIF == 0) { - console.log(`processed ${readTotal} profiles`) - } - }) - ) - .catch((e) => { - console.log(e) - appendFile(rejected, line, () => { - readTotal++ + await new Promise<void>((resolve, reject) => { + // line process function + function process(line: string) { + queueSize++ + if (queueSize > LIMIT) { + linereader.pause() + } + try { + const cplus = JSON.parse(line) + wrapCplusInIndexRequest(cplus, line, convertImg) + .then((cid) => timestampToKey(cplus.time * TIMESTAMP_MULTIPLIER) + ' ' + cid.toString() + '\n') + .then((l) => + appendFile(output, l, () => { + readTotal++ + queueSize-- + if (queueSize < LIMIT) { + linereader.resume() + } + if (readTotal % NOTIF == 0) { + console.log(`processed ${readTotal} profiles`) + } + }) + ) + .catch((e) => { + console.log(e) + appendFile(rejected, line, () => { + readTotal++ + }) }) - }) - } catch (e) { - appendFile(rejected, line + '\n\n\n', () => {}) + } catch (e) { + appendFile(rejected, line + '\n\n\n', () => {}) + } } - } - const linereader = createInterface(createReadStream(input)) - linereader.on('line', process) - linereader.on('close', () => console.log('done')) + // stream + const linereader = createInterface(createReadStream(input)) + linereader.on('line', process) + linereader.on('close', resolve) + linereader.on('error', reject) + }) + + console.log('done.') } // expects to receive a file with on each line a label and the index request CID @@ -124,9 +132,9 @@ async function main() { // 26 minutes // this can take a while because ~50000 profiles are downloaded in raw format independantly // 'https://g1.data.e-is.pro' - await downloadAllCplusProfilesRaw('https://g1data.dns1.us', './input/cplusimport.jsonl') + await downloadAllCplusProfilesRaw('https://g1.data.e-is.pro', './input/cplusimport.jsonl') - console.log(Date.now(), 'start wraping in index requests') + console.log(Date.now(), 'start wrapping in index requests') // 12 minutes // speed is reduced to limit RAM usage and concurrent writes to IPFS node