Skip to content
Snippets Groups Projects
Commit 0564923c authored by Hugo Trentesaux's avatar Hugo Trentesaux
Browse files

custom indexing for raw cplus data

parent e441b7dc
Branches
No related tags found
No related merge requests found
...@@ -7,30 +7,10 @@ import type { CplusProfile, Avatar, IndexRequest } from './types' ...@@ -7,30 +7,10 @@ import type { CplusProfile, Avatar, IndexRequest } from './types'
// ========================= import functions // ========================= import functions
/// C+ profile to index request // UNUSED
function cplusProfileToIndexRequest(profile: CplusProfile, profileCid: CID): IndexRequest { /// upload cesium plus profile as dag instead of raw json
return { // if avatar is present, upload it as a separate file instead
pubkey: profile.issuer, async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> {
kind: CESIUM_PLUS_PROFILE_IMPORT,
data: profileCid,
time: profile.time * 1000,
sig: '' // signature is inside document for old C+ data
}
}
/// adds all cids by groups ot size `groupBy`
export async function processCesiumPlusImport(profileCids: CID[], groupBy: number): Promise<CID> {
const rootNode: Array<Promise<CID>> = []
for (let i = 0; i < Math.floor(profileCids.length / groupBy); i++) {
const group = profileCids.slice(groupBy * i, groupBy * (i + 1))
const cid = kubo.dag.put(group) as Promise<CID>
rootNode.push(cid)
}
return Promise.all(rootNode).then((r: CID[]) => kubo.dag.put(r) as Promise<CID>)
}
/// if avatar is present, upload it as a separate file instead
export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> {
const { avatar, ...profileWithoutAvatar } = obj const { avatar, ...profileWithoutAvatar } = obj
if (avatar != undefined && (avatar as Avatar)._content != undefined) { if (avatar != undefined && (avatar as Avatar)._content != undefined) {
const buffer = Buffer.from((avatar as Avatar)._content, 'base64') const buffer = Buffer.from((avatar as Avatar)._content, 'base64')
...@@ -45,59 +25,29 @@ export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> ...@@ -45,59 +25,29 @@ export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID>
// ========================= merge functions // ========================= merge functions
/// retreive all C+ data as index requests sorted by key (timestamp)
async function allCplusAsIndexRequestCids(cplusrootCID: CID): Promise<Array<[string, CID]>> {
console.log(Date.now() + ' getting all cplus data')
const allCIDs: Array<Promise<[string, CID]>> = []
const cplusroot = await kubo.dag.get(cplusrootCID)
for (let chunkcid of cplusroot.value) {
const chunk = await kubo.dag.get(chunkcid)
for (let pcid of chunk.value) {
const profileIR: Promise<[string, CID]> = kubo.dag
.get(pcid)
.then((v) => cplusProfileToIndexRequest(v.value, pcid))
.then((r: IndexRequest) => Promise.all([timestampToKey(r.time), kubo.dag.put(r)] as [string, Promise<CID>]))
allCIDs.push(profileIR)
}
}
return Promise.all(allCIDs).then((r) => r.sort())
}
/// import cplus index requests to AMT
// about 90 seconds to get C+ data and convert to index requests
// about 90 seconds to merge data 1000 by 1000
// this makes less than 3 minutes in total
// requires maxSockets to be quite high
export async function cplusIndexRequestsToAMT(cplusrootCID: CID, rootNodeCid: CID) {
const chunkSize = 5000
console.log('getting all cplus index requests')
const requests = await allCplusAsIndexRequestCids(cplusrootCID)
const n = requests.length
console.log(Date.now() + ' merging')
for (let i = 0; i < n / chunkSize; i++) {
console.log(Date.now() + ' chunk number ' + i)
const chunk = requests.slice(i * chunkSize, (i + 1) * chunkSize)
const tree = arrayToVinode(chunk) // partial tree for this chunk
rootNodeCid = await mergeInodesSyncCID(rootNodeCid, tree)
console.log('new root node ' + rootNodeCid.toString())
}
}
/// wrap cplus raw document in index request /// wrap cplus raw document in index request
export async function cplusRawIrCID(cplus: CplusProfile, cplusRaw: string): Promise<CID> { export async function wrapCplusInIndexRequest(cplus: CplusProfile, cplusRaw: string, convertImg: boolean): Promise<CID> {
// process avatar separately if requested
const avatar = cplus.avatar as Avatar
if (convertImg && avatar != undefined && avatar._content != undefined) {
const buffer = Buffer.from(avatar._content, 'base64')
const fileCandidate = { content: new Uint8Array(buffer) }
kubo.add(fileCandidate)
}
// return index request CID
return kubo return kubo
.add(cplusRaw) .add(cplusRaw)
.then((cid) => ({ .then((cid) => ({
pubkey: cplus.issuer, pubkey: cplus.issuer,
kind: CESIUM_PLUS_PROFILE_IMPORT, kind: CESIUM_PLUS_PROFILE_IMPORT,
data: cid, data: cid.cid,
time: cplus.time * 1000, time: cplus.time * 1000,
sig: null sig: null
})) }))
.then((ir) => kubo.dag.put(ir)) .then((ir) => kubo.dag.put(ir))
} }
/// import cplus index requests to AMT /// import sorted cplus index requests to AMT
export async function cplusIrToAMT(requests: Array<[string, CID]>, rootNodeCid: CID) { export async function cplusIrToAMT(requests: Array<[string, CID]>, rootNodeCid: CID) {
const chunkSize = 5000 const chunkSize = 5000
const n = requests.length const n = requests.length
......
...@@ -19,9 +19,17 @@ const env = { ...@@ -19,9 +19,17 @@ const env = {
// group query and param builder to limit error // group query and param builder to limit error
interface QueryBuilder { interface QueryBuilder {
// SQL query
query: string query: string
// data getter when there can be some manipulation with data CID
dataGetter: (dataCID: CID) => Promise<any>
// data transform before param builder
dataTransform: (irCID: CID, ir: IndexRequest, dataCID: CID, data: any) => Promise<any>
// build params for query
paramBuilder: (irCID: CID, ir: IndexRequest, dataCID: CID, data: any) => any[] paramBuilder: (irCID: CID, ir: IndexRequest, dataCID: CID, data: any) => any[]
} }
const defaultDataGetter = (dataCID: CID) => kubo.dag.get(dataCID).then((d) => d.value)
const defaultDataTransform = (_a: CID, _b: IndexRequest, _c: CID, d: any) => d
// initialize client // initialize client
const { Client } = pg const { Client } = pg
...@@ -81,6 +89,8 @@ const cesiumPlusProfile: QueryBuilder = { ...@@ -81,6 +89,8 @@ const cesiumPlusProfile: QueryBuilder = {
city = EXCLUDED.city, city = EXCLUDED.city,
socials = EXCLUDED.socials; socials = EXCLUDED.socials;
`, `,
dataGetter: defaultDataGetter,
dataTransform: defaultDataTransform,
paramBuilder: (irCID: CID, ir: IndexRequest, dataCID: CID, data: CplusProfile) => [ paramBuilder: (irCID: CID, ir: IndexRequest, dataCID: CID, data: CplusProfile) => [
// $1 index_request_cid // $1 index_request_cid
irCID.toString(), irCID.toString(),
...@@ -107,6 +117,38 @@ const cesiumPlusProfile: QueryBuilder = { ...@@ -107,6 +117,38 @@ const cesiumPlusProfile: QueryBuilder = {
] ]
} }
// query builder for raw cplus data
const cesiumPlusProfileRaw: QueryBuilder = {
// same query
query: cesiumPlusProfile.query,
// data is not stored directly as dag but as unix fs json
dataGetter: async (dataCID: CID) => {
const stream = kubo.cat(dataCID)
const decoder = new TextDecoder()
let str = ''
for await (const chunk of stream) {
str += decoder.decode(chunk)
}
return str
},
// transform data before indexing
// here "data" is a dag-pb UnixFS
dataTransform: async (irCID, ir, dataCID, data) => {
const cplus: any = JSON.parse(data)
const avatar: any = cplus.avatar
// transform base64 avatar to CID if present
if (avatar != undefined && avatar._content != undefined) {
const buffer = Buffer.from(avatar._content, 'base64')
const fileCandidate = { content: new Uint8Array(buffer) }
// optimization: compute the hash locally without submitting it to kubo
// difficulty: check that the hash is the same
cplus.avatar = (await kubo.add(fileCandidate, { onlyHash: true })).cid
}
return cplus
},
paramBuilder: cesiumPlusProfile.paramBuilder
}
// transaction comment query and param builder // transaction comment query and param builder
// prevents overwrite // prevents overwrite
const txComment: QueryBuilder = { const txComment: QueryBuilder = {
...@@ -116,6 +158,8 @@ const txComment: QueryBuilder = { ...@@ -116,6 +158,8 @@ const txComment: QueryBuilder = {
ON CONFLICT (pubkey, tx_id) ON CONFLICT (pubkey, tx_id)
DO NOTHING; DO NOTHING;
`, `,
dataGetter: defaultDataGetter,
dataTransform: defaultDataTransform,
paramBuilder: (irCID: CID, ir: IndexRequest, _dataCID: CID, data: TxComment) => [ paramBuilder: (irCID: CID, ir: IndexRequest, _dataCID: CID, data: TxComment) => [
// $1 index_request_cid // $1 index_request_cid
irCID.toString(), irCID.toString(),
...@@ -148,13 +192,11 @@ async function handleIrWithNonNullData<T>(irCID: CID, ir: IndexRequest, q: Query ...@@ -148,13 +192,11 @@ async function handleIrWithNonNullData<T>(irCID: CID, ir: IndexRequest, q: Query
console.log('no data when required') console.log('no data when required')
return return
} }
kubo.dag q.dataGetter(dataCID)
.get(dataCID) .then((data) => q.dataTransform(irCID, ir, dataCID, data).then(dataHandler<T>(q, irCID, ir, dataCID)))
.then((d) => d.value)
.then(dataHandler<T>(q, irCID, ir, dataCID))
.catch((e) => { .catch((e) => {
console.log(e) console.log('☁️ error indexing ' + dataCID)
console.log('☁️ could not get data to index ' + dataCID) console.error(e)
}) })
} }
...@@ -172,7 +214,7 @@ export async function handleInsertRequest(irCID: CID, ir: IndexRequest) { ...@@ -172,7 +214,7 @@ export async function handleInsertRequest(irCID: CID, ir: IndexRequest) {
case CESIUM_PLUS_PROFILE_IMPORT.toString(): case CESIUM_PLUS_PROFILE_IMPORT.toString():
// transform base58 pubkey to ss58 address with gdev prefix // transform base58 pubkey to ss58 address with gdev prefix
ir.pubkey = base58ToSS58(ir.pubkey, GDEV_PREFIX) ir.pubkey = base58ToSS58(ir.pubkey, GDEV_PREFIX)
handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfile) handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfileRaw)
break break
// delete cesium plus profile // delete cesium plus profile
......
import { CID } from 'multiformats' import { CID } from 'multiformats'
import { import { wrapCplusInIndexRequest, cplusIrToAMT } from '../cesium-plus'
processCesiumPlusImport,
processCesiumPlusProfile,
cplusIndexRequestsToAMT,
cplusRawIrCID,
cplusIrToAMT
} from '../cesium-plus'
import { createInterface } from 'readline' import { createInterface } from 'readline'
import { appendFile, createReadStream } from 'fs' import { appendFile, createReadStream } from 'fs'
import { timestampToKey } from '../processor' import { timestampToKey } from '../processor'
import { readFile } from 'fs/promises' import { readFile } from 'fs/promises'
// profile files
// const PROFILES = '/home/hugo/ipfs/v2s-datapod/migrate_csplus/profile_csplus.json'
const profiles = (n: number) => `/home/hugo/ipfs/v2s-datapod/migrate_csplus/profile_csplus_${n}.json`
const CHUNKS = 11
const GROUPBY = 256
/// do import cesium data from chunk files to a single basic root dag
// use json chunks as input
// process base64 images apart
// groups output in a single dag for easy import
async function doImport() {
const cids: CID[] = []
// manage chunk by chunk to limit memory usage
for (let chunk = 0; chunk < CHUNKS; chunk++) {
const result = await readFile(profiles(chunk), 'utf8')
const obj = JSON.parse(result)
console.log('chunk ' + chunk)
await Promise.all(obj.map(processCesiumPlusProfile)).then((r: CID[]) => cids.push(...r))
}
console.log('processing...')
console.log(cids.length)
processCesiumPlusImport(cids, GROUPBY).then((cid) => console.log(cid))
}
// // in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18
// // but this shows we should optimise AMT inserting for higher throughput
// function doImportToAMT() {
// REMOVED
// }
// // this is a more optimized version that takes 50 seconds to import all 50000 profiles → 1000 profiles per second
// // this version sometimes crashes with EADDRNOTAVAIL because it exceeds the number of concurrent connections
// async function doMergeAMT() {
// REMOVED
// }
/// import all previously imported C+ data as index requests into a single AMT
async function doAllCplusCidsToAMT() {
const cplusCID = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import
const rootNodeCid = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid
cplusIndexRequestsToAMT(cplusCID, rootNodeCid)
}
// fetch raw profile // fetch raw profile
// fetchRawCplus('38QzVPhRLbEiqJtvCmRY6A6SraheNA6fJbomFX75b2qb').then(console.log) // fetchRawCplus('38QzVPhRLbEiqJtvCmRY6A6SraheNA6fJbomFX75b2qb').then(console.log)
async function fetchRawCplus(id: string): Promise<string> { async function fetchRawCplus(id: string): Promise<string> {
...@@ -64,43 +13,32 @@ async function fetchRawCplus(id: string): Promise<string> { ...@@ -64,43 +13,32 @@ async function fetchRawCplus(id: string): Promise<string> {
} }
/// download all c+ data and add them to a file /// download all c+ data and add them to a file
// strange behavior: all lines are written (`wc -l input/cplusimport.jsonl`) way before logging finishes async function downloadAllCplusProfilesRaw(filename: string) {
async function getAllCplusIr() {
const filename = './input/cplusimport.jsonl'
const SCROLL_TIME = '5m' const SCROLL_TIME = '5m'
const PAGE_SIZE = 1000 const PAGE_SIZE = 100
const ENDPOINT = 'https://g1.data.e-is.pro' const ENDPOINT = 'https://g1.data.e-is.pro'
const URL = ENDPOINT + '/user/profile/_search?scroll=' + SCROLL_TIME const URL = `${ENDPOINT}/user/profile/_search?scroll=${SCROLL_TIME}&size=${PAGE_SIZE}`
const NOTIF = 1000
// first batch // first batch
let batch = await fetch(URL + '&_source=false&filter_path=took,_scroll_id,hits.hits._id,hits.total', { let batch = await fetch(`${URL}&filter_path=_scroll_id,hits.total,hits.hits._id`, {
method: 'post', method: 'post',
body: JSON.stringify({ body: JSON.stringify({
query: { match_all: {} }, query: { match_all: {} }
size: PAGE_SIZE
}) })
}).then((b) => b.json()) }).then((b) => b.json())
let scroll_id = batch._scroll_id let scroll_id = batch._scroll_id
const total = batch.hits.total const total = batch.hits.total
let imported = 0 let scrolled = PAGE_SIZE
let totalimported = 0
console.log(`importing ${total} cplus profiles...`) console.log(`downloading ${total} cplus profiles...`)
// process batches while available // process batches while available
while (totalimported < total) { while (scrolled < total) {
// add to the list // add raw source to the file
for (const hit of batch.hits.hits) { for (const hit of batch.hits.hits) {
fetchRawCplus(hit._id).then((cplusRaw) => appendFile(filename, cplusRaw + '\n', () => {})) fetchRawCplus(hit._id).then((cplusRaw) => appendFile(filename, cplusRaw + '\n', () => {}))
} }
// console.log(ids)
imported += batch.took
totalimported += batch.took
if (imported > 1000) {
console.log(`${totalimported.toString().padStart(5)}/${total}`)
imported = 0
}
// take next batch // take next batch
batch = await fetch(ENDPOINT + '/_search/scroll', { batch = await fetch(ENDPOINT + '/_search/scroll', {
method: 'post', method: 'post',
...@@ -109,21 +47,25 @@ async function getAllCplusIr() { ...@@ -109,21 +47,25 @@ async function getAllCplusIr() {
scroll_id: scroll_id scroll_id: scroll_id
}) })
}).then((b) => b.json()) }).then((b) => b.json())
scroll_id = batch._scroll_id scroll_id = batch._scroll_id
// console.log(scroll_id) scrolled += PAGE_SIZE
if (scrolled % NOTIF == 0) {
console.log(`${scrolled.toString().padStart(5)}/${total}`)
}
} }
console.log(`${total}/${total}, done.`)
} }
/// put all raw cplus profiles to ipfs node in index request and write result to a file /// put all raw cplus profiles to ipfs node in index request and write result to a file
async function putIrToIPFS() { async function wrapRawProfilesInIndexRequest() {
const LIMIT = 500 // max number of lines to process simultaneously const LIMIT = 500 // max number of lines to process simultaneously
const NOTIF = 2000 // log every N lines processed const NOTIF = 2000 // log every N lines processed
const input = './input/cplusimport.jsonl' const input = './input/cplusimport.jsonl'
const output = './input/cplusIR.txt' const output = './input/cplusIR.txt'
const rejected = './input/cplusHS.txt' const rejected = './input/cplusHS.txt'
const convertImg = false // also upload base64 image as separate file for later reference
let queueSize = 0 let queueSize = 0
let read = 0 let readTotal = 0
function process(line: string) { function process(line: string) {
queueSize++ queueSize++
...@@ -132,24 +74,24 @@ async function putIrToIPFS() { ...@@ -132,24 +74,24 @@ async function putIrToIPFS() {
} }
try { try {
const cplus = JSON.parse(line) const cplus = JSON.parse(line)
cplusRawIrCID(cplus, line) wrapCplusInIndexRequest(cplus, line, convertImg)
.then((cid) => timestampToKey(cplus.time) + ' ' + cid.toString() + '\n') .then((cid) => timestampToKey(cplus.time) + ' ' + cid.toString() + '\n')
.then((l) => .then((l) =>
appendFile(output, l, () => { appendFile(output, l, () => {
read++ readTotal++
queueSize-- queueSize--
if (queueSize < LIMIT) { if (queueSize < LIMIT) {
linereader.resume() linereader.resume()
} }
if (read % NOTIF == 0) { if (readTotal % NOTIF == 0) {
console.log(`processed ${read} lines`) console.log(`processed ${readTotal} profiles`)
} }
}) })
) )
.catch((e) => { .catch((e) => {
console.log(e) console.log(e)
appendFile(rejected, line, () => { appendFile(rejected, line, () => {
read++ readTotal++
}) })
}) })
} catch (e) { } catch (e) {
...@@ -178,10 +120,15 @@ async function importAllCplusToAMT() { ...@@ -178,10 +120,15 @@ async function importAllCplusToAMT() {
await cplusIrToAMT(requests, rootNodeCid) await cplusIrToAMT(requests, rootNodeCid)
} }
// TODO use command line args to choose what to do // 26 minutes
// doImport() // this can take a while because ~50000 profiles are downloaded in raw format independantly
// doAllCplusCidsToAMT() // downloadAllCplusProfilesRaw('./input/cplusimport.jsonl')
// importAllCplusToAMT()
// getAllCplusIr() // 12 minutes
// putIrToIPFS() // speed is reduced to limit RAM usage and concurrent writes to IPFS node
// importAllCplusToAMT() // wrapRawProfilesInIndexRequest()
// 3 minutes
// import by batch and logs successive cids
importAllCplusToAMT()
// → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment