From b51320a55b51e935615357792d50845900c7c7b2 Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Wed, 26 Apr 2017 22:41:50 +0200 Subject: [PATCH] [enh] add new index <currency>/blockStat [fix] fix typo on plugin settings 'startlls' --- .../model/bma/util/BlockchainBlockUtils.java | 75 ++++++ .../duniter/core/service/MailServiceImpl.java | 2 +- .../src/test/misc/blocksByIssuer.sh | 21 ++ .../src/test/misc/test_es_query.sh | 59 +++-- .../org/duniter/elasticsearch/Plugin.java | 11 +- .../org/duniter/elasticsearch/PluginInit.java | 44 ++-- .../duniter/elasticsearch/dao/BlockDao.java | 3 + .../elasticsearch/dao/BlockStatDao.java | 64 +++++ .../elasticsearch/dao/CurrencyExtendDao.java | 2 + .../duniter/elasticsearch/dao/DaoModule.java | 2 + .../duniter/elasticsearch/dao/PeerDao.java | 12 + .../elasticsearch/dao/impl/BlockDaoImpl.java | 66 +++-- .../dao/impl/BlockStatDaoImpl.java | 242 ++++++++++++++++++ .../dao/impl/CurrencyDaoImpl.java | 12 +- .../elasticsearch/dao/impl/PeerDaoImpl.java | 19 +- .../model/BlockchainBlockStat.java | 161 ++++++++++++ .../BlockchainTxCountScriptFactory.java | 31 +++ .../service/BlockchainService.java | 12 +- .../service/BlockchainStatsService.java | 194 ++++++++++++++ .../service/CurrencyService.java | 8 +- .../elasticsearch/service/ServiceModule.java | 4 + .../service/SubscriptionService.java | 8 + .../subscription/service/SynchroService.java | 4 +- .../elasticsearch/user/PluginSettings.java | 2 +- .../service/BlockchainUserEventService.java | 35 ++- 25 files changed, 988 insertions(+), 105 deletions(-) create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/util/BlockchainBlockUtils.java create mode 100755 duniter4j-es-assembly/src/test/misc/blocksByIssuer.sh create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/BlockchainBlockStat.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/script/BlockchainTxCountScriptFactory.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/util/BlockchainBlockUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/util/BlockchainBlockUtils.java new file mode 100644 index 00000000..8ad5793a --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/util/BlockchainBlockUtils.java @@ -0,0 +1,75 @@ +package org.duniter.core.client.model.bma.util; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import jnr.ffi.annotations.In; +import org.duniter.core.client.model.bma.BlockchainBlock; + +import java.math.BigInteger; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.IntStream; + +/** + * Created by blavenie on 26/04/17. + */ +public final class BlockchainBlockUtils { + + public static final Pattern TX_UNLOCK_PATTERN = Pattern.compile("([0-9]+):SIG\\(([^)]+)\\)"); + public static final Pattern TX_OUTPUT_PATTERN = Pattern.compile("([0-9]+):([0-9]+):SIG\\(([^)]+)\\)"); + + private BlockchainBlockUtils () { + // helper class + } + + public static BigInteger getTxAmount(BlockchainBlock block) { + BigInteger result = BigInteger.valueOf(0l); + Arrays.stream(block.getTransactions()) + .forEach(tx -> result.add(BigInteger.valueOf(getTxAmount(tx)))); + return result; + } + + public static long getTxAmount(final BlockchainBlock.Transaction tx) { + + final Map<Integer, Integer> inputsByIssuer = Maps.newHashMap(); + Arrays.stream(tx.getUnlocks()) + .map(TX_UNLOCK_PATTERN::matcher) + .filter(Matcher::matches) + .forEach(matcher -> inputsByIssuer.put( + Integer.parseInt(matcher.group(1)), + Integer.parseInt(matcher.group(2))) + ); + + return IntStream.range(0, tx.getIssuers().length) + .mapToLong(i -> { + final String issuer = tx.getIssuers()[i]; + + long inputSum = IntStream.range(0, tx.getInputs().length) + .filter(j -> i == inputsByIssuer.get(j)) + .mapToObj(j -> tx.getInputs()[j]) + .map(input -> input.split(":")) + .filter(inputParts -> inputParts.length > 2) + .mapToLong(inputParts -> powBase(Long.parseLong(inputParts[0]), Integer.parseInt(inputParts[1]))) + .sum(); + + long outputSum = Arrays.stream(tx.getOutputs()) + .map(TX_OUTPUT_PATTERN::matcher) + .filter(Matcher::matches) + .filter(matcher -> issuer.equals(matcher.group(3))) + .mapToLong(matcher -> powBase(Long.parseLong(matcher.group(1)), Integer.parseInt(matcher.group(2)))) + .sum(); + + return (inputSum - outputSum); + }) + .sum(); + } + + + private static long powBase(long amount, int unitbase) { + if (unitbase == 0) return amount; + return amount * (long)Math.pow(10, unitbase); + } +} diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/service/MailServiceImpl.java b/duniter4j-core-shared/src/main/java/org/duniter/core/service/MailServiceImpl.java index c53ad574..95a8dc90 100644 --- a/duniter4j-core-shared/src/main/java/org/duniter/core/service/MailServiceImpl.java +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/service/MailServiceImpl.java @@ -266,7 +266,7 @@ public class MailServiceImpl implements MailService, Closeable { props.put("mail.smtp.host", config.getSmtpHost()); props.put("mail.smtp.port", config.getSmtpPort()); if (StringUtils.isNotBlank(config.getSenderAddress())) { - props.put("mail.from.alias", config.getSenderAddress()); + props.put("mail.from", config.getSenderAddress()); if (StringUtils.isNotBlank(config.getSenderName())) { props.put("mail.from.alias", config.getSenderName()); } diff --git a/duniter4j-es-assembly/src/test/misc/blocksByIssuer.sh b/duniter4j-es-assembly/src/test/misc/blocksByIssuer.sh new file mode 100755 index 00000000..26220b28 --- /dev/null +++ b/duniter4j-es-assembly/src/test/misc/blocksByIssuer.sh @@ -0,0 +1,21 @@ +#!/bin/sh + +curl -XPOST 'http://localhost:9200/g1/block/_search?pretty' -d ' + { + "size": 0, + "aggs": { + "blocksByIssuer": { + "terms": { + "field": "issuer", + "size": 0 + }, + "aggs" : { + "difficulty_stats" : { + "stats" : { + "field" : "powMin" + } + } + } + } + } + }' diff --git a/duniter4j-es-assembly/src/test/misc/test_es_query.sh b/duniter4j-es-assembly/src/test/misc/test_es_query.sh index ea670c76..2151fb3c 100755 --- a/duniter4j-es-assembly/src/test/misc/test_es_query.sh +++ b/duniter4j-es-assembly/src/test/misc/test_es_query.sh @@ -2,22 +2,49 @@ curl -XPOST 'http://localhost:9200/g1/block/_search?pretty' -d ' { - "size": 10000, - "query": { - "filtered": { - "filter": { - "bool": { - "must": [ - { - "exists": { - "field": "dividend" + "size": 0, + "aggs": { + "txByRange": { + "range": { + "field" : "medianTime", + "ranges" : [ + { "from" : 1491955200, "to" : 1492041600 } + ] + }, + "aggs" : { + "tx_stats" : { + "stats" : { + "script" : { + "inline" : "txcount", + "lang": "native" + } } - } - ] - } + }, + "time" : { + "stats" : { "field" : "medianTime" } + } } - } - }, - "_source": ["dividend", "monetaryMass", "membersCount"], - sort + } + } + }' + + +curl -XPOST 'http://localhost:9200/g1/block/_search?pretty' -d ' + { + "size": 0, + "aggs": { + "blocksByIssuer": { + "terms": { + "field": "issuer", + "size": 0 + }, + "aggs" : { + "difficulty_stats" : { + "stats" : { + "field" : "difficulty" + } + } + } + } + } }' diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/Plugin.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/Plugin.java index 693feb1b..deaec741 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/Plugin.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/Plugin.java @@ -25,6 +25,7 @@ package org.duniter.elasticsearch; import com.google.common.collect.Lists; import org.duniter.elasticsearch.dao.DaoModule; import org.duniter.elasticsearch.rest.RestModule; +import org.duniter.elasticsearch.script.BlockchainTxCountScriptFactory; import org.duniter.elasticsearch.security.SecurityModule; import org.duniter.elasticsearch.service.ServiceModule; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -59,6 +60,13 @@ public class Plugin extends org.elasticsearch.plugins.Plugin { return "Duniter ElasticSearch Plugin"; } + @Inject + public void onModule(org.elasticsearch.script.ScriptModule scriptModule) { + // TODO: in ES v5+, see example here : + // https://github.com/imotov/elasticsearch-native-script-example/blob/60a390f77f2fb25cb89d76de5071c52207a57b5f/src/main/java/org/elasticsearch/examples/nativescript/plugin/NativeScriptExamplesPlugin.java + scriptModule.registerScript("txcount", BlockchainTxCountScriptFactory.class); + } + @Override public Collection<Module> nodeModules() { Collection<Module> modules = Lists.newArrayList(); @@ -71,9 +79,10 @@ public class Plugin extends org.elasticsearch.plugins.Plugin { modules.add(new WebSocketModule()); modules.add(new RestModule()); + modules.add(new DaoModule()); modules.add(new ServiceModule()); - + //modules.add(new ScriptModule()); return modules; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 6f285aed..6607ca13 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -24,12 +24,15 @@ package org.duniter.elasticsearch; import org.duniter.core.client.model.elasticsearch.Currency; import org.duniter.core.client.model.local.Peer; +import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.dao.BlockStatDao; +import org.duniter.elasticsearch.dao.PeerDao; import org.duniter.elasticsearch.rest.security.RestSecurityController; import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.BlockchainStatsService; import org.duniter.elasticsearch.service.CurrencyService; import org.duniter.elasticsearch.service.PeerService; import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -120,20 +123,31 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { Currency currency = injector.getInstance(CurrencyService.class) .indexCurrencyFromPeer(peer, true); - // Add access to currency/block index - injector.getInstance(RestSecurityController.class).allowIndexType(RestRequest.Method.GET, - currency.getCurrencyName(), - BlockchainService.BLOCK_TYPE); - injector.getInstance(RestSecurityController.class).allowPostSearchIndexType( - currency.getCurrencyName(), - BlockchainService.BLOCK_TYPE); - // Add access to currency/peer index - injector.getInstance(RestSecurityController.class).allowIndexType(RestRequest.Method.GET, - currency.getCurrencyName(), - BlockchainService.PEER_TYPE); - injector.getInstance(RestSecurityController.class).allowPostSearchIndexType( - currency.getCurrencyName(), - BlockchainService.PEER_TYPE); + injector.getInstance(RestSecurityController.class) + + // Add access to <currency>/block index + .allowIndexType(RestRequest.Method.GET, + currency.getCurrencyName(), + BlockDao.TYPE) + .allowPostSearchIndexType( + currency.getCurrencyName(), + BlockDao.TYPE) + + // Add access to <currency>/blockStat index + .allowIndexType(RestRequest.Method.GET, + currency.getCurrencyName(), + BlockStatDao.TYPE) + .allowPostSearchIndexType( + currency.getCurrencyName(), + BlockStatDao.TYPE) + + // Add access to <currency>/peer index + .allowIndexType(RestRequest.Method.GET, + currency.getCurrencyName(), + PeerDao.TYPE) + .allowPostSearchIndexType( + currency.getCurrencyName(), + PeerDao.TYPE); // Index blocks (and listen if new block appear) injector.getInstance(BlockchainService.class) diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java index 758614f7..e14e88c9 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java @@ -32,6 +32,9 @@ import java.util.List; */ public interface BlockDao extends Bean, TypeDao<BlockDao> { + String TYPE = "block"; + + void create(BlockchainBlock block, boolean wait); /** diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java new file mode 100644 index 00000000..370e220d --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java @@ -0,0 +1,64 @@ +package org.duniter.elasticsearch.dao; + +/*- + * #%L + * Duniter4j :: ElasticSearch Core plugin + * %% + * Copyright (C) 2014 - 2017 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.core.beans.Bean; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.elasticsearch.model.BlockchainBlockStat; + +import java.util.List; + +/** + * Created by blavenie on 03/04/17. + */ +public interface BlockStatDao extends Bean, TypeDao<BlockStatDao> { + + String TYPE = "blockStat"; + + void create(BlockchainBlockStat block, boolean wait); + + /** + * + * @param currencyName + * @param number the block number + * @param json block as JSON + */ + void create(String currencyName, String id, byte[] json, boolean wait); + + boolean isExists(String currencyName, String id); + + void update(BlockchainBlockStat block, boolean wait); + + /** + * + * @param currencyName + * @param number the block number, or -1 for current + * @param json block as JSON + */ + void update(String currencyName, String id, byte[] json, boolean wait); + + void delete(String currency, String id, boolean wait); + + BlockchainBlockStat getById(String currencyName, String id); + +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java index 2659bdeb..ad226bf3 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java @@ -28,4 +28,6 @@ import org.duniter.core.client.dao.CurrencyDao; * Created by blavenie on 03/04/17. */ public interface CurrencyExtendDao extends CurrencyDao, IndexTypeDao<CurrencyExtendDao> { + String INDEX = "currency"; + String RECORD_TYPE = "record"; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java index f1b870dd..4e42309a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java @@ -27,6 +27,7 @@ import org.duniter.core.client.dao.CurrencyDao; import org.duniter.core.client.dao.PeerDao; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.client.Duniter4jClientImpl; +import org.duniter.elasticsearch.dao.impl.BlockStatDaoImpl; import org.duniter.elasticsearch.service.ServiceLocator; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -36,6 +37,7 @@ public class DaoModule extends AbstractModule implements Module { @Override protected void configure() { bind(Duniter4jClient.class).to(Duniter4jClientImpl.class).asEagerSingleton(); + bind(BlockStatDao.class).to(BlockStatDaoImpl.class).asEagerSingleton(); bindWithLocator(BlockDao.class); bindWithLocator(PeerDao.class); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java new file mode 100644 index 00000000..d5405fcd --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java @@ -0,0 +1,12 @@ +package org.duniter.elasticsearch.dao; + +import org.duniter.elasticsearch.dao.impl.PeerDaoImpl; + +/** + * Created by blavenie on 26/04/17. + */ +public interface PeerDao extends org.duniter.core.client.dao.PeerDao, TypeDao<PeerDaoImpl>{ + + String TYPE = "peer"; + +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java index 0520d7b5..28858ff0 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java @@ -56,7 +56,6 @@ import java.util.Map; */ public class BlockDaoImpl extends AbstractDao implements BlockDao { - public static final String BLOCK_TYPE = "block"; public BlockDaoImpl(){ super("duniter.dao.block"); @@ -64,7 +63,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { @Override public String getType() { - return BLOCK_TYPE; + return TYPE; } public void create(BlockchainBlock block, boolean wait) { @@ -79,7 +78,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { String json = objectMapper.writeValueAsString(block); // Preparing - IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), BLOCK_TYPE) + IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE) .setId(block.getNumber().toString()) .setSource(json); @@ -104,7 +103,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { Preconditions.checkArgument(json.length > 0); // Preparing indexBlocksFromNode - IndexRequestBuilder request = client.prepareIndex(currencyName, BLOCK_TYPE) + IndexRequestBuilder request = client.prepareIndex(currencyName, TYPE) .setId(id) .setRefresh(true) .setSource(json); @@ -114,7 +113,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { } public boolean isExists(String currencyName, String id) { - return client.isDocumentExists(currencyName, BLOCK_TYPE, id); + return client.isDocumentExists(currencyName, TYPE, id); } public void update(BlockchainBlock block, boolean wait) { @@ -129,7 +128,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { String json = objectMapper.writeValueAsString(block); // Preparing - UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), BLOCK_TYPE, block.getNumber().toString()) + UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString()) .setRefresh(true) .setDoc(json); @@ -153,7 +152,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { Preconditions.checkArgument(json.length > 0); // Preparing indexBlocksFromNode - UpdateRequestBuilder request = client.prepareUpdate(currencyName, BLOCK_TYPE, id) + UpdateRequestBuilder request = client.prepareUpdate(currencyName, TYPE, id) .setRefresh(true) .setDoc(json); @@ -167,7 +166,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { // Prepare request SearchRequestBuilder searchRequest = client .prepareSearch(currencyName) - .setTypes(BLOCK_TYPE) + .setTypes(TYPE) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); // If only one term, search as prefix @@ -201,7 +200,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { // Prepare request SearchRequestBuilder searchRequest = client .prepareSearch(currencyName) - .setTypes(BLOCK_TYPE) + .setTypes(TYPE) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); // Get max(number) @@ -226,17 +225,43 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { try { XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() - .startObject(BLOCK_TYPE) + .startObject(TYPE) .startObject("properties") - // block number + // currency + .startObject("currency") + .field("type", "string") + .endObject() + + // version + .startObject("version") + .field("type", "integer") + .endObject() + + // time + .startObject("time") + .field("type", "long") + .endObject() + + // medianTime + .startObject("medianTime") + .field("type", "long") + .endObject() + + // number .startObject("number") .field("type", "integer") .endObject() + // nonce + .startObject("nonce") + .field("type", "long") + .endObject() + // hash .startObject("hash") .field("type", "string") + .field("index", "not_analyzed") .endObject() // issuer @@ -250,22 +275,17 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { .field("type", "string") .endObject() - // membercount - .startObject("memberCount") + // membersCount + .startObject("membersCount") .field("type", "integer") .endObject() - // membersChanges - .startObject("membersChanges") - .field("type", "string") - .endObject() - // unitbase .startObject("unitbase") .field("type", "integer") .endObject() - // membersChanges + // monetaryMass .startObject("monetaryMass") .field("type", "long") .endObject() @@ -290,7 +310,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { } public BlockchainBlock getBlockById(String currencyName, String id) { - return client.getSourceById(currencyName, BLOCK_TYPE, id, BlockchainBlock.class); + return client.getSourceById(currencyName, TYPE, id, BlockchainBlock.class); } /** @@ -306,18 +326,18 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { for (int number=fromNumber; number<=toNumber; number++) { bulkRequest.add( - client.prepareDelete(currencyName, BLOCK_TYPE, String.valueOf(number)) + client.prepareDelete(currencyName, TYPE, String.valueOf(number)) ); // Flush the bulk if not empty if ((fromNumber - number % bulkSize) == 0) { - client.flushDeleteBulk(currencyName, BLOCK_TYPE, bulkRequest); + client.flushDeleteBulk(currencyName, TYPE, bulkRequest); bulkRequest = client.prepareBulk(); } } // last flush - client.flushDeleteBulk(currencyName, BLOCK_TYPE, bulkRequest); + client.flushDeleteBulk(currencyName, TYPE, bulkRequest); } /* -- Internal methods -- */ diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java new file mode 100644 index 00000000..461d978f --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java @@ -0,0 +1,242 @@ +package org.duniter.elasticsearch.dao.impl; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.Preconditions; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.dao.AbstractDao; +import org.duniter.elasticsearch.dao.BlockStatDao; +import org.duniter.elasticsearch.model.BlockchainBlockStat; +import org.elasticsearch.action.delete.DeleteRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; + +import java.io.IOException; + +/** + * Created by Benoit on 30/03/2015. + */ +public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { + + public BlockStatDaoImpl(){ + super("duniter.dao.blockStat"); + } + + @Override + public String getType() { + return TYPE; + } + + public void create(BlockchainBlockStat block, boolean wait) { + Preconditions.checkNotNull(block); + Preconditions.checkArgument(StringUtils.isNotBlank(block.getCurrency())); + Preconditions.checkNotNull(block.getHash()); + Preconditions.checkNotNull(block.getNumber()); + + // Serialize into JSON + // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) + try { + String json = objectMapper.writeValueAsString(block); + + // Preparing + IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE) + .setId(block.getNumber().toString()) + .setSource(json); + + // Execute + client.safeExecuteRequest(request, wait); + } + catch(JsonProcessingException e) { + throw new TechnicalException(e); + } + } + + @Override + public void create(String currencyName, String id, byte[] json, boolean wait) { + Preconditions.checkNotNull(currencyName); + Preconditions.checkNotNull(id); + Preconditions.checkNotNull(json); + Preconditions.checkArgument(json.length > 0); + + // Preparing indexBlocksFromNode + IndexRequestBuilder request = client.prepareIndex(currencyName, TYPE) + .setId(id) + .setRefresh(true) + .setSource(json); + + // Execute + client.safeExecuteRequest(request, wait); + } + + public boolean isExists(String currencyName, String id) { + return client.isDocumentExists(currencyName, TYPE, id); + } + + public void update(BlockchainBlockStat block, boolean wait) { + Preconditions.checkNotNull(block); + Preconditions.checkArgument(StringUtils.isNotBlank(block.getCurrency())); + Preconditions.checkNotNull(block.getNumber()); + + // Serialize into JSON + // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) + try { + String json = objectMapper.writeValueAsString(block); + + // Preparing + UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString()) + .setRefresh(true) + .setDoc(json); + + // Execute + client.safeExecuteRequest(request, wait); + } + catch(JsonProcessingException e) { + throw new TechnicalException(e); + } + } + + /** + * + * @param currencyName + * @param id the block id + * @param json block as JSON + */ + public void update(String currencyName, String id, byte[] json, boolean wait) { + Preconditions.checkNotNull(currencyName); + Preconditions.checkNotNull(json); + Preconditions.checkArgument(json.length > 0); + + // Preparing index + UpdateRequestBuilder request = client.prepareUpdate(currencyName, TYPE, id) + .setRefresh(true) + .setDoc(json); + + // Execute + client.safeExecuteRequest(request, wait); + } + + @Override + public void delete(String currency, String id, boolean wait) { + Preconditions.checkNotNull(currency); + Preconditions.checkNotNull(id); + + // Preparing request + DeleteRequestBuilder request = client.prepareDelete(currency, TYPE, id); + + // Execute + client.safeExecuteRequest(request, wait); + } + + @Override + public XContentBuilder createTypeMapping() { + try { + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject(TYPE) + .startObject("properties") + + // currency + .startObject(BlockchainBlockStat.PROPERTY_CURRENCY) + .field("type", "string") + .endObject() + + // version + .startObject(BlockchainBlockStat.PROPERTY_VERSION) + .field("type", "integer") + .endObject() + + // block number + .startObject(BlockchainBlockStat.PROPERTY_NUMBER) + .field("type", "integer") + .endObject() + + // medianTime + .startObject(BlockchainBlockStat.PROPERTY_MEDIAN_TIME) + .field("type", "long") + .endObject() + + // hash + .startObject(BlockchainBlockStat.PROPERTY_HASH) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // membersCount + .startObject(BlockchainBlockStat.PROPERTY_MEMBERS_COUNT) + .field("type", "integer") + .endObject() + + // unitbase + .startObject(BlockchainBlockStat.PROPERTY_UNITBASE) + .field("type", "integer") + .endObject() + + // monetaryMass + .startObject(BlockchainBlockStat.PROPERTY_MONETARY_MASS) + .field("type", "long") + .endObject() + + // dividend + .startObject(BlockchainBlockStat.PROPERTY_DIVIDEND) + .field("type", "integer") + .endObject() + + // --- STATS properties --- + + // txCount + .startObject(BlockchainBlockStat.PROPERTY_TX_COUNT) + .field("type", "integer") + .endObject() + + // txAmount + .startObject(BlockchainBlockStat.PROPERTY_TX_AMOUNT) + .field("type", "long") + .endObject() + + // txChangeAmount + .startObject(BlockchainBlockStat.PROPERTY_TX_CHANGE_COUNT) + .field("type", "integer") + .endObject() + + .endObject() + .endObject().endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException("Error while getting mapping for block stat index: " + ioe.getMessage(), ioe); + } + } + + public BlockchainBlockStat getById(String currencyName, String id) { + return client.getSourceById(currencyName, TYPE, id, BlockchainBlockStat.class); + } + + /* -- Internal methods -- */ + +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java index a077e54d..07f265ec 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java @@ -24,7 +24,6 @@ package org.duniter.elasticsearch.dao.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; -import org.duniter.core.client.dao.CurrencyDao; import org.duniter.core.client.model.local.Currency; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.Preconditions; @@ -38,7 +37,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import java.io.IOException; -import java.util.*; +import java.util.List; +import java.util.Map; /** * Created by blavenie on 29/12/15. @@ -47,9 +47,6 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp protected static final String REGEX_WORD_SEPARATOR = "[-\\t@# _]+"; - public static final String INDEX = "currency"; - public static final String RECORD_TYPE = "record"; - public CurrencyDaoImpl(){ super(INDEX, RECORD_TYPE); } @@ -66,8 +63,6 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp // Serialize into JSON byte[] json = objectMapper.writeValueAsBytes(currency); - System.out.println(objectMapper.writeValueAsString(currency)); - // Preparing indexBlocksFromNode IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_TYPE) .setId(currency.getId()) @@ -158,7 +153,8 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp @Override public XContentBuilder createTypeMapping() { try { - XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_TYPE) + XContentBuilder mapping = XContentFactory.jsonBuilder().startObject() + .startObject(RECORD_TYPE) .startObject("properties") // currency diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index d80d3f0d..5aaf3427 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -23,12 +23,12 @@ package org.duniter.elasticsearch.dao.impl; */ import com.fasterxml.jackson.core.JsonProcessingException; -import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.local.Peer; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.dao.AbstractDao; +import org.duniter.elasticsearch.dao.PeerDao; import org.duniter.elasticsearch.dao.TypeDao; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder; @@ -41,10 +41,7 @@ import java.util.List; /** * Created by blavenie on 29/12/15. */ -public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDaoImpl> { - - - public static final String PEER_TYPE = "peer"; +public class PeerDaoImpl extends AbstractDao implements PeerDao { public PeerDaoImpl(){ super("duniter.dao.peer"); @@ -52,7 +49,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao @Override public String getType() { - return PEER_TYPE; + return TYPE; } @Override @@ -70,7 +67,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao String json = objectMapper.writeValueAsString(peer); // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = client.prepareIndex(peer.getCurrency(), PEER_TYPE) + IndexRequestBuilder indexRequest = client.prepareIndex(peer.getCurrency(), TYPE) .setId(peer.getId()) .setSource(json); @@ -99,7 +96,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao String json = objectMapper.writeValueAsString(peer); // Preparing indexBlocksFromNode - UpdateRequestBuilder updateRequest = client.prepareUpdate(peer.getCurrency(), PEER_TYPE, peer.getId()) + UpdateRequestBuilder updateRequest = client.prepareUpdate(peer.getCurrency(), TYPE, peer.getId()) .setDoc(json); // Execute indexBlocksFromNode @@ -125,7 +122,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao Preconditions.checkArgument(StringUtils.isNotBlank(peer.getCurrency())); // Delete the document - client.prepareDelete(peer.getCurrency(), PEER_TYPE, peer.getId()).execute().actionGet(); + client.prepareDelete(peer.getCurrency(), TYPE, peer.getId()).execute().actionGet(); } @Override @@ -135,7 +132,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao @Override public boolean isExists(String currencyId, String peerId) { - return client.isDocumentExists(currencyId, PEER_TYPE, peerId); + return client.isDocumentExists(currencyId, TYPE, peerId); } @Override @@ -143,7 +140,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao try { XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() - .startObject(PEER_TYPE) + .startObject(TYPE) .startObject("properties") // currency diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/BlockchainBlockStat.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/BlockchainBlockStat.java new file mode 100644 index 00000000..75560fa7 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/BlockchainBlockStat.java @@ -0,0 +1,161 @@ +package org.duniter.elasticsearch.model; + +/* + * #%L + * Duniter4j :: Core Client API + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import java.io.Serializable; +import java.math.BigInteger; + +/** + * Created by blavenie on 29/11/16. + */ +public class BlockchainBlockStat implements Serializable { + + public static final String PROPERTY_VERSION = "version"; + public static final String PROPERTY_CURRENCY = "currency"; + public static final String PROPERTY_NUMBER = "number"; + public static final String PROPERTY_HASH = "hash"; + public static final String PROPERTY_MEDIAN_TIME = "medianTime"; + public static final String PROPERTY_MEMBERS_COUNT = "membersCount"; + public static final String PROPERTY_MONETARY_MASS = "monetaryMass"; + public static final String PROPERTY_UNITBASE= "unitbase"; + public static final String PROPERTY_DIVIDEND = "dividend"; + public static final String PROPERTY_TX_COUNT = "txCount"; + public static final String PROPERTY_TX_AMOUNT = "txAmount"; + public static final String PROPERTY_TX_CHANGE_COUNT = "txChangeCount"; + + // Property copied from Block + private String version; + private String currency; + private Integer number; + private String hash; + private Long medianTime; + private Integer membersCount; + private BigInteger monetaryMass; + private Integer unitbase; + private BigInteger dividend; + + // Statistics + private Integer txCount; + private BigInteger txAmount; + private Integer txChangeCount; + + public BlockchainBlockStat() { + super(); + } + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getCurrency() { + return currency; + } + + public void setCurrency(String currency) { + this.currency = currency; + } + + public BigInteger getDividend() { + return dividend; + } + + public void setDividend(BigInteger dividend) { + this.dividend = dividend; + } + + public Integer getTxCount() { + return txCount; + } + + public void setTxCount(Integer txCount) { + this.txCount = txCount; + } + + public BigInteger getTxAmount() { + return txAmount; + } + + public void setTxAmount(BigInteger txAmount) { + this.txAmount = txAmount; + } + + public Integer getTxChangeCount() { + return txChangeCount; + } + + public void setTxChangeCount(Integer txChangeCount) { + this.txChangeCount = txChangeCount; + } + + public Integer getNumber() { + return number; + } + + public void setNumber(Integer number) { + this.number = number; + } + + public String getHash() { + return hash; + } + + public void setHash(String hash) { + this.hash = hash; + } + + public Long getMedianTime() { + return medianTime; + } + + public void setMedianTime(Long medianTime) { + this.medianTime = medianTime; + } + + public Integer getMembersCount() { + return membersCount; + } + + public void setMembersCount(Integer membersCount) { + this.membersCount = membersCount; + } + + public BigInteger getMonetaryMass() { + return monetaryMass; + } + + public void setMonetaryMass(BigInteger monetaryMass) { + this.monetaryMass = monetaryMass; + } + + public Integer getUnitbase() { + return unitbase; + } + + public void setUnitbase(Integer unitbase) { + this.unitbase = unitbase; + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/script/BlockchainTxCountScriptFactory.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/script/BlockchainTxCountScriptFactory.java new file mode 100644 index 00000000..34774866 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/script/BlockchainTxCountScriptFactory.java @@ -0,0 +1,31 @@ +package org.duniter.elasticsearch.script; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.script.AbstractFloatSearchScript; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.NativeScriptFactory; + +import java.util.List; +import java.util.Map; + +public class BlockchainTxCountScriptFactory implements NativeScriptFactory { + + @Override + public ExecutableScript newScript(@Nullable Map<String, Object> params) { + return new BlockchainTxCountScript(); + } + + @Override + public boolean needsScores() { + return false; + } + + public class BlockchainTxCountScript extends AbstractFloatSearchScript { + + @Override + public float runAsFloat() { + Object a = source().get("transactions"); + return a != null ? ((List)a).size() : 0; + } + } +} \ No newline at end of file diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 25b2134c..72747504 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -23,7 +23,6 @@ package org.duniter.elasticsearch.service; */ -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -31,9 +30,6 @@ import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.BlockchainParameters; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; -import org.duniter.core.util.ObjectUtils; -import org.duniter.core.util.json.JsonAttributeParser; -import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.service.bma.BlockchainRemoteService; import org.duniter.core.client.service.bma.NetworkRemoteService; import org.duniter.core.client.service.exception.BlockNotFoundException; @@ -42,12 +38,16 @@ import org.duniter.core.model.NullProgressionModel; import org.duniter.core.model.ProgressionModel; import org.duniter.core.model.ProgressionModelImpl; import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; +import org.duniter.core.util.json.JsonAttributeParser; import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.dao.BlockStatDao; +import org.duniter.elasticsearch.dao.PeerDao; import org.duniter.elasticsearch.exception.DuplicateIndexIdException; import org.duniter.elasticsearch.exception.NotFoundException; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -65,8 +65,8 @@ import java.util.*; */ public class BlockchainService extends AbstractService { - public static final String BLOCK_TYPE = "block"; - public static final String PEER_TYPE = "peer"; + public static final String BLOCK_TYPE = BlockDao.TYPE; + public static final String PEER_TYPE = PeerDao.TYPE; public static final String CURRENT_BLOCK_ID = "current"; private static final int SYNC_MISSING_BLOCK_MAX_RETRY = 5; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java new file mode 100644 index 00000000..908d87e4 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java @@ -0,0 +1,194 @@ +package org.duniter.elasticsearch.service; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.google.common.collect.ImmutableList; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.util.BlockchainBlockUtils; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.Preconditions; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.BlockStatDao; +import org.duniter.elasticsearch.model.BlockchainBlockStat; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Created by Benoit on 26/04/2017. + */ +public class BlockchainStatsService extends AbstractService implements ChangeService.ChangeListener { + + private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE)); + + private final boolean enable; + private final BlockStatDao blockStatDao; + private final ThreadPool threadPool; + + @Inject + public BlockchainStatsService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, + BlockStatDao blockStatDao, + ThreadPool threadPool) { + super("duniter.blockchain.stats", client, settings, cryptoService); + this.enable = pluginSettings.enableBlockchainSync(); + this.blockStatDao = blockStatDao; + this.threadPool = threadPool; + + if (this.enable) { + ChangeService.registerListener(this); + } + } + + @Override + public String getId() { + return "duniter.blockchain.stats"; + } + + @Override + public void onChange(ChangeEvent change) { + + // Skip _id=current + if(change.getId() == "current") return; + + try { + + switch (change.getOperation()) { + // on create + case CREATE: // create + if (change.getSource() != null) { + BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + processCreateBlock(block); + } + break; + + // on update + case INDEX: + if (change.getSource() != null) { + BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + processUpdateBlock(block); + } + break; + + // on DELETE : remove user event on block (using link + case DELETE: + processBlockDelete(change); + + break; + } + + } + catch(IOException e) { + throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); + } + + } + + @Override + public Collection<ChangeSource> getChangeSources() { + return CHANGE_LISTEN_SOURCES; + } + + /* -- internal method -- */ + + private void processCreateBlock(BlockchainBlock block) { + + BlockchainBlockStat stat = newBlockStat(block); + + // Tx + if (CollectionUtils.isNotEmpty(block.getTransactions())) { + CounterMetric txChangeCounter = new CounterMetric(); + CounterMetric txAmountCounter = new CounterMetric(); + Arrays.stream(block.getTransactions()) + .forEach(tx -> { + long txAmount = BlockchainBlockUtils.getTxAmount(tx); + if (txAmount == 0l) { + txChangeCounter.inc(); + } + else { + txAmountCounter.inc(txAmount); + } + }); + + stat.setTxAmount(BigInteger.valueOf(txAmountCounter.count())); + stat.setTxChangeCount((int)txChangeCounter.count()); + stat.setTxCount(block.getTransactions().length); + } + else { + stat.setTxAmount(BigInteger.valueOf(0)); + stat.setTxChangeCount(0); + stat.setTxCount(0); + } + + // Add to index + blockStatDao.create(stat, false/*wait*/); + } + + private void processUpdateBlock(final BlockchainBlock block) { + Preconditions.checkNotNull(block); + Preconditions.checkNotNull(block.getNumber()); + + // Delete existing stat + CompletableFuture.runAsync(() -> blockStatDao.delete(block.getCurrency(), block.getNumber().toString(), true /*wait*/), threadPool.scheduler()) + // Then process block + .thenAccept(aVoid -> processCreateBlock(block)); + } + + private void processBlockDelete(ChangeEvent change) { + if (change.getId() == null) return; + + // Delete existing stat + blockStatDao.delete(change.getIndex(), change.getId(), false /*wait*/); + } + + protected BlockchainBlockStat newBlockStat(BlockchainBlock block) { + BlockchainBlockStat stat = new BlockchainBlockStat(); + + stat.setNumber(block.getNumber()); + stat.setHash(block.getHash()); + stat.setCurrency(block.getCurrency()); + stat.setMedianTime(block.getMedianTime()); + stat.setMembersCount(block.getMembersCount()); + stat.setMonetaryMass(block.getMonetaryMass()); + stat.setUnitbase(block.getUnitbase()); + stat.setVersion(block.getVersion()); + stat.setDividend(block.getDividend()); + + return stat; + } + + +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java index 1594e65f..a715e5c4 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java @@ -64,8 +64,8 @@ import java.util.Objects; */ public class CurrencyService extends AbstractService { - public static final String INDEX = "currency"; - public static final String RECORD_TYPE = "record"; + public static final String INDEX = CurrencyExtendDao.INDEX; + public static final String RECORD_TYPE = CurrencyExtendDao.RECORD_TYPE; private BlockchainRemoteService blockchainRemoteService; private CurrencyExtendDao currencyDao; @@ -306,6 +306,10 @@ public class CurrencyService extends AbstractService { BlockDao blockDao = ServiceLocator.instance().getBean(BlockDao.class); createIndexRequestBuilder.addMapping(blockDao.getType(), blockDao.createTypeMapping()); + // Add blockStat type + BlockStatDao blockStatDao = injector.getInstance(BlockStatDao.class); + createIndexRequestBuilder.addMapping(blockStatDao.getType(), blockStatDao.createTypeMapping()); + createIndexRequestBuilder.execute().actionGet(); } }; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java index 305b6251..711c2dc3 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java @@ -40,6 +40,7 @@ import org.duniter.elasticsearch.service.changes.ChangeService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; +import org.elasticsearch.script.ScriptModule; public class ServiceModule extends AbstractModule implements Module { @@ -54,6 +55,7 @@ public class ServiceModule extends AbstractModule implements Module { // blockchain indexation services bind(BlockchainService.class).asEagerSingleton(); + bind(BlockchainStatsService.class).asEagerSingleton(); bind(PeerService.class).asEagerSingleton(); // Duniter Client API beans @@ -73,6 +75,8 @@ public class ServiceModule extends AbstractModule implements Module { bindWithLocator(MailService.class); } + + /* protected methods */ protected <T extends Bean> void bindWithLocator(Class<T> clazz) { diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java index 727c6526..caf31984 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java @@ -143,6 +143,14 @@ public class SubscriptionService extends AbstractService { logger.warn(I18n.t("duniter4j.es.subscription.email.start", pluginSettings.getEmailSubscriptionsExecuteHour(), dayOfWeek)); } + + // TODO: remove this (DEV lon) + /*long devDuration = 10 * 60 * 1000; + threadPool.scheduler().scheduleAtFixedRate( + () -> executeEmailSubscriptions(EmailSubscription.Frequency.daily), + 1000 * 2, + devDuration, TimeUnit.MILLISECONDS);*/ + // Daily execution threadPool.scheduler().scheduleAtFixedRate( () -> executeEmailSubscriptions(EmailSubscription.Frequency.daily), diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java index 94421128..d056cf84 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java @@ -63,13 +63,13 @@ public class SynchroService extends AbstractSynchroService { SynchroResult result = new SynchroResult(); long time = System.currentTimeMillis(); - importMailChanges(result, peer, sinceTime); + importSubscriptionsChanges(result, peer, sinceTime); long duration = System.currentTimeMillis() - time; logger.info(String.format("[%s] Synchronizing subscription data since %s [OK] %s (in %s ms)", peer.toString(), sinceTime, result.toString(), duration)); } - protected void importMailChanges(SynchroResult result, Peer peer, long sinceTime) { + protected void importSubscriptionsChanges(SynchroResult result, Peer peer, long sinceTime) { importChanges(result, peer, SubscriptionIndexDao.INDEX, SubscriptionRecordDao.TYPE, sinceTime); } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java index 4ff4731a..a73b8f1a 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java @@ -113,7 +113,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public boolean isMailSmtpStartTLS() { - return settings.getAsBoolean("duniter.mail.smtp.starttle", false); + return settings.getAsBoolean("duniter.mail.smtp.starttls", false); } public boolean isMailSmtpUseSSL() { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index 3c2e5fe8..0cf01c62 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -23,12 +23,10 @@ package org.duniter.elasticsearch.user.service; */ -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.duniter.core.client.model.ModelUtils; import org.duniter.core.client.model.bma.BlockchainBlock; -import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; @@ -38,6 +36,7 @@ import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.changes.ChangeEvent; import org.duniter.elasticsearch.service.changes.ChangeService; import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.model.UserEventCodes; @@ -49,6 +48,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; /** * Created by Benoit on 30/03/2015. @@ -59,33 +59,31 @@ public class BlockchainUserEventService extends AbstractService implements Chang private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE)); - public final UserService userService; + private final UserService userService; - public final UserEventService userEventService; + private final UserEventService userEventService; - public final AdminService adminService; + private final AdminService adminService; - public final ObjectMapper objectMapper; - - - public final boolean enable; + private final boolean enable; + private final ThreadPool threadPool; @Inject public BlockchainUserEventService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, BlockchainService blockchainService, UserService userService, AdminService adminService, - UserEventService userEventService) { + UserEventService userEventService, + ThreadPool threadPool) { super("duniter.user.event.blockchain", client, settings, cryptoService); this.userService = userService; this.adminService = adminService; this.userEventService = userEventService; - this.objectMapper = JacksonUtils.newObjectMapper(); - ChangeService.registerListener(this); - + this.threadPool = threadPool; this.enable = pluginSettings.enableBlockchainSync(); if (this.enable) { + ChangeService.registerListener(this); blockchainService.registerConnectionListener(createConnectionListeners()); } } @@ -221,14 +219,13 @@ public class BlockchainUserEventService extends AbstractService implements Chang } } - private void processUpdateBlock(BlockchainBlock block) { + private void processUpdateBlock(final BlockchainBlock block) { // Delete events that reference this block - userEventService.deleteEventsByReference(new UserEvent.Reference(block.getCurrency(), BlockchainService.BLOCK_TYPE, String.valueOf(block.getNumber()))) - .actionGet(); - - processCreateBlock(block); - + CompletableFuture.runAsync(() -> userEventService.deleteEventsByReference(new UserEvent.Reference(block.getCurrency(), BlockchainService.BLOCK_TYPE, String.valueOf(block.getNumber()))) + .actionGet(), threadPool.scheduler()) + // Then process the block + .thenAccept(aVoid -> processCreateBlock(block)); } private void processTx(BlockchainBlock block, BlockchainBlock.Transaction tx) { -- GitLab