From 770d2cb07ef33a99372ea33c6fd27bf482d9272e Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Tue, 12 Sep 2017 21:02:45 +0200 Subject: [PATCH] Lib upgrade : guava 22 Upgrade to java 1.8 [core] Add config option to reload as part of the BC [core] Fix unit test config file [core] Peer: mark peers as DOWN is not alive since X time --- .../main/java/org/duniter/client/Main.java | 2 +- .../duniter/client/actions/NetworkAction.java | 5 +- .../client/actions/TransactionAction.java | 2 +- .../core/client/config/Configuration.java | 4 + .../client/config/ConfigurationOption.java | 8 +- .../org/duniter/core/client/dao/PeerDao.java | 2 +- .../client/dao/mem/MemoryPeerDaoImpl.java | 5 +- .../client/model/bma/BlockchainBlocks.java | 7 +- .../model/bma/jackson/JacksonUtils.java | 11 + .../duniter/core/client/model/local/Peer.java | 10 +- .../core/client/model/local/Peers.java | 27 ++ .../core/client/model/local/Wallet.java | 15 +- .../core/client/service/HttpServiceImpl.java | 5 +- .../core/client/service/ServiceLocator.java | 5 + .../service/bma/NetworkRemoteServiceImpl.java | 6 +- .../client/service/local/NetworkService.java | 5 +- .../service/local/NetworkServiceImpl.java | 283 ++++++++---------- .../client/service/local/PeerService.java | 9 +- .../client/service/local/PeerServiceImpl.java | 58 +++- .../duniter4j-core-client_en_GB.properties | 1 + .../duniter4j-core-client_fr_FR.properties | 1 + .../core/client/model/BlockFileUtils.java | 3 +- .../bma/BlockchainRemoteServiceTest.java | 4 +- .../org/duniter/core/util/LockManager.java | 104 +++++++ .../src/test/es-home/config/elasticsearch.yml | 4 +- .../src/test/es-home/config/logging.yml | 1 + .../duniter/elasticsearch/PluginSettings.java | 2 +- .../client/Duniter4jClientImpl.java | 8 +- .../elasticsearch/dao/AbstractDao.java | 5 +- .../elasticsearch/dao/impl/BlockDaoImpl.java | 6 +- .../dao/impl/BlockStatDaoImpl.java | 4 +- .../dao/impl/CurrencyDaoImpl.java | 4 +- .../dao/impl/MovementDaoImpl.java | 4 +- .../elasticsearch/dao/impl/PeerDaoImpl.java | 16 +- .../AbstractBlockchainListenerService.java | 2 +- .../service/AbstractService.java | 12 +- .../service/AbstractSynchroService.java | 15 +- .../service/BlockchainListenerService.java | 4 +- .../service/BlockchainService.java | 2 +- .../elasticsearch/service/PeerService.java | 125 +------- .../LoggingScheduledThreadPoolExecutor.java | 1 - .../service/BlockchainServiceTest.java | 26 +- .../service/PeerServiceTest.java | 3 +- .../service/SubscriptionServiceTest.java | 6 +- .../service/BlockchainUserEventService.java | 2 +- .../user/service/UserEventService.java | 12 +- 46 files changed, 439 insertions(+), 407 deletions(-) create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java create mode 100644 duniter4j-core-shared/src/main/java/org/duniter/core/util/LockManager.java diff --git a/duniter4j-client/src/main/java/org/duniter/client/Main.java b/duniter4j-client/src/main/java/org/duniter/client/Main.java index a03b93a6..b86bdf48 100644 --- a/duniter4j-client/src/main/java/org/duniter/client/Main.java +++ b/duniter4j-client/src/main/java/org/duniter/client/Main.java @@ -76,7 +76,7 @@ public class Main { // Parsing args JCommander jc = new JCommander(this); - actions.entrySet().stream().forEach(entry -> jc.addCommand(entry.getKey(), entry.getValue())); + actions.entrySet().forEach(entry -> jc.addCommand(entry.getKey(), entry.getValue())); try { jc.parse(args); diff --git a/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java b/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java index 2903072d..ccf311b3 100644 --- a/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java +++ b/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java @@ -43,6 +43,7 @@ import org.nuiton.i18n.I18n; import java.io.*; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -113,7 +114,7 @@ public class NetworkAction extends AbstractAction { /* -- protected methods -- */ - public void showPeersTable(List<Peer> peers, boolean clearConsole) { + public void showPeersTable(Collection<Peer> peers, boolean clearConsole) { // Clearing console if (clearConsole) { @@ -125,7 +126,7 @@ public class NetworkAction extends AbstractAction { return; } - Peer mainConsensusPeer = peers.get(0); + Peer mainConsensusPeer = peers.iterator().next(); Peer.Stats mainConsensusStats = mainConsensusPeer.getStats(); if (mainConsensusStats.isMainConsensus()) { Long mediantTime = mainConsensusStats.getMedianTime(); diff --git a/duniter4j-client/src/main/java/org/duniter/client/actions/TransactionAction.java b/duniter4j-client/src/main/java/org/duniter/client/actions/TransactionAction.java index 7dc04b26..8415a1f2 100644 --- a/duniter4j-client/src/main/java/org/duniter/client/actions/TransactionAction.java +++ b/duniter4j-client/src/main/java/org/duniter/client/actions/TransactionAction.java @@ -105,7 +105,7 @@ public class TransactionAction extends AbstractAction { // Compute keypair and wallet - KeyPair keypair = null; + KeyPair keypair; if (authParameters.authScrypt) { CryptoService cryptoService = ServiceLocator.instance().getCryptoService(); keypair = cryptoService.getKeyPairFromSeed( diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/config/Configuration.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/config/Configuration.java index 6837a793..cb95c70b 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/config/Configuration.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/config/Configuration.java @@ -262,6 +262,10 @@ public class Configuration { return Integer.parseInt(ConfigurationOption.NETWORK_CACHE_TIME_IN_MILLIS.getDefaultValue()); } + public int getPeerUpMaxAge() { + return applicationConfig.getOptionAsInt(ConfigurationOption.NETWORK_PEER_UP_MAX_AGE.getKey()); + } + public String getNodeElasticSearchHost() { return applicationConfig.getOption(ConfigurationOption.NODE_ELASTICSEARCH_HOST.getKey()); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/config/ConfigurationOption.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/config/ConfigurationOption.java index 9c28f7ea..ea5d6ef0 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/config/ConfigurationOption.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/config/ConfigurationOption.java @@ -182,7 +182,6 @@ public enum ConfigurationOption implements ConfigOptionDef { Integer.class, false), - NETWORK_CACHE_TIME_IN_MILLIS ( "duniter4j.network.cacheTimeInMillis", "duniter4j.config.option.network.cacheTimeInMillis.description", @@ -190,6 +189,13 @@ public enum ConfigurationOption implements ConfigOptionDef { Integer.class, false), + NETWORK_PEER_UP_MAX_AGE ( + "duniter.network.peerUpMaxAge", + "duniter.config.option.network.peerUpMaxAge.description", + "600000", // = 10 min + Integer.class, + false), + NODE_ELASTICSEARCH_PROTOCOL( "duniter4j.node.elasticsearch.protocol", n("duniter4j.config.option.node.elasticsearch.protocol.description"), diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java index 199d817a..2e6b031e 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java @@ -37,5 +37,5 @@ public interface PeerDao extends EntityDao<String, Peer> { Long getMaxLastUpTime(String currencyId); - void updatePeersAsDown(String currencyId, long lastUpTimeTimeout); + void updatePeersAsDown(String currencyId, long maxUpTime); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java index 544c4e69..2ed9eed2 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java @@ -90,11 +90,10 @@ public class MemoryPeerDaoImpl implements PeerDao { } @Override - public void updatePeersAsDown(String currencyId, long lastUpTimeTimeout) { - long maxLastUpTime = (System.currentTimeMillis() - lastUpTimeTimeout)/1000; + public void updatePeersAsDown(String currencyId, long upTimeLimit) { getPeersByCurrencyId(currencyId).stream() - .filter(peer -> peer.getStats() != null && peer.getStats().getLastUpTime() <= maxLastUpTime) + .filter(peer -> peer.getStats() != null && peer.getStats().getLastUpTime() <= upTimeLimit) .forEach(peer -> { peer.getStats().setStatus(Peer.PeerStatus.DOWN); }); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java index 6bf2ba01..a834a8d6 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java @@ -40,8 +40,6 @@ import java.util.stream.IntStream; */ public final class BlockchainBlocks { - private static final Logger log = LoggerFactory.getLogger(BlockchainBlocks.class); - public static final Pattern SIG_PUBKEY_PATTERN = Pattern.compile("SIG\\(([^)]+)\\)"); public static final Pattern TX_INPUT_CONDITION_FUNCTION = Pattern.compile("(SIG|XHX)\\(([^)]+)\\)"); @@ -208,6 +206,11 @@ public final class BlockchainBlocks { .distinct().collect(Collectors.toSet()); } + public static String buid(BlockchainBlock block) { + if (block == null || block.getNumber() == null || block.getHash() == null) return null; + return block.getNumber() + "-" + block.getHash(); + } + public static class TxInput { public long amount; public int unitbase; diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java index d0289195..3ec81df5 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java @@ -42,6 +42,17 @@ public abstract class JacksonUtils extends SimpleModule { public static final String REGEX_ATTRIBUTE_REPLACE = "[,]?(?:\"%s\"|%s)[\\s\\n\\r]*:[\\s\\n\\r]*(?:\"[^\"]+\"|null)"; + private static final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() { + @Override + protected ObjectMapper initialValue() { + return newObjectMapper(); + } + }; + + public static ObjectMapper getThreadObjectMapper() { + return mapper.get(); + } + public static ObjectMapper newObjectMapper() { ObjectMapper objectMapper = new ObjectMapper(); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java index e4d69e63..0069509b 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java @@ -25,19 +25,23 @@ package org.duniter.core.client.model.local; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.base.Joiner; +import javafx.beans.Observable; +import org.duniter.core.beans.Bean; +import org.duniter.core.beans.ObservableBean; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.bma.NetworkPeering; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.core.util.http.InetAddressUtils; +import java.beans.PropertyChangeListener; +import java.beans.PropertyChangeListenerProxy; +import java.beans.PropertyChangeSupport; import java.io.Serializable; import java.util.StringJoiner; public class Peer implements LocalEntity<String>, Serializable { - - public static Builder newBuilder() { return new Builder(); } @@ -364,6 +368,8 @@ public class Peer implements LocalEntity<String>, Serializable { return joiner.toString(); } + + public enum PeerStatus { UP, DOWN, diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java new file mode 100644 index 00000000..8d0723c2 --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java @@ -0,0 +1,27 @@ +package org.duniter.core.client.model.local; + +import org.duniter.core.client.model.bma.EndpointApi; + +/** + * Created by blavenie on 12/09/17. + */ +public final class Peers { + + private Peers() { + // helper class + } + + public static boolean hasEndPointAPI(Peer peer, EndpointApi api) { + return peer.getApi() != null && peer.getApi().equalsIgnoreCase(api.name()); + } + + public static String buid(Peer peer) { + return buid(peer.getStats()); + } + + public static String buid(Peer.Stats stats) { + return stats.getStatus() == Peer.PeerStatus.UP + ? stats.getBlockNumber() + "-" + stats.getBlockHash() + : null; + } +} diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Wallet.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Wallet.java index c22dbf12..0473a118 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Wallet.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Wallet.java @@ -25,6 +25,7 @@ package org.duniter.core.client.model.local; import java.io.Serializable; import java.util.Collection; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.duniter.core.client.model.bma.WotCertification; import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.crypto.CryptoUtils; @@ -36,7 +37,6 @@ import org.duniter.core.util.crypto.KeyPair; */ public class Wallet extends KeyPair implements LocalEntity<Long>, Serializable { - private Long id; private Long accountId; private String currency; @@ -48,11 +48,6 @@ public class Wallet extends KeyPair implements LocalEntity<Long>, Serializable { private long txBlockNumber = -1; private Collection<WotCertification> certifications; - /** - * Use for UI, when some properties has not been displayed yet - */ - private boolean isDirty = false; - public Wallet() { super(null, null); this.identity = new Identity(); @@ -178,14 +173,6 @@ public class Wallet extends KeyPair implements LocalEntity<Long>, Serializable { return identity.getIsMember(); } - public boolean isDirty() { - return isDirty; - } - - public void setDirty(boolean isDirty) { - this.isDirty = isDirty; - } - public Double getCreditAsUD() { return creditAsUD; } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java index 892557ee..1a888190 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java @@ -45,6 +45,7 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.util.EntityUtils; import org.duniter.core.beans.InitializingBean; import org.duniter.core.client.config.Configuration; +import org.duniter.core.client.config.ConfigurationOption; import org.duniter.core.client.model.bma.Constants; import org.duniter.core.client.model.bma.Error; import org.duniter.core.client.model.bma.jackson.JacksonUtils; @@ -111,7 +112,9 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean protected void initCaches() { Configuration config = Configuration.instance(); int cacheTimeInMillis = config.getNetworkCacheTimeInMillis(); - final int defaultTimeout = config.getNetworkTimeout(); + final int defaultTimeout = config.getNetworkTimeout() > 0 ? + config.getNetworkTimeout() : + Integer.parseInt(ConfigurationOption.NETWORK_TIMEOUT.getDefaultValue()); requestConfigCache = new SimpleCache<Integer, RequestConfig>(cacheTimeInMillis*100) { @Override diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java index 38107551..711b7324 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java @@ -25,6 +25,7 @@ package org.duniter.core.client.service; import org.duniter.core.beans.Bean; import org.duniter.core.beans.BeanFactory; +import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.service.bma.BlockchainRemoteService; import org.duniter.core.client.service.bma.NetworkRemoteService; import org.duniter.core.client.service.bma.TransactionRemoteService; @@ -152,6 +153,10 @@ public class ServiceLocator implements Closeable { return getBean(NetworkService.class); } + public PeerDao getPeerDao() { + return getBean(PeerDao.class); + } + public <S extends Bean> S getBean(Class<S> clazz) { if (beanFactory == null) { initBeanFactory(); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java index 7cfafe68..14c0f0e8 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java @@ -62,13 +62,8 @@ public class NetworkRemoteServiceImpl extends BaseRemoteServiceImpl implements N public static final String URL_WS_PEER = "/ws/peer"; - protected ObjectMapper objectMapper; - - public NetworkRemoteServiceImpl() { super(); - - objectMapper = JacksonUtils.newObjectMapper(); } public NetworkPeering getPeering(Peer peer) { @@ -104,6 +99,7 @@ public class NetworkRemoteServiceImpl extends BaseRemoteServiceImpl implements N if (jsonNode.has("leaf")) { jsonNode = jsonNode.get("leaf"); if (jsonNode.has("value")) { + ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper(); jsonNode = jsonNode.get("value"); String json = objectMapper.writeValueAsString(jsonNode); result = objectMapper.readValue(json, NetworkPeers.Peer.class); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java index 69667304..942fd002 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java @@ -25,6 +25,7 @@ package org.duniter.core.client.service.local; import org.duniter.core.beans.Service; import org.duniter.core.client.model.local.Peer; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -38,7 +39,8 @@ import java.util.function.Predicate; public interface NetworkService extends Service { interface PeersChangeListener { - void onChanged(List<Peer> peers); + void onChanges(Collection<Peer> peers); + } class Sort { @@ -51,6 +53,7 @@ public interface NetworkService extends Service { public Peer.PeerStatus filterStatus; public Boolean filterSsl; public List<String> filterEndpoints; + public String currency; } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java index 9c3f66bb..9d016080 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java @@ -23,11 +23,13 @@ package org.duniter.core.client.service.local; */ import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.duniter.core.client.config.Configuration; +import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.*; -import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.model.local.Peers; import org.duniter.core.client.service.ServiceLocator; import org.duniter.core.client.service.bma.BaseRemoteServiceImpl; import org.duniter.core.client.service.bma.BlockchainRemoteService; @@ -36,13 +38,9 @@ import org.duniter.core.client.service.bma.WotRemoteService; import org.duniter.core.client.service.exception.HttpConnectException; import org.duniter.core.client.service.exception.HttpNotFoundException; import org.duniter.core.exception.TechnicalException; -import org.duniter.core.service.CryptoService; +import org.duniter.core.util.*; import org.duniter.core.util.CollectionUtils; -import org.duniter.core.util.ObjectUtils; -import org.duniter.core.util.Preconditions; -import org.duniter.core.util.StringUtils; import org.duniter.core.util.concurrent.CompletableFutures; -import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +50,6 @@ import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -61,43 +58,32 @@ import java.util.stream.Collectors; public class NetworkServiceImpl extends BaseRemoteServiceImpl implements NetworkService { private static final Logger log = LoggerFactory.getLogger(NetworkServiceImpl.class); + private static final String PEERS_UPDATE_LOCK_NAME = "Peers update"; private final static String BMA_URL_STATUS = "/node/summary"; private final static String BMA_URL_BLOCKCHAIN_CURRENT = "/blockchain/current"; private final static String BMA_URL_BLOCKCHAIN_HARDSHIP = "/blockchain/hardship/"; private NetworkRemoteService networkRemoteService; - private CryptoService cryptoService; private WotRemoteService wotRemoteService; private BlockchainRemoteService blockchainRemoteService; + private Configuration config; + private final LockManager lockManager = new LockManager(4, 10); - protected class Locker { - private boolean lock = false; - public boolean isLocked() { - return this.lock; - } - public void lock() { - Preconditions.checkArgument(!this.lock); - this.lock = !this.lock; - } - public void unlock() { - Preconditions.checkArgument(this.lock); - this.lock = !this.lock; - } - } + private PeerService peerService; public NetworkServiceImpl() { } public NetworkServiceImpl(NetworkRemoteService networkRemoteService, WotRemoteService wotRemoteService, - CryptoService cryptoService, - BlockchainRemoteService blockchainRemoteService) { + BlockchainRemoteService blockchainRemoteService, + PeerService peerService) { this(); this.networkRemoteService = networkRemoteService; this.wotRemoteService = wotRemoteService; - this.cryptoService = cryptoService; this.blockchainRemoteService = blockchainRemoteService; + this.peerService = peerService; } @Override @@ -105,8 +91,9 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network super.afterPropertiesSet(); this.networkRemoteService = ServiceLocator.instance().getNetworkRemoteService(); this.wotRemoteService = ServiceLocator.instance().getWotRemoteService(); - this.cryptoService = ServiceLocator.instance().getCryptoService(); this.blockchainRemoteService = ServiceLocator.instance().getBlockchainRemoteService(); + this.config = Configuration.instance(); + this.peerService = ServiceLocator.instance().getPeerService(); } @Override @@ -215,9 +202,6 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network peer.getStats().setUid(uid); if (peer.getStats().isReacheable()) { - // Last UP time - peer.getStats().setLastUpTime(System.currentTimeMillis()/1000 /*unix timestamp*/ ); - // Hardship if (StringUtils.isNotBlank(uid)) { getHardship(peer); @@ -235,7 +219,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network final Map<String,Long> peerCountByBuid = peers.stream() .filter(peer -> peer.getStats().getStatus() == Peer.PeerStatus.UP) - .map(this::buid) + .map(Peers::buid) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); // Compute main consensus buid @@ -257,7 +241,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network // Set consensus stats peers.forEach(peer -> { Peer.Stats stats = peer.getStats(); - String buid = buid(stats); + String buid = Peers.buid(stats); // Set consensus stats on each peers if (buid != null) { @@ -276,11 +260,14 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network public void addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener) { + BlockchainParameters parameters = blockchainRemoteService.getParameters(mainPeer); + // Default filter Filter filterDef = new Filter(); filterDef.filterType = null; filterDef.filterStatus = Peer.PeerStatus.UP; filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + filterDef.currency = parameters.getCurrency(); // Default sort Sort sortDef = new Sort(); @@ -294,126 +281,130 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network final Filter filter, final Sort sort, final boolean autoreconnect, final ExecutorService executor) { + final String currency = filter != null && filter.currency != null ? filter.currency : + blockchainRemoteService.getParameters(mainPeer).getCurrency(); - final Locker threadLock = new Locker(); - final List<Peer> result = new ArrayList<>(); final List<String> knownBlocks = new ArrayList<>(); - final Map<String, Peer> knownPeers = new HashMap<>(); - final Predicate<Peer> peerFilter = peerFilter(filter); final Comparator<Peer> peerComparator = peerComparator(sort); final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool(); - Runnable getPeersRunnable = () -> { - if (threadLock.isLocked()) { - log.debug("Rejected getPeersRunnable() call. Another refresh is already running..."); - return; - } - synchronized (threadLock) { - threadLock.lock(); - } - try { - List<Peer> updatedPeers = getPeers(mainPeer, filter, sort, pool); + // Refreshing one peer (e.g. received from WS) + Consumer<List<Peer>> updateKnownBlocks = (updatedPeers) -> + updatedPeers.forEach(peer -> { + String buid = Peers.buid(peer); + if (!knownBlocks.contains(buid)) { + knownBlocks.add(buid); + } + }); - knownPeers.clear(); - updatedPeers.forEach(peer -> { - String buid = buid(peer.getStats()); - if (!knownBlocks.contains(buid)) { - knownBlocks.add(buid); + // Load all peers + Runnable loadAllPeers = () -> { + try { + if (lockManager.tryLock(PEERS_UPDATE_LOCK_NAME, 1, TimeUnit.MINUTES)) { + try { + List<Peer> result = getPeers(mainPeer, filter, sort, pool); + + knownBlocks.clear(); + updateKnownBlocks.accept(result); + + // Save update peers + peerService.save(currency, result, false/*not the full UP list*/); + + // Send full list listener + listener.onChanges(result); + } catch (Exception e) { + log.error("Error while loading all peers: " + e.getMessage(), e); + } finally { + lockManager.unlock(PEERS_UPDATE_LOCK_NAME); } - knownPeers.put(peer.toString(), peer); - }); - - result.clear(); - result.addAll(updatedPeers); - listener.onChanged(result); - } - catch(Exception e) { - log.error("Error while loading all peers: " + e.getMessage(), e); - } - finally { - synchronized (threadLock) { - threadLock.unlock(); } + } catch (InterruptedException e) { + log.warn("Could not acquire lock for reloading all peers. Skipping."); } }; + // Refreshing one peer (e.g. received from WS) Consumer<NetworkPeers.Peer> refreshPeerConsumer = (bmaPeer) -> { - if (threadLock.isLocked()) { - log.debug("Rejected refreshPeerConsumer() call. Another refresh is already running..."); - return; - } - synchronized (threadLock) { - threadLock.lock(); - } - try { - final List<Peer> newPeers = new ArrayList<>(); - addEndpointsAsPeers(bmaPeer, newPeers, null, filter.filterEndpoints); - - CompletableFuture<List<CompletableFuture<Peer>>> jobs = - CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool) - .thenApply(memberUids -> - newPeers.stream().map(peer -> - asyncRefreshPeer(peer, memberUids, pool)) - .collect(Collectors.toList()) - ); - jobs.thenCompose(refreshedPeersFuture -> CompletableFutures.allOfToList(refreshedPeersFuture, refreshedPeer -> { - boolean exists = knownPeers.containsKey(refreshedPeer.toString()); - boolean include = peerFilter.test(refreshedPeer); - if (!include && exists) { - Peer removedPeer = knownPeers.remove(refreshedPeer.toString()); - result.remove(removedPeer); - } - else if (include && exists) { - result.remove(knownPeers.get(refreshedPeer.toString())); - } - return include; - })) - .thenApply(addedPeers -> { - result.addAll(addedPeers); - fillPeerStatsConsensus(result); - result.sort(peerComparator); - - result.forEach(peer -> { - String buid = buid(peer.getStats()); - if (!knownBlocks.contains(buid)) { - knownBlocks.add(buid); + if (lockManager.tryLock(PEERS_UPDATE_LOCK_NAME)) { + try { + final List<Peer> newPeers = new ArrayList<>(); + addEndpointsAsPeers(bmaPeer, newPeers, null, filter.filterEndpoints); + + CompletableFuture<List<CompletableFuture<Peer>>> jobs = + CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool) + + // Refresh all endpoints + .thenApply(memberUids -> + newPeers.stream().map(peer -> + asyncRefreshPeer(peer, memberUids, pool)) + .collect(Collectors.toList()) + ); + + jobs.thenCompose(CompletableFutures::allOfToList) + .thenAccept(refreshedPeers -> { + if (CollectionUtils.isEmpty(refreshedPeers)) return; + + // Get the full list + final Map<String, Peer> knownPeers = peerService.getPeersByCurrencyId(currency) + .stream() + .filter(peerFilter) + .collect(Collectors.toMap(Peer::toString, Function.identity())); + + // filter, to keep only existing peer, or expected by filter + List<Peer> changedPeers = refreshedPeers.stream() + .filter(refreshedPeer -> { + String peerId = refreshedPeer.toString(); + boolean exists = knownPeers.containsKey(peerId); + if (exists){ + knownPeers.remove(peerId); + } + // If include, add it to full list + boolean include = peerFilter.test(refreshedPeer); + if (include) { + knownPeers.put(peerId, refreshedPeer); + } + return include; + }).collect(Collectors.toList()); + + // If something changes + if (CollectionUtils.isNotEmpty(changedPeers)) { + List<Peer> result = Lists.newArrayList(knownPeers.values()); + fillPeerStatsConsensus(result); + result.sort(peerComparator); + + updateKnownBlocks.accept(changedPeers); + + // Save update peers + peerService.save(currency, changedPeers, false/*not the full UP list*/); + + listener.onChanges(result); } - knownPeers.put(peer.toString(), peer); }); - - listener.onChanged(result); - return result; - }); - } - catch(Exception e) { - log.error("Error while refreshing a peer: " + e.getMessage(), e); - } - finally { - synchronized (threadLock) { - threadLock.unlock(); + } catch (Exception e) { + log.error("Error while refreshing a peer: " + e.getMessage(), e); + } finally { + lockManager.unlock(PEERS_UPDATE_LOCK_NAME); } } }; - // Load all peers - pool.submit(getPeersRunnable); - // Manage new block event blockchainRemoteService.addBlockListener(mainPeer, json -> { - log.debug("Received new block event"); try { BlockchainBlock block = readValue(json, BlockchainBlock.class); - String blockBuid = buid(block); + String blockBuid = BlockchainBlocks.buid(block); boolean isNewBlock = (blockBuid != null && !knownBlocks.contains(blockBuid)); + // If new block + wait 3s for network propagation - if (!isNewBlock) return; + if (isNewBlock) { + schedule(loadAllPeers, pool, 3000/*waiting block propagation*/); + } + } catch(IOException e) { log.error("Could not parse peer received by WS: " + e.getMessage(), e); } - - schedule(getPeersRunnable, pool, 3000/*waiting block propagation*/); }, autoreconnect); // Manage new peer event @@ -422,11 +413,17 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network log.debug("Received new peer event"); try { final NetworkPeers.Peer bmaPeer = readValue(json, NetworkPeers.Peer.class); - pool.submit(() -> refreshPeerConsumer.accept(bmaPeer)); + if (!lockManager.isLocked(PEERS_UPDATE_LOCK_NAME)) { + pool.submit(() -> refreshPeerConsumer.accept(bmaPeer)); + } } catch(IOException e) { log.error("Could not parse peer received by WS: " + e.getMessage(), e); } }, autoreconnect); + + // Default action: Load all peers + pool.submit(loadAllPeers); + } @@ -525,7 +522,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network } // Filter on SSL - if (filter.filterSsl != null && filter.filterSsl.booleanValue() != peer.isUseSsl()) { + if (filter.filterSsl != null && filter.filterSsl != peer.isUseSsl()) { return false; } @@ -574,18 +571,6 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network return peer; } - protected String computeUniqueId(Peer peer) { - return cryptoService.hash( - new StringJoiner("|") - .add(peer.getPubkey()) - .add(peer.getDns()) - .add(peer.getIpv4()) - .add(peer.getIpv6()) - .add(String.valueOf(peer.getPort())) - .add(Boolean.toString(peer.isUseSsl())) - .toString()); - } - protected JsonNode get(final Peer peer, String path) { return executeRequest(peer, path, JsonNode.class); } @@ -633,10 +618,10 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network specScore += (sort.sortType == SortType.PUBKEY ? computeScoreAlphaValue(peer.getPubkey(), 3, sort.sortAsc) : 0); specScore += (sort.sortType == SortType.API ? (peer.isUseSsl() ? (sort.sortAsc ? 1 : -1) : - (hasEndPointAPI(peer, EndpointApi.ES_USER_API) ? (sort.sortAsc ? 0.5 : -0.5) : 0)) : 0); + (Peers.hasEndPointAPI(peer, EndpointApi.ES_USER_API) ? (sort.sortAsc ? 0.5 : -0.5) : 0)) : 0); specScore += (sort.sortType == SortType.HARDSHIP ? (stats.getHardshipLevel() != null ? (sort.sortAsc ? (10000-stats.getHardshipLevel()) : stats.getHardshipLevel()): 0) : 0); specScore += (sort.sortType == SortType.BLOCK_NUMBER ? (stats.getBlockNumber() != null ? (sort.sortAsc ? (1000000000 - stats.getBlockNumber()) : stats.getBlockNumber()) : 0) : 0); - score += (10000000000l * specScore); + score += (10000000000L * specScore); } score += (1000000000 * (stats.getStatus() == Peer.PeerStatus.UP ? 1 : 0)); score += (100000000 * (stats.isMainConsensus() ? 1 : 0)); @@ -662,29 +647,15 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network return asc ? (1000 - score) : score; } - protected boolean hasEndPointAPI(Peer peer, EndpointApi api) { - return peer.getApi() != null && peer.getApi().equalsIgnoreCase(api.name()); - } - protected String buid(Peer peer) { - return buid(peer.getStats()); - } - - protected String buid(Peer.Stats stats) { - return stats.getStatus() == Peer.PeerStatus.UP - ? stats.getBlockNumber() + "-" + stats.getBlockHash() - : null; - } - - protected String buid(BlockchainBlock block) { - if (block == null || block.getNumber() == null || block.getHash() == null) return null; - return block.getNumber() + "-" + block.getHash(); - } protected void schedule(Runnable command, ExecutorService pool, long delayInMs) { if (pool instanceof ScheduledExecutorService) { ((ScheduledExecutorService)pool).schedule(command, delayInMs, TimeUnit.MILLISECONDS); } + else if (delayInMs <= 0) { + pool.submit(command); + } else { pool.submit(() -> { try { diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java index f800a829..f4aa8419 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java @@ -48,10 +48,7 @@ public interface PeerService extends Service { */ List<Peer> getPeersByCurrencyId(String currencyId); - /** - * Fill all cache need for currencies - * @param context - * @param accountId - */ - void loadCache(long accountId); + void save(String currencyId, List<Peer> peers, boolean isFullUpList); + + boolean isExists(String currencyId, String peerId); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java index ad9f7b73..4b12796e 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java @@ -23,20 +23,25 @@ package org.duniter.core.client.service.local; */ import org.duniter.core.beans.InitializingBean; +import org.duniter.core.client.config.Configuration; import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.local.Currency; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.ServiceLocator; import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.core.util.cache.Cache; import org.duniter.core.util.cache.SimpleCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Date; import java.util.List; /** @@ -44,11 +49,14 @@ import java.util.List; */ public class PeerServiceImpl implements PeerService, InitializingBean { + private static final Logger log = LoggerFactory.getLogger(PeerServiceImpl.class); private Cache<String, List<Peer>> peersByCurrencyIdCache; private Cache<String, Peer> activePeerByCurrencyIdCache; private CurrencyService currencyService; + private CryptoService cryptoService; private PeerDao peerDao; + private Configuration config; public PeerServiceImpl() { super(); @@ -56,8 +64,10 @@ public class PeerServiceImpl implements PeerService, InitializingBean { @Override public void afterPropertiesSet() throws Exception { - currencyService = ServiceLocator.instance().getCurrencyService(); - peerDao = ServiceLocator.instance().getBean(PeerDao.class); + this.currencyService = ServiceLocator.instance().getCurrencyService(); + this.peerDao = ServiceLocator.instance().getBean(PeerDao.class); + this.config = Configuration.instance(); + this.cryptoService = ServiceLocator.instance().getCryptoService(); } @Override @@ -66,17 +76,25 @@ public class PeerServiceImpl implements PeerService, InitializingBean { peerDao = null; peersByCurrencyIdCache = null; activePeerByCurrencyIdCache = null; + cryptoService = null; } + + public Peer save(final Peer peer) { Preconditions.checkNotNull(peer); Preconditions.checkNotNull(peer.getCurrency()); Preconditions.checkArgument(StringUtils.isNotBlank(peer.getHost())); Preconditions.checkArgument(peer.getPort() >= 0); + + String peerId = cryptoService.hash(peer.computeKey()); + boolean exists = isExists(peer.getCurrency(), peerId); + peer.setId(peerId); + Peer result; // Create - if (peer.getId() == null) { + if (!exists) { result = peerDao.create(peer); } @@ -107,7 +125,7 @@ public class PeerServiceImpl implements PeerService, InitializingBean { * @param currencyId * @return */ - public Peer getActivePeerByCurrencyId(String currency) { + public Peer getActivePeerByCurrencyId(String currencyId) { // Check if cache as been loaded if (activePeerByCurrencyIdCache == null) { @@ -127,7 +145,7 @@ public class PeerServiceImpl implements PeerService, InitializingBean { }; } - return activePeerByCurrencyIdCache.get(currency); + return activePeerByCurrencyIdCache.get(currencyId); } /** @@ -173,4 +191,34 @@ public class PeerServiceImpl implements PeerService, InitializingBean { } } + @Override + public void save(String currencyId, List<Peer> peers, boolean isFullUpList) { + + int peerDownTimeoutMs = config.getPeerUpMaxAge(); + final long nowInSec = System.currentTimeMillis()/1000; + + if (CollectionUtils.isNotEmpty(peers)) { + if (log.isDebugEnabled()) { + log.debug(String.format("[%s] Updating peers (%s endpoints found)", currencyId, peers.size())); + } + + // On each UP peers: set last UP time + peers.stream().map(Peer::getStats) + .filter(Peer.Stats::isReacheable) + .forEach(stats -> stats.setLastUpTime(nowInSec)); + + peers.forEach(this::save); + } + + // Mark old peers as DOWN + if (isFullUpList && peerDownTimeoutMs > 0) { + Date oldDate = new Date(nowInSec * 1000 - peerDownTimeoutMs); + peerDao.updatePeersAsDown(currencyId, oldDate.getTime() / 1000); + } + } + + @Override + public boolean isExists(String currencyId, String peerId) { + return peerDao.isExists(currencyId, peerId); + } } diff --git a/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_en_GB.properties b/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_en_GB.properties index b9d77141..c750c958 100644 --- a/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_en_GB.properties +++ b/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_en_GB.properties @@ -13,6 +13,7 @@ duniter4j.config.option.data.directory.description= duniter4j.config.option.i18n.directory.description= duniter4j.config.option.i18n.locale.description= duniter4j.config.option.inceptionYear.description= +duniter4j.config.option.network.loadPeers.maxDuration.description= duniter4j.config.option.network.maxConnections.description= duniter4j.config.option.network.maxConnectionsPerHost.description= duniter4j.config.option.network.timeout.description= diff --git a/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_fr_FR.properties b/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_fr_FR.properties index 202c0342..390d4e90 100644 --- a/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_fr_FR.properties +++ b/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_fr_FR.properties @@ -13,6 +13,7 @@ duniter4j.config.option.data.directory.description= duniter4j.config.option.i18n.directory.description= duniter4j.config.option.i18n.locale.description= duniter4j.config.option.inceptionYear.description= +duniter4j.config.option.network.loadPeers.maxDuration.description= duniter4j.config.option.network.maxConnections.description= duniter4j.config.option.network.maxConnectionsPerHost.description= duniter4j.config.option.network.timeout.description= diff --git a/duniter4j-core-client/src/test/java/org/duniter/core/client/model/BlockFileUtils.java b/duniter4j-core-client/src/test/java/org/duniter/core/client/model/BlockFileUtils.java index d4362a34..6a5f252f 100644 --- a/duniter4j-core-client/src/test/java/org/duniter/core/client/model/BlockFileUtils.java +++ b/duniter4j-core-client/src/test/java/org/duniter/core/client/model/BlockFileUtils.java @@ -2,6 +2,7 @@ package org.duniter.core.client.model; import com.fasterxml.jackson.databind.ObjectMapper; import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.junit.Assume; import java.io.File; @@ -14,7 +15,7 @@ public class BlockFileUtils { public static BlockchainBlock readBlockFile(String jsonFileName) { try { - ObjectMapper om = org.duniter.core.client.model.bma.jackson.JacksonUtils.newObjectMapper(); + ObjectMapper om = JacksonUtils.getThreadObjectMapper(); BlockchainBlock block = om.readValue(Files.readAllBytes(new File("src/test/resources" , jsonFileName).toPath()), BlockchainBlock.class); Assume.assumeNotNull(block); return block; diff --git a/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java b/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java index ac731273..9e56a40d 100644 --- a/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java +++ b/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java @@ -115,7 +115,7 @@ public class BlockchainRemoteServiceTest { Assert.assertEquals(10, result.length); // Make sure allOfToList json are valid blocks - ObjectMapper objectMapper = JacksonUtils.newObjectMapper(); + ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper(); int number = 0; for (String jsonBlock: result) { @@ -148,7 +148,7 @@ public class BlockchainRemoteServiceTest { service.addBlockListener(createTestPeer(), (message) -> { try { - BlockchainBlock block = JacksonUtils.newObjectMapper().readValue(message, BlockchainBlock.class); + BlockchainBlock block = JacksonUtils.getThreadObjectMapper().readValue(message, BlockchainBlock.class); log.debug("Received block #" + block.getNumber()); isWebSocketNewBlockReceived = true; } diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/LockManager.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/LockManager.java new file mode 100644 index 00000000..b35a9bbf --- /dev/null +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/LockManager.java @@ -0,0 +1,104 @@ +package org.duniter.core.util; + +import com.google.common.collect.MapMaker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A concurrent lock manager + */ +public class LockManager { + + private static final Logger log = LoggerFactory.getLogger(LockManager.class); + private static int DEFAULT_UNSUCCESSFUL_TRY_LOCK_WARN = 4; + private static int DEFAULT_CONCURRENCY_LEVEL = 4; + + private final int unsuccessfulLockCountForWarn; + private final Map<String, Lock> lockMap; + private final Map<String, Integer> tryLockCounter; + + public LockManager(int concurrencyLevel, int maxLockCounterForLog) { + this.unsuccessfulLockCountForWarn = maxLockCounterForLog; + this.lockMap = new MapMaker().concurrencyLevel(concurrencyLevel).makeMap(); + this.tryLockCounter = new MapMaker().concurrencyLevel(concurrencyLevel).makeMap(); + } + + public LockManager(int concurrencyLevel) { + this(concurrencyLevel, concurrencyLevel); + } + + public LockManager() { + this(DEFAULT_CONCURRENCY_LEVEL, DEFAULT_UNSUCCESSFUL_TRY_LOCK_WARN); + } + + /** + * Acquires the lock. + * + */ + public void lock(String name) { + Lock lock = computeIfAbsent(name); + lock.lock(); + } + + public boolean tryLock(final String name, long time, TimeUnit unit) throws InterruptedException { + Lock lock = computeIfAbsent(name); + boolean locked = lock.tryLock(time, unit); + logTryLock(name, locked); + return locked; + } + + public boolean tryLock(final String name) { + Lock lock = computeIfAbsent(name); + boolean locked = lock.tryLock(); + logTryLock(name, locked); + return locked; + } + + public void unlock(final String name) { + Lock lock = lockMap.get(name); + if (lock != null) { + lock.unlock(); + // Reset counter + tryLockCounter.computeIfPresent(name, (input, counter) -> 0); + } + } + + public boolean isLocked(String name) { + Integer tryLockCount = tryLockCounter.get(name); + return tryLockCount != null && tryLockCount.intValue() > 0; + } + + /* -- protected method -- */ + + protected Lock computeIfAbsent(final String name) { + return lockMap.computeIfAbsent(name, input -> new ReentrantLock()); + } + + protected void logTryLock(final String name, final boolean locked) { + // Counter unsuccessful lock + if (locked) { + // Reset counter + tryLockCounter.computeIfPresent(name, (input, counter) -> 1); + } + else { + if (!tryLockCounter.containsKey(name)) { + tryLockCounter.computeIfAbsent(name, input -> 2); + } + else { + tryLockCounter.computeIfPresent(name, (input, counter) -> { + if (counter < unsuccessfulLockCountForWarn) return counter + 1; + if (log.isDebugEnabled()) { + log.debug(String.format("Unable to acquire lock [%s] - after %s unsuccessful attempts", name, counter + 1)); + } + return 1; // reset log counter + }); + + } + } + } +} \ No newline at end of file diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index 696d6b28..385488bb 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -125,8 +125,8 @@ duniter.blockchain.enable: true # # Force blockchain reload - WARNING: all user events will be resetted to 'unread' # -duniter.blockchain.reload: true -duniter.blockchain.reload.from: 50999 +#duniter.blockchain.reload: true +#duniter.blockchain.reload.from: 50999 # # Duniter node address # diff --git a/duniter4j-es-assembly/src/test/es-home/config/logging.yml b/duniter4j-es-assembly/src/test/es-home/config/logging.yml index b9122182..0676ba19 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/logging.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/logging.yml @@ -26,6 +26,7 @@ logger: security: DEBUG org.duniter: INFO + org.duniter.core.util.LockManager: DEBUG #org.duniter.core.beans: DEBUG #org.duniter.core.client.service: DEBUG #org.duniter.elasticsearch: DEBUG diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 5483ad8e..8590207e 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -207,7 +207,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public int getNetworkTimeout() { - return settings.getAsInt("duniter.network.timeout", 300000 /*300s*/); + return settings.getAsInt("duniter.network.timeout", 30000 /*30s*/); } public int getPeerDownTimeout() { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java index 525bab92..0432e2b9 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java @@ -112,14 +112,12 @@ public class Duniter4jClientImpl implements Duniter4jClient { private final Client client; private final org.duniter.elasticsearch.threadpool.ThreadPool threadPool; - private final ObjectMapper objectMapper; @Inject public Duniter4jClientImpl(Client client, org.duniter.elasticsearch.threadpool.ThreadPool threadPool) { super(); this.client = client; this.threadPool = threadPool; - this.objectMapper = JacksonUtils.newObjectMapper(); } @Override @@ -202,7 +200,10 @@ public class Duniter4jClientImpl implements Duniter4jClient { /** * Retrieve some field from a document id, and check if all field not null + * @param index + * @param type * @param docId + * @param fieldNames * @return */ @Override @@ -362,6 +363,7 @@ public class Duniter4jClientImpl implements Duniter4jClient { // Read query result SearchHit[] searchHits = response.getHits().getHits(); + ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper(); for (SearchHit searchHit : searchHits) { if (searchHit.source() != null) { @@ -382,7 +384,7 @@ public class Duniter4jClientImpl implements Duniter4jClient { @Override public <C extends LocalEntity<String>> C readSourceOrNull(SearchHit searchHit, Class<? extends C> clazz) { try { - C value = objectMapper.readValue(searchHit.getSourceRef().streamInput(), clazz); + C value = JacksonUtils.getThreadObjectMapper().readValue(searchHit.getSourceRef().streamInput(), clazz); value.setId(searchHit.getId()); return value; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java index 1dea7013..ee933207 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java @@ -40,7 +40,6 @@ public abstract class AbstractDao implements Bean { protected final ESLogger logger; - protected final ObjectMapper objectMapper; protected Duniter4jClient client; protected CryptoService cryptoService; @@ -49,7 +48,6 @@ public abstract class AbstractDao implements Bean { public AbstractDao(String loggerName) { super(); this.logger = Loggers.getLogger(loggerName); - this.objectMapper = JacksonUtils.newObjectMapper(); } @Inject @@ -69,4 +67,7 @@ public abstract class AbstractDao implements Bean { /* -- protected methods -- */ + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java index 867a5757..48aeb290 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java @@ -76,7 +76,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(block); + String json = getObjectMapper().writeValueAsString(block); // Preparing IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE) @@ -126,7 +126,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { // Serialize into JSON // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = objectMapper.writeValueAsString(block); + String json = getObjectMapper().writeValueAsString(block); // Preparing UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString()) @@ -384,7 +384,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { if (searchHit.source() != null) { String jsonString = new String(searchHit.source()); try { - block = objectMapper.readValue(jsonString, BlockchainBlock.class); + block = getObjectMapper().readValue(jsonString, BlockchainBlock.class); } catch(Exception e) { if (logger.isDebugEnabled()) { logger.debug("Error while parsing block from JSON:\n" + jsonString); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java index f79a6527..f8c7a62a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java @@ -67,7 +67,7 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(block); + String json = getObjectMapper().writeValueAsString(block); // Preparing IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE) @@ -112,7 +112,7 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { // Serialize into JSON // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = objectMapper.writeValueAsString(block); + String json = getObjectMapper().writeValueAsString(block); // Preparing UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString()) diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java index 357f4187..df9e7cd0 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java @@ -61,7 +61,7 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp } // Serialize into JSON - byte[] json = objectMapper.writeValueAsBytes(currency); + byte[] json = getObjectMapper().writeValueAsBytes(currency); // Preparing indexBlocksFromNode IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_TYPE) @@ -89,7 +89,7 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp } // Serialize into JSON - byte[] json = objectMapper.writeValueAsBytes(currency); + byte[] json = getObjectMapper().writeValueAsBytes(currency); UpdateRequestBuilder updateRequest = client.prepareUpdate(INDEX, RECORD_TYPE, currency.getId()) .setDoc(json); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java index 4160f437..cd270b50 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java @@ -68,7 +68,7 @@ public class MovementDaoImpl extends AbstractDao implements MovementDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(operation); + String json = getObjectMapper().writeValueAsString(operation); // Preparing IndexRequestBuilder request = client.prepareIndex(operation.getCurrency(), TYPE) @@ -96,7 +96,7 @@ public class MovementDaoImpl extends AbstractDao implements MovementDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(operation); + String json = getObjectMapper().writeValueAsString(operation); // Preparing UpdateRequestBuilder request = client.prepareUpdate(operation.getCurrency(), TYPE, operation.getId()) diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index 57a6f8b9..530fc899 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -29,7 +29,6 @@ import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.PeerDao; -import org.duniter.elasticsearch.model.Movement; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -47,6 +46,7 @@ import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.metrics.max.Max; import java.io.IOException; +import java.util.Date; import java.util.List; /** @@ -75,7 +75,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { // Serialize into JSON // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = objectMapper.writeValueAsString(peer); + String json = getObjectMapper().writeValueAsString(peer); // Preparing indexBlocksFromNode IndexRequestBuilder indexRequest = client.prepareIndex(peer.getCurrency(), TYPE) @@ -104,7 +104,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(peer); + String json = getObjectMapper().writeValueAsString(peer); // Preparing indexBlocksFromNode UpdateRequestBuilder updateRequest = client.prepareUpdate(peer.getCurrency(), TYPE, peer.getId()) @@ -183,9 +183,11 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { } @Override - public void updatePeersAsDown(String currencyName, long lastUpTimeTimeout) { + public void updatePeersAsDown(String currencyName, long upTimeLimit) { - long minUpTime = (System.currentTimeMillis() - lastUpTimeTimeout)/1000; + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] Setting peers as DOWN, if older than [%s]...", currencyName, new Date(upTimeLimit*1000))); + } SearchRequestBuilder searchRequest = client.prepareSearch(currencyName) .setFetchSource(false) @@ -193,8 +195,8 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { // Query = filter on lastUpTime BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() - // where lastUpTime < minUpTime - .filter(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(minUpTime)) + // where lastUpTime < upTimeLimit + .filter(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(upTimeLimit)) // AND status = UP .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name())); searchRequest.setQuery(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, QueryBuilders.constantScoreQuery(boolQuery))); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java index 39515f50..a7d84512 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java @@ -161,7 +161,7 @@ public abstract class AbstractBlockchainListenerService extends AbstractService Preconditions.checkNotNull(change.getSource()); try { - return objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + return getObjectMapper().readValue(change.getSource().streamInput(), BlockchainBlock.class); } catch (IOException e) { throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index 19a1d760..d2b15d95 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -29,13 +29,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.bma.jackson.JacksonUtils; -import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.client.model.elasticsearch.Records; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.elasticsearch.ElasticsearchException; @@ -52,7 +50,6 @@ import java.util.Set; public abstract class AbstractService implements Bean { protected final ESLogger logger; - protected final ObjectMapper objectMapper; protected Duniter4jClient client; protected PluginSettings pluginSettings; @@ -77,7 +74,6 @@ public abstract class AbstractService implements Bean { super(); this.logger = Loggers.getLogger(loggerName); this.client = client; - this.objectMapper = JacksonUtils.newObjectMapper(); this.pluginSettings = pluginSettings; this.cryptoService = cryptoService; this.retryCount = pluginSettings.getNodeRetryCount(); @@ -103,6 +99,10 @@ public abstract class AbstractService implements Bean { } } + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } + protected <T> T executeWithRetry(RetryFunction<T> retryFunction) throws TechnicalException{ int retry = 0; while (retry < retryCount) { @@ -137,7 +137,7 @@ public abstract class AbstractService implements Bean { protected JsonNode readAndVerifyIssuerSignature(String recordJson, String issuerFieldName) throws ElasticsearchException { try { - JsonNode recordObj = objectMapper.readTree(recordJson); + JsonNode recordObj = getObjectMapper().readTree(recordJson); readAndVerifyIssuerSignature(recordJson, recordObj, issuerFieldName); return recordObj; } @@ -149,7 +149,7 @@ public abstract class AbstractService implements Bean { protected void readAndVerifyIssuerSignature(JsonNode actualObj, String issuerFieldName) throws ElasticsearchException, JsonProcessingException { // Remove hash and signature - String recordJson = objectMapper.writeValueAsString(actualObj); + String recordJson = getObjectMapper().writeValueAsString(actualObj); readAndVerifyIssuerSignature(recordJson, actualObj, issuerFieldName); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java index 133ebf81..6e144b82 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java @@ -22,13 +22,8 @@ package org.duniter.elasticsearch.service; * #L% */ -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import org.duniter.core.util.Preconditions; -import org.apache.commons.io.IOUtils; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ByteArrayEntity; import org.duniter.core.client.model.elasticsearch.Record; @@ -37,6 +32,7 @@ import org.duniter.core.client.service.HttpService; import org.duniter.core.client.service.exception.HttpUnauthorizeException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; +import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; @@ -44,21 +40,17 @@ import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.duniter.elasticsearch.model.SynchroResult; -import org.duniter.elasticsearch.service.AbstractService; -import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import java.io.IOException; -import java.io.InputStream; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Objects; @@ -212,6 +204,7 @@ public abstract class AbstractSynchroService extends AbstractService { long insertHits = 0; long updateHits = 0; long invalidSignatureHits = 0; + ObjectMapper objectMapper = getObjectMapper(); if (offset < total) { @@ -274,7 +267,7 @@ public abstract class AbstractSynchroService extends AbstractService { // Check version Long existingVersion = ((Number)existingDoc.getFields().get(versionFieldName).getValue()).longValue(); - boolean doUpdate = (existingVersion == null || version > existingVersion.longValue()); + boolean doUpdate = (existingVersion == null || version > existingVersion); if (doUpdate) { if (debug) { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java index d013524c..58f72541 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java @@ -79,7 +79,7 @@ public class BlockchainListenerService extends AbstractBlockchainListenerService try { bulkRequest.add(client.prepareIndex(block.getCurrency(), BlockStatDao.TYPE, String.valueOf(block.getNumber())) .setRefresh(false) // recommended for heavy indexing - .setSource(objectMapper.writeValueAsBytes(stat))); + .setSource(getObjectMapper().writeValueAsBytes(stat))); flushBulkRequestOrSchedule(); } catch (JsonProcessingException e) { logger.error("Could not serialize BlockStat into JSON: " + e.getMessage(), e); @@ -100,7 +100,7 @@ public class BlockchainListenerService extends AbstractBlockchainListenerService try { bulkRequest.add(client.prepareIndex(block.getCurrency(), MovementDao.TYPE) .setRefresh(false) // recommended for heavy indexing - .setSource(objectMapper.writeValueAsBytes(movement))); + .setSource(getObjectMapper().writeValueAsBytes(movement))); flushBulkRequestOrSchedule(); } catch (JsonProcessingException e) { logger.error("Could not serialize BlockOperation into JSON: " + e.getMessage(), e); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 6cc334e4..b634a9ba 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -356,7 +356,7 @@ public class BlockchainService extends AbstractService { // Serialize into JSON // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = objectMapper.writeValueAsString(currentBlock); + String json = getObjectMapper().writeValueAsString(currentBlock); indexCurrentBlockFromJson(currentBlock.getCurrency(), json, wait); } catch(IOException e) { throw new TechnicalException(e); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java index 0dc761b5..ba34bf5b 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java @@ -24,59 +24,40 @@ package org.duniter.elasticsearch.service; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.BlockchainParameters; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.local.NetworkService; -import org.duniter.core.exception.TechnicalException; -import org.duniter.core.model.NullProgressionModel; -import org.duniter.core.model.ProgressionModel; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; -import org.duniter.core.util.ObjectUtils; -import org.duniter.core.util.Preconditions; -import org.duniter.core.util.concurrent.CompletableFutures; -import org.duniter.core.util.json.JsonSyntaxException; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.exception.DuplicateIndexIdException; import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.search.SearchHitField; -import org.elasticsearch.search.highlight.HighlightField; import org.nuiton.i18n.I18n; import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; /** * Created by Benoit on 30/03/2015. */ -public class PeerService extends AbstractService { - - private final ProgressionModel nullProgressionModel = new NullProgressionModel(); +public class PeerService extends AbstractService { private org.duniter.core.client.service.bma.BlockchainRemoteService blockchainRemoteService; private org.duniter.core.client.service.local.NetworkService networkService; + private org.duniter.core.client.service.local.PeerService delegate; private ThreadPool threadPool; - private PeerDao peerDao; @Inject public PeerService(Duniter4jClient client, PluginSettings settings, ThreadPool threadPool, CryptoService cryptoService, - PeerDao peerDao, final ServiceLocator serviceLocator){ super("duniter.network.peer", client, settings, cryptoService); - this.peerDao = peerDao; this.threadPool = threadPool; threadPool.scheduleOnStarted(() -> { this.blockchainRemoteService = serviceLocator.getBlockchainRemoteService(); this.networkService = serviceLocator.getNetworkService(); + this.delegate = serviceLocator.getPeerService(); setIsReady(true); }); } @@ -118,7 +99,7 @@ public class PeerService extends AbstractService { sortDef.sortType = null; List<Peer> peers = networkService.getPeers(firstPeer, filterDef, sortDef, threadPool.scheduler()); - savePeers(currencyName, peers); + delegate.save(currencyName, peers, true); logger.info(I18n.t("duniter4j.es.networkService.indexPeers.succeed", currencyName, firstPeer, peers.size(), (System.currentTimeMillis() - timeStart))); } catch(Exception e) { logger.error("Error during indexBlocksFromNode: " + e.getMessage(), e); @@ -141,108 +122,14 @@ public class PeerService extends AbstractService { filterDef.filterType = null; filterDef.filterStatus = Peer.PeerStatus.UP; filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + filterDef.currency = currencyName; // Default sort NetworkService.Sort sortDef = new NetworkService.Sort(); sortDef.sortType = null; networkService.addPeersChangeListener(mainPeer, - peers -> savePeers(currencyName, peers), + peers -> logger.debug(String.format("[%s] Update peers: %s found", currencyName, CollectionUtils.size(peers))), filterDef, sortDef, true /*autoreconnect*/, threadPool.scheduler()); } - - public Long getMaxLastUpTime(String currencyName) { - return peerDao.getMaxLastUpTime(currencyName); - } - - /** - * Create or update a peer, depending on its existence and hash - * @param peer - * @throws DuplicateIndexIdException - */ - public Peer savePeer(final Peer peer) throws DuplicateIndexIdException { - Preconditions.checkNotNull(peer, "peer could not be null") ; - Preconditions.checkNotNull(peer.getCurrency(), "peer attribute 'currency' could not be null"); - Preconditions.checkNotNull(peer.getPubkey(), "peer attribute 'pubkey' could not be null"); - Preconditions.checkNotNull(peer.getHost(), "peer attribute 'host' could not be null"); - Preconditions.checkNotNull(peer.getApi(), "peer 'api' could not be null"); - - String id = cryptoService.hash(peer.computeKey()); - peer.setId(id); - - boolean exists = peerDao.isExists(peer.getCurrency(), id); - - // Currency not exists, or has changed, so create it - if (!exists) { - if (logger.isTraceEnabled()) { - logger.trace(String.format("Insert new peer [%s]", peer)); - } - - // Index new peer - peer.setId(id); - peerDao.create(peer); - } - - // Update existing peer - else { - if (logger.isTraceEnabled()) { - logger.trace(String.format("Update peer [%s]", peer)); - } - peerDao.update(peer); - } - return peer; - } - - - /* -- protected methods -- */ - - protected void savePeers(String currencyName, List<Peer> peers) { - if (CollectionUtils.isNotEmpty(peers)) { - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] Updating peers endpoints (%s endpoints found)", currencyName, peers.size())); - } - peers.forEach(this::savePeer); - } - - // Mark old peers as DOWN - peerDao.updatePeersAsDown(currencyName, pluginSettings.getPeerDownTimeout()); - } - - protected List<Peer> toPeers(SearchResponse response, boolean withHighlight) { - // Read query result - List<Peer> result = Lists.newArrayList(); - response.getHits().forEach(searchHit -> { - Peer peer; - if (searchHit.source() != null) { - String jsonString = new String(searchHit.source()); - try { - peer = objectMapper.readValue(jsonString, Peer.class); - } catch(Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Error while parsing peer from JSON:\n" + jsonString); - } - throw new JsonSyntaxException("Error while read peer from JSON: " + e.getMessage(), e); - } - } - else { - peer = new Peer(); - SearchHitField field = searchHit.getFields().get("hash"); - peer.setHash(field.getValue()); - } - result.add(peer); - - // If possible, use highlights - if (withHighlight) { - Map<String, HighlightField> fields = searchHit.getHighlightFields(); - for (HighlightField field : fields.values()) { - String blockNameHighLight = field.getFragments()[0].string(); - peer.setHash(blockNameHighLight); - } - } - }); - - return result; - } - - } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java index 5f2d4270..4fb5efd9 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java @@ -22,7 +22,6 @@ package org.duniter.elasticsearch.threadpool; * #L% */ -import org.duniter.core.exception.TechnicalException; import org.elasticsearch.common.logging.ESLogger; import java.util.concurrent.*; diff --git a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java index ab95a1a6..f4238c0e 100644 --- a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java +++ b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java @@ -50,15 +50,13 @@ public class BlockchainServiceTest { private BlockchainRemoteService remoteService; private Configuration config; private Peer peer; - private ObjectMapper objectMapper; @Before public void setUp() throws Exception { service = ServiceLocator.instance().getBean(BlockchainService.class); remoteService = ServiceLocator.instance().getBlockchainRemoteService(); config = Configuration.instance(); - peer = createTestPeer(config); - objectMapper = JacksonUtils.newObjectMapper(); + peer = Peer.newBuilder().setHost(config.getNodeHost()).setPort(config.getNodePort()).build(); // Init the currency CurrencyService currencyService = ServiceLocator.instance().getBean(CurrencyService.class); @@ -77,7 +75,7 @@ public class BlockchainServiceTest { service.indexCurrentBlock(current, true/*wait*/); try { - String blockStr = objectMapper.writeValueAsString(current); + String blockStr = JacksonUtils.getThreadObjectMapper().writeValueAsString(current); service.indexBlockFromJson(peer, blockStr, true/*is current*/, false/*detected fork*/, true/*wait*/); } @@ -88,29 +86,11 @@ public class BlockchainServiceTest { Thread.sleep(1000); // Try to get the indexed block - BlockchainBlock retrievedBlock = service.getBlockById(current.getCurrency(), current.getNumber().intValue()); + BlockchainBlock retrievedBlock = service.getBlockById(current.getCurrency(), current.getNumber()); Assert.assertNotNull(retrievedBlock); - } /* -- internal methods */ - protected void assertResults(String queryText, List<BlockchainBlock> result) { - log.info(String.format("Results for a search on [%s]", queryText)); - Assert.assertNotNull(result); - Assert.assertTrue(result.size() > 0); - for (BlockchainBlock block: result) { - log.info(" - " + block.getNumber()); - } - } - - protected Peer createTestPeer(Configuration config) { - Peer peer = new Peer( - config.getNodeHost(), - config.getNodePort()); - - return peer; - } - } diff --git a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java index f932e97e..790a9d2e 100644 --- a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java +++ b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java @@ -56,7 +56,6 @@ public class PeerServiceTest { private NetworkRemoteService remoteService; private Configuration config; private Peer peer; - private ObjectMapper objectMapper; @Before public void setUp() throws Exception { @@ -67,7 +66,6 @@ public class PeerServiceTest { peer = new Peer.Builder() .setHost(config.getNodeHost()) .setPort(config.getNodePort()).build(); - objectMapper = JacksonUtils.newObjectMapper(); // Waiting services started while(!service.isReady() || !currencyService.isReady()) { @@ -109,5 +107,6 @@ public class PeerServiceTest { Long maxLastUpTime = service.getMaxLastUpTime(peer1.getCurrency()); Assert.assertNotNull(maxLastUpTime); Assert.assertEquals(peer1.getStats().getLastUpTime().longValue(), maxLastUpTime.longValue()); + } } diff --git a/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/service/SubscriptionServiceTest.java b/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/service/SubscriptionServiceTest.java index c8f9bf4a..7493b374 100644 --- a/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/service/SubscriptionServiceTest.java +++ b/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/service/SubscriptionServiceTest.java @@ -64,14 +64,12 @@ public class SubscriptionServiceTest { private SubscriptionService service; private UserEventService userEventService; private CryptoService cryptoService; - private ObjectMapper objectMapper; @Before public void setUp() throws Exception { service = ServiceLocator.instance().getBean(SubscriptionService.class); cryptoService = ServiceLocator.instance().getCryptoService(); userEventService = ServiceLocator.instance().getBean(UserEventService.class); - objectMapper = JacksonUtils.newObjectMapper(); } @Test @@ -125,7 +123,7 @@ public class SubscriptionServiceTest { EmailSubscription subscription = createEmailSubscription(wallet); // Compute full JSON (with hash + signature) - String json = objectMapper.writeValueAsString(subscription); + String json = JacksonUtils.getThreadObjectMapper().writeValueAsString(subscription); String id = service.create(json); Assert.assertNotNull(id); @@ -136,6 +134,8 @@ public class SubscriptionServiceTest { protected EmailSubscription createEmailSubscription(Wallet wallet) throws JsonProcessingException { + ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper(); + EmailSubscription subscription = new EmailSubscription(); subscription.setIssuer(wallet.getPubKeyHash()); subscription.setTime(System.currentTimeMillis()/1000); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index 626fbbbd..01b6a09d 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -260,7 +260,7 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic try { bulkRequest.add(client.prepareIndex(UserEventService.INDEX, UserEventService.EVENT_TYPE) - .setSource(objectMapper.writeValueAsBytes(event)) + .setSource(getObjectMapper().writeValueAsBytes(event)) .setRefresh(false)); flushBulkRequestOrSchedule(); } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index e2514c35..a7dadf25 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -30,7 +30,6 @@ import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; -import org.duniter.core.service.MailService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; @@ -47,7 +46,6 @@ import org.duniter.elasticsearch.user.model.UserProfile; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; @@ -57,7 +55,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; @@ -95,7 +92,6 @@ public class UserEventService extends AbstractService implements ChangeService.C } } - private final MailService mailService; private final ThreadPool threadPool; public final boolean trace; @@ -103,10 +99,8 @@ public class UserEventService extends AbstractService implements ChangeService.C public UserEventService(final Duniter4jClient client, final PluginSettings pluginSettings, final CryptoService cryptoService, - final MailService mailService, final ThreadPool threadPool) { super("duniter.user.event", client, pluginSettings, cryptoService); - this.mailService = mailService; this.threadPool = threadPool; this.trace = logger.isTraceEnabled(); @@ -454,7 +448,7 @@ public class UserEventService extends AbstractService implements ChangeService.C private String toJson(UserEvent userEvent, boolean cleanHashAndSignature) { try { - String json = objectMapper.writeValueAsString(userEvent); + String json = getObjectMapper().writeValueAsString(userEvent); if (cleanHashAndSignature) { json = JacksonUtils.removeAttribute(json, Record.PROPERTY_SIGNATURE); json = JacksonUtils.removeAttribute(json, Record.PROPERTY_HASH); @@ -484,7 +478,7 @@ public class UserEventService extends AbstractService implements ChangeService.C // on create case CREATE: if (change.getSource() != null) { - UserEvent event = objectMapper.readValue(change.getSource().streamInput(), UserEvent.class); + UserEvent event = getObjectMapper().readValue(change.getSource().streamInput(), UserEvent.class); processEventCreate(change.getId(), event); } break; @@ -492,7 +486,7 @@ public class UserEventService extends AbstractService implements ChangeService.C // on update case INDEX: if (change.getSource() != null) { - UserEvent event = objectMapper.readValue(change.getSource().streamInput(), UserEvent.class); + UserEvent event = getObjectMapper().readValue(change.getSource().streamInput(), UserEvent.class); processEventUpdate(change.getId(), event); } break; -- GitLab