Skip to content
Snippets Groups Projects
Commit b51320a5 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[enh] add new index <currency>/blockStat

[fix] fix typo on plugin settings 'startlls'
parent e50da31c
No related branches found
No related tags found
No related merge requests found
Showing
with 957 additions and 83 deletions
package org.duniter.core.client.model.bma.util;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import jnr.ffi.annotations.In;
import org.duniter.core.client.model.bma.BlockchainBlock;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
/**
* Created by blavenie on 26/04/17.
*/
public final class BlockchainBlockUtils {
public static final Pattern TX_UNLOCK_PATTERN = Pattern.compile("([0-9]+):SIG\\(([^)]+)\\)");
public static final Pattern TX_OUTPUT_PATTERN = Pattern.compile("([0-9]+):([0-9]+):SIG\\(([^)]+)\\)");
private BlockchainBlockUtils () {
// helper class
}
public static BigInteger getTxAmount(BlockchainBlock block) {
BigInteger result = BigInteger.valueOf(0l);
Arrays.stream(block.getTransactions())
.forEach(tx -> result.add(BigInteger.valueOf(getTxAmount(tx))));
return result;
}
public static long getTxAmount(final BlockchainBlock.Transaction tx) {
final Map<Integer, Integer> inputsByIssuer = Maps.newHashMap();
Arrays.stream(tx.getUnlocks())
.map(TX_UNLOCK_PATTERN::matcher)
.filter(Matcher::matches)
.forEach(matcher -> inputsByIssuer.put(
Integer.parseInt(matcher.group(1)),
Integer.parseInt(matcher.group(2)))
);
return IntStream.range(0, tx.getIssuers().length)
.mapToLong(i -> {
final String issuer = tx.getIssuers()[i];
long inputSum = IntStream.range(0, tx.getInputs().length)
.filter(j -> i == inputsByIssuer.get(j))
.mapToObj(j -> tx.getInputs()[j])
.map(input -> input.split(":"))
.filter(inputParts -> inputParts.length > 2)
.mapToLong(inputParts -> powBase(Long.parseLong(inputParts[0]), Integer.parseInt(inputParts[1])))
.sum();
long outputSum = Arrays.stream(tx.getOutputs())
.map(TX_OUTPUT_PATTERN::matcher)
.filter(Matcher::matches)
.filter(matcher -> issuer.equals(matcher.group(3)))
.mapToLong(matcher -> powBase(Long.parseLong(matcher.group(1)), Integer.parseInt(matcher.group(2))))
.sum();
return (inputSum - outputSum);
})
.sum();
}
private static long powBase(long amount, int unitbase) {
if (unitbase == 0) return amount;
return amount * (long)Math.pow(10, unitbase);
}
}
......@@ -266,7 +266,7 @@ public class MailServiceImpl implements MailService, Closeable {
props.put("mail.smtp.host", config.getSmtpHost());
props.put("mail.smtp.port", config.getSmtpPort());
if (StringUtils.isNotBlank(config.getSenderAddress())) {
props.put("mail.from.alias", config.getSenderAddress());
props.put("mail.from", config.getSenderAddress());
if (StringUtils.isNotBlank(config.getSenderName())) {
props.put("mail.from.alias", config.getSenderName());
}
......
#!/bin/sh
curl -XPOST 'http://localhost:9200/g1/block/_search?pretty' -d '
{
"size": 0,
"aggs": {
"blocksByIssuer": {
"terms": {
"field": "issuer",
"size": 0
},
"aggs" : {
"difficulty_stats" : {
"stats" : {
"field" : "powMin"
}
}
}
}
}
}'
......@@ -2,22 +2,49 @@
curl -XPOST 'http://localhost:9200/g1/block/_search?pretty' -d '
{
"size": 10000,
"query": {
"filtered": {
"filter": {
"bool": {
"must": [
{
"exists": {
"field": "dividend"
"size": 0,
"aggs": {
"txByRange": {
"range": {
"field" : "medianTime",
"ranges" : [
{ "from" : 1491955200, "to" : 1492041600 }
]
},
"aggs" : {
"tx_stats" : {
"stats" : {
"script" : {
"inline" : "txcount",
"lang": "native"
}
}
]
},
"time" : {
"stats" : { "field" : "medianTime" }
}
}
}
}
}'
curl -XPOST 'http://localhost:9200/g1/block/_search?pretty' -d '
{
"size": 0,
"aggs": {
"blocksByIssuer": {
"terms": {
"field": "issuer",
"size": 0
},
"_source": ["dividend", "monetaryMass", "membersCount"],
sort
"aggs" : {
"difficulty_stats" : {
"stats" : {
"field" : "difficulty"
}
}
}
}
}
}'
......@@ -25,6 +25,7 @@ package org.duniter.elasticsearch;
import com.google.common.collect.Lists;
import org.duniter.elasticsearch.dao.DaoModule;
import org.duniter.elasticsearch.rest.RestModule;
import org.duniter.elasticsearch.script.BlockchainTxCountScriptFactory;
import org.duniter.elasticsearch.security.SecurityModule;
import org.duniter.elasticsearch.service.ServiceModule;
import org.duniter.elasticsearch.threadpool.ThreadPool;
......@@ -59,6 +60,13 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
return "Duniter ElasticSearch Plugin";
}
@Inject
public void onModule(org.elasticsearch.script.ScriptModule scriptModule) {
// TODO: in ES v5+, see example here :
// https://github.com/imotov/elasticsearch-native-script-example/blob/60a390f77f2fb25cb89d76de5071c52207a57b5f/src/main/java/org/elasticsearch/examples/nativescript/plugin/NativeScriptExamplesPlugin.java
scriptModule.registerScript("txcount", BlockchainTxCountScriptFactory.class);
}
@Override
public Collection<Module> nodeModules() {
Collection<Module> modules = Lists.newArrayList();
......@@ -71,9 +79,10 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
modules.add(new WebSocketModule());
modules.add(new RestModule());
modules.add(new DaoModule());
modules.add(new ServiceModule());
//modules.add(new ScriptModule());
return modules;
}
......
......@@ -24,12 +24,15 @@ package org.duniter.elasticsearch;
import org.duniter.core.client.model.elasticsearch.Currency;
import org.duniter.core.client.model.local.Peer;
import org.duniter.elasticsearch.dao.BlockDao;
import org.duniter.elasticsearch.dao.BlockStatDao;
import org.duniter.elasticsearch.dao.PeerDao;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.service.BlockchainStatsService;
import org.duniter.elasticsearch.service.CurrencyService;
import org.duniter.elasticsearch.service.PeerService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
......@@ -120,20 +123,31 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
Currency currency = injector.getInstance(CurrencyService.class)
.indexCurrencyFromPeer(peer, true);
// Add access to currency/block index
injector.getInstance(RestSecurityController.class).allowIndexType(RestRequest.Method.GET,
injector.getInstance(RestSecurityController.class)
// Add access to <currency>/block index
.allowIndexType(RestRequest.Method.GET,
currency.getCurrencyName(),
BlockDao.TYPE)
.allowPostSearchIndexType(
currency.getCurrencyName(),
BlockchainService.BLOCK_TYPE);
injector.getInstance(RestSecurityController.class).allowPostSearchIndexType(
BlockDao.TYPE)
// Add access to <currency>/blockStat index
.allowIndexType(RestRequest.Method.GET,
currency.getCurrencyName(),
BlockchainService.BLOCK_TYPE);
// Add access to currency/peer index
injector.getInstance(RestSecurityController.class).allowIndexType(RestRequest.Method.GET,
BlockStatDao.TYPE)
.allowPostSearchIndexType(
currency.getCurrencyName(),
BlockStatDao.TYPE)
// Add access to <currency>/peer index
.allowIndexType(RestRequest.Method.GET,
currency.getCurrencyName(),
BlockchainService.PEER_TYPE);
injector.getInstance(RestSecurityController.class).allowPostSearchIndexType(
PeerDao.TYPE)
.allowPostSearchIndexType(
currency.getCurrencyName(),
BlockchainService.PEER_TYPE);
PeerDao.TYPE);
// Index blocks (and listen if new block appear)
injector.getInstance(BlockchainService.class)
......
......@@ -32,6 +32,9 @@ import java.util.List;
*/
public interface BlockDao extends Bean, TypeDao<BlockDao> {
String TYPE = "block";
void create(BlockchainBlock block, boolean wait);
/**
......
package org.duniter.elasticsearch.dao;
/*-
* #%L
* Duniter4j :: ElasticSearch Core plugin
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.core.beans.Bean;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.elasticsearch.model.BlockchainBlockStat;
import java.util.List;
/**
* Created by blavenie on 03/04/17.
*/
public interface BlockStatDao extends Bean, TypeDao<BlockStatDao> {
String TYPE = "blockStat";
void create(BlockchainBlockStat block, boolean wait);
/**
*
* @param currencyName
* @param number the block number
* @param json block as JSON
*/
void create(String currencyName, String id, byte[] json, boolean wait);
boolean isExists(String currencyName, String id);
void update(BlockchainBlockStat block, boolean wait);
/**
*
* @param currencyName
* @param number the block number, or -1 for current
* @param json block as JSON
*/
void update(String currencyName, String id, byte[] json, boolean wait);
void delete(String currency, String id, boolean wait);
BlockchainBlockStat getById(String currencyName, String id);
}
......@@ -28,4 +28,6 @@ import org.duniter.core.client.dao.CurrencyDao;
* Created by blavenie on 03/04/17.
*/
public interface CurrencyExtendDao extends CurrencyDao, IndexTypeDao<CurrencyExtendDao> {
String INDEX = "currency";
String RECORD_TYPE = "record";
}
......@@ -27,6 +27,7 @@ import org.duniter.core.client.dao.CurrencyDao;
import org.duniter.core.client.dao.PeerDao;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.client.Duniter4jClientImpl;
import org.duniter.elasticsearch.dao.impl.BlockStatDaoImpl;
import org.duniter.elasticsearch.service.ServiceLocator;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
......@@ -36,6 +37,7 @@ public class DaoModule extends AbstractModule implements Module {
@Override protected void configure() {
bind(Duniter4jClient.class).to(Duniter4jClientImpl.class).asEagerSingleton();
bind(BlockStatDao.class).to(BlockStatDaoImpl.class).asEagerSingleton();
bindWithLocator(BlockDao.class);
bindWithLocator(PeerDao.class);
......
package org.duniter.elasticsearch.dao;
import org.duniter.elasticsearch.dao.impl.PeerDaoImpl;
/**
* Created by blavenie on 26/04/17.
*/
public interface PeerDao extends org.duniter.core.client.dao.PeerDao, TypeDao<PeerDaoImpl>{
String TYPE = "peer";
}
......@@ -56,7 +56,6 @@ import java.util.Map;
*/
public class BlockDaoImpl extends AbstractDao implements BlockDao {
public static final String BLOCK_TYPE = "block";
public BlockDaoImpl(){
super("duniter.dao.block");
......@@ -64,7 +63,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
@Override
public String getType() {
return BLOCK_TYPE;
return TYPE;
}
public void create(BlockchainBlock block, boolean wait) {
......@@ -79,7 +78,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
String json = objectMapper.writeValueAsString(block);
// Preparing
IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), BLOCK_TYPE)
IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE)
.setId(block.getNumber().toString())
.setSource(json);
......@@ -104,7 +103,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
Preconditions.checkArgument(json.length > 0);
// Preparing indexBlocksFromNode
IndexRequestBuilder request = client.prepareIndex(currencyName, BLOCK_TYPE)
IndexRequestBuilder request = client.prepareIndex(currencyName, TYPE)
.setId(id)
.setRefresh(true)
.setSource(json);
......@@ -114,7 +113,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
}
public boolean isExists(String currencyName, String id) {
return client.isDocumentExists(currencyName, BLOCK_TYPE, id);
return client.isDocumentExists(currencyName, TYPE, id);
}
public void update(BlockchainBlock block, boolean wait) {
......@@ -129,7 +128,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
String json = objectMapper.writeValueAsString(block);
// Preparing
UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), BLOCK_TYPE, block.getNumber().toString())
UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString())
.setRefresh(true)
.setDoc(json);
......@@ -153,7 +152,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
Preconditions.checkArgument(json.length > 0);
// Preparing indexBlocksFromNode
UpdateRequestBuilder request = client.prepareUpdate(currencyName, BLOCK_TYPE, id)
UpdateRequestBuilder request = client.prepareUpdate(currencyName, TYPE, id)
.setRefresh(true)
.setDoc(json);
......@@ -167,7 +166,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(currencyName)
.setTypes(BLOCK_TYPE)
.setTypes(TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
// If only one term, search as prefix
......@@ -201,7 +200,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(currencyName)
.setTypes(BLOCK_TYPE)
.setTypes(TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
// Get max(number)
......@@ -226,17 +225,43 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
try {
XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject(BLOCK_TYPE)
.startObject(TYPE)
.startObject("properties")
// block number
// currency
.startObject("currency")
.field("type", "string")
.endObject()
// version
.startObject("version")
.field("type", "integer")
.endObject()
// time
.startObject("time")
.field("type", "long")
.endObject()
// medianTime
.startObject("medianTime")
.field("type", "long")
.endObject()
// number
.startObject("number")
.field("type", "integer")
.endObject()
// nonce
.startObject("nonce")
.field("type", "long")
.endObject()
// hash
.startObject("hash")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
// issuer
......@@ -250,22 +275,17 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
.field("type", "string")
.endObject()
// membercount
.startObject("memberCount")
// membersCount
.startObject("membersCount")
.field("type", "integer")
.endObject()
// membersChanges
.startObject("membersChanges")
.field("type", "string")
.endObject()
// unitbase
.startObject("unitbase")
.field("type", "integer")
.endObject()
// membersChanges
// monetaryMass
.startObject("monetaryMass")
.field("type", "long")
.endObject()
......@@ -290,7 +310,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
}
public BlockchainBlock getBlockById(String currencyName, String id) {
return client.getSourceById(currencyName, BLOCK_TYPE, id, BlockchainBlock.class);
return client.getSourceById(currencyName, TYPE, id, BlockchainBlock.class);
}
/**
......@@ -306,18 +326,18 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
for (int number=fromNumber; number<=toNumber; number++) {
bulkRequest.add(
client.prepareDelete(currencyName, BLOCK_TYPE, String.valueOf(number))
client.prepareDelete(currencyName, TYPE, String.valueOf(number))
);
// Flush the bulk if not empty
if ((fromNumber - number % bulkSize) == 0) {
client.flushDeleteBulk(currencyName, BLOCK_TYPE, bulkRequest);
client.flushDeleteBulk(currencyName, TYPE, bulkRequest);
bulkRequest = client.prepareBulk();
}
}
// last flush
client.flushDeleteBulk(currencyName, BLOCK_TYPE, bulkRequest);
client.flushDeleteBulk(currencyName, TYPE, bulkRequest);
}
/* -- Internal methods -- */
......
package org.duniter.elasticsearch.dao.impl;
/*
* #%L
* UCoin Java Client :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.fasterxml.jackson.core.JsonProcessingException;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.dao.AbstractDao;
import org.duniter.elasticsearch.dao.BlockStatDao;
import org.duniter.elasticsearch.model.BlockchainBlockStat;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
/**
* Created by Benoit on 30/03/2015.
*/
public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao {
public BlockStatDaoImpl(){
super("duniter.dao.blockStat");
}
@Override
public String getType() {
return TYPE;
}
public void create(BlockchainBlockStat block, boolean wait) {
Preconditions.checkNotNull(block);
Preconditions.checkArgument(StringUtils.isNotBlank(block.getCurrency()));
Preconditions.checkNotNull(block.getHash());
Preconditions.checkNotNull(block.getNumber());
// Serialize into JSON
// WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String)
try {
String json = objectMapper.writeValueAsString(block);
// Preparing
IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE)
.setId(block.getNumber().toString())
.setSource(json);
// Execute
client.safeExecuteRequest(request, wait);
}
catch(JsonProcessingException e) {
throw new TechnicalException(e);
}
}
@Override
public void create(String currencyName, String id, byte[] json, boolean wait) {
Preconditions.checkNotNull(currencyName);
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(json);
Preconditions.checkArgument(json.length > 0);
// Preparing indexBlocksFromNode
IndexRequestBuilder request = client.prepareIndex(currencyName, TYPE)
.setId(id)
.setRefresh(true)
.setSource(json);
// Execute
client.safeExecuteRequest(request, wait);
}
public boolean isExists(String currencyName, String id) {
return client.isDocumentExists(currencyName, TYPE, id);
}
public void update(BlockchainBlockStat block, boolean wait) {
Preconditions.checkNotNull(block);
Preconditions.checkArgument(StringUtils.isNotBlank(block.getCurrency()));
Preconditions.checkNotNull(block.getNumber());
// Serialize into JSON
// WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String)
try {
String json = objectMapper.writeValueAsString(block);
// Preparing
UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString())
.setRefresh(true)
.setDoc(json);
// Execute
client.safeExecuteRequest(request, wait);
}
catch(JsonProcessingException e) {
throw new TechnicalException(e);
}
}
/**
*
* @param currencyName
* @param id the block id
* @param json block as JSON
*/
public void update(String currencyName, String id, byte[] json, boolean wait) {
Preconditions.checkNotNull(currencyName);
Preconditions.checkNotNull(json);
Preconditions.checkArgument(json.length > 0);
// Preparing index
UpdateRequestBuilder request = client.prepareUpdate(currencyName, TYPE, id)
.setRefresh(true)
.setDoc(json);
// Execute
client.safeExecuteRequest(request, wait);
}
@Override
public void delete(String currency, String id, boolean wait) {
Preconditions.checkNotNull(currency);
Preconditions.checkNotNull(id);
// Preparing request
DeleteRequestBuilder request = client.prepareDelete(currency, TYPE, id);
// Execute
client.safeExecuteRequest(request, wait);
}
@Override
public XContentBuilder createTypeMapping() {
try {
XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject(TYPE)
.startObject("properties")
// currency
.startObject(BlockchainBlockStat.PROPERTY_CURRENCY)
.field("type", "string")
.endObject()
// version
.startObject(BlockchainBlockStat.PROPERTY_VERSION)
.field("type", "integer")
.endObject()
// block number
.startObject(BlockchainBlockStat.PROPERTY_NUMBER)
.field("type", "integer")
.endObject()
// medianTime
.startObject(BlockchainBlockStat.PROPERTY_MEDIAN_TIME)
.field("type", "long")
.endObject()
// hash
.startObject(BlockchainBlockStat.PROPERTY_HASH)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
// membersCount
.startObject(BlockchainBlockStat.PROPERTY_MEMBERS_COUNT)
.field("type", "integer")
.endObject()
// unitbase
.startObject(BlockchainBlockStat.PROPERTY_UNITBASE)
.field("type", "integer")
.endObject()
// monetaryMass
.startObject(BlockchainBlockStat.PROPERTY_MONETARY_MASS)
.field("type", "long")
.endObject()
// dividend
.startObject(BlockchainBlockStat.PROPERTY_DIVIDEND)
.field("type", "integer")
.endObject()
// --- STATS properties ---
// txCount
.startObject(BlockchainBlockStat.PROPERTY_TX_COUNT)
.field("type", "integer")
.endObject()
// txAmount
.startObject(BlockchainBlockStat.PROPERTY_TX_AMOUNT)
.field("type", "long")
.endObject()
// txChangeAmount
.startObject(BlockchainBlockStat.PROPERTY_TX_CHANGE_COUNT)
.field("type", "integer")
.endObject()
.endObject()
.endObject().endObject();
return mapping;
}
catch(IOException ioe) {
throw new TechnicalException("Error while getting mapping for block stat index: " + ioe.getMessage(), ioe);
}
}
public BlockchainBlockStat getById(String currencyName, String id) {
return client.getSourceById(currencyName, TYPE, id, BlockchainBlockStat.class);
}
/* -- Internal methods -- */
}
......@@ -24,7 +24,6 @@ package org.duniter.elasticsearch.dao.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import org.duniter.core.client.dao.CurrencyDao;
import org.duniter.core.client.model.local.Currency;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.Preconditions;
......@@ -38,7 +37,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.*;
import java.util.List;
import java.util.Map;
/**
* Created by blavenie on 29/12/15.
......@@ -47,9 +47,6 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp
protected static final String REGEX_WORD_SEPARATOR = "[-\\t@# _]+";
public static final String INDEX = "currency";
public static final String RECORD_TYPE = "record";
public CurrencyDaoImpl(){
super(INDEX, RECORD_TYPE);
}
......@@ -66,8 +63,6 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp
// Serialize into JSON
byte[] json = objectMapper.writeValueAsBytes(currency);
System.out.println(objectMapper.writeValueAsString(currency));
// Preparing indexBlocksFromNode
IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_TYPE)
.setId(currency.getId())
......@@ -158,7 +153,8 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp
@Override
public XContentBuilder createTypeMapping() {
try {
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_TYPE)
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject()
.startObject(RECORD_TYPE)
.startObject("properties")
// currency
......
......@@ -23,12 +23,12 @@ package org.duniter.elasticsearch.dao.impl;
*/
import com.fasterxml.jackson.core.JsonProcessingException;
import org.duniter.core.client.dao.PeerDao;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.dao.AbstractDao;
import org.duniter.elasticsearch.dao.PeerDao;
import org.duniter.elasticsearch.dao.TypeDao;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
......@@ -41,10 +41,7 @@ import java.util.List;
/**
* Created by blavenie on 29/12/15.
*/
public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDaoImpl> {
public static final String PEER_TYPE = "peer";
public class PeerDaoImpl extends AbstractDao implements PeerDao {
public PeerDaoImpl(){
super("duniter.dao.peer");
......@@ -52,7 +49,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao
@Override
public String getType() {
return PEER_TYPE;
return TYPE;
}
@Override
......@@ -70,7 +67,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao
String json = objectMapper.writeValueAsString(peer);
// Preparing indexBlocksFromNode
IndexRequestBuilder indexRequest = client.prepareIndex(peer.getCurrency(), PEER_TYPE)
IndexRequestBuilder indexRequest = client.prepareIndex(peer.getCurrency(), TYPE)
.setId(peer.getId())
.setSource(json);
......@@ -99,7 +96,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao
String json = objectMapper.writeValueAsString(peer);
// Preparing indexBlocksFromNode
UpdateRequestBuilder updateRequest = client.prepareUpdate(peer.getCurrency(), PEER_TYPE, peer.getId())
UpdateRequestBuilder updateRequest = client.prepareUpdate(peer.getCurrency(), TYPE, peer.getId())
.setDoc(json);
// Execute indexBlocksFromNode
......@@ -125,7 +122,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao
Preconditions.checkArgument(StringUtils.isNotBlank(peer.getCurrency()));
// Delete the document
client.prepareDelete(peer.getCurrency(), PEER_TYPE, peer.getId()).execute().actionGet();
client.prepareDelete(peer.getCurrency(), TYPE, peer.getId()).execute().actionGet();
}
@Override
......@@ -135,7 +132,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao
@Override
public boolean isExists(String currencyId, String peerId) {
return client.isDocumentExists(currencyId, PEER_TYPE, peerId);
return client.isDocumentExists(currencyId, TYPE, peerId);
}
@Override
......@@ -143,7 +140,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao, TypeDao<PeerDao
try {
XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject(PEER_TYPE)
.startObject(TYPE)
.startObject("properties")
// currency
......
package org.duniter.elasticsearch.model;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import java.io.Serializable;
import java.math.BigInteger;
/**
* Created by blavenie on 29/11/16.
*/
public class BlockchainBlockStat implements Serializable {
public static final String PROPERTY_VERSION = "version";
public static final String PROPERTY_CURRENCY = "currency";
public static final String PROPERTY_NUMBER = "number";
public static final String PROPERTY_HASH = "hash";
public static final String PROPERTY_MEDIAN_TIME = "medianTime";
public static final String PROPERTY_MEMBERS_COUNT = "membersCount";
public static final String PROPERTY_MONETARY_MASS = "monetaryMass";
public static final String PROPERTY_UNITBASE= "unitbase";
public static final String PROPERTY_DIVIDEND = "dividend";
public static final String PROPERTY_TX_COUNT = "txCount";
public static final String PROPERTY_TX_AMOUNT = "txAmount";
public static final String PROPERTY_TX_CHANGE_COUNT = "txChangeCount";
// Property copied from Block
private String version;
private String currency;
private Integer number;
private String hash;
private Long medianTime;
private Integer membersCount;
private BigInteger monetaryMass;
private Integer unitbase;
private BigInteger dividend;
// Statistics
private Integer txCount;
private BigInteger txAmount;
private Integer txChangeCount;
public BlockchainBlockStat() {
super();
}
public String getVersion() {
return version;
}
public void setVersion(String version) {
this.version = version;
}
public String getCurrency() {
return currency;
}
public void setCurrency(String currency) {
this.currency = currency;
}
public BigInteger getDividend() {
return dividend;
}
public void setDividend(BigInteger dividend) {
this.dividend = dividend;
}
public Integer getTxCount() {
return txCount;
}
public void setTxCount(Integer txCount) {
this.txCount = txCount;
}
public BigInteger getTxAmount() {
return txAmount;
}
public void setTxAmount(BigInteger txAmount) {
this.txAmount = txAmount;
}
public Integer getTxChangeCount() {
return txChangeCount;
}
public void setTxChangeCount(Integer txChangeCount) {
this.txChangeCount = txChangeCount;
}
public Integer getNumber() {
return number;
}
public void setNumber(Integer number) {
this.number = number;
}
public String getHash() {
return hash;
}
public void setHash(String hash) {
this.hash = hash;
}
public Long getMedianTime() {
return medianTime;
}
public void setMedianTime(Long medianTime) {
this.medianTime = medianTime;
}
public Integer getMembersCount() {
return membersCount;
}
public void setMembersCount(Integer membersCount) {
this.membersCount = membersCount;
}
public BigInteger getMonetaryMass() {
return monetaryMass;
}
public void setMonetaryMass(BigInteger monetaryMass) {
this.monetaryMass = monetaryMass;
}
public Integer getUnitbase() {
return unitbase;
}
public void setUnitbase(Integer unitbase) {
this.unitbase = unitbase;
}
}
package org.duniter.elasticsearch.script;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.script.AbstractFloatSearchScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.NativeScriptFactory;
import java.util.List;
import java.util.Map;
public class BlockchainTxCountScriptFactory implements NativeScriptFactory {
@Override
public ExecutableScript newScript(@Nullable Map<String, Object> params) {
return new BlockchainTxCountScript();
}
@Override
public boolean needsScores() {
return false;
}
public class BlockchainTxCountScript extends AbstractFloatSearchScript {
@Override
public float runAsFloat() {
Object a = source().get("transactions");
return a != null ? ((List)a).size() : 0;
}
}
}
\ No newline at end of file
......@@ -23,7 +23,6 @@ package org.duniter.elasticsearch.service;
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
......@@ -31,9 +30,6 @@ import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.BlockchainParameters;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.util.ObjectUtils;
import org.duniter.core.util.json.JsonAttributeParser;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
import org.duniter.core.client.service.bma.BlockchainRemoteService;
import org.duniter.core.client.service.bma.NetworkRemoteService;
import org.duniter.core.client.service.exception.BlockNotFoundException;
......@@ -42,12 +38,16 @@ import org.duniter.core.model.NullProgressionModel;
import org.duniter.core.model.ProgressionModel;
import org.duniter.core.model.ProgressionModelImpl;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.ObjectUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.json.JsonAttributeParser;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.dao.BlockDao;
import org.duniter.elasticsearch.dao.BlockStatDao;
import org.duniter.elasticsearch.dao.PeerDao;
import org.duniter.elasticsearch.exception.DuplicateIndexIdException;
import org.duniter.elasticsearch.exception.NotFoundException;
import org.duniter.elasticsearch.threadpool.ThreadPool;
......@@ -65,8 +65,8 @@ import java.util.*;
*/
public class BlockchainService extends AbstractService {
public static final String BLOCK_TYPE = "block";
public static final String PEER_TYPE = "peer";
public static final String BLOCK_TYPE = BlockDao.TYPE;
public static final String PEER_TYPE = PeerDao.TYPE;
public static final String CURRENT_BLOCK_ID = "current";
private static final int SYNC_MISSING_BLOCK_MAX_RETRY = 5;
......
package org.duniter.elasticsearch.service;
/*
* #%L
* UCoin Java Client :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.google.common.collect.ImmutableList;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.util.BlockchainBlockUtils;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.dao.BlockStatDao;
import org.duniter.elasticsearch.model.BlockchainBlockStat;
import org.duniter.elasticsearch.service.changes.ChangeEvent;
import org.duniter.elasticsearch.service.changes.ChangeService;
import org.duniter.elasticsearch.service.changes.ChangeSource;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Created by Benoit on 26/04/2017.
*/
public class BlockchainStatsService extends AbstractService implements ChangeService.ChangeListener {
private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE));
private final boolean enable;
private final BlockStatDao blockStatDao;
private final ThreadPool threadPool;
@Inject
public BlockchainStatsService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService,
BlockStatDao blockStatDao,
ThreadPool threadPool) {
super("duniter.blockchain.stats", client, settings, cryptoService);
this.enable = pluginSettings.enableBlockchainSync();
this.blockStatDao = blockStatDao;
this.threadPool = threadPool;
if (this.enable) {
ChangeService.registerListener(this);
}
}
@Override
public String getId() {
return "duniter.blockchain.stats";
}
@Override
public void onChange(ChangeEvent change) {
// Skip _id=current
if(change.getId() == "current") return;
try {
switch (change.getOperation()) {
// on create
case CREATE: // create
if (change.getSource() != null) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processCreateBlock(block);
}
break;
// on update
case INDEX:
if (change.getSource() != null) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processUpdateBlock(block);
}
break;
// on DELETE : remove user event on block (using link
case DELETE:
processBlockDelete(change);
break;
}
}
catch(IOException e) {
throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e);
}
}
@Override
public Collection<ChangeSource> getChangeSources() {
return CHANGE_LISTEN_SOURCES;
}
/* -- internal method -- */
private void processCreateBlock(BlockchainBlock block) {
BlockchainBlockStat stat = newBlockStat(block);
// Tx
if (CollectionUtils.isNotEmpty(block.getTransactions())) {
CounterMetric txChangeCounter = new CounterMetric();
CounterMetric txAmountCounter = new CounterMetric();
Arrays.stream(block.getTransactions())
.forEach(tx -> {
long txAmount = BlockchainBlockUtils.getTxAmount(tx);
if (txAmount == 0l) {
txChangeCounter.inc();
}
else {
txAmountCounter.inc(txAmount);
}
});
stat.setTxAmount(BigInteger.valueOf(txAmountCounter.count()));
stat.setTxChangeCount((int)txChangeCounter.count());
stat.setTxCount(block.getTransactions().length);
}
else {
stat.setTxAmount(BigInteger.valueOf(0));
stat.setTxChangeCount(0);
stat.setTxCount(0);
}
// Add to index
blockStatDao.create(stat, false/*wait*/);
}
private void processUpdateBlock(final BlockchainBlock block) {
Preconditions.checkNotNull(block);
Preconditions.checkNotNull(block.getNumber());
// Delete existing stat
CompletableFuture.runAsync(() -> blockStatDao.delete(block.getCurrency(), block.getNumber().toString(), true /*wait*/), threadPool.scheduler())
// Then process block
.thenAccept(aVoid -> processCreateBlock(block));
}
private void processBlockDelete(ChangeEvent change) {
if (change.getId() == null) return;
// Delete existing stat
blockStatDao.delete(change.getIndex(), change.getId(), false /*wait*/);
}
protected BlockchainBlockStat newBlockStat(BlockchainBlock block) {
BlockchainBlockStat stat = new BlockchainBlockStat();
stat.setNumber(block.getNumber());
stat.setHash(block.getHash());
stat.setCurrency(block.getCurrency());
stat.setMedianTime(block.getMedianTime());
stat.setMembersCount(block.getMembersCount());
stat.setMonetaryMass(block.getMonetaryMass());
stat.setUnitbase(block.getUnitbase());
stat.setVersion(block.getVersion());
stat.setDividend(block.getDividend());
return stat;
}
}
......@@ -64,8 +64,8 @@ import java.util.Objects;
*/
public class CurrencyService extends AbstractService {
public static final String INDEX = "currency";
public static final String RECORD_TYPE = "record";
public static final String INDEX = CurrencyExtendDao.INDEX;
public static final String RECORD_TYPE = CurrencyExtendDao.RECORD_TYPE;
private BlockchainRemoteService blockchainRemoteService;
private CurrencyExtendDao currencyDao;
......@@ -306,6 +306,10 @@ public class CurrencyService extends AbstractService {
BlockDao blockDao = ServiceLocator.instance().getBean(BlockDao.class);
createIndexRequestBuilder.addMapping(blockDao.getType(), blockDao.createTypeMapping());
// Add blockStat type
BlockStatDao blockStatDao = injector.getInstance(BlockStatDao.class);
createIndexRequestBuilder.addMapping(blockStatDao.getType(), blockStatDao.createTypeMapping());
createIndexRequestBuilder.execute().actionGet();
}
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment