Skip to content
Snippets Groups Projects
Commit c2871dea authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[fix] Network: Add difficulties on WS2P endpoints

parent b6864356
No related branches found
No related tags found
No related merge requests found
Pipeline #4525 passed
...@@ -22,7 +22,13 @@ package org.duniter.core.client.model.bma; ...@@ -22,7 +22,13 @@ package org.duniter.core.client.model.bma;
* #L% * #L%
*/ */
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.ArrayUtils;
import java.io.Serializable; import java.io.Serializable;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class BlockchainDifficulties implements Serializable { public class BlockchainDifficulties implements Serializable {
private static final long serialVersionUID = -5631089862715942431L; private static final long serialVersionUID = -5631089862715942431L;
...@@ -65,4 +71,14 @@ public class BlockchainDifficulties implements Serializable { ...@@ -65,4 +71,14 @@ public class BlockchainDifficulties implements Serializable {
this.level = level; this.level = level;
} }
} }
@JsonIgnore
public Map<String, Integer> toMapByUid() {
return toMapByUid(getLevels());
}
public static Map<String, Integer> toMapByUid(DifficultyLevel[] levels) {
if (ArrayUtils.isEmpty(levels)) return null;
return Stream.of(levels).collect(Collectors.toMap(d -> d.getUid(), d -> d.getLevel()));
}
} }
...@@ -50,6 +50,8 @@ import java.io.IOException; ...@@ -50,6 +50,8 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implements BlockchainRemoteService { public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implements BlockchainRemoteService {
...@@ -572,6 +574,7 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement ...@@ -572,6 +574,7 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement
return getDifficulties(peerService.getActivePeerByCurrencyId(currencyId)); return getDifficulties(peerService.getActivePeerByCurrencyId(currencyId));
} }
/* -- Internal methods -- */ /* -- Internal methods -- */
/** /**
......
...@@ -52,6 +52,7 @@ import java.util.function.Consumer; ...@@ -52,6 +52,7 @@ import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* Created by blavenie on 20/03/17. * Created by blavenie on 20/03/17.
...@@ -161,24 +162,20 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -161,24 +162,20 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
.thenApply(peers -> .thenApply(peers ->
peers.stream() peers.stream()
.map(peer -> { .map(peer -> {
// For if same as main peer, // Replace by main peer, if same URL
if (mainPeer.getUrl().equals(peer.getUrl())) { if (mainPeer.getUrl().equals(peer.getUrl())) {
// Update properties // Update properties
mainPeer.setPubkey(peer.getPubkey()); mainPeer.setPubkey(peer.getPubkey());
mainPeer.setHash(peer.getHash()); mainPeer.setHash(peer.getHash());
mainPeer.setCurrency(peer.getCurrency()); mainPeer.setCurrency(peer.getCurrency());
// reuse instance // reuse instance
peer = mainPeer; return mainPeer;
}
// Exclude peer with only a local IPv4 address (or localhost)
else if (InetAddressUtils.isLocalAddress(peer.getHost())) {
return null;
} }
return peer; return peer;
}) })
.filter(Objects::nonNull) // Exclude peer with only a local address
.filter(peer -> InetAddressUtils.isNotLocalAddress(peer.getHost()))
.collect(Collectors.toList()) .collect(Collectors.toList())
) )
.thenCompose(peers -> this.refreshPeersAsync(mainPeer, peers, pool)); .thenCompose(peers -> this.refreshPeersAsync(mainPeer, peers, pool));
...@@ -188,12 +185,13 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -188,12 +185,13 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
public CompletableFuture<Peer> refreshPeerAsync(final Peer peer, public CompletableFuture<Peer> refreshPeerAsync(final Peer peer,
final Map<String, String> memberUids, final Map<String, String> memberUids,
final List<Ws2pHead> ws2pHeads, final List<Ws2pHead> ws2pHeads,
final BlockchainDifficulties difficulties,
final ExecutorService pool) { final ExecutorService pool) {
if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing peer status", peer.toString())); if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing peer status", peer.toString()));
// WS2P: refresh using heads // WS2P: refresh using heads
if (Peers.hasWs2pEndpoint(peer)) { if (Peers.hasWs2pEndpoint(peer)) {
return CompletableFuture.supplyAsync(() -> fillWs2pPeer(peer, memberUids, ws2pHeads), pool); return CompletableFuture.supplyAsync(() -> fillWs2pPeer(peer, memberUids, ws2pHeads, difficulties), pool);
} }
// BMA or ES_CORE // BMA or ES_CORE
...@@ -238,7 +236,10 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -238,7 +236,10 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
return CompletableFuture.completedFuture(peer); return CompletableFuture.completedFuture(peer);
} }
public Peer fillWs2pPeer(final Peer peer, final Map<String, String> memberUids, List<Ws2pHead> ws2pHeads) { public Peer fillWs2pPeer(final Peer peer,
final Map<String, String> memberUids,
final List<Ws2pHead> ws2pHeads,
final BlockchainDifficulties difficulties) {
if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing WS2P peer status", peer.toString())); if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing WS2P peer status", peer.toString()));
if (StringUtils.isBlank(peer.getPubkey()) || StringUtils.isBlank(peer.getEpId())) return peer; if (StringUtils.isBlank(peer.getPubkey()) || StringUtils.isBlank(peer.getEpId())) return peer;
...@@ -249,31 +250,42 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -249,31 +250,42 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
) )
).findFirst().orElse(null); ).findFirst().orElse(null);
Peer.Stats stats = peer.getStats();
if (ws2pHead != null) { if (ws2pHead != null) {
if (ws2pHead.getBlock() != null) { if (ws2pHead.getBlock() != null) {
String[] blockParts = ws2pHead.getBlock().split("-"); String[] blockParts = ws2pHead.getBlock().split("-");
if (blockParts.length == 2) { if (blockParts.length == 2) {
peer.getStats().setBlockNumber(Integer.parseInt(blockParts[0])); stats.setBlockNumber(Integer.parseInt(blockParts[0]));
peer.getStats().setBlockHash(blockParts[1]); stats.setBlockHash(blockParts[1]);
} }
} }
peer.getStats().setSoftware(ws2pHead.getSoftware()); stats.setSoftware(ws2pHead.getSoftware());
peer.getStats().setVersion(ws2pHead.getSoftwareVersion()); stats.setVersion(ws2pHead.getSoftwareVersion());
} }
else { else {
peer.getStats().setStatus(Peer.PeerStatus.DOWN); stats.setStatus(Peer.PeerStatus.DOWN);
} }
// Set uid // Set uid
String uid = memberUids.get(peer.getPubkey()); String uid = memberUids.get(peer.getPubkey());
peer.getStats().setUid(uid); stats.setUid(uid);
if (uid != null) { if (uid != null) {
Integer difficulty = 0;
if (stats.getBlockNumber() == null || (stats.getBlockNumber().intValue()+1 == difficulties.getBlock().intValue())) {
difficulty = Stream.of(difficulties.getLevels())
.filter(d -> uid.equals(d.getUid()))
.map(d -> d.getLevel())
.filter(Objects::nonNull)
.findFirst()
// Could not known hardship, so fill 0 if member (=can compute) // Could not known hardship, so fill 0 if member (=can compute)
peer.getStats().setHardshipLevel(0); .orElse(new Integer(0));
}
stats.setHardshipLevel(difficulty);
} }
else { else {
peer.getStats().setHardshipLevel(null); stats.setHardshipLevel(null);
} }
return peer; return peer;
...@@ -285,15 +297,17 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -285,15 +297,17 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool); CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool);
CompletableFuture<List<Ws2pHead>> ws2pHeadsFuture = CompletableFuture.supplyAsync(() -> networkRemoteService.getWs2pHeads(mainPeer), pool); CompletableFuture<List<Ws2pHead>> ws2pHeadsFuture = CompletableFuture.supplyAsync(() -> networkRemoteService.getWs2pHeads(mainPeer), pool);
CompletableFuture<BlockchainDifficulties> difficultiesFuture = CompletableFuture.supplyAsync(() -> blockchainRemoteService.getDifficulties(mainPeer), pool);
return CompletableFuture.allOf(memberUidsFuture, ws2pHeadsFuture) return CompletableFuture.allOf(memberUidsFuture, ws2pHeadsFuture, difficultiesFuture)
// Refresh all endpoints // Refresh all endpoints
.thenApply(v -> { .thenApply(v -> {
final Map<String, String> memberUids = memberUidsFuture.join(); final Map<String, String> memberUids = memberUidsFuture.join();
final List<Ws2pHead> ws2pHeads = ws2pHeadsFuture.join(); final List<Ws2pHead> ws2pHeads = ws2pHeadsFuture.join();
final BlockchainDifficulties difficulties = difficultiesFuture.join();
return peers.stream().map(peer -> return peers.stream().map(peer ->
refreshPeerAsync(peer, memberUids, ws2pHeads, pool)) refreshPeerAsync(peer, memberUids, ws2pHeads, difficulties, pool))
.collect(Collectors.toList()); .collect(Collectors.toList());
}) })
.thenCompose(CompletableFutures::allOfToList); .thenCompose(CompletableFutures::allOfToList);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment