From 80a4386a7ef87204eafb326bad77036aa6f47cfe Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Mon, 1 May 2017 21:56:43 +0200 Subject: [PATCH] [fix] Fix blockstat indeaxtion : use bulk and flush in a async thread --- .../client/model/bma/BlockchainBlock.java | 6 +- .../duniter/core/client/model/local/Peer.java | 8 +- .../core/client/service/HttpServiceImpl.java | 2 +- .../org/duniter/elasticsearch/PluginInit.java | 24 +-- .../duniter/elasticsearch/PluginSettings.java | 2 +- .../elasticsearch/client/Duniter4jClient.java | 10 ++ .../client/Duniter4jClientImpl.java | 49 +++++- .../duniter/elasticsearch/dao/BlockDao.java | 4 + .../elasticsearch/dao/BlockStatDao.java | 4 +- .../elasticsearch/dao/impl/BlockDaoImpl.java | 26 ++++ .../dao/impl/BlockStatDaoImpl.java | 28 +++- .../model/BlockchainBlockStat.java | 6 +- .../AbstractBlockchainListenerService.java | 92 ++++++++---- .../service/BlockchainService.java | 13 -- .../service/BlockchainStatsService.java | 73 +++++---- .../service/changes/ChangeEvent.java | 15 +- .../threadpool/CompletableActionFuture.java | 133 ++++++++++++++++ .../LoggingScheduledThreadPoolExecutor.java | 89 +++++++++++ .../elasticsearch/threadpool/RetryPolicy.java | 43 ++++++ .../elasticsearch/threadpool/ThreadPool.java | 44 ++++-- .../i18n/duniter4j-es-core_en_GB.properties | 2 +- .../i18n/duniter4j-es-core_fr_FR.properties | 2 +- .../service/BlockchainServiceTest.java | 20 ++- .../service/SubscriptionService.java | 10 +- .../service/BlockchainUserEventService.java | 142 +++++++++++------- .../user/service/UserEventService.java | 129 ++++++++++------ 26 files changed, 750 insertions(+), 226 deletions(-) create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/CompletableActionFuture.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/RetryPolicy.java diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java index 58c92d35..dee971bd 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java @@ -42,7 +42,7 @@ public class BlockchainBlock implements Serializable { private static final long serialVersionUID = -5598140972293452669L; - private String version; + private Integer version; private Long nonce; private Integer number; private Integer powMin; @@ -76,10 +76,10 @@ public class BlockchainBlock implements Serializable { // raw": "Version: 1\nType: Block\nCurrency: zeta_brouzouf\nNonce: 8233\nNumber: 1\nDate: 1416589860\nConfirmedDate: 1416589860\nIssuer: HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk\nPreviousHash: 00006CD96A01378465318E48310118AC6B2F3625\nPreviousIssuer: HnFcSms8jzwngtVomTTnzudZx7SHUQY8sVE1y8yBmULk\nMembersCount: 4\nIdentities:\nJoiners:\nActives:\nLeavers:\nExcluded:\nCertifications:\nTransactions:\n" private String raw; - public String getVersion() { + public Integer getVersion() { return version; } - public void setVersion(String version) { + public void setVersion(Integer version) { this.version = version; } public Long getNonce() { diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java index 272c1e6c..534d14e2 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java @@ -221,12 +221,16 @@ public class Peer implements LocalEntity<String>, Serializable { // else (if define) use ipv4 (if NOT local IP) // else (if define) use dns // else (if define) use ipv6 - this.host = ((port == 443 || useSsl) && dns != null) ? dns : + host = ((port == 443 || useSsl) && dns != null) ? dns : (ipv4 != null && InetAddressUtils.isNotLocalIPv4Address(ipv4) ? ipv4 : (dns != null ? dns : (ipv6 != null ? "[" + ipv6 + "]" : ""))); + // Use local IPv4 if no other host found + if (StringUtils.isBlank(host) && ipv4 != null && InetAddressUtils.isIPv4Address(ipv4)) { + host = ipv4; + } String protocol = (port == 443 || useSsl) ? "https" : "http"; - this.url = protocol + "://" + this.host + (port != 80 ? (":" + port) : ""); + this.url = protocol + "://" + host + (port != 80 ? (":" + port) : ""); } @JsonIgnore diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java index ee050425..f91020e3 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java @@ -343,7 +343,7 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean catch (SocketTimeoutException | ConnectTimeoutException e) { throw new HttpTimeoutException(I18n.t("duniter4j.client.core.timeout"), e); } - catch (IOException e) { + catch (Throwable e) { throw new TechnicalException(e.getMessage(), e); } finally { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 0e7988f7..7547e363 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -149,15 +149,21 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { currency.getCurrencyName(), PeerDao.TYPE); - // Index blocks (and listen if new block appear) - injector.getInstance(BlockchainService.class) - .indexLastBlocks(peer) - .listenAndIndexNewBlock(peer); - - // Index peers (and listen if new peer appear) - injector.getInstance(PeerService.class) - //.indexAllPeers(peer) - .listenAndIndexPeers(peer); + // Wait end of currency index creation, then index blocks + threadPool.scheduleOnClusterHealthStatus(() -> { + + // Index blocks (and listen if new block appear) + injector.getInstance(BlockchainService.class) + .indexLastBlocks(peer) + .listenAndIndexNewBlock(peer); + + // Index peers (and listen if new peer appear) + injector.getInstance(PeerService.class) + .listenAndIndexPeers(peer); + + }, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN); + + } } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 9b218b70..0227fd27 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -169,7 +169,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public boolean getNodeBmaUseSsl() { - return settings.getAsBoolean("duniter.useSsl", null); + return settings.getAsBoolean("duniter.useSsl", getNodeBmaPort() == 443); } public boolean isIndexBulkEnable() { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClient.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClient.java index 85ee832f..490633b1 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClient.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClient.java @@ -25,8 +25,12 @@ package org.duniter.elasticsearch.client; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.local.LocalEntity; import org.duniter.elasticsearch.dao.handler.StringReaderHandler; +import org.duniter.elasticsearch.threadpool.CompletableActionFuture; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.search.SearchHit; @@ -35,6 +39,8 @@ import java.io.File; import java.io.InputStream; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; /** * Created by blavenie on 03/04/17. @@ -97,5 +103,9 @@ public interface Duniter4jClient extends Bean, Client { void flushDeleteBulk(final String index, final String type, BulkRequestBuilder bulkRequest); + void flushBulk(BulkRequestBuilder bulkRequest); + void safeExecuteRequest(ActionRequestBuilder<?, ?, ?> request, boolean wait); + + ScheduledThreadPoolExecutor scheduler(); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java index 1a157ea1..7fbbb431 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java @@ -25,7 +25,9 @@ package org.duniter.elasticsearch.client; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; +import com.google.common.util.concurrent.ListenableFuture; import org.apache.commons.collections4.MapUtils; +import org.apache.http.client.methods.RequestBuilder; import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.client.model.local.LocalEntity; @@ -34,10 +36,13 @@ import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; +import org.duniter.core.util.concurrent.CompletableFutures; import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.handler.StringReaderHandler; import org.duniter.elasticsearch.exception.AccessDeniedException; import org.duniter.elasticsearch.exception.NotFoundException; +import org.duniter.elasticsearch.threadpool.CompletableActionFuture; +import org.duniter.elasticsearch.threadpool.RetryPolicy; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.*; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; @@ -80,6 +85,7 @@ import org.elasticsearch.action.search.*; import org.elasticsearch.action.suggest.SuggestRequest; import org.elasticsearch.action.suggest.SuggestRequestBuilder; import org.elasticsearch.action.suggest.SuggestResponse; +import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.action.termvectors.*; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequestBuilder; @@ -93,8 +99,11 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; @@ -102,6 +111,10 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.*; import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Created by Benoit on 08/04/2015. @@ -111,13 +124,14 @@ public class Duniter4jClientImpl implements Duniter4jClient { private final static ESLogger logger = Loggers.getLogger("duniter.client"); private final Client client; - + private final org.duniter.elasticsearch.threadpool.ThreadPool threadPool; private final ObjectMapper objectMapper; @Inject - public Duniter4jClientImpl(Client client) { + public Duniter4jClientImpl(Client client, org.duniter.elasticsearch.threadpool.ThreadPool threadPool) { super(); this.client = client; + this.threadPool = threadPool; this.objectMapper = JacksonUtils.newObjectMapper(); } @@ -505,7 +519,7 @@ public class Duniter4jClientImpl implements Duniter4jClient { } @Override - public void flushDeleteBulk(final String index, final String type, BulkRequestBuilder bulkRequest) { + public void flushDeleteBulk(final String index, final String type, final BulkRequestBuilder bulkRequest) { if (bulkRequest.numberOfActions() > 0) { BulkResponse bulkResponse = bulkRequest.execute().actionGet(); @@ -522,6 +536,31 @@ public class Duniter4jClientImpl implements Duniter4jClient { } } + @Override + public void flushBulk(final BulkRequestBuilder bulkRequest) { + if (bulkRequest.numberOfActions() > 0) { + + // Flush the bulk if not empty + BulkResponse bulkResponse = bulkRequest.get(); + + Set<String> missingDocIds = new LinkedHashSet<>(); + + // If failures, continue but save missing blocks + if (bulkResponse.hasFailures()) { + // process failures by iterating through each bulk response item + for (BulkItemResponse itemResponse : bulkResponse) { + boolean skip = !itemResponse.isFailed() + || missingDocIds.contains(itemResponse.getId()); + if (!skip) { + logger.error(String.format("[%s/%s] could not process _id=%s: %s. Skipping.", + itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId(), itemResponse.getFailureMessage())); + missingDocIds.add(itemResponse.getId()); + } + } + } + } + } + /* delegate methods */ @Override @@ -960,6 +999,10 @@ public class Duniter4jClientImpl implements Duniter4jClient { return client.threadPool(); } + public ScheduledThreadPoolExecutor scheduler() { + return (ScheduledThreadPoolExecutor)client.threadPool().scheduler(); + } + public void close() { client.close(); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java index e14e88c9..16d29387 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java @@ -25,7 +25,9 @@ package org.duniter.elasticsearch.dao; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.bma.BlockchainBlock; +import java.util.Collection; import java.util.List; +import java.util.Set; /** * Created by blavenie on 03/04/17. @@ -64,4 +66,6 @@ public interface BlockDao extends Bean, TypeDao<BlockDao> { BlockchainBlock getBlockById(String currencyName, String id); void deleteRange(final String currencyName, final int fromNumber, final int toNumber); + + List<BlockchainBlock> getBlocksByIds(String currencyName, Collection<String> ids); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java index 370e220d..7172a69f 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/BlockStatDao.java @@ -33,7 +33,7 @@ import java.util.List; */ public interface BlockStatDao extends Bean, TypeDao<BlockStatDao> { - String TYPE = "blockStat"; + String TYPE = "blockstat"; void create(BlockchainBlockStat block, boolean wait); @@ -59,6 +59,8 @@ public interface BlockStatDao extends Bean, TypeDao<BlockStatDao> { void delete(String currency, String id, boolean wait); + void delete(String currency, String id, String hash, boolean wait); + BlockchainBlockStat getById(String currencyName, String id); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java index 28858ff0..9d5a47bd 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java @@ -48,8 +48,10 @@ import org.elasticsearch.search.highlight.HighlightField; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; /** * Created by Benoit on 30/03/2015. @@ -167,6 +169,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { SearchRequestBuilder searchRequest = client .prepareSearch(currencyName) .setTypes(TYPE) + .setFetchSource(true) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); // If only one term, search as prefix @@ -196,6 +199,28 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { return toBlocks(searchResponse, true); } + public List<BlockchainBlock> getBlocksByIds(String currencyName, Collection<String> ids) { + // Prepare request + SearchRequestBuilder searchRequest = client + .prepareSearch(currencyName) + .setTypes(TYPE) + .setSize(ids.size()) + .setFetchSource(true) + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + + // If only one term, search as prefix + searchRequest.setQuery(QueryBuilders.idsQuery(TYPE).addIds(ids)); + + // Sort as id + searchRequest.addSort("_id", SortOrder.ASC); + + // Execute query + SearchResponse searchResponse = searchRequest.execute().actionGet(); + + // Read query result + return toBlocks(searchResponse, false); + } + public int getMaxBlockNumber(String currencyName) { // Prepare request SearchRequestBuilder searchRequest = client @@ -345,6 +370,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { protected List<BlockchainBlock> toBlocks(SearchResponse response, boolean withHighlight) { // Read query result List<BlockchainBlock> result = Lists.newArrayList(); + response.getHits().forEach(searchHit -> { BlockchainBlock block; if (searchHit.source() != null) { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java index 461d978f..35d1e669 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java @@ -29,6 +29,8 @@ import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.BlockStatDao; +import org.duniter.elasticsearch.exception.DuniterElasticsearchException; +import org.duniter.elasticsearch.exception.NotFoundException; import org.duniter.elasticsearch.model.BlockchainBlockStat; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; @@ -65,7 +67,8 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { // Preparing IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE) - .setId(block.getNumber().toString()) + .setId(String.valueOf(block.getNumber())) + .setRefresh(false) .setSource(json); // Execute @@ -86,7 +89,7 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { // Preparing indexBlocksFromNode IndexRequestBuilder request = client.prepareIndex(currencyName, TYPE) .setId(id) - .setRefresh(true) + .setRefresh(false) .setSource(json); // Execute @@ -152,6 +155,27 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { client.safeExecuteRequest(request, wait); } + @Override + public void delete(String currency, String id, String hash, boolean wait) { + Preconditions.checkNotNull(currency); + Preconditions.checkNotNull(id); + Preconditions.checkNotNull(hash); + + try { + // get the current hash + String existingHash = client.getTypedFieldById(currency, TYPE, id, BlockchainBlockStat.PROPERTY_HASH); + + // Execute the delete, only if same hash + if (hash.equals(existingHash)) { + DeleteRequestBuilder request = client.prepareDelete(currency, TYPE, id); + client.safeExecuteRequest(request, wait); + } + } catch(NotFoundException e) { + // Not exists: do not delete + } + + } + @Override public XContentBuilder createTypeMapping() { try { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/BlockchainBlockStat.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/BlockchainBlockStat.java index 75560fa7..3a602528 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/BlockchainBlockStat.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/BlockchainBlockStat.java @@ -44,7 +44,7 @@ public class BlockchainBlockStat implements Serializable { public static final String PROPERTY_TX_CHANGE_COUNT = "txChangeCount"; // Property copied from Block - private String version; + private int version; private String currency; private Integer number; private String hash; @@ -63,11 +63,11 @@ public class BlockchainBlockStat implements Serializable { super(); } - public String getVersion() { + public int getVersion() { return version; } - public void setVersion(String version) { + public void setVersion(int version) { this.version = version; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java index 5f95a2a5..c565ec9c 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java @@ -25,27 +25,23 @@ package org.duniter.elasticsearch.service; import com.google.common.collect.ImmutableList; import org.duniter.core.client.model.bma.BlockchainBlock; -import org.duniter.core.client.model.bma.util.BlockchainBlockUtils; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; -import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.Preconditions; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.dao.BlockStatDao; -import org.duniter.elasticsearch.model.BlockchainBlockStat; import org.duniter.elasticsearch.service.changes.ChangeEvent; import org.duniter.elasticsearch.service.changes.ChangeService; import org.duniter.elasticsearch.service.changes.ChangeSource; import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.unit.TimeValue; import java.io.IOException; -import java.math.BigInteger; -import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * Created by Benoit on 26/04/2017. @@ -57,18 +53,31 @@ public abstract class AbstractBlockchainListenerService extends AbstractService protected final boolean enable; protected final String listenerId; protected final ThreadPool threadPool; + protected final int bulkSize; + + private final TimeValue flushInterval; + protected final Object threadLock = Boolean.TRUE; + protected BulkRequestBuilder bulkRequest; + protected boolean flushing; @Inject public AbstractBlockchainListenerService(String loggerName, Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, - ThreadPool threadPool) { + ThreadPool threadPool, + TimeValue processingInterval) { super(loggerName, client, settings, cryptoService); this.listenerId = loggerName; this.enable = pluginSettings.enableBlockchain(); this.threadPool = threadPool; + this.bulkSize = pluginSettings.getIndexBulkSize(); + this.bulkRequest = client.prepareBulk(); + + this.flushInterval = processingInterval; + this.flushing = false; + if (this.enable) { ChangeService.registerListener(this); } @@ -81,35 +90,24 @@ public abstract class AbstractBlockchainListenerService extends AbstractService } @Override - public final void onChange(ChangeEvent change) { + public void onChange(ChangeEvent change) { // Skip _id=current if("current".equals(change.getId())) return; switch (change.getOperation()) { - // on create - case CREATE: // create - if (change.getSource() != null) { - CompletableFuture.runAsync(() -> { - processCreateBlock(change); - }, threadPool.scheduler()); - } - break; - - // on update + // on INDEX case INDEX: - if (change.getSource() != null) { - // Delete existing stat - CompletableFuture.runAsync(() -> processBlockDelete(change, true), threadPool.scheduler()) - // Then process block - .thenAcceptAsync(aVoid -> processCreateBlock(change)); + synchronized (threadLock) { + processBlockIndex(change); } break; - // on DELETE : remove user event on block (using link + // on DELETE case DELETE: - // Delete existing stat - CompletableFuture.runAsync(() -> processBlockDelete(change, false)); + synchronized (threadLock) { + processBlockDelete(change); + } break; } @@ -122,9 +120,43 @@ public abstract class AbstractBlockchainListenerService extends AbstractService /* -- internal method -- */ - protected abstract void processCreateBlock(final ChangeEvent change); - protected abstract void processBlockDelete(ChangeEvent change, boolean wait); + protected abstract void processBlockIndex(ChangeEvent change); + + protected abstract void processBlockDelete(ChangeEvent change); + + protected abstract void beforeFlush(); + protected void flushBulkRequestOrSchedule() { + if (flushing) return; + + // Flush now, if need or later + if (bulkRequest.numberOfActions() % bulkSize == 0) { + client.flushBulk(bulkRequest); + bulkRequest = client.prepareBulk(); + } + else { + flushing = true; + threadPool.schedule(() -> { + synchronized (threadLock) { + beforeFlush(); + client.flushBulk(bulkRequest); + bulkRequest = client.prepareBulk(); + flushing = false; + } + }, new TimeValue(500, TimeUnit.MILLISECONDS)); + } + } + + protected BlockchainBlock readBlock(ChangeEvent change) { + Preconditions.checkNotNull(change); + Preconditions.checkNotNull(change.getSource()); + + try { + return objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + } catch (IOException e) { + throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); + } + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 72747504..6f70cfec 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -46,8 +46,6 @@ import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.dao.BlockDao; -import org.duniter.elasticsearch.dao.BlockStatDao; -import org.duniter.elasticsearch.dao.PeerDao; import org.duniter.elasticsearch.exception.DuplicateIndexIdException; import org.duniter.elasticsearch.exception.NotFoundException; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -66,7 +64,6 @@ import java.util.*; public class BlockchainService extends AbstractService { public static final String BLOCK_TYPE = BlockDao.TYPE; - public static final String PEER_TYPE = PeerDao.TYPE; public static final String CURRENT_BLOCK_ID = "current"; private static final int SYNC_MISSING_BLOCK_MAX_RETRY = 5; @@ -74,8 +71,6 @@ public class BlockchainService extends AbstractService { private final ProgressionModel nullProgressionModel = new NullProgressionModel(); private BlockchainRemoteService blockchainRemoteService; - private CurrencyService currencyService; - private ThreadPool threadPool; private List<WebsocketClientEndpoint.ConnectionListener> connectionListeners = new ArrayList<>(); private final WebsocketClientEndpoint.ConnectionListener dispatchConnectionListener; @@ -93,7 +88,6 @@ public class BlockchainService extends AbstractService { BlockDao blockDao, final ServiceLocator serviceLocator){ super("duniter.blockchain", client, settings); - this.threadPool = threadPool; this.client = client; this.blockDao = blockDao; threadPool.scheduleOnStarted(() -> { @@ -115,11 +109,6 @@ public class BlockchainService extends AbstractService { }; } - @Inject - public void setCurrencyService(CurrencyService currencyService) { - this.currencyService = currencyService; - } - public void registerConnectionListener(WebsocketClientEndpoint.ConnectionListener listener) { synchronized (connectionListeners) { @@ -363,8 +352,6 @@ public class BlockchainService extends AbstractService { // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { String json = objectMapper.writeValueAsString(currentBlock); - - indexCurrentBlockFromJson(currentBlock.getCurrency(), json, wait); } catch(IOException e) { throw new TechnicalException(e); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java index e233e42c..5f46e386 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java @@ -23,9 +23,9 @@ package org.duniter.elasticsearch.service; */ +import com.fasterxml.jackson.core.JsonProcessingException; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.util.BlockchainBlockUtils; -import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; import org.duniter.elasticsearch.PluginSettings; @@ -34,49 +34,64 @@ import org.duniter.elasticsearch.dao.BlockStatDao; import org.duniter.elasticsearch.model.BlockchainBlockStat; import org.duniter.elasticsearch.service.changes.ChangeEvent; import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.unit.TimeValue; -import java.io.IOException; import java.math.BigInteger; import java.util.Arrays; +import java.util.concurrent.TimeUnit; /** * Created by Benoit on 26/04/2017. */ public class BlockchainStatsService extends AbstractBlockchainListenerService { - private final BlockStatDao blockStatDao; - @Inject public BlockchainStatsService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, - BlockStatDao blockStatDao, ThreadPool threadPool) { - super("duniter.blockchain.stats", client, settings, cryptoService, threadPool); - this.blockStatDao = blockStatDao; + super("duniter.blockchain.stats", client, settings, cryptoService, threadPool, + new TimeValue(500, TimeUnit.MILLISECONDS)); } - protected void processCreateBlock(final ChangeEvent change) { + @Override + protected void processBlockIndex(ChangeEvent change) { + + BlockchainBlock block = readBlock(change); + BlockchainBlockStat stat = toBlockStat(block); + + // Add a delete to bulk + bulkRequest.add(client.prepareDelete(block.getCurrency(), BlockStatDao.TYPE, String.valueOf(block.getNumber())) + .setRefresh(false)); + flushBulkRequestOrSchedule(); + + // Add a insert to bulk try { - BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); - processCreateBlock(block); - } catch (IOException e) { - throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); + bulkRequest.add(client.prepareIndex(block.getCurrency(), BlockStatDao.TYPE, String.valueOf(block.getNumber())) + .setRefresh(false) // recommended for heavy indexing + .setSource(objectMapper.writeValueAsString(stat))); + flushBulkRequestOrSchedule(); + } + catch(JsonProcessingException e) { + logger.error("Could not serialize BlockchainBlockStat into JSON: " + e.getMessage(), e); } } - protected void processBlockDelete(ChangeEvent change, boolean wait) { - if (change.getId() == null) return; - - // Delete existing stat - blockStatDao.delete(change.getIndex(), change.getId(), wait); + protected void processBlockDelete(ChangeEvent change) { + // Add delete to bulk + bulkRequest.add(client.prepareDelete(change.getIndex(), BlockStatDao.TYPE, change.getId()) + .setRefresh(false)); + flushBulkRequestOrSchedule(); } - /* -- internal method -- */ + protected void beforeFlush() { + // Nothing to do + } - private void processCreateBlock(BlockchainBlock block) { + protected BlockchainBlockStat toBlockStat(BlockchainBlock block) { - BlockchainBlockStat stat = newBlockStat(block); + BlockchainBlockStat result = newBlockStat(block); // Tx if (CollectionUtils.isNotEmpty(block.getTransactions())) { @@ -92,21 +107,21 @@ public class BlockchainStatsService extends AbstractBlockchainListenerService { txAmountCounter.inc(txAmount); } }); - - stat.setTxAmount(BigInteger.valueOf(txAmountCounter.count())); - stat.setTxChangeCount((int)txChangeCounter.count()); - stat.setTxCount(block.getTransactions().length); + result.setTxAmount(BigInteger.valueOf(txAmountCounter.count())); + result.setTxChangeCount((int)txChangeCounter.count()); + result.setTxCount(block.getTransactions().length); } else { - stat.setTxAmount(BigInteger.valueOf(0)); - stat.setTxChangeCount(0); - stat.setTxCount(0); + result.setTxAmount(BigInteger.valueOf(0)); + result.setTxChangeCount(0); + result.setTxCount(0); } - // Add to index - blockStatDao.create(stat, false/*wait*/); + return result; } + /* -- internal method -- */ + private BlockchainBlockStat newBlockStat(BlockchainBlock block) { BlockchainBlockStat stat = new BlockchainBlockStat(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java index a3b07217..57b68fc0 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java @@ -73,6 +73,16 @@ public class ChangeEvent { this.source = source; } + protected ChangeEvent(ChangeEvent event, boolean copySource) { + this.id = event.getId(); + this.index = event.getIndex(); + this.type = event.getType(); + this.timestamp = event.getTimestamp(); + this.operation = event.getOperation(); + this.version = event.getVersion(); + this.source = copySource ? event.getSource() : null; + } + public String getId() { return id; } @@ -101,7 +111,6 @@ public class ChangeEvent { return source; } - public String toJson() { try { XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput()); @@ -148,5 +157,7 @@ public class ChangeEvent { } } - + public ChangeEvent clone(ChangeEvent event, boolean withSource) { + return new ChangeEvent(this, withSource); + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/CompletableActionFuture.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/CompletableActionFuture.java new file mode 100644 index 00000000..07fbb039 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/CompletableActionFuture.java @@ -0,0 +1,133 @@ +package org.duniter.elasticsearch.threadpool; + +/* + * #%L + * Duniter4j :: ElasticSearch Core plugin + * %% + * Copyright (C) 2014 - 2017 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Clase qui offre l'interface de ES, tout en étant compatible avec java.util.concurrent.CompletableFuture + * (poar exemple pour faire des thenCompose()... + * @param <T> + */ +public class CompletableActionFuture<T> implements ActionFuture<T> { + + private final CompletableFuture<T> delegate; + + public CompletableActionFuture(CompletableFuture<T> delegate) { + this.delegate = delegate; + } + + @Override + public T actionGet() { + try { + return get(); + } catch (InterruptedException e) { + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + throw rethrowExecutionException(e); + } + } + + @Override + public T actionGet(String timeout) { + return actionGet(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".actionGet.timeout")); + } + + @Override + public T actionGet(long timeoutMillis) { + return actionGet(timeoutMillis, TimeUnit.MILLISECONDS); + } + + @Override + public T actionGet(TimeValue timeout) { + return actionGet(timeout.millis(), TimeUnit.MILLISECONDS); + } + + @Override + public T actionGet(long timeout, TimeUnit unit) { + try { + return get(timeout, unit); + } catch (TimeoutException e) { + throw new ElasticsearchTimeoutException(e.getMessage()); + } catch (InterruptedException e) { + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + throw rethrowExecutionException(e); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + + public CompletableFuture<T> getCompletableFuture() { + return delegate; + } + + static RuntimeException rethrowExecutionException(ExecutionException e) { + if (e.getCause() instanceof ElasticsearchException) { + ElasticsearchException esEx = (ElasticsearchException) e.getCause(); + Throwable root = esEx.unwrapCause(); + if (root instanceof ElasticsearchException) { + return (ElasticsearchException) root; + } else if (root instanceof RuntimeException) { + return (RuntimeException) root; + } + return new UncategorizedExecutionException("Failed execution", root); + } else if (e.getCause() instanceof RuntimeException) { + return (RuntimeException) e.getCause(); + } else { + return new UncategorizedExecutionException("Failed execution", e); + } + } +} \ No newline at end of file diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java new file mode 100644 index 00000000..ebd4c9c4 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java @@ -0,0 +1,89 @@ +package org.duniter.elasticsearch.threadpool; + +import org.elasticsearch.common.logging.ESLogger; + +import java.util.concurrent.*; + +public class LoggingScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + + private final ESLogger logger; + + public LoggingScheduledThreadPoolExecutor(ESLogger logger, + int corePoolSize, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, threadFactory, handler); + this.logger =logger; + } + + protected <V> RunnableScheduledFuture<V> decorateTask( + Runnable r, RunnableScheduledFuture<V> task) { + return new LoggingTask<V>(logger, task); + } + + protected <V> RunnableScheduledFuture<V> decorateTask( + Callable<V> c, RunnableScheduledFuture<V> task) { + return new LoggingTask<V>(logger, task); + } + + + static class LoggingTask<V> implements RunnableScheduledFuture<V> { + private final RunnableScheduledFuture<V> task; + private final ESLogger logger; + + public LoggingTask(ESLogger logger, RunnableScheduledFuture<V> task) { + this.task = task; + this.logger = logger; + } + + @Override + public void run() { + try { + task.run(); + } catch (Throwable e) { + logger.warn(String.format("Failed to execute a task: %s", e.getMessage()), e); + } + } + + @Override + public boolean isPeriodic() { + return task.isPeriodic(); + } + + @Override + public int compareTo(Delayed o) { + return task.compareTo(o); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return task.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return task.isCancelled(); + } + + @Override + public boolean isDone() { + return task.isDone(); + } + + @Override + public V get() throws InterruptedException, ExecutionException { + return task.get(); + } + + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return task.get(timeout, unit); + } + + @Override + public long getDelay(TimeUnit unit) { + return task.getDelay(unit); + } + } + +} \ No newline at end of file diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/RetryPolicy.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/RetryPolicy.java new file mode 100644 index 00000000..e8a792f2 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/RetryPolicy.java @@ -0,0 +1,43 @@ +package org.duniter.elasticsearch.threadpool; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.util.concurrent.EsAbortPolicy; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class RetryPolicy extends EsAbortPolicy { + + private final ESLogger logger; + private final long retryDelay; + private final TimeUnit timeUnit; + + public RetryPolicy(long retryDelay, TimeUnit timeUnit) { + this(Loggers.getLogger("duniter.threadpool.policy"), retryDelay, timeUnit); + } + + public RetryPolicy(ESLogger logger, long retryDelay, TimeUnit timeUnit) { + super(); + this.logger = logger; + this.retryDelay = retryDelay; + this.timeUnit = timeUnit; + } + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + + if (executor instanceof ScheduledThreadPoolExecutor) { + ScheduledThreadPoolExecutor scheduledExecutorService = (ScheduledThreadPoolExecutor)executor; + scheduledExecutorService.schedule(r, retryDelay, timeUnit); + logger.warn(String.format("Scheduler queue is full (max pool size = %s). Will retry later...", + executor.getMaximumPoolSize())); + } + else { + logger.error("Scheduler queue is full (max pool size = %s). Operation is rejected !"); + super.rejectedExecution(r, executor); + } + } + +} \ No newline at end of file diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java index 3025a909..b35a81f4 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java @@ -22,8 +22,8 @@ package org.duniter.elasticsearch.threadpool; * #L% */ -import org.duniter.core.util.Preconditions; import com.google.common.collect.Lists; +import org.duniter.core.util.Preconditions; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsRequestBuilder; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsResponse; import org.elasticsearch.client.Client; @@ -37,14 +37,15 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.EsAbortPolicy; import org.elasticsearch.common.util.concurrent.EsExecutors; -import org.elasticsearch.common.util.concurrent.LoggingRunnable; import org.elasticsearch.transport.TransportService; import org.nuiton.i18n.I18n; import java.util.List; -import java.util.concurrent.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * Manage thread pool, to execute tasks asynchronously. @@ -54,7 +55,7 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { private ScheduledThreadPoolExecutor scheduler = null; private final Injector injector; - private final ESLogger logger = Loggers.getLogger("threadpool"); + private final ESLogger logger = Loggers.getLogger("duniter.threadpool"); private final org.elasticsearch.threadpool.ThreadPool delegate; @@ -72,7 +73,14 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { this.delegate = esThreadPool; int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); - this.scheduler = new ScheduledThreadPoolExecutor(availableProcessors, EsExecutors.daemonThreadFactory(settings, "duniter-scheduler"), new EsAbortPolicy()); + this.scheduler = new LoggingScheduledThreadPoolExecutor(logger, availableProcessors, + EsExecutors.daemonThreadFactory(settings, "duniter-scheduler"), + new RetryPolicy(1, TimeUnit.SECONDS)); + /*this.scheduler = new ScheduledThreadPoolExecutor(availableProcessors, + EsExecutors.daemonThreadFactory(settings, "duniter-scheduler"), + new RetryPolicy(1, TimeUnit.SECONDS)) { + + };*/ this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); this.scheduler.setRemoveOnCancelPolicy(true); @@ -146,7 +154,7 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { */ public ScheduledActionFuture<?> schedule(Runnable command, String name, TimeValue delay) { if (name == null) { - return new ScheduledActionFuture<>(scheduler.schedule(new LoggingRunnable(logger, command), delay.millis(), TimeUnit.MILLISECONDS)); + return new ScheduledActionFuture<>(scheduler.schedule(command, delay.millis(), TimeUnit.MILLISECONDS)); } return new ScheduledActionFuture<>(delegate.schedule(delay, name, @@ -168,11 +176,24 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { * Schedules a periodic rest that always runs on the scheduler thread. * * @param command the rest to take - * @param interval the delay interval + * @param initialDelay the initial delay + * @param period the period + * @param timeUnit the time unit * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled */ - public ScheduledActionFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) { - return new ScheduledActionFuture<>(delegate.scheduleWithFixedDelay(command, interval)); + public ScheduledActionFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit timeUnit) { + return new ScheduledActionFuture<>(scheduler.scheduleAtFixedRate(command, initialDelay, period, timeUnit)); + } + + /** + * Schedules a periodic rest that always runs on the scheduler thread. + * + * @param command the rest to take + * @param initialDelay the initial delay + * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled + */ + public ScheduledActionFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit timeUnit) { + return new ScheduledActionFuture<>(scheduler.scheduleWithFixedDelay(command, initialDelay, delay, timeUnit)); } @@ -235,5 +256,8 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { public ScheduledExecutorService scheduler() { return delegate.scheduler(); + //return scheduler; } + + } diff --git a/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_en_GB.properties b/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_en_GB.properties index ef9d3b43..81712787 100644 --- a/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_en_GB.properties +++ b/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_en_GB.properties @@ -46,4 +46,4 @@ duniter4j.job.success= duniter4j.removeServiceUtils.waitThenRetry= duniter4j.task.issuer.system=System duniter4j.task.starting=Starting task... -duniter4j.threadPool.clusterHealthStatus.changed= +duniter4j.threadPool.clusterHealthStatus.changed=Cluster health status changed to [%s] diff --git a/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_fr_FR.properties b/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_fr_FR.properties index efaaec94..d5ed80d4 100644 --- a/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_fr_FR.properties +++ b/duniter4j-es-core/src/main/resources/i18n/duniter4j-es-core_fr_FR.properties @@ -46,4 +46,4 @@ duniter4j.job.success= duniter4j.removeServiceUtils.waitThenRetry= duniter4j.task.issuer.system=Système duniter4j.task.starting=Démarrage du traitement... -duniter4j.threadPool.clusterHealthStatus.changed= +duniter4j.threadPool.clusterHealthStatus.changed=Cluster health status changed to [%s] diff --git a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java index bb17bab8..ab95a1a6 100644 --- a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java +++ b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java @@ -57,7 +57,7 @@ public class BlockchainServiceTest { service = ServiceLocator.instance().getBean(BlockchainService.class); remoteService = ServiceLocator.instance().getBlockchainRemoteService(); config = Configuration.instance(); - peer = createTestPeer(); + peer = createTestPeer(config); objectMapper = JacksonUtils.newObjectMapper(); // Init the currency @@ -72,18 +72,26 @@ public class BlockchainServiceTest { } @Test - public void indexBlock() { + public void indexBlock() throws Exception { BlockchainBlock current = remoteService.getCurrentBlock(peer); service.indexCurrentBlock(current, true/*wait*/); try { String blockStr = objectMapper.writeValueAsString(current); - service.indexBlockFromJson(peer, blockStr, true/*is rurrent*/, false/*detected fork*/, true/*wait*/); + service.indexBlockFromJson(peer, blockStr, true/*is current*/, false/*detected fork*/, true/*wait*/); } catch(Exception e) { Assert.fail(e.getMessage()); } + + Thread.sleep(1000); + + // Try to get the indexed block + BlockchainBlock retrievedBlock = service.getBlockById(current.getCurrency(), current.getNumber().intValue()); + Assert.assertNotNull(retrievedBlock); + + } /* -- internal methods */ @@ -97,10 +105,10 @@ public class BlockchainServiceTest { } } - protected Peer createTestPeer() { + protected Peer createTestPeer(Configuration config) { Peer peer = new Peer( - Configuration.instance().getNodeHost(), - Configuration.instance().getNodePort()); + config.getNodeHost(), + config.getNodePort()); return peer; } diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java index caf31984..375073f8 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java @@ -145,20 +145,20 @@ public class SubscriptionService extends AbstractService { // TODO: remove this (DEV lon) - /*long devDuration = 10 * 60 * 1000; threadPool.scheduler().scheduleAtFixedRate( () -> executeEmailSubscriptions(EmailSubscription.Frequency.daily), - 1000 * 2, - devDuration, TimeUnit.MILLISECONDS);*/ + 1000 * 20, // start in 20s + 10 * 60 * 1000, // every 10 min + TimeUnit.MILLISECONDS); // Daily execution - threadPool.scheduler().scheduleAtFixedRate( + threadPool.scheduleAtFixedRate( () -> executeEmailSubscriptions(EmailSubscription.Frequency.daily), DateUtils.delayBeforeHour(pluginSettings.getEmailSubscriptionsExecuteHour()), DateUtils.DAY_DURATION_IN_MILLIS, TimeUnit.MILLISECONDS); // Weekly execution - threadPool.scheduler().scheduleAtFixedRate( + threadPool.scheduleAtFixedRate( () -> executeEmailSubscriptions(EmailSubscription.Frequency.weekly), DateUtils.delayBeforeDayAndHour(pluginSettings.getEmailSubscriptionsExecuteDayOfWeek(), pluginSettings.getEmailSubscriptionsExecuteHour()), 7 * DateUtils.DAY_DURATION_IN_MILLIS, TimeUnit.MILLISECONDS); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index 05779674..aef6a33a 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -23,8 +23,10 @@ package org.duniter.elasticsearch.user.service; */ +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; import org.duniter.core.client.model.ModelUtils; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.exception.TechnicalException; @@ -32,6 +34,8 @@ import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.BlockStatDao; +import org.duniter.elasticsearch.model.BlockchainBlockStat; import org.duniter.elasticsearch.service.AbstractBlockchainListenerService; import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.changes.ChangeEvent; @@ -42,15 +46,17 @@ import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.model.UserEventCodes; import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.nuiton.i18n.I18n; import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; /** * Created by Benoit on 30/03/2015. @@ -60,10 +66,9 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic public static final String DEFAULT_PUBKEYS_SEPARATOR = ", "; private final UserService userService; - private final UserEventService userEventService; - private final AdminService adminService; + private Queue<UserEvent.Reference> referencesToDelete = ConcurrentCollections.newBlockingQueue(); @Inject public BlockchainUserEventService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, @@ -72,10 +77,12 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic UserService userService, AdminService adminService, UserEventService userEventService) { - super("duniter.user.event.blockchain", client, settings.getDelegate(), cryptoService, threadPool); + super("duniter.user.event.blockchain", client, settings.getDelegate(), cryptoService, threadPool, + new TimeValue(500, TimeUnit.MILLISECONDS)); this.userService = userService; this.adminService = adminService; this.userEventService = userEventService; + if (this.enable) { blockchainService.registerConnectionListener(createConnectionListeners()); } @@ -83,23 +90,76 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic @Override - protected void processCreateBlock(final ChangeEvent change) { - try { - BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); - processCreateBlock(block); - } catch (IOException e) { - throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); + protected void processBlockIndex(ChangeEvent change) { + + BlockchainBlock block = readBlock(change); + + // First: Delete old events on same block + { + UserEvent.Reference reference = new UserEvent.Reference(change.getIndex(), BlockchainService.BLOCK_TYPE, change.getId()); + this.bulkRequest = userEventService.addDeleteEventsByReferenceToBulk(reference, this.bulkRequest, this.bulkSize, false); + flushBulkRequestOrSchedule(); + } + + // Joiners + if (CollectionUtils.isNotEmpty(block.getJoiners())) { + for (BlockchainBlock.Joiner joiner: block.getJoiners()) { + notifyUserEvent(block, joiner.getPublicKey(), UserEventCodes.MEMBER_JOIN, I18n.n("duniter.user.event.MEMBER_JOIN"), block.getCurrency()); + } + } + + // Leavers + if (CollectionUtils.isNotEmpty(block.getLeavers())) { + for (BlockchainBlock.Joiner leaver: block.getJoiners()) { + notifyUserEvent(block, leaver.getPublicKey(), UserEventCodes.MEMBER_LEAVE, I18n.n("duniter.user.event.MEMBER_LEAVE"), block.getCurrency()); + } + } + + // Actives + if (CollectionUtils.isNotEmpty(block.getActives())) { + for (BlockchainBlock.Joiner active: block.getActives()) { + notifyUserEvent(block, active.getPublicKey(), UserEventCodes.MEMBER_ACTIVE, I18n.n("duniter.user.event.MEMBER_ACTIVE"), block.getCurrency()); + } + } + + // Tx + if (CollectionUtils.isNotEmpty(block.getTransactions())) { + for (BlockchainBlock.Transaction tx: block.getTransactions()) { + processTx(block, tx); + } + } + + // Certifications + if (CollectionUtils.isNotEmpty(block.getCertifications())) { + for (BlockchainBlock.Certification cert: block.getCertifications()) { + processCertification(block, cert); + } } } @Override - protected void processBlockDelete(ChangeEvent change, boolean wait) { - if (change.getId() == null) return; + protected void processBlockDelete(ChangeEvent change) { + + UserEvent.Reference reference = new UserEvent.Reference(change.getIndex(), BlockchainService.BLOCK_TYPE, change.getId()); + + if (change.getSource() != null) { + BlockchainBlock block = readBlock(change); + reference.setHash(block.getHash()); + } + + // Add to queue + referencesToDelete.add(reference); + + flushBulkRequestOrSchedule(); + } + + + protected void beforeFlush() { - // Delete events that reference this block - ActionFuture<?> actionFuture = userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId())); - if (wait) { - actionFuture.actionGet(); + UserEvent.Reference reference = referencesToDelete.poll(); + while (reference != null) { + this.bulkRequest = userEventService.addDeleteEventsByReferenceToBulk(reference, this.bulkRequest, this.bulkSize, false); + reference = referencesToDelete.poll(); } } @@ -148,42 +208,7 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic } - private void processCreateBlock(BlockchainBlock block) { - // Joiners - if (CollectionUtils.isNotEmpty(block.getJoiners())) { - for (BlockchainBlock.Joiner joiner: block.getJoiners()) { - notifyUserEvent(block, joiner.getPublicKey(), UserEventCodes.MEMBER_JOIN, I18n.n("duniter.user.event.MEMBER_JOIN"), block.getCurrency()); - } - } - - // Leavers - if (CollectionUtils.isNotEmpty(block.getLeavers())) { - for (BlockchainBlock.Joiner leaver: block.getJoiners()) { - notifyUserEvent(block, leaver.getPublicKey(), UserEventCodes.MEMBER_LEAVE, I18n.n("duniter.user.event.MEMBER_LEAVE"), block.getCurrency()); - } - } - - // Actives - if (CollectionUtils.isNotEmpty(block.getActives())) { - for (BlockchainBlock.Joiner active: block.getActives()) { - notifyUserEvent(block, active.getPublicKey(), UserEventCodes.MEMBER_ACTIVE, I18n.n("duniter.user.event.MEMBER_ACTIVE"), block.getCurrency()); - } - } - - // Tx - if (CollectionUtils.isNotEmpty(block.getTransactions())) { - for (BlockchainBlock.Transaction tx: block.getTransactions()) { - processTx(block, tx); - } - } - // Certifications - if (CollectionUtils.isNotEmpty(block.getCertifications())) { - for (BlockchainBlock.Certification cert: block.getCertifications()) { - processCertification(block, cert); - } - } - } private void processTx(BlockchainBlock block, BlockchainBlock.Transaction tx) { Set<String> senders = ImmutableSet.copyOf(tx.getIssuers()); @@ -212,7 +237,6 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic } } - // TODO : indexer la TX dans un index/type spécifique ? } private void processCertification(BlockchainBlock block, BlockchainBlock.Certification certification) { @@ -243,7 +267,13 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic .setReferenceHash(block.getHash()) .build(); - userEventService.notifyUser(event); + event = userEventService.fillUserEvent(event); + + bulkRequest.add(client.prepareIndex(UserEventService.INDEX, UserEventService.EVENT_TYPE) + .setSource(userEventService.toJson(event)) + .setRefresh(false)); + + flushBulkRequestOrSchedule(); } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index ae33dccf..006857bf 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -44,7 +44,6 @@ import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.model.UserProfile; import org.elasticsearch.action.ActionFuture; -import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -56,7 +55,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; @@ -118,7 +116,7 @@ public class UserEventService extends AbstractService implements ChangeService.C /** * Notify a user */ - public ListenableActionFuture<IndexResponse> notifyUser(UserEvent event) { + public ActionFuture<IndexResponse> notifyUser(UserEvent event) { Preconditions.checkNotNull(event); Preconditions.checkNotNull(event.getRecipient()); @@ -132,35 +130,29 @@ public class UserEventService extends AbstractService implements ChangeService.C return indexEvent(locale, event); } - public ListenableActionFuture<IndexResponse> indexEvent(Locale locale, UserEvent event) { + /** + * Notify a user + */ + public UserEvent fillUserEvent(UserEvent event) { + Preconditions.checkNotNull(event); Preconditions.checkNotNull(event.getRecipient()); - Preconditions.checkNotNull(event.getType()); - Preconditions.checkNotNull(event.getCode()); - String nodePubkey = pluginSettings.getNodePubkey(); + // Get user profile locale + UserProfile userProfile = getUserProfile(event.getRecipient(), + UserProfile.PROPERTY_EMAIL, UserProfile.PROPERTY_TITLE, UserProfile.PROPERTY_LOCALE); - // Generate json - String eventJson; - if (StringUtils.isNotBlank(nodePubkey)) { - UserEvent signedEvent = new UserEvent(event); - signedEvent.setMessage(event.getLocalizedMessage(locale)); - // set issuer, hash, signature - signedEvent.setIssuer(nodePubkey); + Locale locale = userProfile.getLocale() != null ? new Locale(userProfile.getLocale()) : null; - // Add hash - String hash = cryptoService.hash(toJson(signedEvent, true)); - signedEvent.setHash(hash); + // Add new event to index + return fillUserEvent(locale, event); + } - // Add signature - String signature = cryptoService.sign(toJson(signedEvent, true), pluginSettings.getNodeKeypair().getSecKey()); - signedEvent.setSignature(signature); + public ActionFuture<IndexResponse> indexEvent(Locale locale, UserEvent event) { - eventJson = toJson(signedEvent); - } else { - logger.warn("Could not generate hash for new user event (no keyring)"); - // Node has not keyring: do NOT sign it - eventJson = event.toJson(locale); - } + UserEvent completeUserEvent = fillUserEvent(locale, event); + + // Generate json + String eventJson = toJson(completeUserEvent); if (logger.isDebugEnabled()) { logger.debug(String.format("Indexing a event to recipient [%s]", event.getRecipient().substring(0, 8))); @@ -171,11 +163,11 @@ public class UserEventService extends AbstractService implements ChangeService.C } - public ListenableActionFuture<IndexResponse> indexEvent(String eventJson) { + public ActionFuture<IndexResponse> indexEvent(String eventJson) { return indexEvent(eventJson, true); } - public ListenableActionFuture<IndexResponse> indexEvent(String eventJson, boolean checkSignature) { + public ActionFuture<IndexResponse> indexEvent(String eventJson, boolean checkSignature) { if (checkSignature) { JsonNode jsonNode = readAndVerifyIssuerSignature(eventJson); @@ -194,13 +186,19 @@ public class UserEventService extends AbstractService implements ChangeService.C .execute(); } + public ActionFuture<?> deleteEventsByReference(final UserEvent.Reference reference) { Preconditions.checkNotNull(reference); - return threadPool.schedule(() -> doDeleteEventsByReference(reference)); + final int bulkSize = pluginSettings.getIndexBulkSize(); + + return threadPool.schedule(() -> { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + addDeleteEventsByReferenceToBulk(reference, bulkRequest, bulkSize, true); + }); } - public ListenableActionFuture<UpdateResponse> markEventAsRead(String id, String signature) { + public ActionFuture<UpdateResponse> markEventAsRead(String id, String signature) { Map<String, Object> fields = client.getMandatoryFieldsById(INDEX, EVENT_TYPE, id, UserEvent.PROPERTY_HASH, UserEvent.PROPERTY_RECIPIENT); String recipient = fields.get(UserEvent.PROPERTY_RECIPIENT).toString(); @@ -249,9 +247,42 @@ public class UserEventService extends AbstractService implements ChangeService.C .collect(Collectors.toList()); } + public String toJson(UserEvent userEvent) { + return toJson(userEvent, false); + } + /* -- Internal methods -- */ + protected UserEvent fillUserEvent(Locale locale, UserEvent event) { + Preconditions.checkNotNull(event.getRecipient()); + Preconditions.checkNotNull(event.getType()); + Preconditions.checkNotNull(event.getCode()); + + String nodePubkey = pluginSettings.getNodePubkey(); + + // Generate json + if (StringUtils.isNotBlank(nodePubkey)) { + UserEvent signedEvent = new UserEvent(event); + signedEvent.setMessage(event.getLocalizedMessage(locale)); + // set issuer, hash, signature + signedEvent.setIssuer(nodePubkey); + + // Add hash + String hash = cryptoService.hash(toJson(signedEvent, true)); + signedEvent.setHash(hash); + + // Add signature + String signature = cryptoService.sign(toJson(signedEvent, true), pluginSettings.getNodeKeypair().getSecKey()); + signedEvent.setSignature(signature); + + return signedEvent; + } else { + logger.warn("Could not generate hash for new user event (no keyring)"); + return event; + } + } + public static XContentBuilder createEventType() { try { XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(EVENT_TYPE) @@ -358,7 +389,10 @@ public class UserEventService extends AbstractService implements ChangeService.C return result; } - private void doDeleteEventsByReference(final UserEvent.Reference reference) { + public BulkRequestBuilder addDeleteEventsByReferenceToBulk(final UserEvent.Reference reference, + BulkRequestBuilder bulkRequest, + final int bulkSize, + final boolean flushAll) { // Prepare search request SearchRequestBuilder searchRequest = client @@ -389,8 +423,6 @@ public class UserEventService extends AbstractService implements ChangeService.C // Execute query, while there is some data try { - int bulkSize = pluginSettings.getIndexBulkSize(); - BulkRequestBuilder bulkRequest = client.prepareBulk(); int counter = 0; boolean loop = true; @@ -409,7 +441,7 @@ public class UserEventService extends AbstractService implements ChangeService.C counter++; // Flush the bulk if not empty - if ((counter % bulkSize) == 0) { + if ((bulkRequest.numberOfActions() % bulkSize) == 0) { client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); bulkRequest = client.prepareBulk(); } @@ -427,7 +459,7 @@ public class UserEventService extends AbstractService implements ChangeService.C } while(loop); // last flush - if ((counter % bulkSize) != 0) { + if (flushAll && (bulkRequest.numberOfActions() % bulkSize) != 0) { client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); } @@ -435,6 +467,8 @@ public class UserEventService extends AbstractService implements ChangeService.C // Failed or no item on index logger.error(String.format("Error while deleting by reference: %s. Skipping deletions.", e.getMessage()), e); } + + return bulkRequest; } @@ -442,9 +476,6 @@ public class UserEventService extends AbstractService implements ChangeService.C return client.getSourceByIdOrNull(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames); } - private String toJson(UserEvent userEvent) { - return toJson(userEvent, false); - } private String toJson(UserEvent userEvent, boolean cleanHashAndSignature) { try { @@ -509,16 +540,18 @@ public class UserEventService extends AbstractService implements ChangeService.C event.setId(eventId); - // Notify listeners - threadPool.schedule(() -> { - synchronized (LISTENERS) { - LISTENERS.values().forEach(listener -> { - if (event.getRecipient().equals(listener.getPubkey())) { - listener.onEvent(event); - } - }); - } - }); + if (LISTENERS.size() > 0) { + // Notify listeners + threadPool.schedule(() -> { + synchronized (LISTENERS) { + LISTENERS.values().forEach(listener -> { + if (event.getRecipient().equals(listener.getPubkey())) { + listener.onEvent(event); + } + }); + } + }); + } } -- GitLab