...
 
Commits (2)
......@@ -373,11 +373,8 @@ public class SynchroService extends AbstractService {
// Prepare a map of actions by index/type
final ArrayListMultimap<String, SynchroAction> actionsBySource = ArrayListMultimap.create(actions.size(), 2);
actions.stream()
.forEach(a -> {
if (a.getChangeSource() != null) {
actionsBySource.put(a.getChangeSource().toString(), a);
}
});
.filter(a -> a.getChangeSource() != null)
.forEach(a -> actionsBySource.put(a.getChangeSource().toString(), a));
// Get (or create) the websocket endpoint
WebsocketClientEndpoint wsClientEndPoint = httpService.getWebsocketClientEndpoint(peer, WS_CHANGES_URL, false);
......
......@@ -134,7 +134,12 @@ public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint imp
@Override
public void onClose(CloseReason reason) {
if (logger.isDebugEnabled()) logger.debug(String.format("Closing websocket session, id {%s}: %s", sessionId, reason));
if (logger.isDebugEnabled()) {
if (reason != null && reason.getCloseCode() != CloseReason.CloseCodes.GOING_AWAY)
logger.debug(String.format("Closing websocket session, id {%s} - reason {%s}: %s", sessionId, reason.getCloseCode(), reason.getReasonPhrase()));
else
logger.debug(String.format("Closing websocket session, id {%s}"));
}
synchronized (this) {
ChangeService.unregisterListener(this);
}
......@@ -152,12 +157,13 @@ public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint imp
private void checkHasSourceOrClose() {
synchronized (this) {
if (session != null && MapUtils.isEmpty(sources)) {
CloseReason reason = new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "Missing source filter (must be send < 20s after connection)");
try {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Missing changes sources to listen (must be given before 10s)"));
session.close(reason);
}
catch (IOException e) {
logger.error(String.format("Failed to close Web socket session, id {%s}", sessionId), e);
ChangeService.unregisterListener(this); // Make sure to unregistrer anyway
onClose(reason);
}
}
}
......
......@@ -131,7 +131,12 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{
@OnClose
public void onClose(CloseReason reason) {
if (logger.isDebugEnabled()) logger.debug(String.format("Closing websocket session, id {%s}: %s", sessionId, reason));
if (logger.isDebugEnabled()) {
if (reason != null && reason.getCloseCode() != CloseReason.CloseCodes.GOING_AWAY)
logger.debug(String.format("Closing websocket session, id {%s} - reason {%s}: %s", sessionId, reason.getCloseCode(), reason.getReasonPhrase()));
else
logger.debug(String.format("Closing websocket session, id {%s}"));
}
synchronized (this) {
ChangeService.unregisterListener(this);
}
......@@ -149,12 +154,13 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{
private void checkHasSourceOrClose() {
synchronized (this) {
if (session != null && MapUtils.isEmpty(sources)) {
CloseReason reason = new CloseReason(CloseReason.CloseCodes.PROTOCOL_ERROR, "Missing source filter (must be send < 20s after connection)");
try {
session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Missing changes sources to listen (must be given before 10s)"));
session.close(reason);
}
catch (IOException e) {
logger.error(String.format("Failed to close Web socket session, id {%s}", sessionId), e);
ChangeService.unregisterListener(this); // Make sure to unregistrer anyway
onClose(reason);
}
}
}
......