Skip to content
Snippets Groups Projects
Commit fe43ccdd authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[enh] Allow to close() when listening network peers

parent 4ebadce6
No related branches found
No related tags found
No related merge requests found
Pipeline #7552 canceled
...@@ -25,6 +25,7 @@ package org.duniter.core.client.service.local; ...@@ -25,6 +25,7 @@ package org.duniter.core.client.service.local;
import org.duniter.core.beans.Service; import org.duniter.core.beans.Service;
import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.model.local.Peer;
import java.io.Closeable;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
...@@ -86,9 +87,9 @@ public interface NetworkService extends Service { ...@@ -86,9 +87,9 @@ public interface NetworkService extends Service {
Comparator<Peer> peerComparator(Sort sort); Comparator<Peer> peerComparator(Sort sort);
void addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener); Closeable addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener);
void addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener, Closeable addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener,
final Filter filter, final Sort sort, final boolean autoreconnect, final Filter filter, final Sort sort, final boolean autoreconnect,
final ExecutorService executor); final ExecutorService executor);
......
...@@ -43,9 +43,11 @@ import org.duniter.core.util.*; ...@@ -43,9 +43,11 @@ import org.duniter.core.util.*;
import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.concurrent.CompletableFutures; import org.duniter.core.util.concurrent.CompletableFutures;
import org.duniter.core.util.http.InetAddressUtils; import org.duniter.core.util.http.InetAddressUtils;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
...@@ -365,7 +367,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -365,7 +367,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
return peers; return peers;
} }
public void addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener) { public Closeable addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener) {
BlockchainParameters parameters = blockchainRemoteService.getParameters(mainPeer); BlockchainParameters parameters = blockchainRemoteService.getParameters(mainPeer);
fillCurrentBlock(mainPeer); fillCurrentBlock(mainPeer);
...@@ -386,13 +388,13 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -386,13 +388,13 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
Sort sortDef = new Sort(); Sort sortDef = new Sort();
sortDef.sortType = null; sortDef.sortType = null;
addPeersChangeListener(mainPeer, listener, filterDef, sortDef, true, null); return addPeersChangeListener(mainPeer, listener, filterDef, sortDef, true, null);
} }
public void addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener, public Closeable addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener,
final Filter filter, final Sort sort, final boolean autoreconnect, final Filter filter, final Sort sort, final boolean autoreconnect,
final ExecutorService executor) { final ExecutorService executor) {
final String currency = filter != null && filter.currency != null ? filter.currency : final String currency = filter != null && filter.currency != null ? filter.currency :
blockchainRemoteService.getParameters(mainPeer).getCurrency(); blockchainRemoteService.getParameters(mainPeer).getCurrency();
...@@ -507,7 +509,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -507,7 +509,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
}; };
// Manage new block event // Manage new block event
blockchainRemoteService.addBlockListener(mainPeer, json -> { WebsocketClientEndpoint.MessageListener blockListener = json -> {
log.debug("Received new block event"); log.debug("Received new block event");
try { try {
BlockchainBlock block = readValue(json, BlockchainBlock.class); BlockchainBlock block = readValue(json, BlockchainBlock.class);
...@@ -522,10 +524,11 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -522,10 +524,11 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
} catch(IOException e) { } catch(IOException e) {
log.error("Could not parse peer received by WS: " + e.getMessage(), e); log.error("Could not parse peer received by WS: " + e.getMessage(), e);
} }
}, autoreconnect); };
WebsocketClientEndpoint wsBlockEndpoint = blockchainRemoteService.addBlockListener(mainPeer, blockListener, autoreconnect);
// Manage new peer event // Manage new peer event
networkRemoteService.addPeerListener(mainPeer, json -> { WebsocketClientEndpoint.MessageListener peerListsner = json -> {
log.debug("Received new peer event"); log.debug("Received new peer event");
try { try {
...@@ -536,11 +539,17 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network ...@@ -536,11 +539,17 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
} catch(IOException e) { } catch(IOException e) {
log.error("Could not parse peer received by WS: " + e.getMessage(), e); log.error("Could not parse peer received by WS: " + e.getMessage(), e);
} }
}, autoreconnect); };
WebsocketClientEndpoint wsPeerEndpoint = networkRemoteService.addPeerListener(mainPeer, peerListsner, autoreconnect);
// Default action: Load all peers // Default action: Load all peers
pool.submit(loadAllPeers); pool.submit(loadAllPeers);
// Return the tear down logic
return () -> {
wsBlockEndpoint.unregisterListener(blockListener);
wsPeerEndpoint.unregisterListener(peerListsner);
};
} }
public String getVersion(final Peer peer) { public String getVersion(final Peer peer) {
......
...@@ -148,6 +148,16 @@ public class WebsocketClientEndpoint implements Closeable { ...@@ -148,6 +148,16 @@ public class WebsocketClientEndpoint implements Closeable {
public void unregisterListener(MessageListener listener) { public void unregisterListener(MessageListener listener) {
synchronized (messageListeners) { synchronized (messageListeners) {
this.messageListeners.remove(listener); this.messageListeners.remove(listener);
// If no more listener, close the WS client
if (this.messageListeners.size() == 0) {
try {
close();
}
catch(IOException e) {
// Silent
}
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment