Mise à jour de GitLab prévue ce samedi 8 mai 2021 à partir de 9h00 CET | GitLab upgrade planned this Saturday May 4th of 2021 from 9:00 AM CET

Commit 125a38bb authored by Benoit Lavenier's avatar Benoit Lavenier

[fix] Serialize 'raw' on BMA Peer

[fix] Make sure to copy peer.peering, when fill mainPeer on NetworkService.getPeersAsync()
[fix] Add IEnpointApi interface, to be able to extends API (e.g. for gchange)
parent d0536e7e
Pipeline #9542 passed with stage
in 24 seconds
......@@ -195,10 +195,10 @@ public class TransactionAction extends AbstractAction {
peersFilter.filterType = NetworkService.FilterType.MEMBER;
peersFilter.filterStatus = Peer.PeerStatus.UP;
if (useSsl) {
peersFilter.filterEndpoints = ImmutableList.of(EndpointApi.BMAS.name());
peersFilter.filterEndpoints = ImmutableList.of(EndpointApi.BMAS.label());
}
else {
peersFilter.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name());
peersFilter.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.label());
}
// Sort by [lesser difficulty first]
NetworkService.Sort sortLesserDifficulty = new NetworkService.Sort();
......
......@@ -23,20 +23,53 @@ package org.duniter.core.client.model.bma;
*/
public enum EndpointApi {
BASIC_MERKLED_API,
BMAS,
BMATOR,
WS2P,
WS2PTOR,
ES_CORE_API,
ES_USER_API,
ES_SUBSCRIPTION_API,
MONIT_API,
UNDEFINED,
// TODO: remove this ?
GCHANGE_API;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Endpoint used by Duniter protocol, and Cesium-plus-pod API.<br/>
* Label can be override using static the method <code>EndpointApi.setLabel()</code>
*/
public enum EndpointApi implements IEndpointApi {
BASIC_MERKLED_API(),
BMAS(),
BMATOR(),
WS2P(),
WS2PTOR(),
ES_CORE_API(),
ES_USER_API(),
ES_SUBSCRIPTION_API(),
MONIT_API(),
UNDEFINED(),
// TODO: remove this
GCHANGE_API();
private static final Logger log = LoggerFactory.getLogger(EndpointApi.class);
private String label;
EndpointApi() {
this.label = this.name();
}
public String label() {
return this.label;
}
/**
* Allow to change the API label.
* Useful for reuse and API enumeration, with a new label (eg: ES_CORE_API => GCHANGE_API)
* @param api
* @param label
*/
public void setLabel(String label) {
if (!this.label.equals(label)) {
log.warn(String.format("Endpoint API '%s' label change to '%s'", this.name(), label));
this.label = label;
}
}
public boolean useHttpProtocol(String api) {
return !useWebSocketProtocol(api);
......
......@@ -65,7 +65,7 @@ public class Endpoints {
// BMA API
Matcher mather = bmaPattern.matcher(raw);
if (mather.matches()) {
endpoint.api = EndpointApi.BASIC_MERKLED_API.name();
endpoint.api = EndpointApi.BASIC_MERKLED_API.label();
parseDefaultFormatEndPoint(mather, endpoint, 1);
return Optional.of(endpoint);
}
......
package org.duniter.core.client.model.bma;
public interface IEndpointApi {
String label();
}
......@@ -24,6 +24,7 @@ package org.duniter.core.client.model.bma;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import java.io.Serializable;
......@@ -95,7 +96,7 @@ public class NetworkPeering implements Serializable {
}
/**
* Unsigned raw
* Unsigned peering document
* @return
*/
public String getRaw() {
......@@ -152,6 +153,14 @@ public class NetworkPeering implements Serializable {
return sb.toString();
}
public String toSignedRaw() {
Preconditions.checkNotNull(this.signature, "Invalid peer document. Missing 'signature'");
return new StringBuilder()
.append(toUnsignedRaw())
.append(signature).append("\n")
.toString();
}
public static class Endpoint implements Serializable {
public String api;
public String dns;
......
......@@ -87,7 +87,6 @@ public class NetworkPeers implements Serializable {
}
@Override
@JsonIgnore
public String getRaw() {
return super.getRaw();
}
......
......@@ -164,7 +164,7 @@ public class Peer implements LocalEntity<String>, Serializable {
String raw = remotePeering.getRaw();
if (StringUtils.isBlank(raw)) {
raw = remotePeering.toString();
raw = remotePeering.toUnsignedRaw();
}
this.peering.setRaw(raw);
......@@ -213,7 +213,7 @@ public class Peer implements LocalEntity<String>, Serializable {
public Peer build() {
int port = this.port != null ? this.port : 80;
String api = this.api != null ? this.api : EndpointApi.BASIC_MERKLED_API.name();
String api = this.api != null ? this.api : EndpointApi.BASIC_MERKLED_API.label();
boolean useSsl = this.useSsl != null ? this.useSsl :
(port == 443 || EndpointApi.BMAS.name().equals(this.api));
Peer ep = new Peer(api, dns, ipv4, ipv6, port, useSsl);
......@@ -289,7 +289,7 @@ public class Peer implements LocalEntity<String>, Serializable {
*/
@Deprecated
public Peer(String host, Integer port) {
this.api = EndpointApi.BASIC_MERKLED_API.name();
this.api = EndpointApi.BASIC_MERKLED_API.label();
if (InetAddressUtils.isIPv4Address(host)) {
this.ipv4 = host;
}
......@@ -544,6 +544,10 @@ public class Peer implements LocalEntity<String>, Serializable {
this.blockHash = blockHash;
}
/**
* The raw peering document (unsigned)
* @return
*/
public String getRaw() {
return raw;
}
......
......@@ -49,8 +49,12 @@ public final class Peers {
// helper class
}
public static boolean hasEndPointAPI(Peer peer, EndpointApi api) {
return peer.getApi() != null && peer.getApi().equalsIgnoreCase(api.name());
public static boolean hasEndPointAPI(Peer peer, IEndpointApi api) {
return hasEndPointAPI(peer, api.label());
}
public static boolean hasEndPointAPI(Peer peer, String api) {
return peer.getApi() != null && peer.getApi().equalsIgnoreCase(api);
}
public static String buid(Peer peer) {
......@@ -91,6 +95,9 @@ public final class Peers {
try {
// Fill BMA peer, using the raw document
NetworkPeerings.parse(endpointAsPeer.getPeering().getRaw(), result);
result.setSignature(endpointAsPeer.getPeering().getSignature());
result.setRaw(endpointAsPeer.getPeering().getRaw());
// Override the status, last_try and first_down, using stats
Peer.PeerStatus status = getStatus(endpointAsPeer).orElse(Peer.PeerStatus.DOWN);
result.setStatus(status.name());
......@@ -221,7 +228,11 @@ public final class Peers {
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peeringDocument);
Peer.Peering peering = (peer.getPeering() != null) ? peer.getPeering() : new Peer.Peering();
Peer.Peering peering = peer.getPeering();
if (peering == null) {
peering = new Peer.Peering();
peer.setPeering(peering);
}
// Copy some fields
peer.setPubkey(peeringDocument.getPubkey());
......@@ -229,6 +240,7 @@ public final class Peers {
peering.setVersion(peeringDocument.getVersion());
peering.setSignature(peeringDocument.getSignature());
peering.setRaw(peeringDocument.getRaw());
// Copy block infos
if (StringUtils.isNotBlank(peeringDocument.getBlock())) {
......
......@@ -145,13 +145,13 @@ public class NetworkRemoteServiceImpl extends BaseRemoteServiceImpl implements N
match = endpointApi == null || (endpoint != null && endpointApi.equals(endpoint.api));
if (match && endpoint != null) {
Peer childPeer = Peer.newBuilder()
Peer peerEp = Peer.newBuilder()
.setCurrency(remotePeer.getCurrency())
.setPubkey(remotePeer.getPubkey())
.setEndpoint(endpoint)
.setPeering(remotePeer)
.build();
result.add(childPeer);
result.add(peerEp);
}
}
......
......@@ -29,7 +29,6 @@ import java.io.Closeable;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
......@@ -42,7 +41,10 @@ public interface NetworkService extends Service {
interface PeersChangeListener {
void onChanges(Collection<Peer> peers);
}
interface RefreshPeerListener {
void onRefresh(Peer peer);
}
class Sort {
......
......@@ -68,7 +68,6 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
private final static String BMA_URL_STATUS = "/node/summary";
private final static String BMA_URL_BLOCKCHAIN_CURRENT = "/blockchain/current";
private final static String BMA_URL_BLOCKCHAIN_HARDSHIP = "/blockchain/hardship/";
private final static String ES_URL_BLOCKCHAIN_CURRENT = "/blockchain/current";
private NetworkRemoteService networkRemoteService;
private WotRemoteService wotRemoteService;
......@@ -77,6 +76,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
private final LockManager lockManager = new LockManager(4, 10);
private PeerService peerService;
private List<RefreshPeerListener> refreshPeerListeners = Lists.newArrayList();
public NetworkServiceImpl() {
}
......@@ -111,7 +111,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
Filter filterDef = new Filter();
filterDef.filterType = null;
filterDef.filterStatus = Peer.PeerStatus.UP;
filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name(), EndpointApi.WS2P.name());
filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.label(), EndpointApi.BMAS.label(), EndpointApi.WS2P.label());
filterDef.minBlockNumber = current.getNumber().intValue() - 100;
// Default sort
......@@ -139,7 +139,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
.filter(peerFilter(filter))
.sorted(peerComparator(sort))
.collect(Collectors.toList()))
//.thenApplyAsync(this::logPeers)
.thenApplyAsync(this::logPeers)
.get();
} catch (InterruptedException | ExecutionException e) {
throw new TechnicalException("Error while loading peers: " + e.getMessage(), e);
......@@ -164,15 +164,15 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool();
return CompletableFuture.supplyAsync(() -> loadPeerLeafs(mainPeer, filterEndpoints), pool)
.thenApply(peers ->
peers.stream()
.thenApply(peers -> peers.stream()
// Replace by main peer, if same URL
.map(peer -> {
// Replace by main peer, if same URL
if (mainPeer.getUrl().equals(peer.getUrl())) {
// Update properties
mainPeer.setPubkey(peer.getPubkey());
mainPeer.setHash(peer.getHash());
mainPeer.setCurrency(peer.getCurrency());
mainPeer.setPeering(peer.getPeering());
// reuse instance
return mainPeer;
}
......@@ -194,15 +194,16 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
final ExecutorService pool) {
if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing peer status", peer.toString()));
CompletableFuture<Peer> result;
// WS2P: refresh using heads
if (Peers.hasWs2pEndpoint(peer)) {
return CompletableFuture.supplyAsync(() -> fillWs2pPeer(peer, memberUids, ws2pHeads, difficulties), pool);
result = CompletableFuture.supplyAsync(() -> fillWs2pPeer(peer, memberUids, ws2pHeads, difficulties), pool);
}
// BMA or ES_CORE
if (Peers.hasBmaEndpoint(peer) || Peers.hasEsCoreEndpoint(peer)) {
else if (Peers.hasBmaEndpoint(peer) || Peers.hasEsCoreEndpoint(peer)) {
return CompletableFuture.allOf(
result = CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> fillNodeSummary(peer), pool),
CompletableFuture.supplyAsync(() -> fillCurrentBlock(peer), pool)
)
......@@ -213,9 +214,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
Throwable cause = throwable.getCause() != null ? throwable.getCause() : throwable;
peer.getStats().setError(cause.getMessage());
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) {
log.debug(String.format("[%s] is DOWN: %s", peer, cause.getMessage()), cause);
}
if (log.isTraceEnabled()) log.debug(String.format("[%s] is DOWN: %s", peer, cause.getMessage()), cause);
else log.debug(String.format("[%s] is DOWN: %s", peer, cause.getMessage()));
}
}
......@@ -241,7 +240,28 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
}
// Unknown API: just return the peer
return CompletableFuture.completedFuture(peer);
else {
result = CompletableFuture.completedFuture(peer);
}
// No listeners: return result
if (CollectionUtils.isEmpty(refreshPeerListeners)) {
return result;
}
// Executing listeners
return result.thenApplyAsync(p -> CompletableFuture.allOf(
refreshPeerListeners.stream()
.map(l -> CompletableFuture.runAsync(() -> l.onRefresh(peer), pool))
.toArray(CompletableFuture[]::new)
)
.exceptionally(e -> {
if (log.isDebugEnabled()) log.error(String.format("[%s] Refresh peer listeners error: %s", peer, e.getMessage()), e);
else log.error(String.format("[%s] Refresh peer listeners error: %s", peer, e.getMessage()));
return null;
}))
// Return the peer, as result
.thenApply(v -> peer);
}
public Peer fillWs2pPeer(final Peer peer,
......@@ -301,7 +321,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
public CompletableFuture<List<Peer>> refreshPeersAsync(final Peer mainPeer,final List<Peer> peers, final ExecutorService pool) {
if (CollectionUtils.isEmpty(peers)) return CompletableFuture.completedFuture(null);
if (CollectionUtils.isEmpty(peers)) return CompletableFuture.completedFuture(ImmutableList.of());
CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool);
CompletableFuture<List<Ws2pHead>> ws2pHeadsFuture = CompletableFuture.supplyAsync(() -> networkRemoteService.getWs2pHeads(mainPeer), pool);
......@@ -576,48 +596,56 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
return json.asText();
}
public NetworkService addRefreshPeerListener(RefreshPeerListener listener) {
refreshPeerListeners.add(listener);
return this;
}
public NetworkService removeRefreshPeerListener(RefreshPeerListener listener) {
refreshPeerListeners.remove(listener);
return this;
}
/* -- protected methods -- */
protected List<Peer> loadPeerLeafs(Peer peer, List<String> filterEndpoints) {
List<String> leaves = networkRemoteService.getPeersLeaves(peer);
if (CollectionUtils.isEmpty(leaves)) return Lists.newArrayList(); // should never occur
if (CollectionUtils.isEmpty(leaves)) return ImmutableList.of();
List<Peer> result = Lists.newArrayList();
CryptoService cryptoService = ServiceLocator.instance().getCryptoService();
// If less than 100 node, get it in ONE call
if (leaves.size() <= 2000) {
List<Peer> peers = networkRemoteService.getPeers(peer);
if (CollectionUtils.isNotEmpty(peers)) {
for (Peer peerEp : peers) {
// Filter on endpoints - fix #18
if (CollectionUtils.isEmpty(filterEndpoints)
|| StringUtils.isBlank(peerEp.getApi())
|| filterEndpoints.contains(peerEp.getApi())) {
String hash = cryptoService.hash(peerEp.computeKey()); // compute the hash
peerEp.setHash(hash);
result.add(peerEp);
}
}
}
if (CollectionUtils.isEmpty(peers)) return ImmutableList.of();
return peers.stream()
// Filter on endpoints - fix #18
.filter(peerEp -> CollectionUtils.isEmpty(filterEndpoints)
|| StringUtils.isBlank(peerEp.getApi())
|| filterEndpoints.contains(peerEp.getApi()))
// Compute the hash
.map(peerEp -> {
String hash = cryptoService.hash(peerEp.computeKey());
peerEp.setHash(hash);
return peerEp;
}).collect(Collectors.toList());
}
// Get it by multiple call on /network/peering?leaf=
else {
int offset = 0;
int count = Constants.Config.MAX_SAME_REQUEST_COUNT;
while (offset < leaves.size()) {
if (offset + count > leaves.size()) count = leaves.size() - offset;
loadPeerLeafs(peer, result, leaves, offset, count, filterEndpoints);
offset += count;
try {
Thread.sleep(1000); // wait 1 s
} catch (InterruptedException e) {
// stop
offset = leaves.size();
}
List<Peer> result = Lists.newArrayList();
int offset = 0;
int count = Constants.Config.MAX_SAME_REQUEST_COUNT;
while (offset < leaves.size()) {
if (offset + count > leaves.size()) count = leaves.size() - offset;
loadPeerLeafs(peer, result, leaves, offset, count, filterEndpoints);
offset += count;
try {
Thread.sleep(1000); // wait 1 s
} catch (InterruptedException e) {
// stop
offset = leaves.size();
}
}
......@@ -665,6 +693,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
protected boolean applyPeerFilter(Peer peer, Filter filter) {
if (filter == null) return true;
Peer.Stats stats = peer.getStats();
......
......@@ -11,7 +11,7 @@ public class EndpointsTest {
// Parse valid endpoints
NetworkPeering.Endpoint ep = Endpoints.parse("BASIC_MERKLED_API g1.duniter.fr 81.81.81.81 80").orElse(null);
Assert.assertNotNull(ep);
Assert.assertEquals(EndpointApi.BASIC_MERKLED_API.name(), ep.api);
Assert.assertEquals(EndpointApi.BASIC_MERKLED_API.label(), ep.api);
Assert.assertEquals("g1.duniter.fr", ep.dns);
Assert.assertEquals("81.81.81.81", ep.ipv4);
Assert.assertNotNull(ep.port);
......
......@@ -21,7 +21,7 @@ public class NetworkPeeringTest {
peering.setStatus("UP");
NetworkPeering.Endpoint epBma = new NetworkPeering.Endpoint();
epBma.setApi(EndpointApi.BASIC_MERKLED_API.name());
epBma.setApi(EndpointApi.BASIC_MERKLED_API.label());
epBma.setDns("g1.duniter.fr");
epBma.setPort(80);
......
......@@ -70,7 +70,7 @@ public class NetworkRemoteServiceTest {
@Test
public void findPeers() throws Exception {
List<Peer> result = service.findPeers(peer, null, EndpointApi.BASIC_MERKLED_API.name(), null, null);
List<Peer> result = service.findPeers(peer, null, EndpointApi.BASIC_MERKLED_API.label(), null, null);
Assert.assertNotNull(result);
Assert.assertTrue(result.size() > 0);
......
......@@ -23,14 +23,12 @@ package org.duniter.core.client.service.local;
*/
import com.google.common.collect.ImmutableList;
import org.duniter.core.client.TestResource;
import org.duniter.core.client.config.Configuration;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.client.service.ServiceLocator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -63,6 +61,36 @@ public class NetworkServiceTest {
peers.forEach(p -> log.debug(" Found peer: " + p.toString()));
}
@Test
@Ignore
public void getGchangePeers() throws Exception {
Peer gchangePeer = Peer.newBuilder()
//.setHost("data.gchange.fr")
.setHost("gchange.data.presles.fr")
.setPort(443)
.setApi("GCHANGE_API")
.build();
NetworkService.Filter filterDef = new NetworkService.Filter();
filterDef.filterStatus = Peer.PeerStatus.UP;
filterDef.filterEndpoints = ImmutableList.of("GCHANGE_API");
List<Peer> peers = service.getPeers(gchangePeer, filterDef, null);
Assert.assertNotNull(peers);
Assert.assertTrue(peers.size() > 0);
peers.forEach(p -> log.debug(" Found peer: " + p.toString()));
for (Peer peer: peers) {
Assert.assertNotNull(peer.getPeering());
Assert.assertNotNull(peer.getPeering().getRaw());
}
}
/* -- internal methods */
protected Peer createTestPeer() {
......