Skip to content
Snippets Groups Projects
Commit bfd5f17d authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[fix] Request only BMA or BMAS endpoints - fix #18

parent 19c744e0
No related branches found
No related tags found
No related merge requests found
...@@ -73,7 +73,7 @@ public interface NetworkService extends Service { ...@@ -73,7 +73,7 @@ public interface NetworkService extends Service {
List<Peer> getPeers(Peer mainPeer, Filter filter, Sort sort, ExecutorService pool); List<Peer> getPeers(Peer mainPeer, Filter filter, Sort sort, ExecutorService pool);
CompletableFuture<List<CompletableFuture<Peer>>> asyncGetPeers(Peer mainPeer, ExecutorService pool) throws ExecutionException, InterruptedException; CompletableFuture<List<CompletableFuture<Peer>>> asyncGetPeers(Peer mainPeer, List<String> filterEndpoints, ExecutorService pool) throws ExecutionException, InterruptedException;
List<Peer> fillPeerStatsConsensus(final List<Peer> peers); List<Peer> fillPeerStatsConsensus(final List<Peer> peers);
......
...@@ -134,7 +134,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -134,7 +134,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
public List<Peer> getPeers(final Peer mainPeer, Filter filter, Sort sort, ExecutorService executor) { public List<Peer> getPeers(final Peer mainPeer, Filter filter, Sort sort, ExecutorService executor) {
try { try {
return asyncGetPeers(mainPeer, executor) return asyncGetPeers(mainPeer, (filter != null ? filter.filterEndpoints : null), executor)
.thenCompose(CompletableFutures::allOfToList) .thenCompose(CompletableFutures::allOfToList)
.thenApply(this::fillPeerStatsConsensus) .thenApply(this::fillPeerStatsConsensus)
.thenApply(peers -> peers.stream() .thenApply(peers -> peers.stream()
...@@ -162,14 +162,14 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -162,14 +162,14 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
} }
@Override @Override
public CompletableFuture<List<CompletableFuture<Peer>>> asyncGetPeers(final Peer mainPeer, ExecutorService executor) throws ExecutionException, InterruptedException { public CompletableFuture<List<CompletableFuture<Peer>>> asyncGetPeers(final Peer mainPeer, List<String> filterEndpoints, ExecutorService executor) throws ExecutionException, InterruptedException {
Preconditions.checkNotNull(mainPeer); Preconditions.checkNotNull(mainPeer);
log.debug("Loading network peers..."); log.debug("Loading network peers...");
final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool(); final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool();
CompletableFuture<List<Peer>> peersFuture = CompletableFuture.supplyAsync(() -> loadPeerLeafs(mainPeer), pool); CompletableFuture<List<Peer>> peersFuture = CompletableFuture.supplyAsync(() -> loadPeerLeafs(mainPeer, filterEndpoints), pool);
CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool); CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool);
return CompletableFuture.allOf( return CompletableFuture.allOf(
...@@ -184,6 +184,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -184,6 +184,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
mainPeer.setCurrency(peer.getCurrency()); mainPeer.setCurrency(peer.getCurrency());
return asyncRefreshPeer(mainPeer, memberUids, pool); return asyncRefreshPeer(mainPeer, memberUids, pool);
} }
return asyncRefreshPeer(peer, memberUids, pool); return asyncRefreshPeer(peer, memberUids, pool);
}) })
.collect(Collectors.toList()); .collect(Collectors.toList());
...@@ -339,7 +340,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -339,7 +340,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
} }
try { try {
final List<Peer> newPeers = new ArrayList<>(); final List<Peer> newPeers = new ArrayList<>();
addEndpointsAsPeers(bmaPeer, newPeers, null); addEndpointsAsPeers(bmaPeer, newPeers, null, filter.filterEndpoints);
CompletableFuture<List<CompletableFuture<Peer>>> jobs = CompletableFuture<List<CompletableFuture<Peer>>> jobs =
CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool) CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool)
...@@ -423,7 +424,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -423,7 +424,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
/* -- protected methods -- */ /* -- protected methods -- */
protected List<Peer> loadPeerLeafs(Peer peer) { protected List<Peer> loadPeerLeafs(Peer peer, List<String> filterEndpoints) {
List<String> leaves = networkRemoteService.getPeersLeaves(peer); List<String> leaves = networkRemoteService.getPeersLeaves(peer);
if (CollectionUtils.isEmpty(leaves)) return new ArrayList<>(); // should never occur if (CollectionUtils.isEmpty(leaves)) return new ArrayList<>(); // should never occur
...@@ -444,7 +445,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -444,7 +445,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
int count = Constants.Config.MAX_SAME_REQUEST_COUNT; int count = Constants.Config.MAX_SAME_REQUEST_COUNT;
while (offset < leaves.size()) { while (offset < leaves.size()) {
if (offset + count > leaves.size()) count = leaves.size() - offset; if (offset + count > leaves.size()) count = leaves.size() - offset;
loadPeerLeafs(peer, result, leaves, offset, count); loadPeerLeafs(peer, result, leaves, offset, count, filterEndpoints);
offset += count; offset += count;
try { try {
Thread.sleep(1000); // wait 1 s Thread.sleep(1000); // wait 1 s
...@@ -458,13 +459,13 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -458,13 +459,13 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
return result; return result;
} }
protected void loadPeerLeafs(Peer requestedPeer, List<Peer> result, List<String> leaves, int offset, int count) { protected void loadPeerLeafs(Peer requestedPeer, List<Peer> result, List<String> leaves, int offset, int count, List<String> filterEndpoints) {
for (int i = offset; i< offset + count; i++) { for (int i = offset; i< offset + count; i++) {
String leaf = leaves.get(i); String leaf = leaves.get(i);
try { try {
NetworkPeers.Peer peer = networkRemoteService.getPeerLeaf(requestedPeer, leaf); NetworkPeers.Peer peer = networkRemoteService.getPeerLeaf(requestedPeer, leaf);
addEndpointsAsPeers(peer, result, leaf); addEndpointsAsPeers(peer, result, leaf, filterEndpoints);
} catch(HttpNotFoundException | TechnicalException e) { } catch(HttpNotFoundException | TechnicalException e) {
log.warn("Peer not found for leaf=" + leaf); log.warn("Peer not found for leaf=" + leaf);
...@@ -473,7 +474,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -473,7 +474,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
} }
} }
protected void addEndpointsAsPeers(NetworkPeers.Peer peer, List<Peer> result, String hash) { protected void addEndpointsAsPeers(NetworkPeers.Peer peer, List<Peer> result, String hash, List<String> filterEndpoints) {
if (CollectionUtils.isNotEmpty(peer.getEndpoints())) { if (CollectionUtils.isNotEmpty(peer.getEndpoints())) {
for (NetworkPeering.Endpoint ep: peer.getEndpoints()) { for (NetworkPeering.Endpoint ep: peer.getEndpoints()) {
if (ep != null && ep.getApi() != null) { if (ep != null && ep.getApi() != null) {
...@@ -483,7 +484,12 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -483,7 +484,12 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
.setPubkey(peer.getPubkey()) .setPubkey(peer.getPubkey())
.setEndpoint(ep) .setEndpoint(ep)
.build(); .build();
result.add(peerEp); // Filter on endpoints - fix #18
if (CollectionUtils.isEmpty(filterEndpoints)
|| StringUtils.isBlank(peerEp.getApi())
|| filterEndpoints.contains(peerEp.getApi())) {
result.add(peerEp);
}
} }
} }
} }
......
...@@ -129,7 +129,7 @@ public class PeerService extends AbstractService { ...@@ -129,7 +129,7 @@ public class PeerService extends AbstractService {
sortDef.sortType = null; sortDef.sortType = null;
try { try {
networkService.asyncGetPeers(firstPeer, threadPool.scheduler()) networkService.asyncGetPeers(firstPeer, filterDef.filterEndpoints, threadPool.scheduler())
.thenCompose(CompletableFutures::allOfToList) .thenCompose(CompletableFutures::allOfToList)
.thenApply(networkService::fillPeerStatsConsensus) .thenApply(networkService::fillPeerStatsConsensus)
.thenApply(peers -> peers.stream() .thenApply(peers -> peers.stream()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment