diff --git a/duniter4j-client/src/main/java/org/duniter/client/Main.java b/duniter4j-client/src/main/java/org/duniter/client/Main.java index a03b93a68a647db1ba7151220bc61e9dff18a032..b86bdf48140f27e18dc76ed8f7eeb006a688717d 100644 --- a/duniter4j-client/src/main/java/org/duniter/client/Main.java +++ b/duniter4j-client/src/main/java/org/duniter/client/Main.java @@ -76,7 +76,7 @@ public class Main { // Parsing args JCommander jc = new JCommander(this); - actions.entrySet().stream().forEach(entry -> jc.addCommand(entry.getKey(), entry.getValue())); + actions.entrySet().forEach(entry -> jc.addCommand(entry.getKey(), entry.getValue())); try { jc.parse(args); diff --git a/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java b/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java index 2903072d42764f718353f0572e27f6fbada47aa5..ccf311b3f9fdd32c036db2c93afacefb4a53c085 100644 --- a/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java +++ b/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java @@ -43,6 +43,7 @@ import org.nuiton.i18n.I18n; import java.io.*; import java.text.DateFormat; import java.text.SimpleDateFormat; +import java.util.Collection; import java.util.Date; import java.util.List; import java.util.stream.Collectors; @@ -113,7 +114,7 @@ public class NetworkAction extends AbstractAction { /* -- protected methods -- */ - public void showPeersTable(List<Peer> peers, boolean clearConsole) { + public void showPeersTable(Collection<Peer> peers, boolean clearConsole) { // Clearing console if (clearConsole) { @@ -125,7 +126,7 @@ public class NetworkAction extends AbstractAction { return; } - Peer mainConsensusPeer = peers.get(0); + Peer mainConsensusPeer = peers.iterator().next(); Peer.Stats mainConsensusStats = mainConsensusPeer.getStats(); if (mainConsensusStats.isMainConsensus()) { Long mediantTime = mainConsensusStats.getMedianTime(); diff --git a/duniter4j-client/src/main/java/org/duniter/client/actions/TransactionAction.java b/duniter4j-client/src/main/java/org/duniter/client/actions/TransactionAction.java index 7dc04b26644eb32c09a1f1e34483f189f3fb7b53..8415a1f2ba3e7c31e61bfa8f7654e95314ad6b2d 100644 --- a/duniter4j-client/src/main/java/org/duniter/client/actions/TransactionAction.java +++ b/duniter4j-client/src/main/java/org/duniter/client/actions/TransactionAction.java @@ -105,7 +105,7 @@ public class TransactionAction extends AbstractAction { // Compute keypair and wallet - KeyPair keypair = null; + KeyPair keypair; if (authParameters.authScrypt) { CryptoService cryptoService = ServiceLocator.instance().getCryptoService(); keypair = cryptoService.getKeyPairFromSeed( diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/config/Configuration.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/config/Configuration.java index 6837a79330cccfe0136b6b2596d13ae372614d26..cb95c70b4f390cc2d2543f327c4ced30fae87c4e 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/config/Configuration.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/config/Configuration.java @@ -262,6 +262,10 @@ public class Configuration { return Integer.parseInt(ConfigurationOption.NETWORK_CACHE_TIME_IN_MILLIS.getDefaultValue()); } + public int getPeerUpMaxAge() { + return applicationConfig.getOptionAsInt(ConfigurationOption.NETWORK_PEER_UP_MAX_AGE.getKey()); + } + public String getNodeElasticSearchHost() { return applicationConfig.getOption(ConfigurationOption.NODE_ELASTICSEARCH_HOST.getKey()); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/config/ConfigurationOption.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/config/ConfigurationOption.java index 9c28f7ea4f86f5bab98b1b8c04ee104df043cbf1..ea5d6ef0c1796491c27a6047175b9848bbc41aa3 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/config/ConfigurationOption.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/config/ConfigurationOption.java @@ -182,7 +182,6 @@ public enum ConfigurationOption implements ConfigOptionDef { Integer.class, false), - NETWORK_CACHE_TIME_IN_MILLIS ( "duniter4j.network.cacheTimeInMillis", "duniter4j.config.option.network.cacheTimeInMillis.description", @@ -190,6 +189,13 @@ public enum ConfigurationOption implements ConfigOptionDef { Integer.class, false), + NETWORK_PEER_UP_MAX_AGE ( + "duniter.network.peerUpMaxAge", + "duniter.config.option.network.peerUpMaxAge.description", + "600000", // = 10 min + Integer.class, + false), + NODE_ELASTICSEARCH_PROTOCOL( "duniter4j.node.elasticsearch.protocol", n("duniter4j.config.option.node.elasticsearch.protocol.description"), diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java index 199d817a5279357be4d1a4f629321faf4c4892b1..2e6b031ef24b06029b1f8d91515cf373ffcbd663 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java @@ -37,5 +37,5 @@ public interface PeerDao extends EntityDao<String, Peer> { Long getMaxLastUpTime(String currencyId); - void updatePeersAsDown(String currencyId, long lastUpTimeTimeout); + void updatePeersAsDown(String currencyId, long maxUpTime); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java index 544c4e69ef330ab191aff57887fc2cb1c5e5d0db..2ed9eed27230d0516d19376df8716fb93882a5b1 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java @@ -90,11 +90,10 @@ public class MemoryPeerDaoImpl implements PeerDao { } @Override - public void updatePeersAsDown(String currencyId, long lastUpTimeTimeout) { - long maxLastUpTime = (System.currentTimeMillis() - lastUpTimeTimeout)/1000; + public void updatePeersAsDown(String currencyId, long upTimeLimit) { getPeersByCurrencyId(currencyId).stream() - .filter(peer -> peer.getStats() != null && peer.getStats().getLastUpTime() <= maxLastUpTime) + .filter(peer -> peer.getStats() != null && peer.getStats().getLastUpTime() <= upTimeLimit) .forEach(peer -> { peer.getStats().setStatus(Peer.PeerStatus.DOWN); }); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java index 6bf2ba01a290a7668d547fe0d2b704a157d63d43..a834a8d69cb2f779d8cf93a2decf73e5f464cde9 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlocks.java @@ -40,8 +40,6 @@ import java.util.stream.IntStream; */ public final class BlockchainBlocks { - private static final Logger log = LoggerFactory.getLogger(BlockchainBlocks.class); - public static final Pattern SIG_PUBKEY_PATTERN = Pattern.compile("SIG\\(([^)]+)\\)"); public static final Pattern TX_INPUT_CONDITION_FUNCTION = Pattern.compile("(SIG|XHX)\\(([^)]+)\\)"); @@ -208,6 +206,11 @@ public final class BlockchainBlocks { .distinct().collect(Collectors.toSet()); } + public static String buid(BlockchainBlock block) { + if (block == null || block.getNumber() == null || block.getHash() == null) return null; + return block.getNumber() + "-" + block.getHash(); + } + public static class TxInput { public long amount; public int unitbase; diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java index d0289195eb54fc66666194b67221cd0f754b28d4..3ec81df5e02a78bc658ec6fb5fe9a26d01884fcf 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java @@ -42,6 +42,17 @@ public abstract class JacksonUtils extends SimpleModule { public static final String REGEX_ATTRIBUTE_REPLACE = "[,]?(?:\"%s\"|%s)[\\s\\n\\r]*:[\\s\\n\\r]*(?:\"[^\"]+\"|null)"; + private static final ThreadLocal<ObjectMapper> mapper = new ThreadLocal<ObjectMapper>() { + @Override + protected ObjectMapper initialValue() { + return newObjectMapper(); + } + }; + + public static ObjectMapper getThreadObjectMapper() { + return mapper.get(); + } + public static ObjectMapper newObjectMapper() { ObjectMapper objectMapper = new ObjectMapper(); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java index e4d69e6301c37d6de1ba6d469b003334ae6b2d7f..0069509b23bf392292f150e0c02da79d8203433a 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java @@ -25,19 +25,23 @@ package org.duniter.core.client.model.local; import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.base.Joiner; +import javafx.beans.Observable; +import org.duniter.core.beans.Bean; +import org.duniter.core.beans.ObservableBean; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.bma.NetworkPeering; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.core.util.http.InetAddressUtils; +import java.beans.PropertyChangeListener; +import java.beans.PropertyChangeListenerProxy; +import java.beans.PropertyChangeSupport; import java.io.Serializable; import java.util.StringJoiner; public class Peer implements LocalEntity<String>, Serializable { - - public static Builder newBuilder() { return new Builder(); } @@ -364,6 +368,8 @@ public class Peer implements LocalEntity<String>, Serializable { return joiner.toString(); } + + public enum PeerStatus { UP, DOWN, diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java new file mode 100644 index 0000000000000000000000000000000000000000..8d0723c29b237979077c12b578d0c70ddfad5ccb --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peers.java @@ -0,0 +1,27 @@ +package org.duniter.core.client.model.local; + +import org.duniter.core.client.model.bma.EndpointApi; + +/** + * Created by blavenie on 12/09/17. + */ +public final class Peers { + + private Peers() { + // helper class + } + + public static boolean hasEndPointAPI(Peer peer, EndpointApi api) { + return peer.getApi() != null && peer.getApi().equalsIgnoreCase(api.name()); + } + + public static String buid(Peer peer) { + return buid(peer.getStats()); + } + + public static String buid(Peer.Stats stats) { + return stats.getStatus() == Peer.PeerStatus.UP + ? stats.getBlockNumber() + "-" + stats.getBlockHash() + : null; + } +} diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Wallet.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Wallet.java index c22dbf1251a4ff881ff72f0b188c73e94912ef59..0473a118c88a6bd79984deb582d59bb3a7736e82 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Wallet.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Wallet.java @@ -25,6 +25,7 @@ package org.duniter.core.client.model.local; import java.io.Serializable; import java.util.Collection; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.duniter.core.client.model.bma.WotCertification; import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.crypto.CryptoUtils; @@ -36,7 +37,6 @@ import org.duniter.core.util.crypto.KeyPair; */ public class Wallet extends KeyPair implements LocalEntity<Long>, Serializable { - private Long id; private Long accountId; private String currency; @@ -48,11 +48,6 @@ public class Wallet extends KeyPair implements LocalEntity<Long>, Serializable { private long txBlockNumber = -1; private Collection<WotCertification> certifications; - /** - * Use for UI, when some properties has not been displayed yet - */ - private boolean isDirty = false; - public Wallet() { super(null, null); this.identity = new Identity(); @@ -178,14 +173,6 @@ public class Wallet extends KeyPair implements LocalEntity<Long>, Serializable { return identity.getIsMember(); } - public boolean isDirty() { - return isDirty; - } - - public void setDirty(boolean isDirty) { - this.isDirty = isDirty; - } - public Double getCreditAsUD() { return creditAsUD; } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java index 892557eeb07fcaa1182993a7b840583026b2455d..1a8881907ebcd372466353ded7a5d542bb3752c7 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java @@ -45,6 +45,7 @@ import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.http.util.EntityUtils; import org.duniter.core.beans.InitializingBean; import org.duniter.core.client.config.Configuration; +import org.duniter.core.client.config.ConfigurationOption; import org.duniter.core.client.model.bma.Constants; import org.duniter.core.client.model.bma.Error; import org.duniter.core.client.model.bma.jackson.JacksonUtils; @@ -111,7 +112,9 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean protected void initCaches() { Configuration config = Configuration.instance(); int cacheTimeInMillis = config.getNetworkCacheTimeInMillis(); - final int defaultTimeout = config.getNetworkTimeout(); + final int defaultTimeout = config.getNetworkTimeout() > 0 ? + config.getNetworkTimeout() : + Integer.parseInt(ConfigurationOption.NETWORK_TIMEOUT.getDefaultValue()); requestConfigCache = new SimpleCache<Integer, RequestConfig>(cacheTimeInMillis*100) { @Override diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java index 38107551c7cf74de53406be06771c33864ee5861..711b73246bb9289e9ba11a1044c9360c2ce90124 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java @@ -25,6 +25,7 @@ package org.duniter.core.client.service; import org.duniter.core.beans.Bean; import org.duniter.core.beans.BeanFactory; +import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.service.bma.BlockchainRemoteService; import org.duniter.core.client.service.bma.NetworkRemoteService; import org.duniter.core.client.service.bma.TransactionRemoteService; @@ -152,6 +153,10 @@ public class ServiceLocator implements Closeable { return getBean(NetworkService.class); } + public PeerDao getPeerDao() { + return getBean(PeerDao.class); + } + public <S extends Bean> S getBean(Class<S> clazz) { if (beanFactory == null) { initBeanFactory(); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java index 7cfafe68ee2c9e99390094c550fec9b06e579366..14c0f0e8256e3c7ab9c116066bd1cc1463d10692 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java @@ -62,13 +62,8 @@ public class NetworkRemoteServiceImpl extends BaseRemoteServiceImpl implements N public static final String URL_WS_PEER = "/ws/peer"; - protected ObjectMapper objectMapper; - - public NetworkRemoteServiceImpl() { super(); - - objectMapper = JacksonUtils.newObjectMapper(); } public NetworkPeering getPeering(Peer peer) { @@ -104,6 +99,7 @@ public class NetworkRemoteServiceImpl extends BaseRemoteServiceImpl implements N if (jsonNode.has("leaf")) { jsonNode = jsonNode.get("leaf"); if (jsonNode.has("value")) { + ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper(); jsonNode = jsonNode.get("value"); String json = objectMapper.writeValueAsString(jsonNode); result = objectMapper.readValue(json, NetworkPeers.Peer.class); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java index 69667304879a54b5db0939187c6aeaa04bb0dd86..942fd0026ab4965189ebd357d6c2580b9662cb5a 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java @@ -25,6 +25,7 @@ package org.duniter.core.client.service.local; import org.duniter.core.beans.Service; import org.duniter.core.client.model.local.Peer; +import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -38,7 +39,8 @@ import java.util.function.Predicate; public interface NetworkService extends Service { interface PeersChangeListener { - void onChanged(List<Peer> peers); + void onChanges(Collection<Peer> peers); + } class Sort { @@ -51,6 +53,7 @@ public interface NetworkService extends Service { public Peer.PeerStatus filterStatus; public Boolean filterSsl; public List<String> filterEndpoints; + public String currency; } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java index 9c3f66bb95f8fc7585eb41674c6ff02cdb56b154..9d0160801b009bae22c05a965b8709e89b2e6636 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java @@ -23,11 +23,13 @@ package org.duniter.core.client.service.local; */ import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.duniter.core.client.config.Configuration; +import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.*; -import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.model.local.Peers; import org.duniter.core.client.service.ServiceLocator; import org.duniter.core.client.service.bma.BaseRemoteServiceImpl; import org.duniter.core.client.service.bma.BlockchainRemoteService; @@ -36,13 +38,9 @@ import org.duniter.core.client.service.bma.WotRemoteService; import org.duniter.core.client.service.exception.HttpConnectException; import org.duniter.core.client.service.exception.HttpNotFoundException; import org.duniter.core.exception.TechnicalException; -import org.duniter.core.service.CryptoService; +import org.duniter.core.util.*; import org.duniter.core.util.CollectionUtils; -import org.duniter.core.util.ObjectUtils; -import org.duniter.core.util.Preconditions; -import org.duniter.core.util.StringUtils; import org.duniter.core.util.concurrent.CompletableFutures; -import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +50,6 @@ import java.util.concurrent.*; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -61,43 +58,32 @@ import java.util.stream.Collectors; public class NetworkServiceImpl extends BaseRemoteServiceImpl implements NetworkService { private static final Logger log = LoggerFactory.getLogger(NetworkServiceImpl.class); + private static final String PEERS_UPDATE_LOCK_NAME = "Peers update"; private final static String BMA_URL_STATUS = "/node/summary"; private final static String BMA_URL_BLOCKCHAIN_CURRENT = "/blockchain/current"; private final static String BMA_URL_BLOCKCHAIN_HARDSHIP = "/blockchain/hardship/"; private NetworkRemoteService networkRemoteService; - private CryptoService cryptoService; private WotRemoteService wotRemoteService; private BlockchainRemoteService blockchainRemoteService; + private Configuration config; + private final LockManager lockManager = new LockManager(4, 10); - protected class Locker { - private boolean lock = false; - public boolean isLocked() { - return this.lock; - } - public void lock() { - Preconditions.checkArgument(!this.lock); - this.lock = !this.lock; - } - public void unlock() { - Preconditions.checkArgument(this.lock); - this.lock = !this.lock; - } - } + private PeerService peerService; public NetworkServiceImpl() { } public NetworkServiceImpl(NetworkRemoteService networkRemoteService, WotRemoteService wotRemoteService, - CryptoService cryptoService, - BlockchainRemoteService blockchainRemoteService) { + BlockchainRemoteService blockchainRemoteService, + PeerService peerService) { this(); this.networkRemoteService = networkRemoteService; this.wotRemoteService = wotRemoteService; - this.cryptoService = cryptoService; this.blockchainRemoteService = blockchainRemoteService; + this.peerService = peerService; } @Override @@ -105,8 +91,9 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network super.afterPropertiesSet(); this.networkRemoteService = ServiceLocator.instance().getNetworkRemoteService(); this.wotRemoteService = ServiceLocator.instance().getWotRemoteService(); - this.cryptoService = ServiceLocator.instance().getCryptoService(); this.blockchainRemoteService = ServiceLocator.instance().getBlockchainRemoteService(); + this.config = Configuration.instance(); + this.peerService = ServiceLocator.instance().getPeerService(); } @Override @@ -215,9 +202,6 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network peer.getStats().setUid(uid); if (peer.getStats().isReacheable()) { - // Last UP time - peer.getStats().setLastUpTime(System.currentTimeMillis()/1000 /*unix timestamp*/ ); - // Hardship if (StringUtils.isNotBlank(uid)) { getHardship(peer); @@ -235,7 +219,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network final Map<String,Long> peerCountByBuid = peers.stream() .filter(peer -> peer.getStats().getStatus() == Peer.PeerStatus.UP) - .map(this::buid) + .map(Peers::buid) .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); // Compute main consensus buid @@ -257,7 +241,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network // Set consensus stats peers.forEach(peer -> { Peer.Stats stats = peer.getStats(); - String buid = buid(stats); + String buid = Peers.buid(stats); // Set consensus stats on each peers if (buid != null) { @@ -276,11 +260,14 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network public void addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener) { + BlockchainParameters parameters = blockchainRemoteService.getParameters(mainPeer); + // Default filter Filter filterDef = new Filter(); filterDef.filterType = null; filterDef.filterStatus = Peer.PeerStatus.UP; filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + filterDef.currency = parameters.getCurrency(); // Default sort Sort sortDef = new Sort(); @@ -294,126 +281,130 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network final Filter filter, final Sort sort, final boolean autoreconnect, final ExecutorService executor) { + final String currency = filter != null && filter.currency != null ? filter.currency : + blockchainRemoteService.getParameters(mainPeer).getCurrency(); - final Locker threadLock = new Locker(); - final List<Peer> result = new ArrayList<>(); final List<String> knownBlocks = new ArrayList<>(); - final Map<String, Peer> knownPeers = new HashMap<>(); - final Predicate<Peer> peerFilter = peerFilter(filter); final Comparator<Peer> peerComparator = peerComparator(sort); final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool(); - Runnable getPeersRunnable = () -> { - if (threadLock.isLocked()) { - log.debug("Rejected getPeersRunnable() call. Another refresh is already running..."); - return; - } - synchronized (threadLock) { - threadLock.lock(); - } - try { - List<Peer> updatedPeers = getPeers(mainPeer, filter, sort, pool); + // Refreshing one peer (e.g. received from WS) + Consumer<List<Peer>> updateKnownBlocks = (updatedPeers) -> + updatedPeers.forEach(peer -> { + String buid = Peers.buid(peer); + if (!knownBlocks.contains(buid)) { + knownBlocks.add(buid); + } + }); - knownPeers.clear(); - updatedPeers.forEach(peer -> { - String buid = buid(peer.getStats()); - if (!knownBlocks.contains(buid)) { - knownBlocks.add(buid); + // Load all peers + Runnable loadAllPeers = () -> { + try { + if (lockManager.tryLock(PEERS_UPDATE_LOCK_NAME, 1, TimeUnit.MINUTES)) { + try { + List<Peer> result = getPeers(mainPeer, filter, sort, pool); + + knownBlocks.clear(); + updateKnownBlocks.accept(result); + + // Save update peers + peerService.save(currency, result, false/*not the full UP list*/); + + // Send full list listener + listener.onChanges(result); + } catch (Exception e) { + log.error("Error while loading all peers: " + e.getMessage(), e); + } finally { + lockManager.unlock(PEERS_UPDATE_LOCK_NAME); } - knownPeers.put(peer.toString(), peer); - }); - - result.clear(); - result.addAll(updatedPeers); - listener.onChanged(result); - } - catch(Exception e) { - log.error("Error while loading all peers: " + e.getMessage(), e); - } - finally { - synchronized (threadLock) { - threadLock.unlock(); } + } catch (InterruptedException e) { + log.warn("Could not acquire lock for reloading all peers. Skipping."); } }; + // Refreshing one peer (e.g. received from WS) Consumer<NetworkPeers.Peer> refreshPeerConsumer = (bmaPeer) -> { - if (threadLock.isLocked()) { - log.debug("Rejected refreshPeerConsumer() call. Another refresh is already running..."); - return; - } - synchronized (threadLock) { - threadLock.lock(); - } - try { - final List<Peer> newPeers = new ArrayList<>(); - addEndpointsAsPeers(bmaPeer, newPeers, null, filter.filterEndpoints); - - CompletableFuture<List<CompletableFuture<Peer>>> jobs = - CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool) - .thenApply(memberUids -> - newPeers.stream().map(peer -> - asyncRefreshPeer(peer, memberUids, pool)) - .collect(Collectors.toList()) - ); - jobs.thenCompose(refreshedPeersFuture -> CompletableFutures.allOfToList(refreshedPeersFuture, refreshedPeer -> { - boolean exists = knownPeers.containsKey(refreshedPeer.toString()); - boolean include = peerFilter.test(refreshedPeer); - if (!include && exists) { - Peer removedPeer = knownPeers.remove(refreshedPeer.toString()); - result.remove(removedPeer); - } - else if (include && exists) { - result.remove(knownPeers.get(refreshedPeer.toString())); - } - return include; - })) - .thenApply(addedPeers -> { - result.addAll(addedPeers); - fillPeerStatsConsensus(result); - result.sort(peerComparator); - - result.forEach(peer -> { - String buid = buid(peer.getStats()); - if (!knownBlocks.contains(buid)) { - knownBlocks.add(buid); + if (lockManager.tryLock(PEERS_UPDATE_LOCK_NAME)) { + try { + final List<Peer> newPeers = new ArrayList<>(); + addEndpointsAsPeers(bmaPeer, newPeers, null, filter.filterEndpoints); + + CompletableFuture<List<CompletableFuture<Peer>>> jobs = + CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool) + + // Refresh all endpoints + .thenApply(memberUids -> + newPeers.stream().map(peer -> + asyncRefreshPeer(peer, memberUids, pool)) + .collect(Collectors.toList()) + ); + + jobs.thenCompose(CompletableFutures::allOfToList) + .thenAccept(refreshedPeers -> { + if (CollectionUtils.isEmpty(refreshedPeers)) return; + + // Get the full list + final Map<String, Peer> knownPeers = peerService.getPeersByCurrencyId(currency) + .stream() + .filter(peerFilter) + .collect(Collectors.toMap(Peer::toString, Function.identity())); + + // filter, to keep only existing peer, or expected by filter + List<Peer> changedPeers = refreshedPeers.stream() + .filter(refreshedPeer -> { + String peerId = refreshedPeer.toString(); + boolean exists = knownPeers.containsKey(peerId); + if (exists){ + knownPeers.remove(peerId); + } + // If include, add it to full list + boolean include = peerFilter.test(refreshedPeer); + if (include) { + knownPeers.put(peerId, refreshedPeer); + } + return include; + }).collect(Collectors.toList()); + + // If something changes + if (CollectionUtils.isNotEmpty(changedPeers)) { + List<Peer> result = Lists.newArrayList(knownPeers.values()); + fillPeerStatsConsensus(result); + result.sort(peerComparator); + + updateKnownBlocks.accept(changedPeers); + + // Save update peers + peerService.save(currency, changedPeers, false/*not the full UP list*/); + + listener.onChanges(result); } - knownPeers.put(peer.toString(), peer); }); - - listener.onChanged(result); - return result; - }); - } - catch(Exception e) { - log.error("Error while refreshing a peer: " + e.getMessage(), e); - } - finally { - synchronized (threadLock) { - threadLock.unlock(); + } catch (Exception e) { + log.error("Error while refreshing a peer: " + e.getMessage(), e); + } finally { + lockManager.unlock(PEERS_UPDATE_LOCK_NAME); } } }; - // Load all peers - pool.submit(getPeersRunnable); - // Manage new block event blockchainRemoteService.addBlockListener(mainPeer, json -> { - log.debug("Received new block event"); try { BlockchainBlock block = readValue(json, BlockchainBlock.class); - String blockBuid = buid(block); + String blockBuid = BlockchainBlocks.buid(block); boolean isNewBlock = (blockBuid != null && !knownBlocks.contains(blockBuid)); + // If new block + wait 3s for network propagation - if (!isNewBlock) return; + if (isNewBlock) { + schedule(loadAllPeers, pool, 3000/*waiting block propagation*/); + } + } catch(IOException e) { log.error("Could not parse peer received by WS: " + e.getMessage(), e); } - - schedule(getPeersRunnable, pool, 3000/*waiting block propagation*/); }, autoreconnect); // Manage new peer event @@ -422,11 +413,17 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network log.debug("Received new peer event"); try { final NetworkPeers.Peer bmaPeer = readValue(json, NetworkPeers.Peer.class); - pool.submit(() -> refreshPeerConsumer.accept(bmaPeer)); + if (!lockManager.isLocked(PEERS_UPDATE_LOCK_NAME)) { + pool.submit(() -> refreshPeerConsumer.accept(bmaPeer)); + } } catch(IOException e) { log.error("Could not parse peer received by WS: " + e.getMessage(), e); } }, autoreconnect); + + // Default action: Load all peers + pool.submit(loadAllPeers); + } @@ -525,7 +522,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network } // Filter on SSL - if (filter.filterSsl != null && filter.filterSsl.booleanValue() != peer.isUseSsl()) { + if (filter.filterSsl != null && filter.filterSsl != peer.isUseSsl()) { return false; } @@ -574,18 +571,6 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network return peer; } - protected String computeUniqueId(Peer peer) { - return cryptoService.hash( - new StringJoiner("|") - .add(peer.getPubkey()) - .add(peer.getDns()) - .add(peer.getIpv4()) - .add(peer.getIpv6()) - .add(String.valueOf(peer.getPort())) - .add(Boolean.toString(peer.isUseSsl())) - .toString()); - } - protected JsonNode get(final Peer peer, String path) { return executeRequest(peer, path, JsonNode.class); } @@ -633,10 +618,10 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network specScore += (sort.sortType == SortType.PUBKEY ? computeScoreAlphaValue(peer.getPubkey(), 3, sort.sortAsc) : 0); specScore += (sort.sortType == SortType.API ? (peer.isUseSsl() ? (sort.sortAsc ? 1 : -1) : - (hasEndPointAPI(peer, EndpointApi.ES_USER_API) ? (sort.sortAsc ? 0.5 : -0.5) : 0)) : 0); + (Peers.hasEndPointAPI(peer, EndpointApi.ES_USER_API) ? (sort.sortAsc ? 0.5 : -0.5) : 0)) : 0); specScore += (sort.sortType == SortType.HARDSHIP ? (stats.getHardshipLevel() != null ? (sort.sortAsc ? (10000-stats.getHardshipLevel()) : stats.getHardshipLevel()): 0) : 0); specScore += (sort.sortType == SortType.BLOCK_NUMBER ? (stats.getBlockNumber() != null ? (sort.sortAsc ? (1000000000 - stats.getBlockNumber()) : stats.getBlockNumber()) : 0) : 0); - score += (10000000000l * specScore); + score += (10000000000L * specScore); } score += (1000000000 * (stats.getStatus() == Peer.PeerStatus.UP ? 1 : 0)); score += (100000000 * (stats.isMainConsensus() ? 1 : 0)); @@ -662,29 +647,15 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network return asc ? (1000 - score) : score; } - protected boolean hasEndPointAPI(Peer peer, EndpointApi api) { - return peer.getApi() != null && peer.getApi().equalsIgnoreCase(api.name()); - } - protected String buid(Peer peer) { - return buid(peer.getStats()); - } - - protected String buid(Peer.Stats stats) { - return stats.getStatus() == Peer.PeerStatus.UP - ? stats.getBlockNumber() + "-" + stats.getBlockHash() - : null; - } - - protected String buid(BlockchainBlock block) { - if (block == null || block.getNumber() == null || block.getHash() == null) return null; - return block.getNumber() + "-" + block.getHash(); - } protected void schedule(Runnable command, ExecutorService pool, long delayInMs) { if (pool instanceof ScheduledExecutorService) { ((ScheduledExecutorService)pool).schedule(command, delayInMs, TimeUnit.MILLISECONDS); } + else if (delayInMs <= 0) { + pool.submit(command); + } else { pool.submit(() -> { try { diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java index f800a82916d2ecf4fa69f0391eda6b6f6560ca39..f4aa8419732b4ba85d73db98346e6735d0d5f3c8 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java @@ -48,10 +48,7 @@ public interface PeerService extends Service { */ List<Peer> getPeersByCurrencyId(String currencyId); - /** - * Fill all cache need for currencies - * @param context - * @param accountId - */ - void loadCache(long accountId); + void save(String currencyId, List<Peer> peers, boolean isFullUpList); + + boolean isExists(String currencyId, String peerId); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java index ad9f7b736c4b816fe20058b7a3bf8f8082f28512..4b12796e3b87c515ebdae695cbcd1d7970c2bc77 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java @@ -23,20 +23,25 @@ package org.duniter.core.client.service.local; */ import org.duniter.core.beans.InitializingBean; +import org.duniter.core.client.config.Configuration; import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.local.Currency; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.ServiceLocator; import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.core.util.cache.Cache; import org.duniter.core.util.cache.SimpleCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Date; import java.util.List; /** @@ -44,11 +49,14 @@ import java.util.List; */ public class PeerServiceImpl implements PeerService, InitializingBean { + private static final Logger log = LoggerFactory.getLogger(PeerServiceImpl.class); private Cache<String, List<Peer>> peersByCurrencyIdCache; private Cache<String, Peer> activePeerByCurrencyIdCache; private CurrencyService currencyService; + private CryptoService cryptoService; private PeerDao peerDao; + private Configuration config; public PeerServiceImpl() { super(); @@ -56,8 +64,10 @@ public class PeerServiceImpl implements PeerService, InitializingBean { @Override public void afterPropertiesSet() throws Exception { - currencyService = ServiceLocator.instance().getCurrencyService(); - peerDao = ServiceLocator.instance().getBean(PeerDao.class); + this.currencyService = ServiceLocator.instance().getCurrencyService(); + this.peerDao = ServiceLocator.instance().getBean(PeerDao.class); + this.config = Configuration.instance(); + this.cryptoService = ServiceLocator.instance().getCryptoService(); } @Override @@ -66,17 +76,25 @@ public class PeerServiceImpl implements PeerService, InitializingBean { peerDao = null; peersByCurrencyIdCache = null; activePeerByCurrencyIdCache = null; + cryptoService = null; } + + public Peer save(final Peer peer) { Preconditions.checkNotNull(peer); Preconditions.checkNotNull(peer.getCurrency()); Preconditions.checkArgument(StringUtils.isNotBlank(peer.getHost())); Preconditions.checkArgument(peer.getPort() >= 0); + + String peerId = cryptoService.hash(peer.computeKey()); + boolean exists = isExists(peer.getCurrency(), peerId); + peer.setId(peerId); + Peer result; // Create - if (peer.getId() == null) { + if (!exists) { result = peerDao.create(peer); } @@ -107,7 +125,7 @@ public class PeerServiceImpl implements PeerService, InitializingBean { * @param currencyId * @return */ - public Peer getActivePeerByCurrencyId(String currency) { + public Peer getActivePeerByCurrencyId(String currencyId) { // Check if cache as been loaded if (activePeerByCurrencyIdCache == null) { @@ -127,7 +145,7 @@ public class PeerServiceImpl implements PeerService, InitializingBean { }; } - return activePeerByCurrencyIdCache.get(currency); + return activePeerByCurrencyIdCache.get(currencyId); } /** @@ -173,4 +191,34 @@ public class PeerServiceImpl implements PeerService, InitializingBean { } } + @Override + public void save(String currencyId, List<Peer> peers, boolean isFullUpList) { + + int peerDownTimeoutMs = config.getPeerUpMaxAge(); + final long nowInSec = System.currentTimeMillis()/1000; + + if (CollectionUtils.isNotEmpty(peers)) { + if (log.isDebugEnabled()) { + log.debug(String.format("[%s] Updating peers (%s endpoints found)", currencyId, peers.size())); + } + + // On each UP peers: set last UP time + peers.stream().map(Peer::getStats) + .filter(Peer.Stats::isReacheable) + .forEach(stats -> stats.setLastUpTime(nowInSec)); + + peers.forEach(this::save); + } + + // Mark old peers as DOWN + if (isFullUpList && peerDownTimeoutMs > 0) { + Date oldDate = new Date(nowInSec * 1000 - peerDownTimeoutMs); + peerDao.updatePeersAsDown(currencyId, oldDate.getTime() / 1000); + } + } + + @Override + public boolean isExists(String currencyId, String peerId) { + return peerDao.isExists(currencyId, peerId); + } } diff --git a/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_en_GB.properties b/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_en_GB.properties index b9d7714130d6d90938ee1538da608f850cbd300c..c750c9582e9885cf8979e70f3a3f1a930eb8d975 100644 --- a/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_en_GB.properties +++ b/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_en_GB.properties @@ -13,6 +13,7 @@ duniter4j.config.option.data.directory.description= duniter4j.config.option.i18n.directory.description= duniter4j.config.option.i18n.locale.description= duniter4j.config.option.inceptionYear.description= +duniter4j.config.option.network.loadPeers.maxDuration.description= duniter4j.config.option.network.maxConnections.description= duniter4j.config.option.network.maxConnectionsPerHost.description= duniter4j.config.option.network.timeout.description= diff --git a/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_fr_FR.properties b/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_fr_FR.properties index 202c034294b756d9a7967944c7038ce286d99fdd..390d4e90c0a84cbc2f85aa281082007dc6bfed7a 100644 --- a/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_fr_FR.properties +++ b/duniter4j-core-client/src/main/resources/i18n/duniter4j-core-client_fr_FR.properties @@ -13,6 +13,7 @@ duniter4j.config.option.data.directory.description= duniter4j.config.option.i18n.directory.description= duniter4j.config.option.i18n.locale.description= duniter4j.config.option.inceptionYear.description= +duniter4j.config.option.network.loadPeers.maxDuration.description= duniter4j.config.option.network.maxConnections.description= duniter4j.config.option.network.maxConnectionsPerHost.description= duniter4j.config.option.network.timeout.description= diff --git a/duniter4j-core-client/src/test/java/org/duniter/core/client/model/BlockFileUtils.java b/duniter4j-core-client/src/test/java/org/duniter/core/client/model/BlockFileUtils.java index d4362a347aff44c80309f10baf92584b8bbc9bec..6a5f252f1acc91c24f4708b1876831397d1f3b93 100644 --- a/duniter4j-core-client/src/test/java/org/duniter/core/client/model/BlockFileUtils.java +++ b/duniter4j-core-client/src/test/java/org/duniter/core/client/model/BlockFileUtils.java @@ -2,6 +2,7 @@ package org.duniter.core.client.model; import com.fasterxml.jackson.databind.ObjectMapper; import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.junit.Assume; import java.io.File; @@ -14,7 +15,7 @@ public class BlockFileUtils { public static BlockchainBlock readBlockFile(String jsonFileName) { try { - ObjectMapper om = org.duniter.core.client.model.bma.jackson.JacksonUtils.newObjectMapper(); + ObjectMapper om = JacksonUtils.getThreadObjectMapper(); BlockchainBlock block = om.readValue(Files.readAllBytes(new File("src/test/resources" , jsonFileName).toPath()), BlockchainBlock.class); Assume.assumeNotNull(block); return block; diff --git a/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java b/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java index ac731273679fd95f37694067443ada8718cddee0..9e56a40dcfcc2cdc207780f29682c4f10373a178 100644 --- a/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java +++ b/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java @@ -115,7 +115,7 @@ public class BlockchainRemoteServiceTest { Assert.assertEquals(10, result.length); // Make sure allOfToList json are valid blocks - ObjectMapper objectMapper = JacksonUtils.newObjectMapper(); + ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper(); int number = 0; for (String jsonBlock: result) { @@ -148,7 +148,7 @@ public class BlockchainRemoteServiceTest { service.addBlockListener(createTestPeer(), (message) -> { try { - BlockchainBlock block = JacksonUtils.newObjectMapper().readValue(message, BlockchainBlock.class); + BlockchainBlock block = JacksonUtils.getThreadObjectMapper().readValue(message, BlockchainBlock.class); log.debug("Received block #" + block.getNumber()); isWebSocketNewBlockReceived = true; } diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/LockManager.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/LockManager.java new file mode 100644 index 0000000000000000000000000000000000000000..b35a9bbff5e0190e6844cb5bc9887f57bc9a4820 --- /dev/null +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/LockManager.java @@ -0,0 +1,104 @@ +package org.duniter.core.util; + +import com.google.common.collect.MapMaker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A concurrent lock manager + */ +public class LockManager { + + private static final Logger log = LoggerFactory.getLogger(LockManager.class); + private static int DEFAULT_UNSUCCESSFUL_TRY_LOCK_WARN = 4; + private static int DEFAULT_CONCURRENCY_LEVEL = 4; + + private final int unsuccessfulLockCountForWarn; + private final Map<String, Lock> lockMap; + private final Map<String, Integer> tryLockCounter; + + public LockManager(int concurrencyLevel, int maxLockCounterForLog) { + this.unsuccessfulLockCountForWarn = maxLockCounterForLog; + this.lockMap = new MapMaker().concurrencyLevel(concurrencyLevel).makeMap(); + this.tryLockCounter = new MapMaker().concurrencyLevel(concurrencyLevel).makeMap(); + } + + public LockManager(int concurrencyLevel) { + this(concurrencyLevel, concurrencyLevel); + } + + public LockManager() { + this(DEFAULT_CONCURRENCY_LEVEL, DEFAULT_UNSUCCESSFUL_TRY_LOCK_WARN); + } + + /** + * Acquires the lock. + * + */ + public void lock(String name) { + Lock lock = computeIfAbsent(name); + lock.lock(); + } + + public boolean tryLock(final String name, long time, TimeUnit unit) throws InterruptedException { + Lock lock = computeIfAbsent(name); + boolean locked = lock.tryLock(time, unit); + logTryLock(name, locked); + return locked; + } + + public boolean tryLock(final String name) { + Lock lock = computeIfAbsent(name); + boolean locked = lock.tryLock(); + logTryLock(name, locked); + return locked; + } + + public void unlock(final String name) { + Lock lock = lockMap.get(name); + if (lock != null) { + lock.unlock(); + // Reset counter + tryLockCounter.computeIfPresent(name, (input, counter) -> 0); + } + } + + public boolean isLocked(String name) { + Integer tryLockCount = tryLockCounter.get(name); + return tryLockCount != null && tryLockCount.intValue() > 0; + } + + /* -- protected method -- */ + + protected Lock computeIfAbsent(final String name) { + return lockMap.computeIfAbsent(name, input -> new ReentrantLock()); + } + + protected void logTryLock(final String name, final boolean locked) { + // Counter unsuccessful lock + if (locked) { + // Reset counter + tryLockCounter.computeIfPresent(name, (input, counter) -> 1); + } + else { + if (!tryLockCounter.containsKey(name)) { + tryLockCounter.computeIfAbsent(name, input -> 2); + } + else { + tryLockCounter.computeIfPresent(name, (input, counter) -> { + if (counter < unsuccessfulLockCountForWarn) return counter + 1; + if (log.isDebugEnabled()) { + log.debug(String.format("Unable to acquire lock [%s] - after %s unsuccessful attempts", name, counter + 1)); + } + return 1; // reset log counter + }); + + } + } + } +} \ No newline at end of file diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index 696d6b288ec69490d47f5a27a122f109d0f09072..385488bb39982def8708fadb5fba6eff439d0385 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -125,8 +125,8 @@ duniter.blockchain.enable: true # # Force blockchain reload - WARNING: all user events will be resetted to 'unread' # -duniter.blockchain.reload: true -duniter.blockchain.reload.from: 50999 +#duniter.blockchain.reload: true +#duniter.blockchain.reload.from: 50999 # # Duniter node address # diff --git a/duniter4j-es-assembly/src/test/es-home/config/logging.yml b/duniter4j-es-assembly/src/test/es-home/config/logging.yml index b9122182159af75a55cfd869c89af8e2a84d13ca..0676ba194a3bcdbfd93bbce8fcc76fcbd88d19c0 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/logging.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/logging.yml @@ -26,6 +26,7 @@ logger: security: DEBUG org.duniter: INFO + org.duniter.core.util.LockManager: DEBUG #org.duniter.core.beans: DEBUG #org.duniter.core.client.service: DEBUG #org.duniter.elasticsearch: DEBUG diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 5483ad8e8a6738eadfe6f0782e52e1f2b647beda..8590207ef8e960d8a69744894fd07e240a584467 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -207,7 +207,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public int getNetworkTimeout() { - return settings.getAsInt("duniter.network.timeout", 300000 /*300s*/); + return settings.getAsInt("duniter.network.timeout", 30000 /*30s*/); } public int getPeerDownTimeout() { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java index 525bab92e0f9d33eb77b6a5fca7532022b92bf43..0432e2b98100e121664c1ae56913a05b3aa7b428 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/client/Duniter4jClientImpl.java @@ -112,14 +112,12 @@ public class Duniter4jClientImpl implements Duniter4jClient { private final Client client; private final org.duniter.elasticsearch.threadpool.ThreadPool threadPool; - private final ObjectMapper objectMapper; @Inject public Duniter4jClientImpl(Client client, org.duniter.elasticsearch.threadpool.ThreadPool threadPool) { super(); this.client = client; this.threadPool = threadPool; - this.objectMapper = JacksonUtils.newObjectMapper(); } @Override @@ -202,7 +200,10 @@ public class Duniter4jClientImpl implements Duniter4jClient { /** * Retrieve some field from a document id, and check if all field not null + * @param index + * @param type * @param docId + * @param fieldNames * @return */ @Override @@ -362,6 +363,7 @@ public class Duniter4jClientImpl implements Duniter4jClient { // Read query result SearchHit[] searchHits = response.getHits().getHits(); + ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper(); for (SearchHit searchHit : searchHits) { if (searchHit.source() != null) { @@ -382,7 +384,7 @@ public class Duniter4jClientImpl implements Duniter4jClient { @Override public <C extends LocalEntity<String>> C readSourceOrNull(SearchHit searchHit, Class<? extends C> clazz) { try { - C value = objectMapper.readValue(searchHit.getSourceRef().streamInput(), clazz); + C value = JacksonUtils.getThreadObjectMapper().readValue(searchHit.getSourceRef().streamInput(), clazz); value.setId(searchHit.getId()); return value; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java index 1dea70134c8b1b2751bd4c550cc43abc71d29976..ee9332075a27f93a41c04a2b3117f5f0935d7447 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java @@ -40,7 +40,6 @@ public abstract class AbstractDao implements Bean { protected final ESLogger logger; - protected final ObjectMapper objectMapper; protected Duniter4jClient client; protected CryptoService cryptoService; @@ -49,7 +48,6 @@ public abstract class AbstractDao implements Bean { public AbstractDao(String loggerName) { super(); this.logger = Loggers.getLogger(loggerName); - this.objectMapper = JacksonUtils.newObjectMapper(); } @Inject @@ -69,4 +67,7 @@ public abstract class AbstractDao implements Bean { /* -- protected methods -- */ + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java index 867a5757ecceba881bd9d04b0919c33229800d16..48aeb2907c033fd52a67f7fee0586e6672786e91 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java @@ -76,7 +76,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(block); + String json = getObjectMapper().writeValueAsString(block); // Preparing IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE) @@ -126,7 +126,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { // Serialize into JSON // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = objectMapper.writeValueAsString(block); + String json = getObjectMapper().writeValueAsString(block); // Preparing UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString()) @@ -384,7 +384,7 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { if (searchHit.source() != null) { String jsonString = new String(searchHit.source()); try { - block = objectMapper.readValue(jsonString, BlockchainBlock.class); + block = getObjectMapper().readValue(jsonString, BlockchainBlock.class); } catch(Exception e) { if (logger.isDebugEnabled()) { logger.debug("Error while parsing block from JSON:\n" + jsonString); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java index f79a6527795bfb483977b111d62509e70087562a..f8c7a62a59aae73c446dd8281cf09307dc0d7a63 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java @@ -67,7 +67,7 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(block); + String json = getObjectMapper().writeValueAsString(block); // Preparing IndexRequestBuilder request = client.prepareIndex(block.getCurrency(), TYPE) @@ -112,7 +112,7 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { // Serialize into JSON // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = objectMapper.writeValueAsString(block); + String json = getObjectMapper().writeValueAsString(block); // Preparing UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString()) diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java index 357f4187d57f36ebbb5dffe8152866576e79de37..df9e7cd04432e8e4ae9a03031982892e1502bcbe 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java @@ -61,7 +61,7 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp } // Serialize into JSON - byte[] json = objectMapper.writeValueAsBytes(currency); + byte[] json = getObjectMapper().writeValueAsBytes(currency); // Preparing indexBlocksFromNode IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_TYPE) @@ -89,7 +89,7 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp } // Serialize into JSON - byte[] json = objectMapper.writeValueAsBytes(currency); + byte[] json = getObjectMapper().writeValueAsBytes(currency); UpdateRequestBuilder updateRequest = client.prepareUpdate(INDEX, RECORD_TYPE, currency.getId()) .setDoc(json); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java index 4160f437438b79f3c04334db34ac308364e13bc1..cd270b502df51eb840c7e665da65b828c98fa425 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/MovementDaoImpl.java @@ -68,7 +68,7 @@ public class MovementDaoImpl extends AbstractDao implements MovementDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(operation); + String json = getObjectMapper().writeValueAsString(operation); // Preparing IndexRequestBuilder request = client.prepareIndex(operation.getCurrency(), TYPE) @@ -96,7 +96,7 @@ public class MovementDaoImpl extends AbstractDao implements MovementDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(operation); + String json = getObjectMapper().writeValueAsString(operation); // Preparing UpdateRequestBuilder request = client.prepareUpdate(operation.getCurrency(), TYPE, operation.getId()) diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index 57a6f8b9a0f66512bd3f30b6c286e39746227fcb..530fc8992594ac1d9dd147276173c098fcbec705 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -29,7 +29,6 @@ import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.PeerDao; -import org.duniter.elasticsearch.model.Movement; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -47,6 +46,7 @@ import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation; import org.elasticsearch.search.aggregations.metrics.max.Max; import java.io.IOException; +import java.util.Date; import java.util.List; /** @@ -75,7 +75,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { // Serialize into JSON // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = objectMapper.writeValueAsString(peer); + String json = getObjectMapper().writeValueAsString(peer); // Preparing indexBlocksFromNode IndexRequestBuilder indexRequest = client.prepareIndex(peer.getCurrency(), TYPE) @@ -104,7 +104,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { // Serialize into JSON try { - String json = objectMapper.writeValueAsString(peer); + String json = getObjectMapper().writeValueAsString(peer); // Preparing indexBlocksFromNode UpdateRequestBuilder updateRequest = client.prepareUpdate(peer.getCurrency(), TYPE, peer.getId()) @@ -183,9 +183,11 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { } @Override - public void updatePeersAsDown(String currencyName, long lastUpTimeTimeout) { + public void updatePeersAsDown(String currencyName, long upTimeLimit) { - long minUpTime = (System.currentTimeMillis() - lastUpTimeTimeout)/1000; + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] Setting peers as DOWN, if older than [%s]...", currencyName, new Date(upTimeLimit*1000))); + } SearchRequestBuilder searchRequest = client.prepareSearch(currencyName) .setFetchSource(false) @@ -193,8 +195,8 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { // Query = filter on lastUpTime BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() - // where lastUpTime < minUpTime - .filter(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(minUpTime)) + // where lastUpTime < upTimeLimit + .filter(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(upTimeLimit)) // AND status = UP .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name())); searchRequest.setQuery(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, QueryBuilders.constantScoreQuery(boolQuery))); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java index 39515f50136f007c41022103975d5a0455f8c5b6..a7d84512d4855fb04b5496d099da3e9fdf662b70 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java @@ -161,7 +161,7 @@ public abstract class AbstractBlockchainListenerService extends AbstractService Preconditions.checkNotNull(change.getSource()); try { - return objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + return getObjectMapper().readValue(change.getSource().streamInput(), BlockchainBlock.class); } catch (IOException e) { throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index 19a1d7608d6d7847bfacd7d8ec5bf5ddfe8ce8b5..d2b15d9527174d0940a7fc87095215998a831547 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -29,13 +29,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.bma.jackson.JacksonUtils; -import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.client.model.elasticsearch.Records; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.elasticsearch.ElasticsearchException; @@ -52,7 +50,6 @@ import java.util.Set; public abstract class AbstractService implements Bean { protected final ESLogger logger; - protected final ObjectMapper objectMapper; protected Duniter4jClient client; protected PluginSettings pluginSettings; @@ -77,7 +74,6 @@ public abstract class AbstractService implements Bean { super(); this.logger = Loggers.getLogger(loggerName); this.client = client; - this.objectMapper = JacksonUtils.newObjectMapper(); this.pluginSettings = pluginSettings; this.cryptoService = cryptoService; this.retryCount = pluginSettings.getNodeRetryCount(); @@ -103,6 +99,10 @@ public abstract class AbstractService implements Bean { } } + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } + protected <T> T executeWithRetry(RetryFunction<T> retryFunction) throws TechnicalException{ int retry = 0; while (retry < retryCount) { @@ -137,7 +137,7 @@ public abstract class AbstractService implements Bean { protected JsonNode readAndVerifyIssuerSignature(String recordJson, String issuerFieldName) throws ElasticsearchException { try { - JsonNode recordObj = objectMapper.readTree(recordJson); + JsonNode recordObj = getObjectMapper().readTree(recordJson); readAndVerifyIssuerSignature(recordJson, recordObj, issuerFieldName); return recordObj; } @@ -149,7 +149,7 @@ public abstract class AbstractService implements Bean { protected void readAndVerifyIssuerSignature(JsonNode actualObj, String issuerFieldName) throws ElasticsearchException, JsonProcessingException { // Remove hash and signature - String recordJson = objectMapper.writeValueAsString(actualObj); + String recordJson = getObjectMapper().writeValueAsString(actualObj); readAndVerifyIssuerSignature(recordJson, actualObj, issuerFieldName); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java index 133ebf81a4497445e60b10e3af3b935b797b41d2..6e144b822d45c80bce459d144814303b36899a6c 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java @@ -22,13 +22,8 @@ package org.duniter.elasticsearch.service; * #L% */ -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.base.Charsets; -import com.google.common.base.Joiner; -import org.duniter.core.util.Preconditions; -import org.apache.commons.io.IOUtils; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ByteArrayEntity; import org.duniter.core.client.model.elasticsearch.Record; @@ -37,6 +32,7 @@ import org.duniter.core.client.service.HttpService; import org.duniter.core.client.service.exception.HttpUnauthorizeException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; +import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; @@ -44,21 +40,17 @@ import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.duniter.elasticsearch.model.SynchroResult; -import org.duniter.elasticsearch.service.AbstractService; -import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import java.io.IOException; -import java.io.InputStream; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Objects; @@ -212,6 +204,7 @@ public abstract class AbstractSynchroService extends AbstractService { long insertHits = 0; long updateHits = 0; long invalidSignatureHits = 0; + ObjectMapper objectMapper = getObjectMapper(); if (offset < total) { @@ -274,7 +267,7 @@ public abstract class AbstractSynchroService extends AbstractService { // Check version Long existingVersion = ((Number)existingDoc.getFields().get(versionFieldName).getValue()).longValue(); - boolean doUpdate = (existingVersion == null || version > existingVersion.longValue()); + boolean doUpdate = (existingVersion == null || version > existingVersion); if (doUpdate) { if (debug) { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java index d013524c3e5785491afa2826e3e8dcc393ea0d07..58f725416c46e85cbb867aa022d9f7d7cee10a92 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java @@ -79,7 +79,7 @@ public class BlockchainListenerService extends AbstractBlockchainListenerService try { bulkRequest.add(client.prepareIndex(block.getCurrency(), BlockStatDao.TYPE, String.valueOf(block.getNumber())) .setRefresh(false) // recommended for heavy indexing - .setSource(objectMapper.writeValueAsBytes(stat))); + .setSource(getObjectMapper().writeValueAsBytes(stat))); flushBulkRequestOrSchedule(); } catch (JsonProcessingException e) { logger.error("Could not serialize BlockStat into JSON: " + e.getMessage(), e); @@ -100,7 +100,7 @@ public class BlockchainListenerService extends AbstractBlockchainListenerService try { bulkRequest.add(client.prepareIndex(block.getCurrency(), MovementDao.TYPE) .setRefresh(false) // recommended for heavy indexing - .setSource(objectMapper.writeValueAsBytes(movement))); + .setSource(getObjectMapper().writeValueAsBytes(movement))); flushBulkRequestOrSchedule(); } catch (JsonProcessingException e) { logger.error("Could not serialize BlockOperation into JSON: " + e.getMessage(), e); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 6cc334e45cde9fd11d1c5cee67da1a2974d0b82a..b634a9ba18dab1d09e68771418fa1da1fb7b9242 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -356,7 +356,7 @@ public class BlockchainService extends AbstractService { // Serialize into JSON // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = objectMapper.writeValueAsString(currentBlock); + String json = getObjectMapper().writeValueAsString(currentBlock); indexCurrentBlockFromJson(currentBlock.getCurrency(), json, wait); } catch(IOException e) { throw new TechnicalException(e); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java index 0dc761b5a959a2dedda76ce0f7541b0ab5a4b5fa..ba34bf5b40b52e895b54377f82e6e022583aed6e 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java @@ -24,59 +24,40 @@ package org.duniter.elasticsearch.service; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.BlockchainParameters; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.local.NetworkService; -import org.duniter.core.exception.TechnicalException; -import org.duniter.core.model.NullProgressionModel; -import org.duniter.core.model.ProgressionModel; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; -import org.duniter.core.util.ObjectUtils; -import org.duniter.core.util.Preconditions; -import org.duniter.core.util.concurrent.CompletableFutures; -import org.duniter.core.util.json.JsonSyntaxException; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.exception.DuplicateIndexIdException; import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.search.SearchHitField; -import org.elasticsearch.search.highlight.HighlightField; import org.nuiton.i18n.I18n; import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; /** * Created by Benoit on 30/03/2015. */ -public class PeerService extends AbstractService { - - private final ProgressionModel nullProgressionModel = new NullProgressionModel(); +public class PeerService extends AbstractService { private org.duniter.core.client.service.bma.BlockchainRemoteService blockchainRemoteService; private org.duniter.core.client.service.local.NetworkService networkService; + private org.duniter.core.client.service.local.PeerService delegate; private ThreadPool threadPool; - private PeerDao peerDao; @Inject public PeerService(Duniter4jClient client, PluginSettings settings, ThreadPool threadPool, CryptoService cryptoService, - PeerDao peerDao, final ServiceLocator serviceLocator){ super("duniter.network.peer", client, settings, cryptoService); - this.peerDao = peerDao; this.threadPool = threadPool; threadPool.scheduleOnStarted(() -> { this.blockchainRemoteService = serviceLocator.getBlockchainRemoteService(); this.networkService = serviceLocator.getNetworkService(); + this.delegate = serviceLocator.getPeerService(); setIsReady(true); }); } @@ -118,7 +99,7 @@ public class PeerService extends AbstractService { sortDef.sortType = null; List<Peer> peers = networkService.getPeers(firstPeer, filterDef, sortDef, threadPool.scheduler()); - savePeers(currencyName, peers); + delegate.save(currencyName, peers, true); logger.info(I18n.t("duniter4j.es.networkService.indexPeers.succeed", currencyName, firstPeer, peers.size(), (System.currentTimeMillis() - timeStart))); } catch(Exception e) { logger.error("Error during indexBlocksFromNode: " + e.getMessage(), e); @@ -141,108 +122,14 @@ public class PeerService extends AbstractService { filterDef.filterType = null; filterDef.filterStatus = Peer.PeerStatus.UP; filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + filterDef.currency = currencyName; // Default sort NetworkService.Sort sortDef = new NetworkService.Sort(); sortDef.sortType = null; networkService.addPeersChangeListener(mainPeer, - peers -> savePeers(currencyName, peers), + peers -> logger.debug(String.format("[%s] Update peers: %s found", currencyName, CollectionUtils.size(peers))), filterDef, sortDef, true /*autoreconnect*/, threadPool.scheduler()); } - - public Long getMaxLastUpTime(String currencyName) { - return peerDao.getMaxLastUpTime(currencyName); - } - - /** - * Create or update a peer, depending on its existence and hash - * @param peer - * @throws DuplicateIndexIdException - */ - public Peer savePeer(final Peer peer) throws DuplicateIndexIdException { - Preconditions.checkNotNull(peer, "peer could not be null") ; - Preconditions.checkNotNull(peer.getCurrency(), "peer attribute 'currency' could not be null"); - Preconditions.checkNotNull(peer.getPubkey(), "peer attribute 'pubkey' could not be null"); - Preconditions.checkNotNull(peer.getHost(), "peer attribute 'host' could not be null"); - Preconditions.checkNotNull(peer.getApi(), "peer 'api' could not be null"); - - String id = cryptoService.hash(peer.computeKey()); - peer.setId(id); - - boolean exists = peerDao.isExists(peer.getCurrency(), id); - - // Currency not exists, or has changed, so create it - if (!exists) { - if (logger.isTraceEnabled()) { - logger.trace(String.format("Insert new peer [%s]", peer)); - } - - // Index new peer - peer.setId(id); - peerDao.create(peer); - } - - // Update existing peer - else { - if (logger.isTraceEnabled()) { - logger.trace(String.format("Update peer [%s]", peer)); - } - peerDao.update(peer); - } - return peer; - } - - - /* -- protected methods -- */ - - protected void savePeers(String currencyName, List<Peer> peers) { - if (CollectionUtils.isNotEmpty(peers)) { - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] Updating peers endpoints (%s endpoints found)", currencyName, peers.size())); - } - peers.forEach(this::savePeer); - } - - // Mark old peers as DOWN - peerDao.updatePeersAsDown(currencyName, pluginSettings.getPeerDownTimeout()); - } - - protected List<Peer> toPeers(SearchResponse response, boolean withHighlight) { - // Read query result - List<Peer> result = Lists.newArrayList(); - response.getHits().forEach(searchHit -> { - Peer peer; - if (searchHit.source() != null) { - String jsonString = new String(searchHit.source()); - try { - peer = objectMapper.readValue(jsonString, Peer.class); - } catch(Exception e) { - if (logger.isDebugEnabled()) { - logger.debug("Error while parsing peer from JSON:\n" + jsonString); - } - throw new JsonSyntaxException("Error while read peer from JSON: " + e.getMessage(), e); - } - } - else { - peer = new Peer(); - SearchHitField field = searchHit.getFields().get("hash"); - peer.setHash(field.getValue()); - } - result.add(peer); - - // If possible, use highlights - if (withHighlight) { - Map<String, HighlightField> fields = searchHit.getHighlightFields(); - for (HighlightField field : fields.values()) { - String blockNameHighLight = field.getFragments()[0].string(); - peer.setHash(blockNameHighLight); - } - } - }); - - return result; - } - - } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java index 5f2d4270bcc25d8bea88255a0bc0a5957304f61d..4fb5efd9bc6cd49c9d6158b333d48d38bc79166b 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/LoggingScheduledThreadPoolExecutor.java @@ -22,7 +22,6 @@ package org.duniter.elasticsearch.threadpool; * #L% */ -import org.duniter.core.exception.TechnicalException; import org.elasticsearch.common.logging.ESLogger; import java.util.concurrent.*; diff --git a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java index ab95a1a65800d619948d1238be13820a5f5521cd..f4238c0e48641e297f21b35e5b09a02e95bede1b 100644 --- a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java +++ b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/BlockchainServiceTest.java @@ -50,15 +50,13 @@ public class BlockchainServiceTest { private BlockchainRemoteService remoteService; private Configuration config; private Peer peer; - private ObjectMapper objectMapper; @Before public void setUp() throws Exception { service = ServiceLocator.instance().getBean(BlockchainService.class); remoteService = ServiceLocator.instance().getBlockchainRemoteService(); config = Configuration.instance(); - peer = createTestPeer(config); - objectMapper = JacksonUtils.newObjectMapper(); + peer = Peer.newBuilder().setHost(config.getNodeHost()).setPort(config.getNodePort()).build(); // Init the currency CurrencyService currencyService = ServiceLocator.instance().getBean(CurrencyService.class); @@ -77,7 +75,7 @@ public class BlockchainServiceTest { service.indexCurrentBlock(current, true/*wait*/); try { - String blockStr = objectMapper.writeValueAsString(current); + String blockStr = JacksonUtils.getThreadObjectMapper().writeValueAsString(current); service.indexBlockFromJson(peer, blockStr, true/*is current*/, false/*detected fork*/, true/*wait*/); } @@ -88,29 +86,11 @@ public class BlockchainServiceTest { Thread.sleep(1000); // Try to get the indexed block - BlockchainBlock retrievedBlock = service.getBlockById(current.getCurrency(), current.getNumber().intValue()); + BlockchainBlock retrievedBlock = service.getBlockById(current.getCurrency(), current.getNumber()); Assert.assertNotNull(retrievedBlock); - } /* -- internal methods */ - protected void assertResults(String queryText, List<BlockchainBlock> result) { - log.info(String.format("Results for a search on [%s]", queryText)); - Assert.assertNotNull(result); - Assert.assertTrue(result.size() > 0); - for (BlockchainBlock block: result) { - log.info(" - " + block.getNumber()); - } - } - - protected Peer createTestPeer(Configuration config) { - Peer peer = new Peer( - config.getNodeHost(), - config.getNodePort()); - - return peer; - } - } diff --git a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java index f932e97e04e826898b39fb93fece07995a9e14ba..790a9d2e00d3a34b2b04b19c9c97e3f7b944669e 100644 --- a/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java +++ b/duniter4j-es-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java @@ -56,7 +56,6 @@ public class PeerServiceTest { private NetworkRemoteService remoteService; private Configuration config; private Peer peer; - private ObjectMapper objectMapper; @Before public void setUp() throws Exception { @@ -67,7 +66,6 @@ public class PeerServiceTest { peer = new Peer.Builder() .setHost(config.getNodeHost()) .setPort(config.getNodePort()).build(); - objectMapper = JacksonUtils.newObjectMapper(); // Waiting services started while(!service.isReady() || !currencyService.isReady()) { @@ -109,5 +107,6 @@ public class PeerServiceTest { Long maxLastUpTime = service.getMaxLastUpTime(peer1.getCurrency()); Assert.assertNotNull(maxLastUpTime); Assert.assertEquals(peer1.getStats().getLastUpTime().longValue(), maxLastUpTime.longValue()); + } } diff --git a/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/service/SubscriptionServiceTest.java b/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/service/SubscriptionServiceTest.java index c8f9bf4a80a41998272c434d60accb3feef8c74f..7493b37471b347a72360e0b1ba63eba6239e5e22 100644 --- a/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/service/SubscriptionServiceTest.java +++ b/duniter4j-es-subscription/src/test/java/org/duniter/elasticsearch/subscription/service/SubscriptionServiceTest.java @@ -64,14 +64,12 @@ public class SubscriptionServiceTest { private SubscriptionService service; private UserEventService userEventService; private CryptoService cryptoService; - private ObjectMapper objectMapper; @Before public void setUp() throws Exception { service = ServiceLocator.instance().getBean(SubscriptionService.class); cryptoService = ServiceLocator.instance().getCryptoService(); userEventService = ServiceLocator.instance().getBean(UserEventService.class); - objectMapper = JacksonUtils.newObjectMapper(); } @Test @@ -125,7 +123,7 @@ public class SubscriptionServiceTest { EmailSubscription subscription = createEmailSubscription(wallet); // Compute full JSON (with hash + signature) - String json = objectMapper.writeValueAsString(subscription); + String json = JacksonUtils.getThreadObjectMapper().writeValueAsString(subscription); String id = service.create(json); Assert.assertNotNull(id); @@ -136,6 +134,8 @@ public class SubscriptionServiceTest { protected EmailSubscription createEmailSubscription(Wallet wallet) throws JsonProcessingException { + ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper(); + EmailSubscription subscription = new EmailSubscription(); subscription.setIssuer(wallet.getPubKeyHash()); subscription.setTime(System.currentTimeMillis()/1000); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index 626fbbbde0f9ee5865c68d8ac1e6e262ac726e4e..01b6a09d93b1b62501f1fe7fb0e961c157e07b46 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -260,7 +260,7 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic try { bulkRequest.add(client.prepareIndex(UserEventService.INDEX, UserEventService.EVENT_TYPE) - .setSource(objectMapper.writeValueAsBytes(event)) + .setSource(getObjectMapper().writeValueAsBytes(event)) .setRefresh(false)); flushBulkRequestOrSchedule(); } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index e2514c3524f6ec2ba9692bebc488b0175d3bba5f..a7dadf25c878c2dd8b9b2620ca770568ee930525 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -30,7 +30,6 @@ import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; -import org.duniter.core.service.MailService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; @@ -47,7 +46,6 @@ import org.duniter.elasticsearch.user.model.UserProfile; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; @@ -57,7 +55,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; @@ -95,7 +92,6 @@ public class UserEventService extends AbstractService implements ChangeService.C } } - private final MailService mailService; private final ThreadPool threadPool; public final boolean trace; @@ -103,10 +99,8 @@ public class UserEventService extends AbstractService implements ChangeService.C public UserEventService(final Duniter4jClient client, final PluginSettings pluginSettings, final CryptoService cryptoService, - final MailService mailService, final ThreadPool threadPool) { super("duniter.user.event", client, pluginSettings, cryptoService); - this.mailService = mailService; this.threadPool = threadPool; this.trace = logger.isTraceEnabled(); @@ -454,7 +448,7 @@ public class UserEventService extends AbstractService implements ChangeService.C private String toJson(UserEvent userEvent, boolean cleanHashAndSignature) { try { - String json = objectMapper.writeValueAsString(userEvent); + String json = getObjectMapper().writeValueAsString(userEvent); if (cleanHashAndSignature) { json = JacksonUtils.removeAttribute(json, Record.PROPERTY_SIGNATURE); json = JacksonUtils.removeAttribute(json, Record.PROPERTY_HASH); @@ -484,7 +478,7 @@ public class UserEventService extends AbstractService implements ChangeService.C // on create case CREATE: if (change.getSource() != null) { - UserEvent event = objectMapper.readValue(change.getSource().streamInput(), UserEvent.class); + UserEvent event = getObjectMapper().readValue(change.getSource().streamInput(), UserEvent.class); processEventCreate(change.getId(), event); } break; @@ -492,7 +486,7 @@ public class UserEventService extends AbstractService implements ChangeService.C // on update case INDEX: if (change.getSource() != null) { - UserEvent event = objectMapper.readValue(change.getSource().streamInput(), UserEvent.class); + UserEvent event = getObjectMapper().readValue(change.getSource().streamInput(), UserEvent.class); processEventUpdate(change.getId(), event); } break;