diff --git a/src/cesium-plus.ts b/src/cesium-plus.ts index a1d0fe252500e96a5d83af450416a8ae738c6538..8203bc2dedcd5fd93fa5603567a89e7943932c7f 100644 --- a/src/cesium-plus.ts +++ b/src/cesium-plus.ts @@ -7,30 +7,10 @@ import type { CplusProfile, Avatar, IndexRequest } from './types' // ========================= import functions -/// C+ profile to index request -function cplusProfileToIndexRequest(profile: CplusProfile, profileCid: CID): IndexRequest { - return { - pubkey: profile.issuer, - kind: CESIUM_PLUS_PROFILE_IMPORT, - data: profileCid, - time: profile.time * 1000, - sig: '' // signature is inside document for old C+ data - } -} - -/// adds all cids by groups ot size `groupBy` -export async function processCesiumPlusImport(profileCids: CID[], groupBy: number): Promise<CID> { - const rootNode: Array<Promise<CID>> = [] - for (let i = 0; i < Math.floor(profileCids.length / groupBy); i++) { - const group = profileCids.slice(groupBy * i, groupBy * (i + 1)) - const cid = kubo.dag.put(group) as Promise<CID> - rootNode.push(cid) - } - return Promise.all(rootNode).then((r: CID[]) => kubo.dag.put(r) as Promise<CID>) -} - -/// if avatar is present, upload it as a separate file instead -export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> { +// UNUSED +/// upload cesium plus profile as dag instead of raw json +// if avatar is present, upload it as a separate file instead +async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> { const { avatar, ...profileWithoutAvatar } = obj if (avatar != undefined && (avatar as Avatar)._content != undefined) { const buffer = Buffer.from((avatar as Avatar)._content, 'base64') @@ -45,59 +25,29 @@ export async function processCesiumPlusProfile(obj: CplusProfile): Promise<CID> // ========================= merge functions -/// retreive all C+ data as index requests sorted by key (timestamp) -async function allCplusAsIndexRequestCids(cplusrootCID: CID): Promise<Array<[string, CID]>> { - console.log(Date.now() + ' getting all cplus data') - const allCIDs: Array<Promise<[string, CID]>> = [] - const cplusroot = await kubo.dag.get(cplusrootCID) - for (let chunkcid of cplusroot.value) { - const chunk = await kubo.dag.get(chunkcid) - for (let pcid of chunk.value) { - const profileIR: Promise<[string, CID]> = kubo.dag - .get(pcid) - .then((v) => cplusProfileToIndexRequest(v.value, pcid)) - .then((r: IndexRequest) => Promise.all([timestampToKey(r.time), kubo.dag.put(r)] as [string, Promise<CID>])) - allCIDs.push(profileIR) - } - } - return Promise.all(allCIDs).then((r) => r.sort()) -} - -/// import cplus index requests to AMT -// about 90 seconds to get C+ data and convert to index requests -// about 90 seconds to merge data 1000 by 1000 -// this makes less than 3 minutes in total -// requires maxSockets to be quite high -export async function cplusIndexRequestsToAMT(cplusrootCID: CID, rootNodeCid: CID) { - const chunkSize = 5000 - console.log('getting all cplus index requests') - const requests = await allCplusAsIndexRequestCids(cplusrootCID) - const n = requests.length - console.log(Date.now() + ' merging') - for (let i = 0; i < n / chunkSize; i++) { - console.log(Date.now() + ' chunk number ' + i) - const chunk = requests.slice(i * chunkSize, (i + 1) * chunkSize) - const tree = arrayToVinode(chunk) // partial tree for this chunk - rootNodeCid = await mergeInodesSyncCID(rootNodeCid, tree) - console.log('new root node ' + rootNodeCid.toString()) - } -} - /// wrap cplus raw document in index request -export async function cplusRawIrCID(cplus: CplusProfile, cplusRaw: string): Promise<CID> { +export async function wrapCplusInIndexRequest(cplus: CplusProfile, cplusRaw: string, convertImg: boolean): Promise<CID> { + // process avatar separately if requested + const avatar = cplus.avatar as Avatar + if (convertImg && avatar != undefined && avatar._content != undefined) { + const buffer = Buffer.from(avatar._content, 'base64') + const fileCandidate = { content: new Uint8Array(buffer) } + kubo.add(fileCandidate) + } + // return index request CID return kubo .add(cplusRaw) .then((cid) => ({ pubkey: cplus.issuer, kind: CESIUM_PLUS_PROFILE_IMPORT, - data: cid, + data: cid.cid, time: cplus.time * 1000, sig: null })) .then((ir) => kubo.dag.put(ir)) } -/// import cplus index requests to AMT +/// import sorted cplus index requests to AMT export async function cplusIrToAMT(requests: Array<[string, CID]>, rootNodeCid: CID) { const chunkSize = 5000 const n = requests.length diff --git a/src/indexer/database.ts b/src/indexer/database.ts index aaaad2694026af4bcf9e94a192f7814e0401f36e..dffc32238b274f63b8358d37b08255a43a85cde6 100644 --- a/src/indexer/database.ts +++ b/src/indexer/database.ts @@ -19,9 +19,17 @@ const env = { // group query and param builder to limit error interface QueryBuilder { + // SQL query query: string + // data getter when there can be some manipulation with data CID + dataGetter: (dataCID: CID) => Promise<any> + // data transform before param builder + dataTransform: (irCID: CID, ir: IndexRequest, dataCID: CID, data: any) => Promise<any> + // build params for query paramBuilder: (irCID: CID, ir: IndexRequest, dataCID: CID, data: any) => any[] } +const defaultDataGetter = (dataCID: CID) => kubo.dag.get(dataCID).then((d) => d.value) +const defaultDataTransform = (_a: CID, _b: IndexRequest, _c: CID, d: any) => d // initialize client const { Client } = pg @@ -81,6 +89,8 @@ const cesiumPlusProfile: QueryBuilder = { city = EXCLUDED.city, socials = EXCLUDED.socials; `, + dataGetter: defaultDataGetter, + dataTransform: defaultDataTransform, paramBuilder: (irCID: CID, ir: IndexRequest, dataCID: CID, data: CplusProfile) => [ // $1 index_request_cid irCID.toString(), @@ -107,6 +117,38 @@ const cesiumPlusProfile: QueryBuilder = { ] } +// query builder for raw cplus data +const cesiumPlusProfileRaw: QueryBuilder = { + // same query + query: cesiumPlusProfile.query, + // data is not stored directly as dag but as unix fs json + dataGetter: async (dataCID: CID) => { + const stream = kubo.cat(dataCID) + const decoder = new TextDecoder() + let str = '' + for await (const chunk of stream) { + str += decoder.decode(chunk) + } + return str + }, + // transform data before indexing + // here "data" is a dag-pb UnixFS + dataTransform: async (irCID, ir, dataCID, data) => { + const cplus: any = JSON.parse(data) + const avatar: any = cplus.avatar + // transform base64 avatar to CID if present + if (avatar != undefined && avatar._content != undefined) { + const buffer = Buffer.from(avatar._content, 'base64') + const fileCandidate = { content: new Uint8Array(buffer) } + // optimization: compute the hash locally without submitting it to kubo + // difficulty: check that the hash is the same + cplus.avatar = (await kubo.add(fileCandidate, { onlyHash: true })).cid + } + return cplus + }, + paramBuilder: cesiumPlusProfile.paramBuilder +} + // transaction comment query and param builder // prevents overwrite const txComment: QueryBuilder = { @@ -116,6 +158,8 @@ const txComment: QueryBuilder = { ON CONFLICT (pubkey, tx_id) DO NOTHING; `, + dataGetter: defaultDataGetter, + dataTransform: defaultDataTransform, paramBuilder: (irCID: CID, ir: IndexRequest, _dataCID: CID, data: TxComment) => [ // $1 index_request_cid irCID.toString(), @@ -148,13 +192,11 @@ async function handleIrWithNonNullData<T>(irCID: CID, ir: IndexRequest, q: Query console.log('no data when required') return } - kubo.dag - .get(dataCID) - .then((d) => d.value) - .then(dataHandler<T>(q, irCID, ir, dataCID)) + q.dataGetter(dataCID) + .then((data) => q.dataTransform(irCID, ir, dataCID, data).then(dataHandler<T>(q, irCID, ir, dataCID))) .catch((e) => { - console.log(e) - console.log('â˜ï¸ could not get data to index ' + dataCID) + console.log('â˜ï¸ error indexing ' + dataCID) + console.error(e) }) } @@ -172,7 +214,7 @@ export async function handleInsertRequest(irCID: CID, ir: IndexRequest) { case CESIUM_PLUS_PROFILE_IMPORT.toString(): // transform base58 pubkey to ss58 address with gdev prefix ir.pubkey = base58ToSS58(ir.pubkey, GDEV_PREFIX) - handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfile) + handleIrWithNonNullData<CplusProfile>(irCID, ir, cesiumPlusProfileRaw) break // delete cesium plus profile diff --git a/src/scripts/cesium-plus-import.ts b/src/scripts/cesium-plus-import.ts index 4a4d4e699d6557f69d8c9a22f3aae36abcc3cac6..0682028a883f2e67482cf973e668724fb5956a13 100644 --- a/src/scripts/cesium-plus-import.ts +++ b/src/scripts/cesium-plus-import.ts @@ -1,61 +1,10 @@ import { CID } from 'multiformats' -import { - processCesiumPlusImport, - processCesiumPlusProfile, - cplusIndexRequestsToAMT, - cplusRawIrCID, - cplusIrToAMT -} from '../cesium-plus' +import { wrapCplusInIndexRequest, cplusIrToAMT } from '../cesium-plus' import { createInterface } from 'readline' import { appendFile, createReadStream } from 'fs' import { timestampToKey } from '../processor' import { readFile } from 'fs/promises' -// profile files -// const PROFILES = '/home/hugo/ipfs/v2s-datapod/migrate_csplus/profile_csplus.json' -const profiles = (n: number) => `/home/hugo/ipfs/v2s-datapod/migrate_csplus/profile_csplus_${n}.json` -const CHUNKS = 11 -const GROUPBY = 256 - -/// do import cesium data from chunk files to a single basic root dag -// use json chunks as input -// process base64 images apart -// groups output in a single dag for easy import -async function doImport() { - const cids: CID[] = [] - // manage chunk by chunk to limit memory usage - for (let chunk = 0; chunk < CHUNKS; chunk++) { - const result = await readFile(profiles(chunk), 'utf8') - const obj = JSON.parse(result) - console.log('chunk ' + chunk) - await Promise.all(obj.map(processCesiumPlusProfile)).then((r: CID[]) => cids.push(...r)) - } - console.log('processing...') - console.log(cids.length) - - processCesiumPlusImport(cids, GROUPBY).then((cid) => console.log(cid)) -} - -// // in node, this is a bit faster than in chromium, and we get about 25 profiles per second instead of 18 -// // but this shows we should optimise AMT inserting for higher throughput -// function doImportToAMT() { -// REMOVED -// } - -// // this is a more optimized version that takes 50 seconds to import all 50000 profiles → 1000 profiles per second -// // this version sometimes crashes with EADDRNOTAVAIL because it exceeds the number of concurrent connections -// async function doMergeAMT() { -// REMOVED -// } - -/// import all previously imported C+ data as index requests into a single AMT -async function doAllCplusCidsToAMT() { - const cplusCID = CID.parse('bafyreie74jtf23zzz2tdgsz7axfrm4pidje43ypqn25v4gkdtfjbcj62km') // cesium plus import - const rootNodeCid = CID.parse('bafyreicvlp2p65agkxpzcboedba7zit55us4zvtyyq2wesvsdedy6irwfy') // empty root cid - - cplusIndexRequestsToAMT(cplusCID, rootNodeCid) -} - // fetch raw profile // fetchRawCplus('38QzVPhRLbEiqJtvCmRY6A6SraheNA6fJbomFX75b2qb').then(console.log) async function fetchRawCplus(id: string): Promise<string> { @@ -64,43 +13,32 @@ async function fetchRawCplus(id: string): Promise<string> { } /// download all c+ data and add them to a file -// strange behavior: all lines are written (`wc -l input/cplusimport.jsonl`) way before logging finishes -async function getAllCplusIr() { - const filename = './input/cplusimport.jsonl' +async function downloadAllCplusProfilesRaw(filename: string) { const SCROLL_TIME = '5m' - const PAGE_SIZE = 1000 + const PAGE_SIZE = 100 const ENDPOINT = 'https://g1.data.e-is.pro' - const URL = ENDPOINT + '/user/profile/_search?scroll=' + SCROLL_TIME + const URL = `${ENDPOINT}/user/profile/_search?scroll=${SCROLL_TIME}&size=${PAGE_SIZE}` + const NOTIF = 1000 // first batch - let batch = await fetch(URL + '&_source=false&filter_path=took,_scroll_id,hits.hits._id,hits.total', { + let batch = await fetch(`${URL}&filter_path=_scroll_id,hits.total,hits.hits._id`, { method: 'post', body: JSON.stringify({ - query: { match_all: {} }, - size: PAGE_SIZE + query: { match_all: {} } }) }).then((b) => b.json()) let scroll_id = batch._scroll_id const total = batch.hits.total - let imported = 0 - let totalimported = 0 + let scrolled = PAGE_SIZE - console.log(`importing ${total} cplus profiles...`) + console.log(`downloading ${total} cplus profiles...`) // process batches while available - while (totalimported < total) { - // add to the list + while (scrolled < total) { + // add raw source to the file for (const hit of batch.hits.hits) { fetchRawCplus(hit._id).then((cplusRaw) => appendFile(filename, cplusRaw + '\n', () => {})) } - // console.log(ids) - imported += batch.took - totalimported += batch.took - if (imported > 1000) { - console.log(`${totalimported.toString().padStart(5)}/${total}`) - imported = 0 - } - // take next batch batch = await fetch(ENDPOINT + '/_search/scroll', { method: 'post', @@ -109,21 +47,25 @@ async function getAllCplusIr() { scroll_id: scroll_id }) }).then((b) => b.json()) - scroll_id = batch._scroll_id - // console.log(scroll_id) + scrolled += PAGE_SIZE + if (scrolled % NOTIF == 0) { + console.log(`${scrolled.toString().padStart(5)}/${total}`) + } } + console.log(`${total}/${total}, done.`) } /// put all raw cplus profiles to ipfs node in index request and write result to a file -async function putIrToIPFS() { +async function wrapRawProfilesInIndexRequest() { const LIMIT = 500 // max number of lines to process simultaneously const NOTIF = 2000 // log every N lines processed const input = './input/cplusimport.jsonl' const output = './input/cplusIR.txt' const rejected = './input/cplusHS.txt' + const convertImg = false // also upload base64 image as separate file for later reference let queueSize = 0 - let read = 0 + let readTotal = 0 function process(line: string) { queueSize++ @@ -132,24 +74,24 @@ async function putIrToIPFS() { } try { const cplus = JSON.parse(line) - cplusRawIrCID(cplus, line) + wrapCplusInIndexRequest(cplus, line, convertImg) .then((cid) => timestampToKey(cplus.time) + ' ' + cid.toString() + '\n') .then((l) => appendFile(output, l, () => { - read++ + readTotal++ queueSize-- if (queueSize < LIMIT) { linereader.resume() } - if (read % NOTIF == 0) { - console.log(`processed ${read} lines`) + if (readTotal % NOTIF == 0) { + console.log(`processed ${readTotal} profiles`) } }) ) .catch((e) => { console.log(e) appendFile(rejected, line, () => { - read++ + readTotal++ }) }) } catch (e) { @@ -178,10 +120,15 @@ async function importAllCplusToAMT() { await cplusIrToAMT(requests, rootNodeCid) } -// TODO use command line args to choose what to do -// doImport() -// doAllCplusCidsToAMT() -// importAllCplusToAMT() -// getAllCplusIr() -// putIrToIPFS() -// importAllCplusToAMT() +// 26 minutes +// this can take a while because ~50000 profiles are downloaded in raw format independantly +// downloadAllCplusProfilesRaw('./input/cplusimport.jsonl') + +// 12 minutes +// speed is reduced to limit RAM usage and concurrent writes to IPFS node +// wrapRawProfilesInIndexRequest() + +// 3 minutes +// import by batch and logs successive cids +importAllCplusToAMT() +// → bafyreih4jspnqnsd4o3sdqv7c765uyylhtlh5majjw6aq6clilkq7tmqey \ No newline at end of file