Skip to content
Snippets Groups Projects
Commit 6dc4598f authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

add intermediate files

parent 728dfdac
No related branches found
No related tags found
No related merge requests found
...@@ -9,3 +9,5 @@ node_modules ...@@ -9,3 +9,5 @@ node_modules
# Env # Env
.env .env
input/*
\ No newline at end of file
...@@ -6,7 +6,8 @@ import { ...@@ -6,7 +6,8 @@ import {
cplusRawIrCID, cplusRawIrCID,
cplusIrToAMT cplusIrToAMT
} from '../cesium-plus' } from '../cesium-plus'
import * as fs from 'fs/promises' import { createInterface } from 'readline'
import { appendFile, createReadStream } from 'fs'
import { timestampToKey } from '../processor' import { timestampToKey } from '../processor'
// profile files // profile files
...@@ -61,17 +62,15 @@ async function fetchRawCplus(id: string): Promise<string> { ...@@ -61,17 +62,15 @@ async function fetchRawCplus(id: string): Promise<string> {
return fetch(ENDPOINT + '/user/profile/' + id + '/_source').then((b) => b.text()) return fetch(ENDPOINT + '/user/profile/' + id + '/_source').then((b) => b.text())
} }
/// download all c+ data and add them to kubo /// download all c+ data and add them to a file
async function getAllCplusIr(): Promise<Array<[string, CID]>> { // strange behavior: all lines are written (`wc -l input/cplusimport.jsonl`) way before logging finishes
async function getAllCplusIr() {
const filename = './input/cplusimport.jsonl'
const SCROLL_TIME = '5m' const SCROLL_TIME = '5m'
const PAGE_SIZE = 100 const PAGE_SIZE = 1000
const ENDPOINT = 'https://g1.data.e-is.pro' const ENDPOINT = 'https://g1.data.e-is.pro'
const cids: Array<Promise<[string, CID]>> = []
const URL = ENDPOINT + '/user/profile/_search?scroll=' + SCROLL_TIME const URL = ENDPOINT + '/user/profile/_search?scroll=' + SCROLL_TIME
const decoder = new TextDecoder()
// first batch // first batch
let batch = await fetch(URL + '&_source=false&filter_path=took,_scroll_id,hits.hits._id,hits.total', { let batch = await fetch(URL + '&_source=false&filter_path=took,_scroll_id,hits.hits._id,hits.total', {
method: 'post', method: 'post',
...@@ -88,22 +87,17 @@ async function getAllCplusIr(): Promise<Array<[string, CID]>> { ...@@ -88,22 +87,17 @@ async function getAllCplusIr(): Promise<Array<[string, CID]>> {
console.log(`importing ${total} cplus profiles...`) console.log(`importing ${total} cplus profiles...`)
// process batches while available // process batches while available
while (batch.took > 0) { while (totalimported < total) {
// add to the list // add to the list
for (const hit of batch.hits.hits) { for (const hit of batch.hits.hits) {
cids.push( fetchRawCplus(hit._id).then((cplusRaw) => appendFile(filename, cplusRaw + '\n', () => {}))
fetchRawCplus(hit._id)
.then((cplusRaw) => [cplusRaw, JSON.parse(cplusRaw)])
.then(([cplusRaw, cplus]) => Promise.all([timestampToKey(cplus.time), cplusRawIrCID(cplus, cplusRaw)]))
)
} }
// console.log(ids) // console.log(ids)
imported += batch.took imported += batch.took
totalimported += batch.took totalimported += batch.took
if (imported > 100) { if (imported > 1000) {
console.log(`${totalimported.toString().padStart(5)}/${total} imported`) console.log(`${totalimported.toString().padStart(5)}/${total}`)
imported = 0 imported = 0
await Promise.all(cids)
} }
// take next batch // take next batch
...@@ -118,8 +112,51 @@ async function getAllCplusIr(): Promise<Array<[string, CID]>> { ...@@ -118,8 +112,51 @@ async function getAllCplusIr(): Promise<Array<[string, CID]>> {
scroll_id = batch._scroll_id scroll_id = batch._scroll_id
// console.log(scroll_id) // console.log(scroll_id)
} }
}
return Promise.all(cids).then((l) => l.sort()) /// put all raw cplus profiles to ipfs node in index request and write result to a file
async function putIrToIPFS() {
const LIMIT = 500 // max number of lines to process simultaneously
const NOTIF = 2000 // log every N lines processed
const input = './input/cplusimport.jsonl'
const output = './input/cplusIR.txt'
const rejected = './input/cplusHS.txt'
let queueSize = 0
let read = 0
function process(line: string) {
queueSize++
if (queueSize > LIMIT) {
linereader.pause()
}
try {
const cplus = JSON.parse(line)
cplusRawIrCID(cplus, line)
.then((cid) => timestampToKey(cplus.time) + ' ' + cid.toString() + '\n')
.then((l) =>
appendFile(output, l, () => {
read++
queueSize--
if (queueSize < LIMIT) {
linereader.resume()
}
if (read % NOTIF == 0) {
console.log(`processed ${read} lines`)
}
})
).catch(e => {
console.log(e);
appendFile(rejected, line, () => {
read++
})
})
} catch (e) {
appendFile(rejected, line + '\n\n\n', () => {})
}
}
const linereader = createInterface(createReadStream(input))
linereader.on('line', process)
linereader.on('close', () => console.log('done'))
} }
async function importAllCplusToAMT() { async function importAllCplusToAMT() {
...@@ -131,4 +168,6 @@ async function importAllCplusToAMT() { ...@@ -131,4 +168,6 @@ async function importAllCplusToAMT() {
// TODO use command line args to choose what to do // TODO use command line args to choose what to do
// doImport() // doImport()
// doAllCplusCidsToAMT() // doAllCplusCidsToAMT()
importAllCplusToAMT() // importAllCplusToAMT()
// getAllCplusIr()
putIrToIPFS()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment