diff --git a/app/lib/dal/drivers/LokiFsAdapter.ts b/app/lib/dal/drivers/LokiFsAdapter.ts index d703da8909dd89fab046a013fa413dcb2cb4bed9..4d278753e0dfbc251b79bace76e7de17f595ba3e 100644 --- a/app/lib/dal/drivers/LokiFsAdapter.ts +++ b/app/lib/dal/drivers/LokiFsAdapter.ts @@ -17,13 +17,13 @@ import {CFSCore} from "../fileDALs/CFSCore" import {getNanosecondsTime} from "../../../ProcessCpuProfiler" import {NewLogger} from "../../logger" -interface Iterator<T> { +export interface Iterator<T> { next(value?: any): IteratorResult<T> return?(value?: any): IteratorResult<T> throw?(e?: any): IteratorResult<T> } -interface IteratorResult<T> { +export interface IteratorResult<T> { done: boolean value: T } @@ -185,9 +185,7 @@ export class LokiFsAdapter { li = this.generateDestructured({ partition: pinext.value }); // iterate each of the lines generated by generateDestructured() - for(let outline of li) { - await this.cfs.appendFile(filename, outline + "\n") - } + await this.cfs.fsStreamTo(filename, li) self.saveNextPartition(commit, pi, callback) }; @@ -234,6 +232,10 @@ export class LokiFsAdapter { for(docidx=0; docidx<doccount; docidx++) { yield JSON.stringify(coll.data[docidx]); } + + if (doccount === 0) { + yield '' + } } }; diff --git a/app/lib/dal/fileDALs/CFSCore.ts b/app/lib/dal/fileDALs/CFSCore.ts index 9e0e0d3ad9373a7c83858ba75b1f2a3effdd1b55..28971e6c545fc7e8fc39f9513c8fe53ec2f69a41 100644 --- a/app/lib/dal/fileDALs/CFSCore.ts +++ b/app/lib/dal/fileDALs/CFSCore.ts @@ -228,11 +228,7 @@ export class CFSCore { return path.normalize(filePath).replace(/\//g, '__').replace(/\\/g, '__'); } - getPath(file: string) { - return path.join(this.rootPath, file) - } - - appendFile(filename: string, content: string) { - return this.qfs.fsAppend(path.join(this.rootPath, filename), content) + fsStreamTo(filename: string, iterator: IterableIterator<string>) { + return this.qfs.fsStreamTo(path.join(this.rootPath, filename), iterator) } } diff --git a/app/lib/system/directory.ts b/app/lib/system/directory.ts index 8254b42995500fe7d07714a691892e8ff87880d1..d0717051c95173a964ba3107e5ec18faf280d5a5 100644 --- a/app/lib/system/directory.ts +++ b/app/lib/system/directory.ts @@ -12,7 +12,7 @@ // GNU Affero General Public License for more details. import * as path from "path" -import * as fs from "fs" +import * as fs from 'fs' import {SQLiteDriver} from "../dal/drivers/SQLiteDriver" import {CFSCore} from "../dal/fileDALs/CFSCore" import {WoTBInstance, WoTBObject} from "../wot" @@ -41,12 +41,12 @@ export interface FileSystem { fsWrite(file:string, content:string): Promise<void> fsMakeDirectory(dir:string): Promise<void> fsRemoveTree(dir:string): Promise<void> - fsAppend(file: string, content: string): Promise<void> + fsStreamTo(file: string, iterator: IterableIterator<string>): Promise<void> } class QioFileSystem implements FileSystem { - constructor(private qio:any) {} + constructor(private qio:any, private isMemory:boolean = false) {} async fsExists(file:string) { return this.qio.exists(file) @@ -68,8 +68,28 @@ class QioFileSystem implements FileSystem { return this.qio.write(file, content) } - fsAppend(file: string, content: string): Promise<void> { - return this.qio.append(file, content) + async fsStreamTo(file: string, iterator: IterableIterator<string>): Promise<void> { + if (this.isMemory) { + for (const line of iterator) { + await this.qio.append(file, line) + } + } else { + // Use NodeJS streams for faster writing + let wstream = fs.createWriteStream(file) + await new Promise(async (res, rej) => { + // When done, return + wstream.on('close', (err:any) => { + if (err) return rej(err) + res() + }) + // Write each line + for (const line of iterator) { + wstream.write(line + "\n") + } + // End the writing + wstream.end() + }) + } } fsMakeDirectory(dir: string): Promise<void> {