Skip to content
Snippets Groups Projects
Commit 80d6ea32 authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

improve cplus import

parent 87484913
No related branches found
No related tags found
No related merge requests found
......@@ -56,7 +56,7 @@ export async function indexRequestsToAMT(requests: Array<[string, CID]>, rootNod
const chunkSize = 5000
const n = requests.length
console.log(Date.now() + ' merging')
for (let i = 0; i < n / chunkSize; i++) {
for (let i = 0; i < Math.ceil(n / chunkSize); i++) {
console.log(Date.now() + ' chunk number ' + i)
const chunk = requests.slice(i * chunkSize, (i + 1) * chunkSize)
const tree = arrayToVinode('', chunk) // partial tree for this chunk
......
......@@ -59,6 +59,7 @@ async function downloadAllCplusProfilesRaw(endpoint: string, filename: string) {
/// put all raw cplus profiles to ipfs node in index request and write result to a file
async function wrapRawProfilesInIndexRequest(input: string, output: string) {
const TIMESTAMP_MULTIPLIER = 1000 // cesium plus data was using seconds instead of milliseconds
const LIMIT = 500 // max number of lines to process simultaneously
const NOTIF = 2000 // log every N lines processed
const rejected = './input/cplusHS.txt'
......@@ -66,40 +67,47 @@ async function wrapRawProfilesInIndexRequest(input: string, output: string) {
let queueSize = 0
let readTotal = 0
function process(line: string) {
queueSize++
if (queueSize > LIMIT) {
linereader.pause()
}
try {
const cplus = JSON.parse(line)
wrapCplusInIndexRequest(cplus, line, convertImg)
.then((cid) => timestampToKey(cplus.time) + ' ' + cid.toString() + '\n')
.then((l) =>
appendFile(output, l, () => {
readTotal++
queueSize--
if (queueSize < LIMIT) {
linereader.resume()
}
if (readTotal % NOTIF == 0) {
console.log(`processed ${readTotal} profiles`)
}
})
)
.catch((e) => {
console.log(e)
appendFile(rejected, line, () => {
readTotal++
await new Promise<void>((resolve, reject) => {
// line process function
function process(line: string) {
queueSize++
if (queueSize > LIMIT) {
linereader.pause()
}
try {
const cplus = JSON.parse(line)
wrapCplusInIndexRequest(cplus, line, convertImg)
.then((cid) => timestampToKey(cplus.time * TIMESTAMP_MULTIPLIER) + ' ' + cid.toString() + '\n')
.then((l) =>
appendFile(output, l, () => {
readTotal++
queueSize--
if (queueSize < LIMIT) {
linereader.resume()
}
if (readTotal % NOTIF == 0) {
console.log(`processed ${readTotal} profiles`)
}
})
)
.catch((e) => {
console.log(e)
appendFile(rejected, line, () => {
readTotal++
})
})
})
} catch (e) {
appendFile(rejected, line + '\n\n\n', () => {})
} catch (e) {
appendFile(rejected, line + '\n\n\n', () => {})
}
}
}
const linereader = createInterface(createReadStream(input))
linereader.on('line', process)
linereader.on('close', () => console.log('done'))
// stream
const linereader = createInterface(createReadStream(input))
linereader.on('line', process)
linereader.on('close', resolve)
linereader.on('error', reject)
})
console.log('done.')
}
// expects to receive a file with on each line a label and the index request CID
......@@ -124,9 +132,9 @@ async function main() {
// 26 minutes
// this can take a while because ~50000 profiles are downloaded in raw format independantly
// 'https://g1.data.e-is.pro'
await downloadAllCplusProfilesRaw('https://g1data.dns1.us', './input/cplusimport.jsonl')
await downloadAllCplusProfilesRaw('https://g1.data.e-is.pro', './input/cplusimport.jsonl')
console.log(Date.now(), 'start wraping in index requests')
console.log(Date.now(), 'start wrapping in index requests')
// 12 minutes
// speed is reduced to limit RAM usage and concurrent writes to IPFS node
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment