diff --git a/duniter4j-client/src/main/assembly/min/duniter4j.sh b/duniter4j-client/src/main/assembly/min/duniter4j.sh index 4ddc4538646a981abccbe62782343c39302d7ccc..2c0064775a1ab2c82ed83031903f356e094e1647 100644 --- a/duniter4j-client/src/main/assembly/min/duniter4j.sh +++ b/duniter4j-client/src/main/assembly/min/duniter4j.sh @@ -1,7 +1,7 @@ #!/bin/bash READLINK=`which readlink` -if [ -z "$READLINK" ]; then +if [[ -z "$READLINK" ]]; then message "Required tool 'readlink' is missing. Please install before launch \"$0\" file." exit 1 fi @@ -10,8 +10,8 @@ fi # Ensure BASEDIR points to the directory where the soft is installed. # ------------------------------------------------------------------ SCRIPT_LOCATION=$0 -if [ -x "$READLINK" ]; then - while [ -L "$SCRIPT_LOCATION" ]; do +if [[ -x "$READLINK" ]]; then + while [[ -L "$SCRIPT_LOCATION" ]]; do SCRIPT_LOCATION=`"$READLINK" -e "$SCRIPT_LOCATION"` done fi @@ -22,11 +22,11 @@ export JAR="$JARDIR/${project.build.finalName}.${project.packaging}" export I18N_DIR="$APPDIR/i18n" # Retrieve the JAVA installation -if [ "$JAVA_HOME~" == "~" ]; then +if [[ "$JAVA_HOME~" == "~" ]]; then export JAVA_HOME="$APPDIR/jre" export JAVA_COMMAND="$JAVA_HOME/bin/java" - if [ -f "$JAVA_HOME/bin/java" ]; then + if [[ -f "$JAVA_HOME/bin/java" ]]; then # If embedded JRE exists, make sure java is executable chmod +x "$JAVA_COMMAND" else @@ -37,7 +37,7 @@ else export JAVA_COMMAND="$JAVA_HOME/bin/java" fi -if [ -d "$HOME" ]; then +if [[ -d "$HOME" ]]; then export BASEDIR="$HOME/.config/duniter4j" export CONFIG_DIR="$BASEDIR/config" export CONFIG_FILE="$CONFIG_DIR/duniter4j-client.config" @@ -48,14 +48,21 @@ else export CONFIG_FILE="$CONFIG_DIR/config/duniter4j-client.config" export LOG_FILE="$APPDIR/logs/${project.build.finalName}.log" - echo "Using base" + echo "Using basedir: $APPDIR" fi +if [[ "$JAVA_OPTS~" == "~" ]]; then + # Configuring apache simplelog to use Log4j + JAVA_OPTS="-Dorg.apache.commons.logging.Log=org.apache.commons.logging.impl.Log4JLogger" + exprt ${JAVA_OPTS} +fi + + # Create the config dir if need mkdir -p "$CONFIG_DIR" # Create the config file (if need) -if [ ! -f "$CONFIG_FILE" ]; then +if [[ ! -f "$CONFIG_FILE" ]]; then echo "INFO - Initialized configuration file: $CONFIG_FILE" cp -u $JARDIR/duniter4j-client.config $CONFIG_FILE fi @@ -68,12 +75,12 @@ while true; do $JAVA_COMMAND $JAVA_OPTS -Dduniter4j.log.file=$LOG_FILE -Dduniter4j.i18n.directory=$I18N_DIR -jar $JAR --basedir $BASEDIR --config $CONFIG_FILE $* exitcode=$? - if [ ! "$exitcode" -eq "130" ]; then + if [[ ! "$exitcode" -eq "130" ]]; then echo "INFO - Application stopped with exitcode: $exitcode" fi ## Continue only if exitcode=88 (will restart the application) - if [ ! "$exitcode" -eq "88" ]; then + if [[ ! "$exitcode" -eq "88" ]]; then # quit now! exit $exitcode fi diff --git a/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java b/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java index 51574c1e584c3f2621eabcf8ac51f8673302afdc..2c033bea52db11eed0baa83bcc6fc490b8729e3f 100644 --- a/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java +++ b/duniter4j-client/src/main/java/org/duniter/client/actions/NetworkAction.java @@ -32,7 +32,9 @@ import org.duniter.client.actions.utils.Formatters; import org.apache.commons.io.IOUtils; import org.duniter.core.client.config.Configuration; import org.duniter.core.client.config.ConfigurationOption; +import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.model.local.Peers; import org.duniter.core.client.service.ServiceLocator; import org.duniter.core.client.service.local.NetworkService; import org.duniter.core.util.CollectionUtils; @@ -97,7 +99,7 @@ public class NetworkAction extends AbstractAction { Long now = System.currentTimeMillis(); List<Peer> peers = service.getPeers(mainPeer); showPeersTable(peers, false); - log.info(I18n.t("duniter4j.client.network.executionTime", -System.currentTimeMillis() - now)); + log.info(I18n.t("duniter4j.client.network.executionTime", System.currentTimeMillis() - now)); } else { service.addPeersChangeListener(mainPeer, peers -> showPeersTable(peers, true)); @@ -171,7 +173,7 @@ public class NetworkAction extends AbstractAction { Formatters.formatPubkey(peer.getPubkey()), peer.getHost() + ":" + peer.getPort(), peer.getStats().getStatus().name(), - isUp && peer.isUseSsl() ? I18n.t("duniter4j.client.network.ssl") : "", + isUp ? formatApi(peer) : "", isUp ? peer.getStats().getVersion() : "", (isUp && peer.getStats().getHardshipLevel() != null) ? peer.getStats().getHardshipLevel() : I18n.t("duniter4j.client.network.mirror"), isUp ? formatBuid(peer.getStats()) : "" @@ -237,6 +239,18 @@ public class NetworkAction extends AbstractAction { } protected String formatBuid(Peer.Stats stats) { + if (stats.getBlockNumber() == null) return ""; return Formatters.formatBuid(stats.getBlockNumber() + "-" + stats.getBlockHash()); } + + protected String formatApi(Peer peer) { + if (Peers.hasBmaEndpoint(peer)) { + return peer.isUseSsl() ? I18n.t("duniter4j.client.network.ssl") : ""; + } + if (Peers.hasWs2pEndpoint(peer)) { + return "WS2P"; + } + + return peer.getApi(); + } } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java index 09400bab17e60857a9cd221c6d24357c8ec5ec63..1a9ade97b36494d46f8881b53fd0ea24d2f5fbe5 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/PeerDao.java @@ -26,6 +26,7 @@ import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.bma.NetworkPeers; import org.duniter.core.client.model.local.Peer; +import java.util.Collection; import java.util.List; import java.util.Set; @@ -59,7 +60,7 @@ public interface PeerDao extends EntityDao<String, Peer> { Long getMaxLastUpTime(String currencyId); - void updatePeersAsDown(String currencyId, long upTimeLimitInSec); + void updatePeersAsDown(String currencyId, long upTimeLimitInSec, Collection<String> endpointApis); - boolean hasPeersUpWithApi(String currencyId, Set<EndpointApi> api); + boolean hasPeersUpWithApi(String currencyId, Set<EndpointApi> endpointApis); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java index 057de46c3320be95f5a11bc183ddd04add37a7bb..9fd7dbe0919457331596f13197d15ef7ea02c6c3 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/dao/mem/MemoryPeerDaoImpl.java @@ -143,10 +143,14 @@ public class MemoryPeerDaoImpl implements PeerDao { } @Override - public void updatePeersAsDown(String currencyId, long upTimeLimitInSec) { + public void updatePeersAsDown(String currencyId, long upTimeLimitInSec, Collection<String> endpointApis) { getPeersByCurrencyId(currencyId).stream() - .filter(peer -> peer.getStats() != null && peer.getStats().getLastUpTime() <= upTimeLimitInSec) + .filter(peer -> + peer.getStats() != null + && peer.getStats().getLastUpTime() < upTimeLimitInSec + && (endpointApis == null || endpointApis.contains(peer.getApi())) + ) .forEach(peer -> peer.getStats().setStatus(Peer.PeerStatus.DOWN)); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java index 05ebf72de465a41ad23a8da7de9e349d965e3d53..29125edd43837b2dd53859311ced47005b704324 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/local/Peer.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.base.Joiner; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.bma.NetworkPeering; +import org.duniter.core.client.model.bma.NetworkPeers; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.core.util.http.InetAddressUtils; @@ -54,6 +55,8 @@ public class Peer implements LocalEntity<String>, Serializable { private String currency; private String path; + private Peering peering; + public Builder() { } @@ -127,6 +130,9 @@ public class Peer implements LocalEntity<String>, Serializable { if (source.api != null) { setApi(source.api.name()); } + if (StringUtils.isNotBlank(source.id)) { + setEpId(source.id); + } if (StringUtils.isNotBlank(source.dns)) { setDns(source.dns); } @@ -142,12 +148,30 @@ public class Peer implements LocalEntity<String>, Serializable { if (source.port != null) { setPort(source.port); } - if (StringUtils.isNotBlank(source.id)) { - setEpId(source.id); + if (StringUtils.isNotBlank(source.path)) { + setPath(source.path); } return this; } + public Builder setPeering(NetworkPeers.Peer remotePeer) { + this.peering = this.peering != null ? this.peering : new Peering(); + + this.peering.setVersion(remotePeer.getVersion()); + this.peering.setSignature(remotePeer.getSignature()); + + // Block number+hash + if (remotePeer.getBlock() != null) { + String[] blockParts = remotePeer.getBlock().split("-"); + if (blockParts.length == 2) { + this.peering.setBlockNumber(Integer.parseInt(blockParts[0])); + this.peering.setBlockHash(blockParts[1]); + } + } + + return this; + } + public void setPath(String path) { this.path = path; } @@ -173,6 +197,10 @@ public class Peer implements LocalEntity<String>, Serializable { if (StringUtils.isNotBlank(this.path)) { ep.setPath(this.path); } + // Peering + if (this.peering != null) { + ep.setPeering(this.peering); + } return ep; } @@ -187,6 +215,7 @@ public class Peer implements LocalEntity<String>, Serializable { public static final String PROPERTY_IPV6 = "ipv6"; public static final String PROPERTY_EP_ID = "epId"; public static final String PROPERTY_STATS = "stats"; + public static final String PROPERTY_PEERING = "peering"; private String id; @@ -205,6 +234,7 @@ public class Peer implements LocalEntity<String>, Serializable { private String currency; private Stats stats = new Stats(); + private Peering peering = new Peering(); private int port; private boolean useSsl; @@ -388,6 +418,14 @@ public class Peer implements LocalEntity<String>, Serializable { this.stats = stats; } + public Peering getPeering() { + return peering; + } + + public void setPeering(Peering peering) { + this.peering = peering; + } + public String toString() { StringJoiner joiner = new StringJoiner(" "); if (api != null) { @@ -422,6 +460,60 @@ public class Peer implements LocalEntity<String>, Serializable { ERROR } + public static class Peering { + public static final String PROPERTY_VERSION = "version"; + public static final String PROPERTY_SIGNATURE = "signature"; + public static final String PROPERTY_BLOCK_NUMBER = "blockNumber"; + public static final String PROPERTY_BLOCK_HASH = "blockHash"; + public static final String PROPERTY_RAW = "raw"; + + private String version; + private String signature; + private Integer blockNumber; + private String blockHash; + private String raw; + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } + + public String getSignature() { + return signature; + } + + public void setSignature(String signature) { + this.signature = signature; + } + + public Integer getBlockNumber() { + return blockNumber; + } + + public void setBlockNumber(Integer blockNumber) { + this.blockNumber = blockNumber; + } + + public String getBlockHash() { + return blockHash; + } + + public void setBlockHash(String blockHash) { + this.blockHash = blockHash; + } + + public String getRaw() { + return raw; + } + + public void setRaw(String raw) { + this.raw = raw; + } + } + public static class Stats { public static final String PROPERTY_SOFTWARE = "software"; public static final String PROPERTY_VERSION = "version"; diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java index 87613e29717b375d84ca663489ed5f25006c5d34..12242217c95d0ea92560eebc7492343f90642085 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/NetworkRemoteServiceImpl.java @@ -142,6 +142,7 @@ public class NetworkRemoteServiceImpl extends BaseRemoteServiceImpl implements N .setCurrency(remotePeer.getCurrency()) .setPubkey(remotePeer.getPubkey()) .setEndpoint(endpoint) + .setPeering(remotePeer) .build(); result.add(childPeer); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java index ac512200a200cb737589cd24673b8b267ce9dbda..bf8651dbde8836a3680bc06d33d67201814d6989 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkService.java @@ -77,7 +77,7 @@ public interface NetworkService extends Service { List<Peer> getPeers(Peer mainPeer, Filter filter, Sort sort, ExecutorService pool); - CompletableFuture<List<CompletableFuture<Peer>>> asyncGetPeers(Peer mainPeer, List<String> filterEndpoints, ExecutorService pool) throws ExecutionException, InterruptedException; + CompletableFuture<List<Peer>> getPeersAsync(Peer mainPeer, List<String> filterEndpoints, ExecutorService pool) throws ExecutionException, InterruptedException; List<Peer> fillPeerStatsConsensus(final List<Peer> peers); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java index 0cb61837a14ebfe0592e4b6362a104132eec2d31..0bcd58945c701790cdf8b37a8aaf71bfb96017d4 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/NetworkServiceImpl.java @@ -104,7 +104,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network Filter filterDef = new Filter(); filterDef.filterType = null; filterDef.filterStatus = Peer.PeerStatus.UP; - filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name()); + filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name(), EndpointApi.WS2P.name()); // Default sort Sort sortDef = new Sort(); @@ -122,17 +122,17 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network public List<Peer> getPeers(final Peer mainPeer, Filter filter, Sort sort, ExecutorService executor) { try { - return asyncGetPeers(mainPeer, (filter != null ? filter.filterEndpoints : null), executor) - .thenCompose(CompletableFutures::allOfToList) - .thenApply(this::fillPeerStatsConsensus) - .thenApply(peers -> peers.stream() + return getPeersAsync(mainPeer, (filter != null ? filter.filterEndpoints : null), executor) + //.thenComposeAsync(CompletableFutures::allOfToList) + .thenApplyAsync(this::fillPeerStatsConsensus) + .thenApplyAsync(peers -> peers.stream() // Filter on currency .filter(peer -> mainPeer.getCurrency() == null || ObjectUtils.equals(mainPeer.getCurrency(), peer.getCurrency())) // filter, then sort .filter(peerFilter(filter)) .sorted(peerComparator(sort)) .collect(Collectors.toList())) - .thenApply(this::logPeers) + .thenApplyAsync(this::logPeers) .get(); } catch (InterruptedException | ExecutionException e) { throw new TechnicalException("Error while loading peers: " + e.getMessage(), e); @@ -150,7 +150,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network } @Override - public CompletableFuture<List<CompletableFuture<Peer>>> asyncGetPeers(final Peer mainPeer, List<String> filterEndpoints, ExecutorService executor) throws ExecutionException, InterruptedException { + public CompletableFuture<List<Peer>> getPeersAsync(final Peer mainPeer, List<String> filterEndpoints, ExecutorService executor) throws ExecutionException, InterruptedException { Preconditions.checkNotNull(mainPeer); log.debug("Loading network peers..."); @@ -160,9 +160,11 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network return CompletableFuture.allOf( new CompletableFuture[] {peersFuture, memberUidsFuture}) - .thenApply(v -> { + .thenComposeAsync(v -> { final Map<String, String> memberUids = memberUidsFuture.join(); - return peersFuture.join().stream() + List<Peer> peers = peersFuture.join(); + + List<CompletableFuture<Peer>> list = peers.stream() .map(peer -> { // For if same as main peer, if (mainPeer.getUrl().equals(peer.getUrl())) { @@ -186,15 +188,16 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network return asyncRefreshPeer(peer, memberUids, pool); }) - .filter(Objects::nonNull) .collect(Collectors.toList()); + return CompletableFutures.allOfToList(list); }); } public CompletableFuture<Peer> asyncRefreshPeer(final Peer peer, final Map<String, String> memberUids, final ExecutorService pool) { - return CompletableFuture.supplyAsync(() -> fillVersion(peer), pool) - .thenApply(p -> fillCurrentBlock(p)) + System.out.println("Refreshing peer: " + peer.toString()); + return CompletableFuture.supplyAsync(() -> fillNodeSummary(peer), pool) + .thenApplyAsync(this::fillCurrentBlock) .exceptionally(throwable -> { peer.getStats().setStatus(Peer.PeerStatus.DOWN); if(!(throwable instanceof HttpConnectException)) { @@ -210,7 +213,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network else if (log.isTraceEnabled()) log.debug(String.format("[%s] is DOWN", peer)); return peer; }) - .thenApply(p -> { + .thenApplyAsync(p -> { String uid = StringUtils.isNotBlank(p.getPubkey()) ? memberUids.get(p.getPubkey()) : null; p.getStats().setUid(uid); if (p.getStats().isReacheable() && Peers.hasBmaEndpoint(p)) { @@ -315,6 +318,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network final Predicate<Peer> peerFilter = peerFilter(filter); final Comparator<Peer> peerComparator = peerComparator(sort); final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool(); + final int peerDownTimeoutMs = config.getPeerUpMaxAge(); // Refreshing one peer (e.g. received from WS) Consumer<List<Peer>> updateKnownBlocks = (updatedPeers) -> @@ -330,13 +334,27 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network try { if (lockManager.tryLock(PEERS_UPDATE_LOCK_NAME, 1, TimeUnit.MINUTES)) { try { + long now = System.currentTimeMillis(); List<Peer> result = getPeers(mainPeer, filter, sort, pool); + // Mark old peers as DOWN + long maxUpTimeInSec = Math.round((System.currentTimeMillis() - peerDownTimeoutMs) / 1000); + knownBlocks.clear(); updateKnownBlocks.accept(result); // Save update peers - peerService.save(currency, result, false/*not the full UP list*/); + peerService.save(currency, result); + + // Set old peers as DOWN (with a delay) + peerService.updatePeersAsDown(currency, maxUpTimeInSec, filter.filterEndpoints); + + long duration = System.currentTimeMillis() - now; + + // If took more than 2 min => warning + if (duration /1000/60 > 2) { + log.warn(String.format("Refreshing peers took %s seconds", Math.round(duration/1000))); + } // Send full list listener listener.onChanges(result); @@ -381,18 +399,18 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network // filter, to keep only existing peer, or expected by filter List<Peer> changedPeers = refreshedPeers.stream() .filter(refreshedPeer -> { - String peerId = refreshedPeer.toString(); - boolean exists = knownPeers.containsKey(peerId); - if (exists){ - knownPeers.remove(peerId); - } - // If include, add it to full list - boolean include = peerFilter.test(refreshedPeer); - if (include) { - knownPeers.put(peerId, refreshedPeer); - } - return include; - }).collect(Collectors.toList()); + String peerId = refreshedPeer.toString(); + boolean exists = knownPeers.containsKey(peerId); + if (exists){ + knownPeers.remove(peerId); + } + // If include, add it to full list + boolean include = peerFilter.test(refreshedPeer); + if (include) { + knownPeers.put(peerId, refreshedPeer); + } + return include; + }).collect(Collectors.toList()); // If something changes if (CollectionUtils.isNotEmpty(changedPeers)) { @@ -402,8 +420,8 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network updateKnownBlocks.accept(changedPeers); - // Save update peers - peerService.save(currency, changedPeers, false/*not the full UP list*/); + // Save updated peers + peerService.save(currency, changedPeers); listener.onChanges(result); } @@ -596,7 +614,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network return true; } - protected Peer fillVersion(final Peer peer) { + protected Peer fillNodeSummary(final Peer peer) { if (!Peers.hasBmaEndpoint(peer) && !Peers.hasEsCoreEndpoint(peer)) return peer; JsonNode summary = getNodeSummary(peer); peer.getStats().setVersion(getVersion(summary)); diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java index d0631a8ab24763f5c4abd3274f0fbe43f06c110a..2d5ba99f919d7a998572da65fbfe261c8ba09d3d 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerService.java @@ -25,6 +25,7 @@ package org.duniter.core.client.service.local; import org.duniter.core.beans.Service; import org.duniter.core.client.model.local.Peer; +import java.util.Collection; import java.util.List; /** @@ -55,7 +56,11 @@ public interface PeerService extends Service { */ List<Peer> getPeersByCurrencyId(String currencyId); - void save(String currencyId, List<Peer> peers, boolean isFullUpList); + void save(String currencyId, List<Peer> peers); + + void updatePeersAsDown(String currencyId, Collection<String> filterApis); + + void updatePeersAsDown(String currencyId, long maxUpTimeInSec, Collection<String> filterApis); boolean isExists(String currencyId, String peerId); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java index 1998aefebd217d7762f74533277ff792d8dc9a41..a39e51f241382daa6fa4e8a6189d3dbea5bcffac 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/local/PeerServiceImpl.java @@ -40,9 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; +import java.util.*; /** * Created by eis on 07/02/15. @@ -87,7 +85,6 @@ public class PeerServiceImpl implements PeerService, InitializingBean { } - public Peer save(final Peer peer) { Preconditions.checkNotNull(peer); Preconditions.checkNotNull(peer.getCurrency()); @@ -118,8 +115,7 @@ public class PeerServiceImpl implements PeerService, InitializingBean { peers = new ArrayList<>(); peersByCurrencyIdCache.put(peer.getCurrency(), peers); peers.add(peer); - } - else if (!peers.contains(peer)) { + } else if (!peers.contains(peer)) { peers.add(peer); } } @@ -127,8 +123,9 @@ public class PeerServiceImpl implements PeerService, InitializingBean { return result; } - /** + /** * Return a (cached) active peer, by currency id + * * @param currencyId * @return */ @@ -143,6 +140,7 @@ public class PeerServiceImpl implements PeerService, InitializingBean { /** * Return a (cached) peer list, by currency id + * * @param currencyId * @return */ @@ -157,6 +155,7 @@ public class PeerServiceImpl implements PeerService, InitializingBean { /** * Fill allOfToList cache need for currencies + * * @param accountId */ public void loadCache(long accountId) { @@ -185,36 +184,49 @@ public class PeerServiceImpl implements PeerService, InitializingBean { } @Override - public void save(String currencyId, List<Peer> peers, boolean isFullUpList) { + public void save(String currencyId, List<Peer> peers) { - int peerDownTimeoutMs = config.getPeerUpMaxAge(); - final long now = System.currentTimeMillis(); if (CollectionUtils.isNotEmpty(peers)) { if (log.isDebugEnabled()) { log.debug(String.format("[%s] Updating peers (%s endpoints found)", currencyId, peers.size())); } + final long upTime = System.currentTimeMillis() / 1000; + peers.forEach(peer -> { // On each UP peers: set last UP time if (peer.getStats() != null && peer.getStats().isReacheable()) { - peer.getStats().setLastUpTime(now / 1000); + peer.getStats().setLastUpTime(upTime); } // Save save(peer); }); } + } + + @Override + public boolean isExists(String currencyId, String peerId) { + return peerDao.isExists(currencyId, peerId); + } + + @Override + public void updatePeersAsDown(String currencyId, Collection<String> filterApis) { + int peerDownTimeoutMs = config.getPeerUpMaxAge(); // Mark old peers as DOWN - if (isFullUpList && peerDownTimeoutMs > 0) { - long maxUpTimeInMs = now - peerDownTimeoutMs; - peerDao.updatePeersAsDown(currencyId, maxUpTimeInMs / 1000); + if (peerDownTimeoutMs >0) { + long maxUpTimeInSec = Math.round((System.currentTimeMillis() - peerDownTimeoutMs) / 1000); + updatePeersAsDown(currencyId, maxUpTimeInSec, filterApis); } } @Override - public boolean isExists(String currencyId, String peerId) { - return peerDao.isExists(currencyId, peerId); + public void updatePeersAsDown(String currencyId, long maxUpTimeInSec, Collection<String> filterApis) { + if (log.isDebugEnabled()) { + log.debug(String.format("[%s] %s Setting peers as DOWN, if older than [%s]...", currencyId, filterApis, new Date(maxUpTimeInSec*1000))); + } + peerDao.updatePeersAsDown(currencyId, maxUpTimeInSec, filterApis); } protected Peer loadDefaultPeer(String currencyId) { @@ -228,4 +240,5 @@ public class PeerServiceImpl implements PeerService, InitializingBean { return peers.get(0); } + } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/util/http/HttpClients.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/util/http/HttpClients.java index 53b8a5e58cd16236836ed11c84184d1284e7a6a3..2f7dbde745feba96406360c27fb588381f21daa3 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/util/http/HttpClients.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/util/http/HttpClients.java @@ -7,6 +7,7 @@ import org.apache.http.client.HttpRequestRetryHandler; import org.apache.http.client.config.CookieSpecs; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.protocol.HttpClientContext; +import org.apache.http.config.ConnectionConfig; import org.apache.http.config.SocketConfig; import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.conn.HttpHostConnectException; diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/concurrent/CompletableFutures.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/concurrent/CompletableFutures.java index e7700d607ef5e394edf53bcf61d711f76e74df27..3e493e400f9a84331a865cf9644e957d45dc8463 100644 --- a/duniter4j-core-shared/src/main/java/org/duniter/core/util/concurrent/CompletableFutures.java +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/concurrent/CompletableFutures.java @@ -22,7 +22,10 @@ package org.duniter.core.util.concurrent; * #L% */ +import org.apache.commons.collections4.CollectionUtils; + import java.util.List; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -37,12 +40,13 @@ public class CompletableFutures { } public static <T> CompletableFuture<List<T>> allOfToList(List<CompletableFuture<T>> futures) { + CollectionUtils.filter(futures, Objects::nonNull); CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); return allDoneFuture.thenApply(v -> - futures.stream() + futures.parallelStream() .map(future -> future.join()) - .filter(peer -> peer != null) // skip + .filter(Objects::nonNull) // skip null peer .collect(Collectors.toList()) ); }