From 15f88fed0be3af3150cdf89ae54ad5bb76c7d03f Mon Sep 17 00:00:00 2001
From: Benoit Lavenier <benoit.lavenier@e-is.pro>
Date: Wed, 10 May 2023 18:54:48 +0200
Subject: [PATCH] [fix] Optimize access to sources by pubkey, using specific
 index for complex condition - Close #1438

---
 app/lib/common-libs/constants.ts              |  4 +-
 app/lib/dal/fileDAL.ts                        | 23 ++++-
 app/lib/dal/indexDAL/leveldb/LevelDBSindex.ts | 95 +++++++++++++++----
 test/dal/sources-dal.ts                       | 60 ++++++++++--
 4 files changed, 149 insertions(+), 33 deletions(-)

diff --git a/app/lib/common-libs/constants.ts b/app/lib/common-libs/constants.ts
index 8a7e3f4a2..c58cf6bb5 100755
--- a/app/lib/common-libs/constants.ts
+++ b/app/lib/common-libs/constants.ts
@@ -53,7 +53,7 @@ const CONDITIONS =
   "\\)|CSV\\(" +
   CSV_INTEGER +
   "\\))))*";
-
+const CONDITION_SIG_PUBKEY = "SIG\\((" + PUBKEY +")\\)";
 const BMA_REGEXP = /^BASIC_MERKLED_API( ([a-z_][a-z0-9-_.]*))?( ([0-9.]+))?( ([0-9a-f:]+))?( ([0-9]+))$/;
 const BMAS_REGEXP = /^BMAS( ([a-z_][a-z0-9-_.]*))?( ([0-9.]+))?( ([0-9a-f:]+))?( ([0-9]+))( (\/.+))?$/;
 const BMATOR_REGEXP = /^BMATOR( ([a-z0-9]{16})\.onion)( ([0-9.]+))?( ([0-9a-f:]+))?( ([0-9]+))$/;
@@ -535,6 +535,8 @@ export const CommonConstants = {
     LOCKTIME: find("Locktime: (" + INTEGER + ")"),
     INLINE_COMMENT: exact(COMMENT),
     OUTPUT_CONDITION: exact(CONDITIONS),
+    OUTPUT_CONDITION_SIG_PUBKEY: find(CONDITION_SIG_PUBKEY),
+    OUTPUT_CONDITION_SIG_PUBKEY_UNIQUE: exact(CONDITION_SIG_PUBKEY)
   },
   PEER: {
     BLOCK: find("Block: (" + INTEGER + "-" + FINGERPRINT + ")"),
diff --git a/app/lib/dal/fileDAL.ts b/app/lib/dal/fileDAL.ts
index 8f4848f77..a53767ea0 100644
--- a/app/lib/dal/fileDAL.ts
+++ b/app/lib/dal/fileDAL.ts
@@ -470,8 +470,27 @@ export class FileDAL implements ServerDAO {
   }
 
   async getAvailableSourcesByPubkey(pubkey: string): Promise<HttpSource[]> {
-    const txAvailable = await this.sindexDAL.getAvailableForPubkey(pubkey);
-    const sources: UDSource[] = await this.dividendDAL.getUDSources(pubkey);
+    const start = Date.now();
+    logger.debug(`Reading sources of pubkey ${pubkey}...`);
+
+    const [txAvailable, sources] = await Promise.all([
+      this.sindexDAL.getAvailableForPubkey(pubkey)
+        .then(res => {
+          logger.debug(
+              "Reading sources of pubkey %s in %s ms",
+              pubkey,
+              Date.now() - start);
+          return res;
+        }),
+      this.dividendDAL.getUDSources(pubkey)
+        .then(res => {
+          logger.debug(
+              "Reading UDs of pubkey %s in %s ms",
+              pubkey,
+              Date.now() - start);
+          return res;
+        })
+    ]);
     return sources
       .map((d) => {
         return {
diff --git a/app/lib/dal/indexDAL/leveldb/LevelDBSindex.ts b/app/lib/dal/indexDAL/leveldb/LevelDBSindex.ts
index a8b4aceba..aeadcc076 100644
--- a/app/lib/dal/indexDAL/leveldb/LevelDBSindex.ts
+++ b/app/lib/dal/indexDAL/leveldb/LevelDBSindex.ts
@@ -1,23 +1,19 @@
-import { MonitorExecutionTime } from "../../../debug/MonitorExecutionTime";
-import {
-  FullSindexEntry,
-  Indexer,
-  SimpleTxEntryForWallet,
-  SimpleTxInput,
-  SindexEntry,
-} from "../../../indexer";
-import { LevelUp } from "levelup";
-import { LevelDBTable } from "./LevelDBTable";
-import { SIndexDAO } from "../abstract/SIndexDAO";
-import { Underscore } from "../../../common-libs/underscore";
-import { pint } from "../../../common-libs/pint";
-import { arrayPruneAllCopy } from "../../../common-libs/array-prune";
+import {MonitorExecutionTime} from "../../../debug/MonitorExecutionTime";
+import {FullSindexEntry, Indexer, SimpleTxEntryForWallet, SimpleTxInput, SindexEntry,} from "../../../indexer";
+import {LevelUp} from "levelup";
+import {LevelDBTable} from "./LevelDBTable";
+import {SIndexDAO} from "../abstract/SIndexDAO";
+import {Underscore} from "../../../common-libs/underscore";
+import {pint} from "../../../common-libs/pint";
+import {arrayPruneAllCopy} from "../../../common-libs/array-prune";
+import {CommonConstants} from "../../../common-libs/constants";
 
 export class LevelDBSindex extends LevelDBTable<SindexEntry>
   implements SIndexDAO {
   private indexForTrimming: LevelDBTable<string[]>;
   private indexForConsumed: LevelDBTable<string[]>;
   private indexForConditions: LevelDBTable<string[]>;
+  private indexOfComplexeConditionForPubkeys: LevelDBTable<string[]>;
 
   constructor(protected getLevelDB: (dbName: string) => Promise<LevelUp>) {
     super("level_sindex", getLevelDB);
@@ -41,9 +37,14 @@ export class LevelDBSindex extends LevelDBTable<SindexEntry>
       "level_sindex/conditions",
       this.getLevelDB
     );
+    this.indexOfComplexeConditionForPubkeys = new LevelDBTable<string[]>(
+        "level_sindex/complex_condition_pubkeys",
+        this.getLevelDB
+    );
     await this.indexForTrimming.init();
     await this.indexForConsumed.init();
     await this.indexForConditions.init();
+    await this.indexOfComplexeConditionForPubkeys.init();
   }
 
   async close(): Promise<void> {
@@ -51,6 +52,7 @@ export class LevelDBSindex extends LevelDBTable<SindexEntry>
     await this.indexForTrimming.close();
     await this.indexForConsumed.close();
     await this.indexForConditions.close();
+    await this.indexOfComplexeConditionForPubkeys.close();
   }
 
   /**
@@ -127,11 +129,9 @@ export class LevelDBSindex extends LevelDBTable<SindexEntry>
       pos: number;
     }[]
   > {
-    // TODO: very costly: needs a full scan, would be better to change this implementatio
-    const entries = await this.findWhere((e) =>
-      e.conditions.includes(`SIG(${pubkey})`)
-    );
-    const reduced = Indexer.DUP_HELPERS.reduceBy(entries, [
+    const forConditions = await this.getForConditions(`SIG(${pubkey})`);
+    const forPubkeys = await this.getForComplexeConditionPubkey(pubkey);
+    const reduced = Indexer.DUP_HELPERS.reduceBy(forConditions.concat(forPubkeys), [
       "identifier",
       "pos",
     ]);
@@ -269,6 +269,19 @@ export class LevelDBSindex extends LevelDBTable<SindexEntry>
     return found;
   }
 
+  async getForComplexeConditionPubkey(pubkey: string): Promise<SindexEntry[]> {
+    const ids = (await this.indexOfComplexeConditionForPubkeys.getOrNull(pubkey)) || [];
+    const found: SindexEntry[] = [];
+    for (const id of ids) {
+      const entries = await this.findByIdentifierAndPos(
+          id.split("-")[0],
+          pint(id.split("-")[1])
+      );
+      entries.forEach((e) => found.push(e));
+    }
+    return found;
+  }
+
   async removeBlock(blockstamp: string): Promise<void> {
     const writtenOn = pint(blockstamp);
     // We look at records written on this blockstamp: `indexForTrimming` allows to get them
@@ -393,6 +406,7 @@ export class LevelDBSindex extends LevelDBTable<SindexEntry>
     const byConsumed: { [k: number]: SindexEntry[] } = {};
     const byWrittenOn: { [k: number]: SindexEntry[] } = {};
     const byConditions: { [k: string]: SindexEntry[] } = {};
+    const byPubkeys: { [k: string]: SindexEntry[] } = {};
     records
       .filter((r) => r.consumed)
       .forEach((r) => {
@@ -410,12 +424,25 @@ export class LevelDBSindex extends LevelDBTable<SindexEntry>
         arrWO = byWrittenOn[r.writtenOn] = [];
       }
       arrWO.push(r);
-      // Conditiosn
+      // Conditions
       let arrCN = byConditions[r.conditions];
       if (!arrCN) {
         arrCN = byConditions[r.conditions] = [];
       }
       arrCN.push(r);
+
+      // If complex condition
+      if (!CommonConstants.TRANSACTION.OUTPUT_CONDITION_SIG_PUBKEY_UNIQUE.test(r.conditions)) {
+        // Fill complex condition by pubkey
+        const pubkeys = this.getDistinctPubkeysFromCondition(r.conditions);
+        pubkeys.forEach((pub) => {
+          let arrPub = byPubkeys[pub];
+          if (!arrPub) {
+            arrPub = byPubkeys[pub] = [];
+          }
+          arrPub.push(r);
+        });
+      }
     });
     // Index consumed => (identifier + pos)[]
     for (const k of Underscore.keys(byConsumed)) {
@@ -446,5 +473,33 @@ export class LevelDBSindex extends LevelDBTable<SindexEntry>
         Underscore.uniq(existing.concat(newSources))
       );
     }
+    // Index pubkeys => (identifier + pos)[]
+    for (const k of Underscore.keys(byPubkeys).map(String)) {
+      const existing = (await this.indexOfComplexeConditionForPubkeys.getOrNull(k)) || [];
+      const newSources = byPubkeys[k].map((r) =>
+          LevelDBSindex.trimPartialKey(r.identifier, r.pos)
+      );
+      await this.indexOfComplexeConditionForPubkeys.put(
+          k,
+          Underscore.uniq(existing.concat(newSources))
+      );
+    }
+  }
+
+  /**
+   * Get all pubkeys used by an output condition (e.g. 'SIG(A) && SIG(B)' will return ['A', 'B']
+   * @param condition
+   * @private
+   */
+  private getDistinctPubkeysFromCondition(condition: string): string[] {
+    const pubKeys: string[] = [];
+    if (!condition) return pubKeys;
+    let match: RegExpExecArray | null;
+    while ((match = CommonConstants.TRANSACTION.OUTPUT_CONDITION_SIG_PUBKEY.exec(condition)) !== null) {
+      pubKeys.push(match[1]);
+      condition = condition.substring(match.index + match[0].length);
+    }
+
+    return Underscore.uniq(pubKeys);
   }
 }
diff --git a/test/dal/sources-dal.ts b/test/dal/sources-dal.ts
index 7faac7713..ed4d9ab83 100644
--- a/test/dal/sources-dal.ts
+++ b/test/dal/sources-dal.ts
@@ -18,30 +18,70 @@ const should = require('should');
 
 let dal:FileDAL
 
-describe("Source DAL", function(){
+describe("Source DAL", function() {
+  const pubkeyA = 'BYfWYFrsyjpvpFysgu19rGK3VHBkz4MqmQbNyEuVU64g';
+  const pubkeyB = 'DSz4rgncXCytsUMW2JU2yhLquZECD2XpEkpP9gG5HyAx';
 
   before(async () => {
     dal = new FileDAL(await Directory.getHomeParams(true, 'db0'), async (name: string) => Directory.getHomeDB(true, name), async (name: string) => Directory.getHomeLevelDB(true, name))
     await dal.init({} as any)
   })
 
-  it('should be able to feed the sindex with unordered rows', async () => {
+  it('should be able to fill the sindex with unordered rows', async () => {
     await dal.sindexDAL.insertBatch([
-      { op: 'UPDATE', tx: null, identifier: 'SOURCE_1', pos: 4, written_on: '139-H', writtenOn: 139, written_time: 4500, consumed: true,  conditions: 'SIG(ABC)' },
-      { op: 'CREATE', tx: null, identifier: 'SOURCE_1', pos: 4, written_on: '126-H', writtenOn: 126, written_time: 2000, consumed: false, conditions: 'SIG(ABC)' },
-      { op: 'CREATE', tx: null, identifier: 'SOURCE_2', pos: 4, written_on: '126-H', writtenOn: 126, written_time: 2000, consumed: false, conditions: 'SIG(ABC)' },
-      { op: 'CREATE', tx: null, identifier: 'SOURCE_3', pos: 4, written_on: '126-H', writtenOn: 126, written_time: 2000, consumed: false, conditions: 'SIG(DEF)' }
+      { op: 'UPDATE', tx: null, identifier: 'SOURCE_1', pos: 4, written_on: '139-H', writtenOn: 139, written_time: 4500, consumed: true,  conditions: `SIG(${pubkeyA})` },
+      { op: 'CREATE', tx: null, identifier: 'SOURCE_1', pos: 4, written_on: '126-H', writtenOn: 126, written_time: 2000, consumed: false, conditions: `SIG(${pubkeyA})` },
+      { op: 'CREATE', tx: null, identifier: 'SOURCE_2', pos: 4, written_on: '126-H', writtenOn: 126, written_time: 2000, consumed: false, conditions: `SIG(${pubkeyA})` },
+      { op: 'CREATE', tx: null, identifier: 'SOURCE_3', pos: 4, written_on: '126-H', writtenOn: 126, written_time: 2000, consumed: false, conditions: `SIG(${pubkeyB})` }
     ] as any);
     (await dal.sindexDAL.findByIdentifier('SOURCE_1')).should.have.length(2);
     (await dal.sindexDAL.findByPos(4)).should.have.length(4);
     // Source availability
-    const sourcesOfDEF = await dal.sindexDAL.getAvailableForPubkey('DEF');
-    sourcesOfDEF.should.have.length(1);
-    const sourcesOfABC = await dal.sindexDAL.getAvailableForPubkey('ABC');
-    sourcesOfABC.should.have.length(1);
+    const sourcesOfA = await dal.sindexDAL.getAvailableForPubkey(pubkeyA);
+    sourcesOfA.should.have.length(1);
+    const sourcesOfB = await dal.sindexDAL.getAvailableForPubkey(pubkeyB);
+    sourcesOfB.should.have.length(1);
     const source1 = await dal.sindexDAL.getTxSource('SOURCE_1', 4) as any
     source1.should.have.property('consumed').equal(true);
     const source2 = await dal.sindexDAL.getTxSource('SOURCE_2', 4) as any
     source2.should.have.property('consumed').equal(false);
+
+    // Check sources not available after block deletion
+    await dal.sindexDAL.removeBlock('126-H');
+    (await dal.sindexDAL.findByIdentifier('SOURCE_1')).should.have.length(1);
+    should(await dal.sindexDAL.getTxSource('SOURCE_2', 4) as any).be.null();
+    should(await dal.sindexDAL.getTxSource('SOURCE_3', 4) as any).be.null();
+    (await dal.sindexDAL.findByPos(4)).should.have.length(1);
+    await dal.sindexDAL.removeBlock('139-H');
+    (await dal.sindexDAL.findByIdentifier('SOURCE_1')).should.have.length(0);
+    (await dal.sindexDAL.findByPos(4)).should.have.length(0);
+    (await dal.sindexDAL.getAvailableForPubkey(pubkeyA)).should.have.length(0);
+    (await dal.sindexDAL.getAvailableForPubkey(pubkeyB)).should.have.length(0);
+    should(await dal.sindexDAL.getTxSource('SOURCE_1', 4) as any).be.null();
+    should(await dal.sindexDAL.getTxSource('SOURCE_2', 4) as any).be.null();
+    should(await dal.sindexDAL.getTxSource('SOURCE_3', 4) as any).be.null();
+  })
+
+  it('should be able to read sindex by pubkey', async () => {
+    // Test insertion, using complex condition
+    await dal.sindexDAL.insertBatch([
+      { op: 'CREATE', tx: null, identifier: 'SOURCE_4', pos: 4, written_on: '139-H', writtenOn: 139, written_time: 2000, consumed: false, conditions: `SIG(${pubkeyA})` },
+      { op: 'CREATE', tx: null, identifier: 'SOURCE_5', pos: 4, written_on: '126-H', writtenOn: 126, written_time: 2000, consumed: false, conditions: `(SIG(${pubkeyA}) && SIG(${pubkeyB}))` },
+      { op: 'CREATE', tx: null, identifier: 'SOURCE_6', pos: 4, written_on: '126-H', writtenOn: 126, written_time: 2000, consumed: false, conditions: `(XHX(3EB4702F2AC2FD3FA4FDC46A4FC05AE8CDEE1A85F2AC2FD3FA4FDC46A4FC01CA) || SIG(${pubkeyB}))` }
+    ] as any);
+
+    // Check sources availability by pubkey
+    let sourcesOfA = await dal.sindexDAL.getAvailableForPubkey(pubkeyA);
+    sourcesOfA.should.have.length(2);
+    let sourcesOfB = await dal.sindexDAL.getAvailableForPubkey(pubkeyB);
+    sourcesOfB.should.have.length(2);
+
+    // Check sources not available after block deletion
+    await dal.sindexDAL.removeBlock('126-H');
+    await dal.sindexDAL.removeBlock('139-H');
+    sourcesOfA = await dal.sindexDAL.getAvailableForPubkey(pubkeyA);
+    sourcesOfA.should.have.length(0);
+    sourcesOfB = await dal.sindexDAL.getAvailableForPubkey(pubkeyB);
+    sourcesOfB.should.have.length(0);
   })
 })
-- 
GitLab