Commit 62d4f269 authored by Benoit Lavenier's avatar Benoit Lavenier

[enh] WS changes: make sure a source filter is given after 20s, otherwise close the session

[enh] WS changes: start to listen changes only after defined a source filter
parent 6f47f97e
Pipeline #7719 failed
......@@ -72,8 +72,8 @@ public class RestNodeStatsGetAction extends BaseRestHandler {
// Listeners by source
if (request.paramAsBoolean("listeners", true)) {
mapping.startArray("listeners");
Map<String, Integer> sourcesListener = changeService.getUsageStatistics();
for (Map.Entry<String, Integer> entry : sourcesListener.entrySet()) {
Map<String, Long> sourcesListener = changeService.getUsageStatistics();
for (Map.Entry<String, Long> entry : sourcesListener.entrySet()) {
mapping.startObject()
.field("source", entry.getKey())
.field("count", entry.getValue())
......
......@@ -54,6 +54,8 @@ import org.joda.time.DateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
public class ChangeService {
......@@ -70,7 +72,7 @@ public class ChangeService {
private static final Map<String, ChangeListener> LISTENERS = new ConcurrentHashMap<>();
private static Map<String, ChangeSource> LISTENERS_SOURCES = new ConcurrentHashMap<>();
private static Map<String, Integer> LISTENERS_SOURCES_USAGE_COUNT = new ConcurrentHashMap<>();
private static Map<String, LongAdder> LISTENERS_SOURCES_USAGE_COUNT = new ConcurrentHashMap<>();
@Inject
public ChangeService(final Settings settings, IndicesService indicesService) {
......@@ -161,7 +163,9 @@ public class ChangeService {
private boolean apply(ChangeListener listener, ChangeEvent change) {
Collection<ChangeSource> sources = listener.getChangeSources();
if (CollectionUtils.isEmpty(sources)) return true;
// Exclude when no source defined
if (CollectionUtils.isEmpty(sources)) return false;
for (ChangeSource source : sources) {
if (source.apply(change.getIndex(), change.getType(), change.getId())) {
......@@ -174,16 +178,14 @@ public class ChangeService {
private void emitChange(final ChangeEvent change) {
LISTENERS.values().parallelStream()
.filter(listener -> apply(listener, change))
.forEach(listener -> {
try {
if (apply(listener, change)) {
listener.onChange(change);
}
} catch (Exception e) {
log.error("Failed to emit change event on listener: " + listener.getClass().getName(), e);
}
});
.filter(listener -> apply(listener, change))
.forEach(listener -> {
try {
listener.onChange(change);
} catch (Exception e) {
log.error("Failed to emit change event on listener: " + listener.getClass().getName(), e);
}
});
}
});
}
......@@ -208,11 +210,10 @@ public class ChangeService {
String sourceKey = source.toString();
if (!LISTENERS_SOURCES.containsKey(sourceKey)) {
LISTENERS_SOURCES.put(sourceKey, source);
LISTENERS_SOURCES_USAGE_COUNT.put(sourceKey, 1);
}
else {
LISTENERS_SOURCES_USAGE_COUNT.put(sourceKey, LISTENERS_SOURCES_USAGE_COUNT.get(sourceKey)+1);
}
LISTENERS_SOURCES_USAGE_COUNT
.computeIfAbsent(sourceKey, k -> new LongAdder())
.increment();
}
}
return listener;
......@@ -236,9 +237,10 @@ public class ChangeService {
for (ChangeSource source: listener.getChangeSources()) {
String sourceKey = source.toString();
if (LISTENERS_SOURCES.containsKey(sourceKey)) {
int usageCount = LISTENERS_SOURCES_USAGE_COUNT.get(sourceKey) - 1;
LongAdder usageCounter = LISTENERS_SOURCES_USAGE_COUNT.get(sourceKey);;
long usageCount = usageCounter != null ? usageCounter.longValue() : 0;
if (usageCount > 0) {
LISTENERS_SOURCES_USAGE_COUNT.put(sourceKey, usageCount);
usageCounter.decrement();
}
else {
LISTENERS_SOURCES.remove(sourceKey);
......@@ -249,8 +251,13 @@ public class ChangeService {
}
}
public Map<String, Integer> getUsageStatistics() {
return ImmutableMap.copyOf(LISTENERS_SOURCES_USAGE_COUNT);
public Map<String, Long> getUsageStatistics() {
return LISTENERS_SOURCES_USAGE_COUNT
.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().longValue()
));
}
}
......@@ -146,7 +146,7 @@ public class NettyWebSocketBlockHandler extends NettyBaseWebSocketEndpoint imple
@Override
public String getId() {
return session == null ? null : session.getId();
return "duniter.ws.block." + (session == null ? null : session.getId());
}
@Override
......
......@@ -48,6 +48,7 @@ import org.duniter.elasticsearch.service.changes.ChangeEvent;
import org.duniter.elasticsearch.service.changes.ChangeEvents;
import org.duniter.elasticsearch.service.changes.ChangeService;
import org.duniter.elasticsearch.service.changes.ChangeSource;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
......@@ -55,10 +56,12 @@ import org.elasticsearch.common.logging.Loggers;
import javax.websocket.CloseReason;
import javax.websocket.OnError;
import javax.websocket.OnOpen;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint implements ChangeService.ChangeListener{
......@@ -66,14 +69,18 @@ public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint imp
public static Collection<ChangeSource> DEFAULT_SOURCES = null;
private static ESLogger logger;
private static ThreadPool threadPool;
private NettyWebSocketSession session;
private Map<String, ChangeSource> sources;
private String sessionId;
public static class Init {
@Inject
public Init(NettyWebSocketServer webSocketServer, PluginSettings pluginSettings) {
public Init(NettyWebSocketServer webSocketServer, PluginSettings pluginSettings,
ThreadPool threadPoolInstance) {
logger = Loggers.getLogger("duniter.ws.changes", pluginSettings.getSettings(), new String[0]);
threadPool = threadPoolInstance;
// Init default sources
final String[] sourcesStr = pluginSettings.getWebSocketChangesListenSource();
......@@ -91,10 +98,17 @@ public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint imp
@OnOpen
public void onOpen(NettyWebSocketSession session){
logger.debug("Connected ... " + session.getId());
this.session = session;
this.sources = null;
ChangeService.registerListener(this);
if (logger.isDebugEnabled())
logger.debug(String.format("Opening websocket session id {%s}. Waiting for sources...", session.getId()));
synchronized (this) {
this.session = session;
this.sessionId = "duniter.ws.changes#" + session.getId();
this.sources = null;
}
// Wait 10s that sources
threadPool.schedule(() -> checkHasSourceOrClose(), 30, TimeUnit.SECONDS);
}
@Override
......@@ -104,7 +118,7 @@ public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint imp
@Override
public String getId() {
return session == null ? null : session.getId();
return sessionId;
}
@Override
......@@ -120,35 +134,59 @@ public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint imp
@Override
public void onClose(CloseReason reason) {
logger.debug("Closing websocket: "+reason);
ChangeService.unregisterListener(this);
if (logger.isDebugEnabled()) logger.debug(String.format("Closing websocket session, id {%s}: %s", sessionId, reason));
synchronized (this) {
ChangeService.unregisterListener(this);
}
this.session = null;
}
@OnError
public void onError(Throwable t) {
logger.error("Error on websocket "+(session == null ? null : session.getId()), t);
logger.error(String.format("Error on websocket session, id {%s}", sessionId), t);
}
/* -- internal methods -- */
private void checkHasSourceOrClose() {
synchronized (this) {
if (session != null && MapUtils.isEmpty(sources)) {
try {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Missing changes sources to listen (must be given before 10s)"));
}
catch (IOException e) {
logger.error(String.format("Failed to close Web socket session, id {%s}", sessionId), e);
ChangeService.unregisterListener(this); // Make sure to unregistrer anyway
}
}
}
}
private void addSourceFilter(String filter) {
ChangeSource source = new ChangeSource(filter);
if (source.isEmpty()) {
logger.debug("Rejecting changes filter (seems to be empty): " + filter);
if (logger.isDebugEnabled()) logger.debug("Rejecting changes filter (seems to be empty): " + filter);
return;
}
String sourceKey = source.toString();
if (sources == null || !sources.containsKey(sourceKey)) {
logger.debug("Adding changes filter: " + filter);
if (sources == null) {
sources = Maps.newHashMap();
synchronized (this) {
if (sources == null || !sources.containsKey(sourceKey)) {
if (logger.isDebugEnabled())
logger.debug(String.format("Adding changes {%s}, id {%s}", filter, sessionId));
if (sources == null) {
sources = Maps.newHashMap();
sources.put(sourceKey, source);
ChangeService.registerListener(this);
}
else {
// Replace the sourceKey, then refresh the listener registration
sources.put(sourceKey, source);
ChangeService.refreshListener(this);
}
}
sources.put(sourceKey, source);
ChangeService.refreshListener(this);
}
}
}
......@@ -129,7 +129,7 @@ public class NettyWebSocketPeerHandler extends NettyBaseWebSocketEndpoint implem
@Override
public String getId() {
return session == null ? null : session.getId();
return "duniter.ws.peer." + (session == null ? null : session.getId());
}
@Override
......
......@@ -127,7 +127,7 @@ public class WebSocketBlockEndPoint implements ChangeService.ChangeListener{
@Override
public String getId() {
return session == null ? null : session.getId();
return "duniter.ws.block." + (session == null ? null : session.getId());
}
@Override
......
......@@ -46,48 +46,66 @@ import org.duniter.elasticsearch.service.changes.ChangeEvent;
import org.duniter.elasticsearch.service.changes.ChangeEvents;
import org.duniter.elasticsearch.service.changes.ChangeService;
import org.duniter.elasticsearch.service.changes.ChangeSource;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ServerEndpoint(value = "/_changes")
public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{
public static Collection<ChangeSource> DEFAULT_SOURCES = null;
private static ESLogger logger;
private static ThreadPool threadPool;
private Session session;
private Map<String, ChangeSource> sources;
private String sessionId;
public static class Init {
@Inject
public Init(TyrusWebSocketServer webSocketServer, PluginSettings pluginSettings) {
public Init(TyrusWebSocketServer webSocketServer, PluginSettings pluginSettings,
ThreadPool threadPoolInstance) {
logger = Loggers.getLogger("duniter.ws.changes", pluginSettings.getSettings(), new String[0]);
webSocketServer.addEndPoint(WebSocketChangesEndPoint.class);
threadPool = threadPoolInstance;
// Init default sources
final String[] sourcesStr = pluginSettings.getWebSocketChangesListenSource();
List<ChangeSource> sources = new ArrayList<>();
for(String sourceStr : sourcesStr) {
sources.add(new ChangeSource(sourceStr));
}
DEFAULT_SOURCES = sources;
// Register endpoint
webSocketServer.addEndPoint(WebSocketChangesEndPoint.class);
}
}
private Session session;
private Map<String, ChangeSource> sources;
@OnOpen
public void onOpen(Session session) {
logger.debug("Connected ... " + session.getId());
this.session = session;
this.sources = null;
ChangeService.registerListener(this);
if (logger.isDebugEnabled())
logger.debug(String.format("Opening websocket session id {%s}. Waiting for sources...", session.getId()));
synchronized (this) {
this.session = session;
this.sessionId = "duniter.ws.changes#" + session.getId();
this.sources = null;
}
// Wait 10s that sources
threadPool.schedule(() -> checkHasSourceOrClose(), 30, TimeUnit.SECONDS);
}
@Override
......@@ -97,7 +115,7 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{
@Override
public String getId() {
return session == null ? null : session.getId();
return sessionId;
}
@Override
......@@ -113,35 +131,59 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{
@OnClose
public void onClose(CloseReason reason) {
logger.debug("Closing websocket: "+reason);
ChangeService.unregisterListener(this);
if (logger.isDebugEnabled()) logger.debug(String.format("Closing websocket session, id {%s}: %s", sessionId, reason));
synchronized (this) {
ChangeService.unregisterListener(this);
}
this.session = null;
}
@OnError
public void onError(Throwable t) {
logger.error("Error on websocket "+(session == null ? null : session.getId()), t);
logger.error(String.format("Error on websocket session, id {%s}", sessionId), t);
}
/* -- internal methods -- */
private void checkHasSourceOrClose() {
synchronized (this) {
if (session != null && MapUtils.isEmpty(sources)) {
try {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Missing changes sources to listen (must be given before 10s)"));
}
catch (IOException e) {
logger.error(String.format("Failed to close Web socket session, id {%s}", sessionId), e);
ChangeService.unregisterListener(this); // Make sure to unregistrer anyway
}
}
}
}
private void addSourceFilter(String filter) {
ChangeSource source = new ChangeSource(filter);
if (source.isEmpty()) {
logger.debug("Rejecting changes filter (seems to be empty): " + filter);
if (logger.isDebugEnabled()) logger.debug("Rejecting changes filter (seems to be empty): " + filter);
return;
}
String sourceKey = source.toString();
if (sources == null || !sources.containsKey(sourceKey)) {
logger.debug("Adding changes filter: " + filter);
if (sources == null) {
sources = Maps.newHashMap();
synchronized (this) {
if (sources == null || !sources.containsKey(sourceKey)) {
if (logger.isDebugEnabled())
logger.debug(String.format("Adding changes {%s}, id {%s}", filter, sessionId));
if (sources == null) {
sources = Maps.newHashMap();
sources.put(sourceKey, source);
ChangeService.registerListener(this);
}
else {
// Replace the sourceKey, then refresh the listener registration
sources.put(sourceKey, source);
ChangeService.refreshListener(this);
}
}
sources.put(sourceKey, source);
ChangeService.refreshListener(this);
}
}
}
......@@ -83,15 +83,11 @@ public class UserEventService extends AbstractService implements ChangeService.C
private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource(INDEX, EVENT_TYPE));
public static void registerListener(UserEventListener listener) {
synchronized (LISTENERS) {
LISTENERS.put(listener.getId(), listener);
}
LISTENERS.put(listener.getId(), listener);
}
public static synchronized void unregisterListener(UserEventListener listener) {
synchronized (LISTENERS) {
LISTENERS.remove(listener.getId());
}
LISTENERS.remove(listener.getId());
}
private final ThreadPool threadPool;
......@@ -510,16 +506,13 @@ public class UserEventService extends AbstractService implements ChangeService.C
event.setId(eventId);
if (LISTENERS.size() > 0) {
if (LISTENERS.size() > 0 && event.getRecipient() != null) {
// Notify listeners
threadPool.schedule(() -> {
synchronized (LISTENERS) {
LISTENERS.values().forEach(listener -> {
if (event.getRecipient().equals(listener.getPubkey())) {
listener.onEvent(event);
}
});
}
LISTENERS.values()
.parallelStream()
.filter(listener -> event.getRecipient().equals(listener.getPubkey()))
.forEach(listener -> listener.onEvent(event));
});
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment