Skip to content
Snippets Groups Projects
Commit a9e32707 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[fix] Remote network service: Add request to BMA /network/ws2p/heads

[fix] Remote network service: rename async functions
parent 2b595812
Branches
Tags
No related merge requests found
Pipeline #4494 passed
Showing
with 715 additions and 66 deletions
......@@ -175,7 +175,7 @@ public class NetworkAction extends AbstractAction {
peer.getStats().getStatus().name(),
isUp ? formatApi(peer) : "",
isUp ? peer.getStats().getVersion() : "",
(isUp && peer.getStats().getHardshipLevel() != null) ? peer.getStats().getHardshipLevel() : I18n.t("duniter4j.client.network.mirror"),
(isUp && peer.getStats().getHardshipLevel() != null) ? peer.getStats().getHardshipLevel() : (peer.getStats().getUid() == null ? I18n.t("duniter4j.client.network.mirror") : ""),
isUp ? formatBuid(peer.getStats()) : ""
};
})
......
package org.duniter.core.client.model.bma;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import java.io.Serializable;
/**
* Created by blavenie on 22/01/19.
*/
public class NetworkWs2pHeads {
public NetworkWs2pHeads.Head[] heads;
public String toString() {
StringBuilder sb = new StringBuilder();
for(NetworkWs2pHeads.Head head : heads) {
sb.append(head.toString()).append("\n");
}
return sb.toString();
}
public static class Head implements Serializable {
public Ws2pHead message;
public String sig;
public String messageV2;
public String sigV2;
public Integer step;
public Ws2pHead getMessage() {
return message;
}
public void setMessage(Ws2pHead message) {
this.message = message;
}
public String getSig() {
return sig;
}
public void setSig(String sig) {
this.sig = sig;
}
public String getMessageV2() {
return messageV2;
}
public void setMessageV2(String messageV2) {
this.messageV2 = messageV2;
}
public String getSigV2() {
return sigV2;
}
public void setSigV2(String sigV2) {
this.sigV2 = sigV2;
}
public Integer getStep() {
return step;
}
public void setStep(Integer step) {
this.step = step;
}
@Override
public String toString() {
String s = "message=" + message + "\n" +
"sig=" + sig+ "\n" +
"step=" + step;
return s;
}
}
}
\ No newline at end of file
package org.duniter.core.client.model.bma;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Joiner;
import java.io.Serializable;
/**
* Created by blavenie on 22/01/19.
*/
public class Ws2pHead implements Serializable {
public class AccessConfig {
public boolean useTor;
private String mode;
public boolean isUseTor() {
return useTor;
}
public void setUseTor(boolean useTor) {
this.useTor = useTor;
}
public String getMode() {
return mode;
}
public void setMode(String mode) {
this.mode = mode;
}
}
public Integer version;
public String pubkey;
public String block;
public String ws2pid;
public String software;
public String softwareVersion;
public String powPrefix;
public String signature;
public AccessConfig privateConfig = new AccessConfig();
public AccessConfig publicConfig = new AccessConfig();
public Integer getVersion() {
return version;
}
public void setVersion(Integer version) {
this.version = version;
}
public String getPubkey() {
return pubkey;
}
public void setPubkey(String pubkey) {
this.pubkey = pubkey;
}
public String getBlock() {
return block;
}
public void setBlock(String block) {
this.block = block;
}
public String getWs2pid() {
return ws2pid;
}
public void setWs2pid(String ws2pid) {
this.ws2pid = ws2pid;
}
public String getSoftware() {
return software;
}
public void setSoftware(String software) {
this.software = software;
}
public String getSoftwareVersion() {
return softwareVersion;
}
public void setSoftwareVersion(String softwareVersion) {
this.softwareVersion = softwareVersion;
}
public String getPowPrefix() {
return powPrefix;
}
public void setPowPrefix(String powPrefix) {
this.powPrefix = powPrefix;
}
public AccessConfig getPrivateConfig() {
return privateConfig;
}
public void setPrivateConfig(AccessConfig privateConfig) {
this.privateConfig = privateConfig;
}
public AccessConfig getPublicConfig() {
return publicConfig;
}
public void setPublicConfig(AccessConfig publicConfig) {
this.publicConfig = publicConfig;
}
public String getSignature() {
return signature;
}
public void setSignature(String signature) {
this.signature = signature;
}
@Override
public String toString() {
return Joiner.on(':').skipNulls().join(new Object[]{
getPrefix(), "HEAD", version, pubkey, block, ws2pid, software, softwareVersion, powPrefix
});
}
@JsonIgnore
protected String getPrefix() {
StringBuilder sb = new StringBuilder();
sb.append("WS2P");
// Private access
if (getPrivateConfig() != null) {
sb.append("O");
if (getPrivateConfig().isUseTor()) {
sb.append("T");
} else {
sb.append("C");
}
if (getPrivateConfig().getMode() != null) {
switch (getPrivateConfig().getMode()) {
case "all":
sb.append("A");
break;
case "mixed":
sb.append("M");
break;
case "strict":
sb.append("S");
break;
}
}
}
// Public access
if (getPublicConfig() != null) {
sb.append("I");
if (getPublicConfig().isUseTor()) {
sb.append("T");
}
else {
sb.append("C");
}
}
return sb.toString();
}
}
\ No newline at end of file
package org.duniter.core.client.model.bma;
import org.duniter.core.client.model.bma.jackson.Ws2pHeadDeserializer;
import org.duniter.core.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class Ws2pHeads {
private static final Logger log = LoggerFactory.getLogger(Ws2pHeads.class);
public static final String WS2P_PREFIX = "^WS2P(?:O([CT][SAM]))?(?:I([CT]))?$";
public static final Pattern WS2P_PREFIX_PATTERN = Pattern.compile(WS2P_PREFIX);
private Ws2pHeads() {
// helper class
}
public static Ws2pHead parse(String message) throws IOException {
try {
String[] parts = message.split(":");
if (parts.length < 3 || !parts[0].startsWith("WS2P")) {
throw new IOException("Invalid WS2P message format: " + message);
}
// Head message
if ("HEAD".equals(parts[1])) {
if (parts.length < 4) {
throw new IllegalArgumentException("Invalid WS2P message format: " + message);
}
// Duniter version < 1.6.9
if (parts.length == 4) {
Ws2pHead result = new Ws2pHead();
result.setPubkey(parts[2]);
result.setBlock(parts[3]);
} else {
int version = Integer.parseInt(parts[2]);
if (version >= 1) {
Ws2pHead result = new Ws2pHead();
String prefix = parts[0];
// Private/public options
if (prefix.length() > 4) {
Matcher matches = WS2P_PREFIX_PATTERN.matcher(prefix);
if (!matches.matches()) {
throw new IllegalArgumentException("Invalid WS2P message format: " + message);
}
// Private options
String privateOptions = matches.group(1);
if (StringUtils.isNotBlank(privateOptions)) {
Ws2pHead.AccessConfig privateConfig = result.getPrivateConfig();
privateConfig.setUseTor(privateOptions.startsWith("T"));
String mode = privateOptions.substring(1);
switch (mode) {
case "A":
privateConfig.setMode("all");
break;
case "M":
privateConfig.setMode("mixed");
break;
case "S":
privateConfig.setMode("strict");
break;
}
}
// Public options
String publicOptions = matches.group(2);
if (StringUtils.isNotBlank(publicOptions)) {
Ws2pHead.AccessConfig publicConfig = result.getPrivateConfig();
publicConfig.setUseTor(publicOptions.startsWith("T"));
publicConfig.setMode("all");
}
// For DEBUG only:
log.debug(String.format("Parsing WS2P prefix {%s} into: private %s, public %s",
prefix,
((result.getPrivateConfig().isUseTor() ? "TOR " : "" ) + (result.getPrivateConfig().getMode())),
((result.getPublicConfig().isUseTor() ? "TOR " : "" ) + (result.getPublicConfig().getMode()))
));
}
result.setVersion(version);
result.setPubkey(parts[3]);
result.setBlock(parts[4]);
result.setWs2pid(parts[5]);
result.setSoftware(parts[6]);
result.setSoftwareVersion(parts[7]);
result.setPowPrefix(parts[8]);
return result;
}
}
}
return null;
}
catch(Exception e) {
throw new IOException(e.getMessage(), e);
}
}
}
......@@ -28,6 +28,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.NetworkPeering;
import org.duniter.core.client.model.bma.NetworkWs2pHeads;
import org.duniter.core.client.model.bma.Ws2pHead;
/**
* Created by blavenie on 07/12/16.
......@@ -62,6 +64,8 @@ public abstract class JacksonUtils extends SimpleModule {
// Network
module.addDeserializer(NetworkPeering.Endpoint.class, new EndpointDeserializer());
module.addSerializer(NetworkPeering.Endpoint.class, new EndpointSerializer());
module.addDeserializer(Ws2pHead.class, new Ws2pHeadDeserializer());
module.addSerializer(Ws2pHead.class, new Ws2pHeadSerializer());
objectMapper.registerModule(module);
......
package org.duniter.core.client.model.bma.jackson;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import org.duniter.core.client.model.bma.Endpoints;
import org.duniter.core.client.model.bma.NetworkPeering;
import org.duniter.core.client.model.bma.Ws2pHead;
import org.duniter.core.client.model.bma.Ws2pHeads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Created by blavenie on 07/12/16.
*/
public class Ws2pHeadDeserializer extends JsonDeserializer<Ws2pHead> {
private static final Logger log = LoggerFactory.getLogger(Ws2pHeadDeserializer.class);
private boolean debug;
public Ws2pHeadDeserializer() {
this.debug = log.isDebugEnabled();
}
@Override
public Ws2pHead deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException {
String ept = jp.getText();
try {
return Ws2pHeads.parse(ept);
} catch(IOException e) {
// Unable to parse endpoint: continue (will skip this endpoint)
if (debug) {
log.warn(e.getMessage(), e); // link the exception
}
else {
log.debug(e.getMessage());
}
return null;
}
}
}
\ No newline at end of file
package org.duniter.core.client.model.bma.jackson;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import org.duniter.core.client.model.bma.NetworkPeering;
import org.duniter.core.client.model.bma.NetworkWs2pHeads;
import org.duniter.core.client.model.bma.Ws2pHead;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
* Created by blavenie on 17/10/18.
*/
public class Ws2pHeadSerializer extends JsonSerializer<Ws2pHead> {
private static final Logger log = LoggerFactory.getLogger(Ws2pHeadSerializer.class);
private boolean debug;
public Ws2pHeadSerializer() {
this.debug = log.isDebugEnabled();
}
@Override
public void serialize(Ws2pHead head, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) {
try {
jsonGenerator.writeString(head.toString());
} catch(IOException e) {
// Unable to parse endpoint: continue (will skip this endpoint)
if (debug) {
log.warn(e.getMessage(), e); // link the exception
}
else {
log.debug(e.getMessage());
}
}
}
}
\ No newline at end of file
......@@ -104,7 +104,20 @@ public final class Peers {
result.setCurrency(firstEp.getCurrency());
result.setPubkey(firstEp.getPubkey());
result.setBlock(getBlockStamp(firstEp));
if (firstEp.getPeering() != null) {
result.setBlock(getPeeringBlockStamp(firstEp));
result.setSignature(firstEp.getPeering().getSignature());
result.setVersion(firstEp.getPeering().getVersion());
}
else {
result.setVersion(Protocol.VERSION);
result.setBlock(getStatsBlockStamp(firstEp));
result.setSignature(null);
}
// Default values (not stored yet)
// TODO check if still used by clients
result.setStatusTS(0L);
// Compute status (=UP is at least one endpoint is UP)
String status = endpoints.stream()
......@@ -114,10 +127,6 @@ public final class Peers {
.orElse(Peer.PeerStatus.DOWN).name();
result.setStatus(status);
// Default values (not stored yet)
result.setVersion(Protocol.VERSION); // TODO: get it from the storage (DB, ES, etc.) ?
result.setStatusTS(0L); // TODO make sure this is used by clients ?
// Compute endpoints list
List<NetworkPeering.Endpoint> bmaEps = endpoints.stream()
.map(Peers::toBmaEndpoint)
......@@ -152,7 +161,14 @@ public final class Peers {
}
public static String getBlockStamp(final Peer peer) {
public static String getPeeringBlockStamp(final Peer peer) {
return peer.getPeering() != null &&
peer.getPeering().getBlockNumber() != null &&
peer.getPeering().getBlockHash() != null
? (peer.getPeering().getBlockNumber() + "-" + peer.getPeering().getBlockHash()) : null;
}
public static String getStatsBlockStamp(final Peer peer) {
return peer.getStats() != null &&
peer.getStats().getBlockNumber() != null &&
peer.getStats().getBlockHash() != null
......
......@@ -23,9 +23,7 @@ package org.duniter.core.client.service.bma;
*/
import org.duniter.core.beans.Service;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.bma.NetworkPeering;
import org.duniter.core.client.model.bma.NetworkPeers;
import org.duniter.core.client.model.bma.*;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
......@@ -44,6 +42,8 @@ public interface NetworkRemoteService extends Service {
NetworkPeers.Peer getPeerLeaf(Peer peer, String leaf);
List<Ws2pHead> getWs2pHeads(Peer peer);
List<Peer> findPeers(Peer peer, String status, EndpointApi endpointApi, Integer currentBlockNumber, String currentBlockHash);
WebsocketClientEndpoint addPeerListener(String currencyId, WebsocketClientEndpoint.MessageListener listener, boolean autoReconnect);
......
......@@ -30,13 +30,12 @@ import java.util.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.message.BasicNameValuePair;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.bma.NetworkPeering;
import org.duniter.core.client.model.bma.NetworkPeers;
import org.duniter.core.client.model.bma.*;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.client.model.local.Wallet;
......@@ -68,6 +67,10 @@ public class NetworkRemoteServiceImpl extends BaseRemoteServiceImpl implements N
public static final String URL_WS_PEER = "/ws/peer";
public static final String URL_WS2P = URL_BASE + "/ws2p";
public static final String URL_WS2P_HEADS = URL_WS2P + "/heads";
public NetworkRemoteServiceImpl() {
super();
}
......@@ -154,6 +157,26 @@ public class NetworkRemoteServiceImpl extends BaseRemoteServiceImpl implements N
return result;
}
@Override
public List<Ws2pHead> getWs2pHeads(Peer peer) {
Preconditions.checkNotNull(peer);
NetworkWs2pHeads remoteResult = httpService.executeRequest(peer, URL_WS2P_HEADS, NetworkWs2pHeads.class);
List<Ws2pHead> result = Lists.newArrayList();
for (NetworkWs2pHeads.Head remoteWs2pHead: remoteResult.heads) {
Ws2pHead head = remoteWs2pHead.getMessage();
if (head != null) {
head.setSignature(remoteWs2pHead.getSig());
result.add(head);
}
}
return result;
}
@Override
public WebsocketClientEndpoint addPeerListener(String currencyId, WebsocketClientEndpoint.MessageListener listener, boolean autoReconnect) {
......
......@@ -91,7 +91,7 @@ public interface NetworkService extends Service {
final Filter filter, final Sort sort, final boolean autoreconnect,
final ExecutorService executor);
CompletableFuture<List<Peer>> asyncRefreshPeers(final Peer mainPeer, final List<Peer> peers, final ExecutorService pool);
CompletableFuture<List<Peer>> refreshPeersAsync(final Peer mainPeer, final List<Peer> peers, final ExecutorService pool);
String getVersion(final Peer peer);
}
......@@ -155,16 +155,10 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
log.debug("Loading network peers...");
final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool();
CompletableFuture<List<Peer>> peersFuture = CompletableFuture.supplyAsync(() -> loadPeerLeafs(mainPeer, filterEndpoints), pool);
CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool);
return CompletableFuture.allOf(
new CompletableFuture[] {peersFuture, memberUidsFuture})
.thenComposeAsync(v -> {
final Map<String, String> memberUids = memberUidsFuture.join();
List<Peer> peers = peersFuture.join();
List<CompletableFuture<Peer>> list = peers.stream()
return CompletableFuture.supplyAsync(() -> loadPeerLeafs(mainPeer, filterEndpoints), pool)
.thenApply(peers ->
peers.stream()
.map(peer -> {
// For if same as main peer,
if (mainPeer.getUrl().equals(peer.getUrl())) {
......@@ -178,20 +172,32 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
// Exclude peer with only a local IPv4 address (or localhost)
else if (InetAddressUtils.isLocalAddress(peer.getHost())) {
return CompletableFuture.<Peer>completedFuture(null);
return null;
}
return asyncRefreshPeer(peer, memberUids, pool);
return peer;
})
.collect(Collectors.toList());
return CompletableFutures.allOfToList(list);
});
.filter(Objects::nonNull)
.collect(Collectors.toList())
)
.thenCompose(peers -> this.refreshPeersAsync(mainPeer, peers, pool));
}
public CompletableFuture<Peer> asyncRefreshPeer(final Peer peer, final Map<String, String> memberUids, final ExecutorService pool) {
public CompletableFuture<Peer> refreshPeerAsync(final Peer peer,
final Map<String, String> memberUids,
final List<Ws2pHead> ws2pHeads,
final ExecutorService pool) {
if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing peer status", peer.toString()));
// WS2P: refresh using heads
if (Peers.hasWs2pEndpoint(peer)) {
return CompletableFuture.supplyAsync(() -> fillWs2pPeer(peer, memberUids, ws2pHeads), pool);
}
// BMA or ES_CORE
if (Peers.hasBmaEndpoint(peer) || Peers.hasEsCoreEndpoint(peer)) {
return CompletableFuture.supplyAsync(() -> fillNodeSummary(peer), pool)
.thenApplyAsync(this::fillCurrentBlock)
.exceptionally(throwable -> {
......@@ -227,16 +233,70 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
});
}
// Unknown API: just return the peer
return CompletableFuture.completedFuture(peer);
}
public CompletableFuture<List<Peer>> asyncRefreshPeers(final Peer mainPeer, final List<Peer> peers, final ExecutorService pool) {
return CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool)
// Refresh all endpoints
.thenApply(memberUids ->
peers.stream().map(peer ->
asyncRefreshPeer(peer, memberUids, pool))
.collect(Collectors.toList())
public Peer fillWs2pPeer(final Peer peer, final Map<String, String> memberUids, List<Ws2pHead> ws2pHeads) {
if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing WS2P peer status", peer.toString()));
if (StringUtils.isBlank(peer.getPubkey()) || StringUtils.isBlank(peer.getEpId())) return peer;
Ws2pHead ws2pHead = ws2pHeads.stream().filter(head ->
peer.getPubkey().equals(head.getPubkey())
&& peer.getEpId().equals(head.getWs2pid()
)
).findFirst().orElse(null);
if (ws2pHead != null) {
if (ws2pHead.getBlock() != null) {
String[] blockParts = ws2pHead.getBlock().split("-");
if (blockParts.length == 2) {
peer.getStats().setBlockNumber(Integer.parseInt(blockParts[0]));
peer.getStats().setBlockHash(blockParts[1]);
}
}
peer.getStats().setSoftware(ws2pHead.getSoftware());
peer.getStats().setVersion(ws2pHead.getSoftwareVersion());
}
else {
peer.getStats().setStatus(Peer.PeerStatus.DOWN);
}
// Set uid
String uid = memberUids.get(peer.getPubkey());
peer.getStats().setUid(uid);
if (uid != null) {
// Could not known hardship, so fill 0 if member (=can compute)
peer.getStats().setHardshipLevel(0);
}
else {
peer.getStats().setHardshipLevel(null);
}
return peer;
}
public CompletableFuture<List<Peer>> refreshPeersAsync(final Peer mainPeer,final List<Peer> peers, final ExecutorService pool) {
if (CollectionUtils.isEmpty(peers)) return CompletableFuture.completedFuture(null);
CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool);
CompletableFuture<List<Ws2pHead>> ws2pHeadsFuture = CompletableFuture.supplyAsync(() -> networkRemoteService.getWs2pHeads(mainPeer), pool);
return CompletableFuture.allOf(memberUidsFuture, ws2pHeadsFuture)
// Refresh all endpoints
.thenApply(v -> {
final Map<String, String> memberUids = memberUidsFuture.join();
final List<Ws2pHead> ws2pHeads = ws2pHeadsFuture.join();
return peers.stream().map(peer ->
refreshPeerAsync(peer, memberUids, ws2pHeads, pool))
.collect(Collectors.toList());
})
.thenCompose(CompletableFutures::allOfToList);
}
public List<Peer> fillPeerStatsConsensus(final List<Peer> peers) {
......@@ -245,7 +305,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
final Map<String,Long> peerCountByBuid = peers.stream()
.filter(peer -> Peers.isReacheable(peer) && Peers.hasDuniterEndpoint(peer))
.map(Peers::buid)
.filter(b -> b != null)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
// Compute main consensus buid
......@@ -253,7 +313,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
.sorted(Comparator.comparing(Map.Entry::getValue, Comparator.reverseOrder()))
.findFirst();
final String mainBuid = maxPeerCountEntry.isPresent() ? maxPeerCountEntry.get().getKey() : null;;
final String mainBuid = maxPeerCountEntry.isPresent() ? maxPeerCountEntry.get().getKey() : null;
// Compute total of UP peers
final Long peersUpTotal = peerCountByBuid.values().stream().mapToLong(Long::longValue).sum();
......@@ -372,17 +432,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
final List<Peer> newPeers = new ArrayList<>();
addEndpointsAsPeers(bmaPeer, newPeers, null, filter.filterEndpoints);
CompletableFuture<List<CompletableFuture<Peer>>> jobs =
CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool)
// Refresh all endpoints
.thenApply(memberUids ->
newPeers.stream().map(peer ->
asyncRefreshPeer(peer, memberUids, pool))
.collect(Collectors.toList())
);
jobs.thenCompose(CompletableFutures::allOfToList)
refreshPeersAsync(mainPeer, newPeers, executor)
.thenAccept(refreshedPeers -> {
if (CollectionUtils.isEmpty(refreshedPeers)) return;
......@@ -749,4 +799,6 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
});
}
}
}
......@@ -27,6 +27,7 @@ import org.duniter.core.client.TestResource;
import org.duniter.core.client.config.Configuration;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.bma.NetworkPeering;
import org.duniter.core.client.model.bma.Ws2pHead;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.client.service.ServiceLocator;
import org.junit.Assert;
......@@ -84,6 +85,20 @@ public class NetworkRemoteServiceTest {
Assert.assertTrue(result.size() > 0);
}
@Test
public void getWs2pHeads() throws Exception {
List<Ws2pHead> result = service.getWs2pHeads(peer);
Assert.assertNotNull(result);
Assert.assertTrue(result.size() > 0);
// log
//result.stream().forEach(head ->
// System.out.println(head.toString())
//);
}
/* -- internal methods */
protected Peer createTestPeer() {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment