From 5ccacc6701c6cacf61817fcd5439eff75e78042c Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Wed, 24 May 2017 17:33:40 +0200 Subject: [PATCH] [enh] add new index <currency>/movement --- .../duniter/core/client/model/ModelUtils.java | 15 - .../client/model/bma/BlockchainBlock.java | 6 +- .../client/model/bma/BlockchainBlocks.java | 223 +++++++++++++ .../core/client/model/bma/EndpointApi.java | 1 + .../model/bma/util/BlockchainBlockUtils.java | 97 ------ .../core/client/model/local/Movement.java | 76 +++-- .../core/client/model/local/Movements.java | 98 ++++++ .../core/client/model/MovementsTest.java | 44 +++ .../{gson => json}/JsonArrayParserTest.java | 2 +- .../src/test/resources/block_with_tx.json | 114 +++++++ .../org/duniter/elasticsearch/PluginInit.java | 11 +- .../elasticsearch/client/Duniter4jClient.java | 8 + .../client/Duniter4jClientImpl.java | 60 ++++ .../elasticsearch/dao/BlockStatDao.java | 2 +- .../duniter/elasticsearch/dao/DaoModule.java | 2 + .../elasticsearch/dao/MovementDao.java | 53 +++ .../elasticsearch/dao/impl/BlockDaoImpl.java | 1 - .../dao/impl/BlockStatDaoImpl.java | 57 +++- .../dao/impl/MovementDaoImpl.java | 274 +++++++++++++++ .../duniter/elasticsearch/model/Movement.java | 314 ++++++++++++++++++ .../elasticsearch/model/Movements.java | 95 ++++++ .../AbstractBlockchainListenerService.java | 3 - .../service/BlockchainListenerService.java | 134 ++++++++ .../service/BlockchainStatsService.java | 142 -------- .../service/CurrencyService.java | 16 +- .../elasticsearch/service/ServiceModule.java | 5 +- .../service/BlockchainUserEventService.java | 35 +- .../user/service/UserEventService.java | 48 +-- 28 files changed, 1555 insertions(+), 381 deletions(-) create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java delete mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/util/BlockchainBlockUtils.java create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Movements.java create mode 100644 duniter4j-core-client/src/test/java/org/duniter/core/client/model/MovementsTest.java rename duniter4j-core-client/src/test/java/org/duniter/core/client/model/bma/{gson => json}/JsonArrayParserTest.java (97%) create mode 100644 duniter4j-core-client/src/test/resources/block_with_tx.json create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/MovementDao.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/Movement.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/Movements.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java delete mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java index 67a7390d..a8f976ee 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java @@ -118,21 +118,6 @@ public class ModelUtils { }; } - /** - * Transform a list of sources, into a Map, using the fingerprint as key - * @param movements - * @return - */ - public static Map<String, Movement> movementsToFingerprintMap(List<Movement> movements) { - - Map<String, Movement> result = new HashMap<>(); - for(Movement movement: movements) { - result.put(movement.getFingerprint(), movement); - } - - return result; - } - /** * Return a small string, for the given pubkey. * @param pubkey diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java index dee971bd..a242852e 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java @@ -567,7 +567,7 @@ public class BlockchainBlock implements Serializable { private String[] signatures; - private String version; + private int version; private String currency; @@ -599,11 +599,11 @@ public class BlockchainBlock implements Serializable { this.signatures = signatures; } - public String getVersion() { + public int getVersion() { return version; } - public void setVersion(String version) { + public void setVersion(int version) { this.version = version; } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java new file mode 100644 index 00000000..05040f22 --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java @@ -0,0 +1,223 @@ +package org.duniter.core.client.model.bma; + +/*- + * #%L + * Duniter4j :: Core Client API + * %% + * Copyright (C) 2014 - 2017 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.math.BigInteger; +import java.util.*; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Created by blavenie on 26/04/17. + */ +public final class BlockchainBlocks { + + public static final Pattern SIG_PUBKEY_PATTERN = Pattern.compile("SIG\\(([^)]+)\\)"); + + public static final Pattern TX_UNLOCK_PATTERN = Pattern.compile("([0-9]+):SIG\\(([^)]+)\\)"); + public static final Pattern TX_OUTPUT_PATTERN = Pattern.compile("([0-9]+):([0-9]+):([^:]+)"); + public static final Pattern TX_INPUT_PATTERN = Pattern.compile("([0-9]+):([0-9]+):([TD]):([^:]+):([^:]+)"); + + private BlockchainBlocks() { + // helper class + } + + public static BigInteger getTxAmount(BlockchainBlock block) { + BigInteger result = BigInteger.valueOf(0l); + Arrays.stream(block.getTransactions()) + .forEach(tx -> result.add(BigInteger.valueOf(getTxAmount(tx)))); + return result; + } + + public static long getTxAmount(final BlockchainBlock.Transaction tx) { + return getTxAmount(tx, null/*no issuer filter*/); + } + + public static long getTxAmount(final BlockchainBlock.Transaction tx, + Predicate<String> issuerFilter) { + + final Map<Integer, Integer> inputIndexByIssuerIndex = Maps.newHashMap(); + Arrays.stream(tx.getUnlocks()) + .map(TX_UNLOCK_PATTERN::matcher) + .filter(Matcher::matches) + .forEach(matcher -> inputIndexByIssuerIndex.put( + Integer.parseInt(matcher.group(1)), + Integer.parseInt(matcher.group(2))) + ); + + return IntStream.range(0, tx.getIssuers().length) + .mapToLong(i -> { + final String issuer = tx.getIssuers()[i]; + + // Skip if issuerFilter test failed + if (issuerFilter != null && !issuerFilter.test(issuer)) return 0; + + long inputSum = IntStream.range(0, tx.getInputs().length) + .filter(j -> i == inputIndexByIssuerIndex.get(j)) + .mapToObj(j -> tx.getInputs()[j]) + .map(input -> input.split(":")) + .filter(inputParts -> inputParts.length > 2) + .mapToLong(inputParts -> powBase(Long.parseLong(inputParts[0]), Integer.parseInt(inputParts[1]))) + .sum(); + + long outputSum = Arrays.stream(tx.getOutputs()) + .map(TX_OUTPUT_PATTERN::matcher) + .filter(Matcher::matches) + .filter(matcher -> issuer.equals(matcher.group(3))) + .mapToLong(matcher -> powBase(Long.parseLong(matcher.group(1)), Integer.parseInt(matcher.group(2)))) + .sum(); + + return (inputSum - outputSum); + }) + .sum(); + } + + public static long powBase(long amount, int unitbase) { + if (unitbase == 0) return amount; + return amount * (long)Math.pow(10, unitbase); + } + + public static List<TxInput> getTxInputs(final BlockchainBlock.Transaction tx) { + Preconditions.checkNotNull(tx); + + final Function<Integer, String> issuerByInputIndex = transformInputIndex2Issuer(tx); + + return IntStream.range(0, tx.getInputs().length) + .mapToObj(i -> { + TxInput txInput = parseInput(tx.getInputs()[i]); + txInput.issuer = issuerByInputIndex.apply(i); + return txInput; + }) + .collect(Collectors.toList()); + } + + public static List<TxOutput> getTxOutputs(final BlockchainBlock.Transaction tx) { + Preconditions.checkNotNull(tx); + return Arrays.stream(tx.getOutputs()) + .map(output -> parseOuput(output)) + .collect(Collectors.toList()); + } + + public static TxInput parseInput(String input) { + TxInput result = null; + Matcher matcher = TX_INPUT_PATTERN.matcher(input); + if (matcher.matches()) { + result = new TxInput(); + result.amount = Long.parseLong(matcher.group(1)); + result.unitbase = Integer.parseInt(matcher.group(2)); + result.type = matcher.group(3); + result.txHashOrPubkey = matcher.group(4); + result.indexOrBlockId = matcher.group(5); + } + return result; + } + + public static TxOutput parseOuput(String output) { + TxOutput result = null; + Matcher matcher = TX_OUTPUT_PATTERN.matcher(output); + if (matcher.matches()) { + result = new TxOutput(); + result.amount = Long.parseLong(matcher.group(1)); + result.unitbase = Integer.parseInt(matcher.group(2)); + result.unlockCondition = matcher.group(3); + + // Parse unlock condition like 'SIG(<pubkey>)' + matcher = SIG_PUBKEY_PATTERN.matcher(result.unlockCondition); + if (matcher.matches()) { + result.recipient = matcher.group(1); + } + } + return result; + } + + public static long getTxInputAmountByIssuer(final List<TxInput> txInputs, final String issuer) { + Preconditions.checkNotNull(txInputs); + return txInputs.stream() + // only keep inputs from issuer + .filter(input -> Objects.equals(issuer, input.issuer)) + .mapToLong(input -> powBase(input.amount, input.unitbase)) + .sum(); + } + + public static long getTxOutputAmountByIssuerAndRecipient(final List<BlockchainBlocks.TxOutput> txOutputs, + final String issuer, + final String recipient) { + Preconditions.checkNotNull(txOutputs); + return txOutputs.stream() + // only keep the expected recipient, but not equals to the issuer + .filter(output -> Objects.equals(recipient, output.recipient) && !Objects.equals(issuer, output.recipient)) + .mapToLong(output -> powBase(output.amount, output.unitbase)) + .sum(); + } + + public static Set<String> getTxRecipients(Collection<TxOutput> txOutputs) { + Preconditions.checkNotNull(txOutputs); + return txOutputs.stream().map(output -> output.recipient).distinct().collect(Collectors.toSet()); + } + + public static class TxInput { + public long amount; + public int unitbase; + public String type; + public String txHashOrPubkey; + public String indexOrBlockId; + public String issuer; + + public boolean isUD() { + return "D".equals(type); + } + } + + public static class TxOutput { + public long amount; + public int unitbase; + public String recipient; + public String unlockCondition; + } + + /* -- Internal methods -- */ + + + private static Function<Integer, String> transformInputIndex2Issuer(final BlockchainBlock.Transaction tx) { + final Map<Integer, Integer> inputIndexByIssuerIndex = Maps.newHashMap(); + Arrays.stream(tx.getUnlocks()) + .map(TX_UNLOCK_PATTERN::matcher) + .filter(Matcher::matches) + .forEach(matcher -> inputIndexByIssuerIndex.put( + Integer.parseInt(matcher.group(1)), + Integer.parseInt(matcher.group(2))) + ); + + + return (inputIndex -> tx.getIssuers()[inputIndexByIssuerIndex.get(inputIndex)]); + } + +} diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/EndpointApi.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/EndpointApi.java index f120a3e3..a9962535 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/EndpointApi.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/EndpointApi.java @@ -29,5 +29,6 @@ public enum EndpointApi { BMATOR, ES_CORE_API, ES_USER_API, + MONIT_API, UNDEFINED } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/util/BlockchainBlockUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/util/BlockchainBlockUtils.java deleted file mode 100644 index 2fb098fe..00000000 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/util/BlockchainBlockUtils.java +++ /dev/null @@ -1,97 +0,0 @@ -package org.duniter.core.client.model.bma.util; - -/*- - * #%L - * Duniter4j :: Core Client API - * %% - * Copyright (C) 2014 - 2017 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import jnr.ffi.annotations.In; -import org.duniter.core.client.model.bma.BlockchainBlock; - -import java.math.BigInteger; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.IntStream; - -/** - * Created by blavenie on 26/04/17. - */ -public final class BlockchainBlockUtils { - - public static final Pattern TX_UNLOCK_PATTERN = Pattern.compile("([0-9]+):SIG\\(([^)]+)\\)"); - public static final Pattern TX_OUTPUT_PATTERN = Pattern.compile("([0-9]+):([0-9]+):SIG\\(([^)]+)\\)"); - - private BlockchainBlockUtils () { - // helper class - } - - public static BigInteger getTxAmount(BlockchainBlock block) { - BigInteger result = BigInteger.valueOf(0l); - Arrays.stream(block.getTransactions()) - .forEach(tx -> result.add(BigInteger.valueOf(getTxAmount(tx)))); - return result; - } - - public static long getTxAmount(final BlockchainBlock.Transaction tx) { - - final Map<Integer, Integer> inputsByIssuer = Maps.newHashMap(); - Arrays.stream(tx.getUnlocks()) - .map(TX_UNLOCK_PATTERN::matcher) - .filter(Matcher::matches) - .forEach(matcher -> inputsByIssuer.put( - Integer.parseInt(matcher.group(1)), - Integer.parseInt(matcher.group(2))) - ); - - return IntStream.range(0, tx.getIssuers().length) - .mapToLong(i -> { - final String issuer = tx.getIssuers()[i]; - - long inputSum = IntStream.range(0, tx.getInputs().length) - .filter(j -> i == inputsByIssuer.get(j)) - .mapToObj(j -> tx.getInputs()[j]) - .map(input -> input.split(":")) - .filter(inputParts -> inputParts.length > 2) - .mapToLong(inputParts -> powBase(Long.parseLong(inputParts[0]), Integer.parseInt(inputParts[1]))) - .sum(); - - long outputSum = Arrays.stream(tx.getOutputs()) - .map(TX_OUTPUT_PATTERN::matcher) - .filter(Matcher::matches) - .filter(matcher -> issuer.equals(matcher.group(3))) - .mapToLong(matcher -> powBase(Long.parseLong(matcher.group(1)), Integer.parseInt(matcher.group(2)))) - .sum(); - - return (inputSum - outputSum); - }) - .sum(); - } - - - private static long powBase(long amount, int unitbase) { - if (unitbase == 0) return amount; - return amount * (long)Math.pow(10, unitbase); - } -} diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Movement.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Movement.java index 8e2d134e..be7efb7a 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Movement.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Movement.java @@ -22,6 +22,8 @@ package org.duniter.core.client.model.local; * #L% */ +import com.fasterxml.jackson.annotation.JsonIgnore; + import java.io.Serializable; /** @@ -30,17 +32,31 @@ import java.io.Serializable; */ public class Movement implements LocalEntity<Long>, Serializable { + public static final String PROPERTY_MEDIAN_TIME = "medianTime"; + public static final String PROPERTY_BLOCK_NUMBER= "blockNumber"; + public static final String PROPERTY_BLOCK_HASH = "blockHash"; + public static final String PROPERTY_DIVIDEND = "dividend"; + public static final String PROPERTY_IS_UD = "isUD"; + public static final String PROPERTY_ISSUER = "issuer"; + public static final String PROPERTY_RECIPIENT = "recipient"; + public static final String PROPERTY_AMOUNT = "amount"; + public static final String PROPERTY_UNITBASE = "unitbase"; + public static final String PROPERTY_COMMENT = "comment"; + public static final String PROPERTY_TX_VERSION = "txVersion"; + private Long id; private long walletId; - private long amount; - private Long time; + private Long medianTime; private Integer blockNumber; + private String blockHash; + private String issuer; + private String recipient; + private long amount; + private int unitbase; private long dividend; private boolean isUD = false; - private String fingerprint; private String comment; - private String issuers; - private String receivers; + private String txVersion; @Override public Long getId() { @@ -60,6 +76,22 @@ public class Movement implements LocalEntity<Long>, Serializable { this.walletId = walletId; } + public int getUnitbase() { + return unitbase; + } + + public void setUnitbase(int unitbase) { + this.unitbase = unitbase; + } + + public String getBlockHash() { + return blockHash; + } + + public void setBlockHash(String blockHash) { + this.blockHash = blockHash; + } + public long getAmount() { return amount; } @@ -68,12 +100,12 @@ public class Movement implements LocalEntity<Long>, Serializable { this.amount = amount; } - public Long getTime() { - return time; + public Long getMedianTime() { + return medianTime; } - public void setTime(Long time) { - this.time = time; + public void setMedianTime(Long medianTime) { + this.medianTime = medianTime; } public Integer getBlockNumber() { @@ -84,6 +116,7 @@ public class Movement implements LocalEntity<Long>, Serializable { this.blockNumber = blockNumber; } + @JsonIgnore public boolean isUD() { return isUD; } @@ -92,14 +125,6 @@ public class Movement implements LocalEntity<Long>, Serializable { this.isUD = isUD; } - public String getFingerprint() { - return fingerprint; - } - - public void setFingerprint(String fingerprint) { - this.fingerprint = fingerprint; - } - public String getComment() { return comment; } @@ -108,24 +133,25 @@ public class Movement implements LocalEntity<Long>, Serializable { this.comment = comment; } + @JsonIgnore public boolean isValidate() { return blockNumber != null; } - public String getIssuers() { - return issuers; + public String getIssuer() { + return issuer; } - public void setIssuers(String issuers) { - this.issuers = issuers; + public void setIssuer(String issuer) { + this.issuer = issuer; } - public void setReceivers(String receivers) { - this.receivers = receivers; + public void setRecipient(String recipient) { + this.recipient = recipient; } - public String getReceivers() { - return receivers; + public String getRecipient() { + return recipient; } public long getDividend() { diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Movements.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Movements.java new file mode 100644 index 00000000..c9cfa29a --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Movements.java @@ -0,0 +1,98 @@ +package org.duniter.core.client.model.local; + +/*- + * #%L + * Duniter4j :: Core Client API + * %% + * Copyright (C) 2014 - 2017 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.google.common.base.Preconditions; +import org.duniter.core.client.model.bma.BlockchainBlock; +import static org.duniter.core.client.model.bma.BlockchainBlocks.*; +import org.duniter.core.util.CollectionUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Created by blavenie on 26/04/17. + */ +public final class Movements { + + private Movements() { + // helper class + } + + public static Stream<Movement> stream(final BlockchainBlock block) { + Preconditions.checkNotNull(block); + + // No Tx + if (CollectionUtils.isEmpty(block.getTransactions())) { + return Stream.empty(); + } + + return Arrays.stream(block.getTransactions()) + .flatMap(tx -> Movements.streamFromTx(block, tx)); + } + + public static List<Movement> getMovements(final BlockchainBlock block) { + return stream(block).collect(Collectors.toList()); + } + + /* -- Internal methods -- */ + + private static Stream<Movement> streamFromTx(final BlockchainBlock block, final BlockchainBlock.Transaction tx) { + + final List<TxInput> inputs = getTxInputs(tx); + final List<TxOutput> outputs = getTxOutputs(tx); + final Set<String> recipients = getTxRecipients(outputs); + + final long totalAmount = inputs.stream().mapToLong(input -> powBase(input.amount, input.unitbase)).sum(); + + return Arrays.stream(tx.getIssuers()) + .flatMap(issuer -> { + long issuerInputsAmount = getTxInputAmountByIssuer(inputs, issuer); + double issuerInputRatio = issuerInputsAmount / totalAmount; + + return recipients.stream() + // Compute the recipient amount + .map(recipient -> { + // If more than one issuer, apply a ratio (=input amount %) + Double amount = getTxOutputAmountByIssuerAndRecipient(outputs, issuer, recipient) * issuerInputRatio; + Movement movement = new Movement(); + movement.setBlockNumber(block.getNumber()); + movement.setBlockHash(block.getHash()); + movement.setMedianTime(block.getMedianTime()); + movement.setAmount(amount.longValue()); + movement.setUnitbase(0); // conversion has been done when computed 'amount' + movement.setIssuer(issuer); + movement.setRecipient(recipient); + movement.setBlockNumber(block.getNumber()); + movement.setComment(tx.getComment()); + return movement; + }) + // Exclude movements to itself (e.g. changes) + .filter(movement -> movement.getAmount() != 0); + }); + } + +} diff --git a/duniter4j-core-client/src/test/java/org/duniter/core/client/model/MovementsTest.java b/duniter4j-core-client/src/test/java/org/duniter/core/client/model/MovementsTest.java new file mode 100644 index 00000000..5f98558e --- /dev/null +++ b/duniter4j-core-client/src/test/java/org/duniter/core/client/model/MovementsTest.java @@ -0,0 +1,44 @@ +package org.duniter.core.client.model; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.client.model.local.Movement; +import org.duniter.core.client.model.local.Movements; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.List; + +/** + * Created by blavenie on 23/05/17. + */ +public class MovementsTest { + + @Test + public void testGetMovements() throws Exception { + + final BlockchainBlock block = readBlockFile("block_with_tx.json"); + List<Movement> mov = Movements.getMovements(block); + Assert.assertTrue(mov.size() > 0); + } + + /* -- internal methods -- */ + + private BlockchainBlock readBlockFile(String jsonFileName) { + try { + ObjectMapper om = JacksonUtils.newObjectMapper(); + BlockchainBlock block = om.readValue(Files.readAllBytes(new File("src/test/resources" , jsonFileName).toPath()), BlockchainBlock.class); + Assume.assumeNotNull(block); + return block; + } + catch(Exception e) { + Assume.assumeNoException(e); + return null; + } + } + +} diff --git a/duniter4j-core-client/src/test/java/org/duniter/core/client/model/bma/gson/JsonArrayParserTest.java b/duniter4j-core-client/src/test/java/org/duniter/core/client/model/bma/json/JsonArrayParserTest.java similarity index 97% rename from duniter4j-core-client/src/test/java/org/duniter/core/client/model/bma/gson/JsonArrayParserTest.java rename to duniter4j-core-client/src/test/java/org/duniter/core/client/model/bma/json/JsonArrayParserTest.java index 930f6c4d..b7fac04d 100644 --- a/duniter4j-core-client/src/test/java/org/duniter/core/client/model/bma/gson/JsonArrayParserTest.java +++ b/duniter4j-core-client/src/test/java/org/duniter/core/client/model/bma/json/JsonArrayParserTest.java @@ -1,4 +1,4 @@ -package org.duniter.core.client.model.bma.gson; +package org.duniter.core.client.model.bma.json; /* * #%L diff --git a/duniter4j-core-client/src/test/resources/block_with_tx.json b/duniter4j-core-client/src/test/resources/block_with_tx.json new file mode 100644 index 00000000..8b1623be --- /dev/null +++ b/duniter4j-core-client/src/test/resources/block_with_tx.json @@ -0,0 +1,114 @@ +{ + "version": 10, + "nonce": 10800000021731, + "number": 20833, + "powMin": 82, + "time": 1495484460, + "medianTime": 1495480888, + "membersCount": 125, + "monetaryMass": 6430000, + "unitbase": 0, + "issuersCount": 16, + "issuersFrame": 84, + "issuersFrameVar": -3, + "currency": "g1", + "issuer": "38MEAZN68Pz1DTvT3tqgxx4yQP6snJCQhPqEFxbDk4aE", + "signature": "Gk1XVUqumatpqyFbK9W+d5ZQmASYX7ytqJean3H/ZSAamkU5R5xQFkBFwFmDRdI1PKis5mrRt48IIWqYPIBhBg==", + "hash": "00000A6C6B739F3DCDE52B4F1B84169B1029D3E52AF0CF0A3A2B5BFFFAC4EFDF", + "parameters": "", + "previousHash": "00000C63C0153B97E5303F8D5A18A7DA49B695063C9113638E1098DC7EA55BF9", + "previousIssuer": "7vU9BMDhN6fBuRa2iK3JRbC6pqQKb4qDMGsFcQuT5cz", + "inner_hash": "443AD546FF0DC6E71B36700D8286F0837A11C3FE710D4DA670A1476497F2EBE6", + "dividend": null, + "identities": [], + "joiners": [], + "actives": [], + "leavers": [], + "revoked": [], + "excluded": [], + "certifications": [], + "transactions": [ + { + "version": 10, + "blockstamp": "20830-0000001054671813DA273F2A93914F8CC86F1EACF744C8654EECE5E1265B9F3F", + "locktime": 0, + "issuers": [ + "7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj" + ], + "inputs": [ + "1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:8025", + "1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:8282" + ], + "unlocks": [ + "0:SIG(0)", + "1:SIG(0)" + ], + "outputs": [ + "2000:0:SIG(4bD7J3uA5pH2N9Xqimspf2XxWN4ESM2Az2XBqtSeHvUZ)" + ], + "signatures": [ + "gCo1A/zMgo728R8TqJWQTgLVc9U2ohyTGGXfZTMtiJ+fxVszNH/qw6LGbpoYljG092i/7NTeSgC7ZRRJoyCCAA==" + ], + "comment": "Geco Clermont", + "currency": "g1", + "block_number": 20833, + "time": 1495480888, + "blockstampTime": 0 + }, + { + "version": 10, + "blockstamp": "20830-0000001054671813DA273F2A93914F8CC86F1EACF744C8654EECE5E1265B9F3F", + "locktime": 0, + "issuers": [ + "7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj" + ], + "inputs": [ + "1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:8545", + "1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:8816" + ], + "unlocks": [ + "0:SIG(0)", + "1:SIG(0)" + ], + "outputs": [ + "2000:0:SIG(7vU9BMDhN6fBuRa2iK3JRbC6pqQKb4qDMGsFcQuT5cz)" + ], + "signatures": [ + "IHX24W8uByDCZ3zg8UcPcdJba9gFKS2+xuuSlLVzmgJAtRsEzYs6nD7JOdHm+PIcnx++hnSZ7HX8tTQZuDV0DA==" + ], + "comment": "", + "currency": "g1", + "block_number": 20833, + "time": 1495480888, + "blockstampTime": 0 + }, + { + "version": 10, + "blockstamp": "20830-0000001054671813DA273F2A93914F8CC86F1EACF744C8654EECE5E1265B9F3F", + "locktime": 0, + "issuers": [ + "7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj" + ], + "inputs": [ + "1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:9106", + "1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:9413" + ], + "unlocks": [ + "0:SIG(0)", + "1:SIG(0)" + ], + "outputs": [ + "2000:0:SIG(82NdD9eEbXSjRJXeJdqf56xkpu6taTfTeEqtAtmtbyXY)" + ], + "signatures": [ + "8t5+2l+9TDhpQgkgnky69GIkbLuQFaPgmAUZK9NVTk2PL2pcfgkxNChLH48hfY/zi+BXjsM7Ynybdxex+rtuAQ==" + ], + "comment": "", + "currency": "g1", + "block_number": 20833, + "time": 1495480888, + "blockstampTime": 0 + } + ], + "raw": "Version: 10\nType: Block\nCurrency: g1\nNumber: 20833\nPoWMin: 82\nTime: 1495484460\nMedianTime: 1495480888\nUnitBase: 0\nIssuer: 38MEAZN68Pz1DTvT3tqgxx4yQP6snJCQhPqEFxbDk4aE\nIssuersFrame: 84\nIssuersFrameVar: -3\nDifferentIssuersCount: 16\nPreviousHash: 00000C63C0153B97E5303F8D5A18A7DA49B695063C9113638E1098DC7EA55BF9\nPreviousIssuer: 7vU9BMDhN6fBuRa2iK3JRbC6pqQKb4qDMGsFcQuT5cz\nMembersCount: 125\nIdentities:\nJoiners:\nActives:\nLeavers:\nRevoked:\nExcluded:\nCertifications:\nTransactions:\nTX:10:1:2:2:1:1:0\n20830-0000001054671813DA273F2A93914F8CC86F1EACF744C8654EECE5E1265B9F3F\n7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj\n1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:8025\n1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:8282\n0:SIG(0)\n1:SIG(0)\n2000:0:SIG(4bD7J3uA5pH2N9Xqimspf2XxWN4ESM2Az2XBqtSeHvUZ)\nGeco Clermont\ngCo1A/zMgo728R8TqJWQTgLVc9U2ohyTGGXfZTMtiJ+fxVszNH/qw6LGbpoYljG092i/7NTeSgC7ZRRJoyCCAA==\nTX:10:1:2:2:1:0:0\n20830-0000001054671813DA273F2A93914F8CC86F1EACF744C8654EECE5E1265B9F3F\n7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj\n1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:8545\n1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:8816\n0:SIG(0)\n1:SIG(0)\n2000:0:SIG(7vU9BMDhN6fBuRa2iK3JRbC6pqQKb4qDMGsFcQuT5cz)\nIHX24W8uByDCZ3zg8UcPcdJba9gFKS2+xuuSlLVzmgJAtRsEzYs6nD7JOdHm+PIcnx++hnSZ7HX8tTQZuDV0DA==\nTX:10:1:2:2:1:0:0\n20830-0000001054671813DA273F2A93914F8CC86F1EACF744C8654EECE5E1265B9F3F\n7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj\n1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:9106\n1000:0:D:7h7UPHXgTa296pYYcF85Y1zxiXoQNN8ApsUkQVYSrgLj:9413\n0:SIG(0)\n1:SIG(0)\n2000:0:SIG(82NdD9eEbXSjRJXeJdqf56xkpu6taTfTeEqtAtmtbyXY)\n8t5+2l+9TDhpQgkgnky69GIkbLuQFaPgmAUZK9NVTk2PL2pcfgkxNChLH48hfY/zi+BXjsM7Ynybdxex+rtuAQ==\nInnerHash: 443AD546FF0DC6E71B36700D8286F0837A11C3FE710D4DA670A1476497F2EBE6\nNonce: 10800000021731\n" +} \ No newline at end of file diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 778b8b5d..44b58d8d 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -27,6 +27,7 @@ import org.duniter.core.client.model.local.Peer; import org.duniter.elasticsearch.dao.BlockDao; import org.duniter.elasticsearch.dao.BlockStatDao; import org.duniter.elasticsearch.dao.PeerDao; +import org.duniter.elasticsearch.dao.MovementDao; import org.duniter.elasticsearch.rest.security.RestSecurityController; import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.CurrencyService; @@ -151,7 +152,15 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { PeerDao.TYPE) .allowPostSearchIndexType( currency.getCurrencyName(), - PeerDao.TYPE); + PeerDao.TYPE) + + // Add access to <currency>/movement index + .allowIndexType(RestRequest.Method.GET, + currency.getCurrencyName(), + MovementDao.TYPE) + .allowPostSearchIndexType( + currency.getCurrencyName(), + MovementDao.TYPE); // Wait end of currency index creation, then index blocks threadPool.scheduleOnClusterHealthStatus(() -> { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClient.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClient.java index 490633b1..242a0cfb 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClient.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClient.java @@ -32,6 +32,7 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.search.SearchHit; @@ -105,6 +106,13 @@ public interface Duniter4jClient extends Bean, Client { void flushBulk(BulkRequestBuilder bulkRequest); + BulkRequestBuilder bulkDeleteFromSearch(String index, + String type, + SearchRequestBuilder searchRequest, + BulkRequestBuilder bulkRequest, + int bulkSize, + boolean flushAll); + void safeExecuteRequest(ActionRequestBuilder<?, ?, ?> request, boolean wait); ScheduledThreadPoolExecutor scheduler(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java index 4f6bf95a..525bab92 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java @@ -548,6 +548,66 @@ public class Duniter4jClientImpl implements Duniter4jClient { } } + @Override + public BulkRequestBuilder bulkDeleteFromSearch(final String index, + final String type, + final SearchRequestBuilder searchRequest, + BulkRequestBuilder bulkRequest, + final int bulkSize, + final boolean flushAll) { + + // Execute query, while there is some data + try { + + int counter = 0; + boolean loop = true; + searchRequest.setSize(bulkSize); + SearchResponse response = searchRequest.execute().actionGet(); + + // Execute query, while there is some data + do { + + // Read response + SearchHit[] searchHits = response.getHits().getHits(); + for (SearchHit searchHit : searchHits) { + + // Add deletion to bulk + bulkRequest.add( + client.prepareDelete(index, type, searchHit.getId()) + ); + counter++; + + // Flush the bulk if not empty + if ((bulkRequest.numberOfActions() % bulkSize) == 0) { + flushDeleteBulk(index, type, bulkRequest); + bulkRequest = client.prepareBulk(); + } + } + + // Prepare next iteration + if (counter == 0 || counter >= response.getHits().getTotalHits()) { + loop = false; + } + // Prepare next iteration + else { + searchRequest.setFrom(counter); + response = searchRequest.execute().actionGet(); + } + } while(loop); + + // last flush + if (flushAll && (bulkRequest.numberOfActions() % bulkSize) != 0) { + flushDeleteBulk(index, type, bulkRequest); + } + + } catch (SearchPhaseExecutionException e) { + // Failed or no item on index + logger.error(String.format("Error while deleting by reference: %s. Skipping deletions.", e.getMessage()), e); + } + + return bulkRequest; + } + /* delegate methods */ @Override diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java index 7172a69f..b0919aa9 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java @@ -61,6 +61,6 @@ public interface BlockStatDao extends Bean, TypeDao<BlockStatDao> { void delete(String currency, String id, String hash, boolean wait); - BlockchainBlockStat getById(String currencyName, String id); + BlockchainBlockStat toBlockStat(BlockchainBlock block); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java index 4e42309a..5aad0548 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java @@ -28,6 +28,7 @@ import org.duniter.core.client.dao.PeerDao; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.client.Duniter4jClientImpl; import org.duniter.elasticsearch.dao.impl.BlockStatDaoImpl; +import org.duniter.elasticsearch.dao.impl.MovementDaoImpl; import org.duniter.elasticsearch.service.ServiceLocator; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -38,6 +39,7 @@ public class DaoModule extends AbstractModule implements Module { bind(Duniter4jClient.class).to(Duniter4jClientImpl.class).asEagerSingleton(); bind(BlockStatDao.class).to(BlockStatDaoImpl.class).asEagerSingleton(); + bind(MovementDao.class).to(MovementDaoImpl.class).asEagerSingleton(); bindWithLocator(BlockDao.class); bindWithLocator(PeerDao.class); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/MovementDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/MovementDao.java new file mode 100644 index 00000000..fdb6be06 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/MovementDao.java @@ -0,0 +1,53 @@ +package org.duniter.elasticsearch.dao; + +/*- + * #%L + * Duniter4j :: ElasticSearch Core plugin + * %% + * Copyright (C) 2014 - 2017 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.core.beans.Bean; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.elasticsearch.model.Movement; +import org.elasticsearch.action.bulk.BulkRequestBuilder; + +import java.util.List; + +/** + * Created by blavenie on 03/04/17. + */ +public interface MovementDao extends Bean, TypeDao<MovementDao> { + + String TYPE = "movement"; + + void create(Movement block, boolean wait); + + boolean isExists(String currencyName, String id); + + void update(Movement operation, boolean wait); + + void delete(String currency, String id, boolean wait); + + BulkRequestBuilder bulkDeleteByBlock(String currency, + String number, + String hash, + BulkRequestBuilder bulkRequest, + int bulkSize, + boolean flushAll); +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java index 9d5a47bd..df263021 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java @@ -75,7 +75,6 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { Preconditions.checkNotNull(block.getNumber()); // Serialize into JSON - // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { String json = objectMapper.writeValueAsString(block); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java index a5c930e7..e4b8e7b3 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java @@ -24,21 +24,26 @@ package org.duniter.elasticsearch.dao.impl; import com.fasterxml.jackson.core.JsonProcessingException; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.BlockchainBlocks; import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.BlockStatDao; -import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.NotFoundException; import org.duniter.elasticsearch.model.BlockchainBlockStat; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import java.io.IOException; +import java.math.BigInteger; +import java.util.Arrays; /** * Created by Benoit on 30/03/2015. @@ -46,7 +51,7 @@ import java.io.IOException; public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { public BlockStatDaoImpl(){ - super("duniter.dao.blockStat"); + super("duniter.dao.block.stat"); } @Override @@ -61,7 +66,6 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { Preconditions.checkNotNull(block.getNumber()); // Serialize into JSON - // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { String json = objectMapper.writeValueAsString(block); @@ -263,10 +267,53 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { } } - public BlockchainBlockStat getById(String currencyName, String id) { - return client.getSourceById(currencyName, TYPE, id, BlockchainBlockStat.class); + public BlockchainBlockStat toBlockStat(BlockchainBlock block) { + + BlockchainBlockStat result = newBlockStat(block); + + // Tx + if (CollectionUtils.isNotEmpty(block.getTransactions())) { + CounterMetric txChangeCounter = new CounterMetric(); + CounterMetric txAmountCounter = new CounterMetric(); + Arrays.stream(block.getTransactions()) + .forEach(tx -> { + long txAmount = BlockchainBlocks.getTxAmount(tx); + if (txAmount == 0l) { + txChangeCounter.inc(); + } + else { + txAmountCounter.inc(txAmount); + } + }); + result.setTxAmount(BigInteger.valueOf(txAmountCounter.count())); + result.setTxChangeCount((int)txChangeCounter.count()); + result.setTxCount(block.getTransactions().length); + } + else { + result.setTxAmount(BigInteger.valueOf(0)); + result.setTxChangeCount(0); + result.setTxCount(0); + } + + return result; } /* -- Internal methods -- */ + private BlockchainBlockStat newBlockStat(BlockchainBlock block) { + BlockchainBlockStat stat = new BlockchainBlockStat(); + + stat.setNumber(block.getNumber()); + stat.setCurrency(block.getCurrency()); + stat.setHash(block.getHash()); + stat.setIssuer(block.getIssuer()); + stat.setMedianTime(block.getMedianTime()); + stat.setMembersCount(block.getMembersCount()); + stat.setMonetaryMass(block.getMonetaryMass()); + stat.setUnitbase(block.getUnitbase()); + stat.setVersion(block.getVersion()); + stat.setDividend(block.getDividend()); + + return stat; + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java new file mode 100644 index 00000000..4160f437 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java @@ -0,0 +1,274 @@ +package org.duniter.elasticsearch.dao.impl; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.Preconditions; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.dao.AbstractDao; +import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.dao.MovementDao; +import org.duniter.elasticsearch.model.Movement; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.delete.DeleteRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; + +import java.io.IOException; + +/** + * Created by Benoit on 30/03/2015. + */ +public class MovementDaoImpl extends AbstractDao implements MovementDao { + + public MovementDaoImpl(){ + super("duniter.dao.movement"); + } + + @Override + public String getType() { + return TYPE; + } + + public void create(Movement operation, boolean wait) { + Preconditions.checkNotNull(operation); + Preconditions.checkArgument(StringUtils.isNotBlank(operation.getCurrency())); + Preconditions.checkNotNull(operation.getIssuer()); + Preconditions.checkNotNull(operation.getRecipient()); + Preconditions.checkNotNull(operation.getAmount()); + + // Serialize into JSON + try { + String json = objectMapper.writeValueAsString(operation); + + // Preparing + IndexRequestBuilder request = client.prepareIndex(operation.getCurrency(), TYPE) + .setRefresh(false) + .setSource(json); + + // Execute + client.safeExecuteRequest(request, wait); + } + catch(JsonProcessingException e) { + throw new TechnicalException(e); + } + } + + public boolean isExists(String currencyName, String id) { + return client.isDocumentExists(currencyName, TYPE, id); + } + + public void update(Movement operation, boolean wait) { + Preconditions.checkNotNull(operation); + Preconditions.checkArgument(StringUtils.isNotBlank(operation.getCurrency())); + Preconditions.checkNotNull(operation.getIssuer()); + Preconditions.checkNotNull(operation.getRecipient()); + Preconditions.checkNotNull(operation.getAmount()); + + // Serialize into JSON + try { + String json = objectMapper.writeValueAsString(operation); + + // Preparing + UpdateRequestBuilder request = client.prepareUpdate(operation.getCurrency(), TYPE, operation.getId()) + .setRefresh(true) + .setDoc(json); + + // Execute + client.safeExecuteRequest(request, wait); + } + catch(JsonProcessingException e) { + throw new TechnicalException(e); + } + } + + @Override + public void delete(String currency, String id, boolean wait) { + Preconditions.checkNotNull(currency); + Preconditions.checkNotNull(id); + + // Preparing request + DeleteRequestBuilder request = client.prepareDelete(currency, TYPE, id); + + // Execute + client.safeExecuteRequest(request, wait); + } + + @Override + public XContentBuilder createTypeMapping() { + try { + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject(TYPE) + .startObject("properties") + + // --- BLOCK properties --- + + // currency + .startObject(Movement.PROPERTY_CURRENCY) + .field("type", "string") + .endObject() + + // medianTime + .startObject(Movement.PROPERTY_MEDIAN_TIME) + .field("type", "long") + .endObject() + + // --- TX properties --- + + // version + .startObject(Movement.PROPERTY_VERSION) + .field("type", "integer") + .endObject() + + // issuer + .startObject(Movement.PROPERTY_ISSUER) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // recipient + .startObject(Movement.PROPERTY_RECIPIENT) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // amount + .startObject(Movement.PROPERTY_AMOUNT) + .field("type", "long") + .endObject() + + // unitbase + .startObject(Movement.PROPERTY_UNITBASE) + .field("type", "integer") + .endObject() + + // comment + .startObject(Movement.PROPERTY_COMMENT) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // --- OTHER properties --- + + // is UD ? + .startObject(Movement.PROPERTY_IS_UD) + .field("type", "boolean") + .field("index", "not_analyzed") + .endObject() + + // reference + .startObject(Movement.PROPERTY_REFERENCE) + .field("type", "nested") + .field("dynamic", "false") + .startObject("properties") + // reference.index + .startObject(Movement.Reference.PROPERTY_INDEX) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + // reference.index + .startObject(Movement.Reference.PROPERTY_TYPE) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .startObject(Movement.Reference.PROPERTY_ID) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .startObject(Movement.Reference.PROPERTY_HASH) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .startObject(Movement.Reference.PROPERTY_ANCHOR) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .endObject() + .endObject() + + .endObject() + .endObject().endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException("Error while getting mapping for block operation index: " + ioe.getMessage(), ioe); + } + } + + public BulkRequestBuilder bulkDeleteByBlock(final String currency, + final String number, + final String hash, + BulkRequestBuilder bulkRequest, + final int bulkSize, + final boolean flushAll) { + + Preconditions.checkNotNull(currency); + Preconditions.checkNotNull(number); + Preconditions.checkNotNull(bulkRequest); + Preconditions.checkArgument(bulkSize > 0); + + // Prepare search request + SearchRequestBuilder searchRequest = client + .prepareSearch(currency) + .setTypes(TYPE) + .setFetchSource(false) + .setSearchType(SearchType.QUERY_AND_FETCH); + + // Query = filter on reference + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Movement.PROPERTY_REFERENCE + "." + Movement.Reference.PROPERTY_INDEX, currency)) + .filter(QueryBuilders.termQuery(Movement.PROPERTY_REFERENCE + "." + Movement.Reference.PROPERTY_TYPE, BlockDao.TYPE)) + .filter(QueryBuilders.termQuery(Movement.PROPERTY_REFERENCE + "." + Movement.Reference.PROPERTY_ID, number)); + if (StringUtils.isNotBlank(hash)) { + boolQuery.filter(QueryBuilders.termQuery(Movement.PROPERTY_REFERENCE + "." + Movement.Reference.PROPERTY_HASH, hash)); + } + + searchRequest.setQuery(QueryBuilders.nestedQuery(Movement.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery))); + + // Execute query, while there is some data + return client.bulkDeleteFromSearch(currency, TYPE, searchRequest, bulkRequest, bulkSize, flushAll); + } + + public BulkRequestBuilder bulkDeleteByBlock(final BlockchainBlock block, + BulkRequestBuilder bulkRequest, + final int bulkSize, + final boolean flushAll) { + Preconditions.checkNotNull(block); + + return bulkDeleteByBlock(block.getCurrency(), String.valueOf(block.getNumber()), block.getHash(), bulkRequest, bulkSize, flushAll); + } + + /* -- Internal methods -- */ + +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/Movement.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/Movement.java new file mode 100644 index 00000000..d6c30f54 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/Movement.java @@ -0,0 +1,314 @@ +package org.duniter.elasticsearch.model; + +/* + * #%L + * Duniter4j :: Core Client API + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.local.LocalEntity; +import static org.duniter.core.util.Preconditions.*; +import org.duniter.elasticsearch.dao.BlockDao; + +import java.io.Serializable; + +/** + * Created by blavenie on 29/11/16. + */ +public class Movement implements LocalEntity<String>, Serializable { + + + public static Builder newBuilder() { + return new Builder(); + } + + public static Builder newBuilder(BlockchainBlock block) { + return new Builder(block); + } + + public static final String PROPERTY_CURRENCY = "currency"; + public static final String PROPERTY_MEDIAN_TIME = "medianTime"; + + public static final String PROPERTY_VERSION = "version"; + public static final String PROPERTY_ISSUER = "issuer"; + public static final String PROPERTY_RECIPIENT = "recipient"; + public static final String PROPERTY_AMOUNT = "amount"; + public static final String PROPERTY_UNITBASE = "unitbase"; + public static final String PROPERTY_COMMENT = "comment"; + + public static final String PROPERTY_IS_UD = "isUD"; + public static final String PROPERTY_REFERENCE = "reference"; + + // ES identifier + private String id; + + // Property copied from Block + private String currency; + private Long medianTime; + + // Property copied from Tx + private int version; + private String issuer; + private String recipient; + private Long amount; + private Integer unitbase; + private String comment; + + // Specific properties + private boolean isUD; + private Reference reference; + + public Movement() { + super(); + } + + @Override + @JsonIgnore + public String getId() { + return id; + } + + @Override + @JsonIgnore + public void setId(String id) { + this.id = id; + } + + public String getCurrency() { + return currency; + } + + public void setCurrency(String currency) { + this.currency = currency; + } + + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + + public String getIssuer() { + return issuer; + } + + public void setIssuer(String issuer) { + this.issuer = issuer; + } + + public Long getMedianTime() { + return medianTime; + } + + public void setMedianTime(Long medianTime) { + this.medianTime = medianTime; + } + + public String getRecipient() { + return recipient; + } + + public void setRecipient(String recipient) { + this.recipient = recipient; + } + + public Long getAmount() { + return amount; + } + + public void setAmount(Long amount) { + this.amount = amount; + } + + public Integer getUnitbase() { + return unitbase; + } + + public void setUnitbase(Integer unitbase) { + this.unitbase = unitbase; + } + + public String getComment() { + return comment; + } + + public void setComment(String comment) { + this.comment = comment; + } + + public boolean isUD() { + return isUD; + } + + public void setIsUD(boolean isUD) { + this.isUD = isUD; + } + + public Reference getReference() { + return reference; + } + + public void setReference(Reference reference) { + this.reference = reference; + } + + public static class Builder { + + private Movement result; + + private Builder() { + result = new Movement(); + } + + public Builder(BlockchainBlock block) { + this(); + setBlock(block); + } + + public Builder setBlock(BlockchainBlock block) { + result.setCurrency(block.getCurrency()); + result.setMedianTime(block.getMedianTime()); + result.setReference(new Reference(block.getCurrency(), BlockDao.TYPE, String.valueOf(block.getNumber()))); + setReferenceHash(block.getHash()); + return this; + } + + public Builder setReferenceHash(String hash) { + checkNotNull(result.getReference(), "No reference set. Please call setReference() first"); + result.getReference().setHash(hash); + return this; + } + + public Builder setRecipient(String recipient) { + result.setRecipient(recipient); + return this; + } + + public Builder setIssuer(String issuer) { + result.setIssuer(issuer); + return this; + } + + public Builder setVersion(int version) { + result.setVersion(version); + return this; + } + + public Builder setComment(String comment) { + result.setComment(comment); + return this; + } + + public Builder setAmount(long amount, int unitbase) { + result.setAmount(amount); + result.setUnitbase(unitbase); + return this; + } + + public Builder setIsUD(boolean isUD) { + result.setIsUD(isUD); + return this; + } + + public Movement build() { + checkNotNull(result); + checkNotNull(result.getAmount()); + checkNotNull(result.getUnitbase()); + checkNotNull(result.getRecipient()); + checkNotNull(result.getIssuer()); + checkNotNull(result.getCurrency()); + checkNotNull(result.getVersion()); + + return result; + } + } + + public static class Reference { + + public static final String PROPERTY_INDEX="index"; + public static final String PROPERTY_TYPE="type"; + public static final String PROPERTY_ID="id"; + public static final String PROPERTY_ANCHOR="anchor"; + public static final String PROPERTY_HASH="hash"; + + private String index; + + private String type; + + private String id; + + private String anchor; + + private String hash; + + public Reference() { + } + + public Reference(String index, String type, String id) { + this(index, type, id, null); + } + + public Reference(String index, String type, String id, String anchor) { + this.index = index; + this.type = type; + this.id = id; + this.anchor = anchor; + } + + public Reference(Reference another) { + this.index = another.getIndex(); + this.type = another.getType(); + this.id = another.getId(); + this.hash = another.getHash(); + this.anchor = another.getAnchor(); + } + + public String getIndex() { + return index; + } + + public String getType() { + return type; + } + + public String getId() { + return id; + } + + public String getAnchor() { + return anchor; + } + + public void setAnchor(String anchor) { + this.anchor = anchor; + } + + public String getHash() { + return hash; + } + + public void setHash(String hash) { + this.hash = hash; + } + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/Movements.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/Movements.java new file mode 100644 index 00000000..24cae9ae --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/Movements.java @@ -0,0 +1,95 @@ +package org.duniter.elasticsearch.model; + +/*- + * #%L + * Duniter4j :: Core Client API + * %% + * Copyright (C) 2014 - 2017 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.google.common.base.Preconditions; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.util.CollectionUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.duniter.core.client.model.bma.BlockchainBlocks.*; + +/** + * Created by blavenie on 26/04/17. + */ +public final class Movements { + + private Movements() { + // helper class + } + + public static Stream<Movement> stream(final BlockchainBlock block) { + Preconditions.checkNotNull(block); + + // No Tx + if (CollectionUtils.isEmpty(block.getTransactions())) { + return Stream.empty(); + } + + return Arrays.stream(block.getTransactions()) + .flatMap(tx -> Movements.streamFromTx(block, tx)); + } + + public static List<Movement> getMovements(final BlockchainBlock block) { + return stream(block).collect(Collectors.toList()); + } + + + /* -- Internal methods -- */ + + private static Stream<Movement> streamFromTx(final BlockchainBlock block, final BlockchainBlock.Transaction tx) { + + final List<TxInput> inputs = getTxInputs(tx); + final List<TxOutput> outputs = getTxOutputs(tx); + final Set<String> recipients = getTxRecipients(outputs); + + final long totalAmount = inputs.stream().mapToLong(input -> powBase(input.amount, input.unitbase)).sum(); + + return Arrays.stream(tx.getIssuers()) + .flatMap(issuer -> { + long issuerInputsAmount = getTxInputAmountByIssuer(inputs, issuer); + double issuerInputRatio = issuerInputsAmount / totalAmount; + + return recipients.stream() + // Compute the recipient amount + .map(recipient -> { + Double recipientAmount = getTxOutputAmountByIssuerAndRecipient(outputs, issuer, recipient) * issuerInputRatio; + return Movement.newBuilder(block) + .setAmount(recipientAmount.longValue(), 0/*unitbase*/) + .setIssuer(issuer) + .setRecipient(recipient) + .setVersion(tx.getVersion()) + .setComment(tx.getComment()) + .build(); + }) + // Exclude movements to itself (e.g. changes) + .filter(movement -> movement.getAmount() != 0); + }); + } + +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java index 722b3aec..39515f50 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java @@ -136,8 +136,6 @@ public abstract class AbstractBlockchainListenerService extends AbstractService protected abstract void processBlockDelete(ChangeEvent change); - protected abstract void beforeFlush(); - protected void flushBulkRequestOrSchedule() { if (flushing || bulkRequest.numberOfActions() == 0) return; @@ -150,7 +148,6 @@ public abstract class AbstractBlockchainListenerService extends AbstractService flushing = true; threadPool.schedule(() -> { synchronized (threadLock) { - beforeFlush(); client.flushBulk(bulkRequest); bulkRequest = client.prepareBulk(); flushing = false; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java new file mode 100644 index 00000000..d013524c --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java @@ -0,0 +1,134 @@ +package org.duniter.elasticsearch.service; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.BlockStatDao; +import org.duniter.elasticsearch.dao.MovementDao; +import org.duniter.elasticsearch.model.Movement; +import org.duniter.elasticsearch.model.BlockchainBlockStat; +import org.duniter.elasticsearch.model.Movements; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.TimeValue; + +import java.util.concurrent.TimeUnit; + +/** + * Created by Benoit on 26/04/2017. + */ +public class BlockchainListenerService extends AbstractBlockchainListenerService { + + private final BlockStatDao blockStatDao; + private final MovementDao movementDao; + + @Inject + public BlockchainListenerService(Duniter4jClient client, + PluginSettings settings, + CryptoService cryptoService, + ThreadPool threadPool, + BlockStatDao blockStatDao, + MovementDao movementDao) { + super("duniter.blockchain.listener", client, settings, cryptoService, threadPool, + new TimeValue(500, TimeUnit.MILLISECONDS)); + this.blockStatDao = blockStatDao; + this.movementDao = movementDao; + } + + @Override + protected void processBlockIndex(ChangeEvent change) { + + BlockchainBlock block = readBlock(change); + + // Block stat + { + BlockchainBlockStat stat = blockStatDao.toBlockStat(block); + + // Add a delete to bulk + bulkRequest.add(client.prepareDelete(block.getCurrency(), BlockStatDao.TYPE, String.valueOf(block.getNumber())) + .setRefresh(false)); + flushBulkRequestOrSchedule(); + + // Add a insert to bulk + try { + bulkRequest.add(client.prepareIndex(block.getCurrency(), BlockStatDao.TYPE, String.valueOf(block.getNumber())) + .setRefresh(false) // recommended for heavy indexing + .setSource(objectMapper.writeValueAsBytes(stat))); + flushBulkRequestOrSchedule(); + } catch (JsonProcessingException e) { + logger.error("Could not serialize BlockStat into JSON: " + e.getMessage(), e); + } + } + + // Movements + { + // Delete previous indexation + bulkRequest = movementDao.bulkDeleteByBlock(block.getCurrency(), + String.valueOf(block.getNumber()), + null, /*do NOT filter on hash = delete by block number*/ + bulkRequest, bulkSize, false); + + // Add a insert to bulk + Movements.stream(block) + .forEach(movement -> { + try { + bulkRequest.add(client.prepareIndex(block.getCurrency(), MovementDao.TYPE) + .setRefresh(false) // recommended for heavy indexing + .setSource(objectMapper.writeValueAsBytes(movement))); + flushBulkRequestOrSchedule(); + } catch (JsonProcessingException e) { + logger.error("Could not serialize BlockOperation into JSON: " + e.getMessage(), e); + } + }); + } + } + + protected void processBlockDelete(ChangeEvent change) { + // blockStat + { + // Add delete to bulk + bulkRequest.add(client.prepareDelete(change.getIndex(), BlockStatDao.TYPE, change.getId()) + .setRefresh(false)); + } + + // Operation + { + // Add delete to bulk + bulkRequest = movementDao.bulkDeleteByBlock( + change.getIndex(), + change.getId(), + null/*do kwown the hash*/, + bulkRequest, bulkSize, false); + flushBulkRequestOrSchedule(); + } + } + + /* -- internal method -- */ + +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java deleted file mode 100644 index 2bae7a1d..00000000 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java +++ /dev/null @@ -1,142 +0,0 @@ -package org.duniter.elasticsearch.service; - -/* - * #%L - * UCoin Java Client :: Core API - * %% - * Copyright (C) 2014 - 2015 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ - - -import com.fasterxml.jackson.core.JsonProcessingException; -import org.duniter.core.client.model.bma.BlockchainBlock; -import org.duniter.core.client.model.bma.util.BlockchainBlockUtils; -import org.duniter.core.service.CryptoService; -import org.duniter.core.util.CollectionUtils; -import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.dao.BlockStatDao; -import org.duniter.elasticsearch.model.BlockchainBlockStat; -import org.duniter.elasticsearch.service.changes.ChangeEvent; -import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.metrics.CounterMetric; -import org.elasticsearch.common.unit.TimeValue; - -import java.math.BigInteger; -import java.util.Arrays; -import java.util.concurrent.TimeUnit; - -/** - * Created by Benoit on 26/04/2017. - */ -public class BlockchainStatsService extends AbstractBlockchainListenerService { - - @Inject - public BlockchainStatsService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, - ThreadPool threadPool) { - super("duniter.blockchain.stats", client, settings, cryptoService, threadPool, - new TimeValue(500, TimeUnit.MILLISECONDS)); - } - - @Override - protected void processBlockIndex(ChangeEvent change) { - - BlockchainBlock block = readBlock(change); - BlockchainBlockStat stat = toBlockStat(block); - - // Add a delete to bulk - bulkRequest.add(client.prepareDelete(block.getCurrency(), BlockStatDao.TYPE, String.valueOf(block.getNumber())) - .setRefresh(false)); - flushBulkRequestOrSchedule(); - - // Add a insert to bulk - try { - bulkRequest.add(client.prepareIndex(block.getCurrency(), BlockStatDao.TYPE, String.valueOf(block.getNumber())) - .setRefresh(false) // recommended for heavy indexing - .setSource(objectMapper.writeValueAsString(stat))); - flushBulkRequestOrSchedule(); - } - catch(JsonProcessingException e) { - logger.error("Could not serialize BlockchainBlockStat into JSON: " + e.getMessage(), e); - } - } - - protected void processBlockDelete(ChangeEvent change) { - // Add delete to bulk - bulkRequest.add(client.prepareDelete(change.getIndex(), BlockStatDao.TYPE, change.getId()) - .setRefresh(false)); - flushBulkRequestOrSchedule(); - } - - protected void beforeFlush() { - // Nothing to do - } - - protected BlockchainBlockStat toBlockStat(BlockchainBlock block) { - - BlockchainBlockStat result = newBlockStat(block); - - // Tx - if (CollectionUtils.isNotEmpty(block.getTransactions())) { - CounterMetric txChangeCounter = new CounterMetric(); - CounterMetric txAmountCounter = new CounterMetric(); - Arrays.stream(block.getTransactions()) - .forEach(tx -> { - long txAmount = BlockchainBlockUtils.getTxAmount(tx); - if (txAmount == 0l) { - txChangeCounter.inc(); - } - else { - txAmountCounter.inc(txAmount); - } - }); - result.setTxAmount(BigInteger.valueOf(txAmountCounter.count())); - result.setTxChangeCount((int)txChangeCounter.count()); - result.setTxCount(block.getTransactions().length); - } - else { - result.setTxAmount(BigInteger.valueOf(0)); - result.setTxChangeCount(0); - result.setTxCount(0); - } - - return result; - } - - /* -- internal method -- */ - - private BlockchainBlockStat newBlockStat(BlockchainBlock block) { - BlockchainBlockStat stat = new BlockchainBlockStat(); - - stat.setNumber(block.getNumber()); - stat.setCurrency(block.getCurrency()); - stat.setHash(block.getHash()); - stat.setIssuer(block.getIssuer()); - stat.setMedianTime(block.getMedianTime()); - stat.setMembersCount(block.getMembersCount()); - stat.setMonetaryMass(block.getMonetaryMass()); - stat.setUnitbase(block.getUnitbase()); - stat.setVersion(block.getVersion()); - stat.setDividend(block.getDividend()); - - return stat; - } - - -} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java index 07fdae83..dbe90837 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java @@ -28,7 +28,6 @@ import org.duniter.core.client.dao.CurrencyDao; import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.BlockchainParameters; -import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.elasticsearch.Currency; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.bma.BlockchainRemoteService; @@ -41,23 +40,12 @@ import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.dao.*; import org.duniter.elasticsearch.exception.AccessDeniedException; import org.duniter.elasticsearch.exception.DuplicateIndexIdException; -import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; -import org.elasticsearch.index.query.IdsQueryBuilder; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHitField; -import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.Objects; /** * Created by Benoit on 30/03/2015. @@ -218,6 +206,10 @@ public class CurrencyService extends AbstractService { BlockStatDao blockStatDao = injector.getInstance(BlockStatDao.class); createIndexRequestBuilder.addMapping(blockStatDao.getType(), blockStatDao.createTypeMapping()); + // Add operation type + MovementDao operationDao = ServiceLocator.instance().getBean(MovementDao.class); + createIndexRequestBuilder.addMapping(operationDao.getType(), operationDao.createTypeMapping()); + createIndexRequestBuilder.execute().actionGet(); } }; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java index 711c2dc3..13a771ef 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java @@ -23,8 +23,6 @@ package org.duniter.elasticsearch.service; */ import org.duniter.core.beans.Bean; -import org.duniter.core.client.dao.CurrencyDao; -import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.service.DataContext; import org.duniter.core.client.service.HttpService; import org.duniter.core.client.service.bma.BlockchainRemoteService; @@ -40,7 +38,6 @@ import org.duniter.elasticsearch.service.changes.ChangeService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; -import org.elasticsearch.script.ScriptModule; public class ServiceModule extends AbstractModule implements Module { @@ -55,7 +52,7 @@ public class ServiceModule extends AbstractModule implements Module { // blockchain indexation services bind(BlockchainService.class).asEagerSingleton(); - bind(BlockchainStatsService.class).asEagerSingleton(); + bind(BlockchainListenerService.class).asEagerSingleton(); bind(PeerService.class).asEagerSingleton(); // Duniter Client API beans diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index ada0fbce..738864c7 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -24,38 +24,26 @@ package org.duniter.elasticsearch.user.service; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Multimap; import org.duniter.core.client.model.ModelUtils; import org.duniter.core.client.model.bma.BlockchainBlock; -import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.dao.BlockStatDao; -import org.duniter.elasticsearch.model.BlockchainBlockStat; import org.duniter.elasticsearch.service.AbstractBlockchainListenerService; import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.changes.ChangeEvent; -import org.duniter.elasticsearch.service.changes.ChangeService; -import org.duniter.elasticsearch.service.changes.ChangeSource; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.model.UserEventCodes; -import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.nuiton.i18n.I18n; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.BlockingDeque; -import java.util.concurrent.CompletableFuture; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -150,11 +138,6 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic flushBulkRequestOrSchedule(); } - - protected void beforeFlush() { - - } - /* -- internal method -- */ /** @@ -261,11 +244,15 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic event = userEventService.fillUserEvent(event); - bulkRequest.add(client.prepareIndex(UserEventService.INDEX, UserEventService.EVENT_TYPE) - .setSource(userEventService.toJson(event)) - .setRefresh(false)); - - flushBulkRequestOrSchedule(); + try { + bulkRequest.add(client.prepareIndex(UserEventService.INDEX, UserEventService.EVENT_TYPE) + .setSource(objectMapper.writeValueAsBytes(event)) + .setRefresh(false)); + flushBulkRequestOrSchedule(); + } + catch(JsonProcessingException e) { + logger.error("Could not serialize UserEvent into JSON: " + e.getMessage(), e); + } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index 7f9379c4..0cabefd7 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -420,53 +420,7 @@ public class UserEventService extends AbstractService implements ChangeService.C searchRequest.setQuery(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery))); // Execute query, while there is some data - try { - - int counter = 0; - boolean loop = true; - searchRequest.setSize(bulkSize); - SearchResponse response = searchRequest.execute().actionGet(); - do { - - // Read response - SearchHit[] searchHits = response.getHits().getHits(); - for (SearchHit searchHit : searchHits) { - - // Add deletion to bulk - bulkRequest.add( - client.prepareDelete(INDEX, EVENT_TYPE, searchHit.getId()) - ); - counter++; - - // Flush the bulk if not empty - if ((bulkRequest.numberOfActions() % bulkSize) == 0) { - client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); - bulkRequest = client.prepareBulk(); - } - } - - // Prepare next iteration - if (counter == 0 || counter >= response.getHits().getTotalHits()) { - loop = false; - } - // Prepare next iteration - else { - searchRequest.setFrom(counter); - response = searchRequest.execute().actionGet(); - } - } while(loop); - - // last flush - if (flushAll && (bulkRequest.numberOfActions() % bulkSize) != 0) { - client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); - } - - } catch (SearchPhaseExecutionException e) { - // Failed or no item on index - logger.error(String.format("Error while deleting by reference: %s. Skipping deletions.", e.getMessage()), e); - } - - return bulkRequest; + return client.bulkDeleteFromSearch(INDEX, EVENT_TYPE, searchRequest, bulkRequest, bulkSize, flushAll); } -- GitLab