Skip to content
Snippets Groups Projects
Commit 200715fa authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

- Notify admin when Duniter node is down

parent 3d777802
No related branches found
No related tags found
No related merge requests found
Showing
with 439 additions and 146 deletions
......@@ -22,13 +22,12 @@ package org.duniter.core.client.model;
* #L%
*/
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import com.google.common.base.Preconditions;
import org.duniter.core.client.model.local.Certification;
import org.duniter.core.client.model.local.Movement;
import org.duniter.core.util.CollectionUtils;
/**
* Helper class on model entities
......@@ -145,4 +144,17 @@ public class ModelUtils {
}
return pubkey.substring(0, 8);
}
public static String joinPubkeys(Collection<String> pubkeys, boolean minify, String separator) {
Preconditions.checkArgument(CollectionUtils.isNotEmpty(pubkeys));
Preconditions.checkNotNull(separator);
StringBuilder sb = new StringBuilder();
for (String pubkey : pubkeys) {
sb.append(separator)
.append(minify ? ModelUtils.minifyPubkey(pubkey) : pubkey);
}
return sb.toString().substring(separator.length());
}
}
......@@ -222,9 +222,8 @@ public interface BlockchainRemoteService extends Service {
*/
Map<Integer, Long> getUDs(long currencyId, long startOffset);
void addNewBlockListener(long currencyId, WebsocketClientEndpoint.MessageHandler messageHandler);
void addNewBlockListener(Peer peer, WebsocketClientEndpoint.MessageHandler messageHandler);
WebsocketClientEndpoint addBlockListener(long currencyId, WebsocketClientEndpoint.MessageListener listener);
WebsocketClientEndpoint addBlockListener(Peer peer, WebsocketClientEndpoint.MessageListener listener);
}
\ No newline at end of file
......@@ -22,6 +22,7 @@ package org.duniter.core.client.service.bma;
* #L%
*/
import com.google.common.base.Preconditions;
import org.duniter.core.client.config.Configuration;
import org.duniter.core.client.model.bma.*;
import org.duniter.core.client.model.bma.gson.JsonArrayParser;
......@@ -560,35 +561,23 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement
}
@Override
public void addNewBlockListener(long currencyId, WebsocketClientEndpoint.MessageHandler messageHandler) {
public WebsocketClientEndpoint addBlockListener(long currencyId, WebsocketClientEndpoint.MessageListener listener) {
Peer peer = peerService.getActivePeerByCurrencyId(currencyId);
addNewBlockListener(peer, messageHandler);
return addBlockListener(peer, listener);
}
@Override
public void addNewBlockListener(Peer peer, WebsocketClientEndpoint.MessageHandler messageHandler) {
public WebsocketClientEndpoint addBlockListener(Peer peer, WebsocketClientEndpoint.MessageListener listener) {
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(listener);
try {
URI wsBlockURI = new URI(String.format("ws://%s:%s/ws/block",
peer.getHost(),
peer.getPort()));
// Get the websocket endpoint
WebsocketClientEndpoint wsClientEndPoint = getWebsocketClientEndpoint(peer, "/ws/block");
log.info(String.format("Starting to listen block from [%s]...", wsBlockURI.toString()));
// Get the websocket, or open new one if not exists
WebsocketClientEndpoint wsClientEndPoint = blockWsEndPoints.get(wsBlockURI);
if (wsClientEndPoint == null || wsClientEndPoint.isClosed()) {
wsClientEndPoint = new WebsocketClientEndpoint(wsBlockURI, true/*autoReconnect*/);
blockWsEndPoints.put(wsBlockURI, wsClientEndPoint);
}
// add listener
wsClientEndPoint.addMessageHandler(messageHandler);
} catch (URISyntaxException | ServiceConfigurationError ex) {
throw new TechnicalException("could not create URI need for web socket on block: " + ex.getMessage());
}
// add listener
wsClientEndPoint.registerListener(listener);
return wsClientEndPoint;
}
/* -- Internal methods -- */
......@@ -804,4 +793,27 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement
return Long.parseLong(dividendStr);
}
public WebsocketClientEndpoint getWebsocketClientEndpoint(Peer peer, String path) {
try {
URI wsBlockURI = new URI(String.format("ws://%s:%s%s",
peer.getHost(),
peer.getPort(),
path));
// Get the websocket, or open new one if not exists
WebsocketClientEndpoint wsClientEndPoint = blockWsEndPoints.get(wsBlockURI);
if (wsClientEndPoint == null || wsClientEndPoint.isClosed()) {
log.info(String.format("Starting to listen block from [%s]...", wsBlockURI.toString()));
wsClientEndPoint = new WebsocketClientEndpoint(wsBlockURI);
blockWsEndPoints.put(wsBlockURI, wsClientEndPoint);
}
return wsClientEndPoint;
} catch (URISyntaxException | ServiceConfigurationError ex) {
throw new TechnicalException("could not create URI need for web socket on block: " + ex.getMessage());
}
}
}
......@@ -36,7 +36,6 @@ import org.duniter.core.client.model.local.Wallet;
import org.duniter.core.client.service.ServiceLocator;
import org.duniter.core.client.service.exception.HttpBadRequestException;
import org.duniter.core.util.crypto.CryptoUtils;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -134,7 +133,7 @@ public class BlockchainRemoteServiceTest {
isWebSocketNewBlockReceived = false;
service.addNewBlockListener(createTestPeer(), (message) -> {
service.addBlockListener(createTestPeer(), (message) -> {
BlockchainBlock block = GsonUtils.newBuilder().create().fromJson(message, BlockchainBlock.class);
log.debug("Received block #" + block.getNumber());
isWebSocketNewBlockReceived = true;
......
......@@ -23,6 +23,7 @@ package org.duniter.core.util.websocket;
*/
import com.google.common.collect.Lists;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -44,9 +45,11 @@ public class WebsocketClientEndpoint implements Closeable {
private static final Logger log = LoggerFactory.getLogger(WebsocketClientEndpoint.class);
private Session userSession = null;
private List<MessageHandler> messageHandlers = Lists.newArrayList();
private List<MessageListener> messageListeners = Lists.newArrayList();
private List<ConnectionListener> connectionListeners = Lists.newArrayList();
private final URI endpointURI;
private final boolean autoReconnect;
private long lastTimeUp = -1;
public WebsocketClientEndpoint(URI endpointURI) {
this(endpointURI, true);
......@@ -55,7 +58,7 @@ public class WebsocketClientEndpoint implements Closeable {
public WebsocketClientEndpoint(URI endpointURI, boolean autoReconnect) {
this.endpointURI = endpointURI;
this.autoReconnect = autoReconnect;
connect(true);
connect();
}
......@@ -95,8 +98,8 @@ public class WebsocketClientEndpoint implements Closeable {
this.userSession = null;
// abnormal close : try to reconnect
if (reason.getCloseCode() == CloseReason.CloseCodes.CLOSED_ABNORMALLY) {
connect(false);
if (reason.getCloseCode() == CloseReason.CloseCodes.CLOSED_ABNORMALLY && autoReconnect) {
connect();
}
}
......@@ -106,32 +109,41 @@ public class WebsocketClientEndpoint implements Closeable {
* @param message The text message
*/
@OnMessage
public void onMessage(String message) {
synchronized (messageHandlers) {
if (CollectionUtils.isNotEmpty(messageHandlers)) {
if (log.isDebugEnabled()) {
log.debug("[%s] Received message: " + message);
}
public void onMessage(final String message) {
if (CollectionUtils.isNotEmpty(messageListeners)) {
if (log.isDebugEnabled()) {
log.debug("[%s] Received message: " + message);
}
for (MessageHandler messageHandler : messageHandlers) {
try {
messageHandler.handleMessage(message);
} catch (Exception e) {
log.error(String.format("[%s] Error during message handling: %s", endpointURI, e.getMessage()), e);
}
messageListeners.stream().forEach(messageListener -> {
try {
messageListener.onMessage(message);
} catch (Exception e) {
log.error(String.format("[%s] Error during message handling: %s", endpointURI, e.getMessage()), e);
}
}
});
}
}
/**
* register message listener
*
* @param listener
*/
public void registerListener(MessageListener listener) {
synchronized (messageListeners) {
this.messageListeners.add(listener);
}
}
/**
* register message handler
* register connection listener
*
* @param msgHandler
* @param listener
*/
public void addMessageHandler(MessageHandler msgHandler) {
synchronized (messageHandlers) {
this.messageHandlers.add(msgHandler);
public void registerListener(ConnectionListener listener) {
synchronized (connectionListeners) {
this.connectionListeners.add(listener);
}
}
......@@ -153,29 +165,40 @@ public class WebsocketClientEndpoint implements Closeable {
}
/**
* Message handler.
*
* @author Jiji_Sasidharan
* Message listener.
*/
public interface MessageListener {
void onMessage(String message);
}
/**
* Connection listener.
*/
public static interface MessageHandler {
public interface ConnectionListener {
public void handleMessage(String message);
void onSuccess();
void onError(Exception e, long lastTimeUp);
}
/* -- Internal method */
private void connect(boolean throwErrorIfFailed) {
private void connect() {
while(isClosed()) {
try {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
container.connectToServer(this, endpointURI);
return; // stop
lastTimeUp = System.currentTimeMillis() / 1000;
notifyConnectionSuccess();
return; // stop while
} catch (Exception e) {
if (throwErrorIfFailed) throw new RuntimeException(e);
notifyConnectionError(e);
if (!this.autoReconnect) throw new TechnicalException(e);
log.warn(String.format("[%s] Unable to connect. Retrying in 10s...", endpointURI.toString()));
}
// wait 20s, then try again
// wait 10s, then try again
try {
Thread.sleep(10 * 1000);
}
......@@ -184,4 +207,28 @@ public class WebsocketClientEndpoint implements Closeable {
}
}
}
private void notifyConnectionSuccess() {
synchronized (connectionListeners) {
connectionListeners.stream().forEach(connectionListener -> {
try {
connectionListener.onSuccess();
} catch (Exception e) {
log.error(String.format("[%s] Error during ConnectionListener.onSuccess(): %s", endpointURI, e.getMessage()), e);
}
});
}
}
private void notifyConnectionError(final Exception error) {
synchronized (connectionListeners) {
connectionListeners.stream().forEach(connectionListener -> {
try {
connectionListener.onError(error, lastTimeUp);
} catch (Exception e) {
log.error(String.format("[%s] Error during ConnectionListener.onError(): %s", endpointURI, e.getMessage()), e);
}
});
}
}
}
\ No newline at end of file
......@@ -121,12 +121,16 @@ duniter.string.analyzer: french
#
# Enabling node blockchain synchronization
#
duniter.blockchain.sync.enable: false
duniter.blockchain.sync.enable: true
#
# Duniter node to synchronize
#
duniter.host: cgeek.fr
duniter.port: 9330
#duniter.host: cgeek.fr
#duniter.port: 9330
#duniter.host: test-net.duniter.fr
#duniter.port: 9201
duniter.host: 192.168.0.28
duniter.port: 21378
#
# ---------------------------------- Duniter4j security -------------------------
#
......@@ -157,7 +161,7 @@ duniter.data.sync.enable: false
#
# SMTP server configuration (host and port)
#
duniter.mail.enable: false
#duniter.mail.enable: false
#duniter.mail.smtp.host: localhost
#duniter.mail.smtp.port: 25
#
......
......@@ -21,6 +21,7 @@ logger:
org.duniter.elasticsearch: DEBUG
duniter : DEBUG
duniter.user.event : INFO
duniter.network.p2p: TRACE
security: DEBUG
......
......@@ -145,6 +145,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
}
public String getClusterName() {
return settings.get("cluster.name", "?");
}
public String getNodeBmaHost() {
return settings.get("duniter.host", "cgeek.fr");
}
......
......@@ -46,7 +46,10 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
......@@ -298,6 +301,20 @@ public abstract class AbstractService implements Bean {
return result.get(fieldName);
}
/**
* Retrieve a document by id (safe mode)
* @param docId
* @return
*/
public <T extends Object> T getSourceByIdOrNull(String index, String type, String docId, Class<T> classOfT, String... fieldNames) {
try {
return getSourceById(index, type, docId, classOfT, fieldNames);
}
catch(TechnicalException e) {
return null; // not found
}
}
/**
* Retrieve a document by id
* @param docId
......@@ -308,10 +325,9 @@ public abstract class AbstractService implements Bean {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(index)
.setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
.setSearchType(SearchType.QUERY_AND_FETCH);
searchRequest.setQuery(QueryBuilders.matchQuery("_id", docId));
searchRequest.setQuery(QueryBuilders.idsQuery(type).ids(docId));
if (CollectionUtils.isNotEmpty(fieldNames)) {
searchRequest.setFetchSource(fieldNames, null);
}
......@@ -323,8 +339,11 @@ public abstract class AbstractService implements Bean {
try {
SearchResponse response = searchRequest.execute().actionGet();
if (response.getHits().getTotalHits() == 0) return null;
// Read query result
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
if (searchHit.source() != null) {
return objectMapper.readValue(searchHit.source(), classOfT);
......@@ -494,6 +513,22 @@ public abstract class AbstractService implements Bean {
}
}
protected void flushDeleteBulk(final String index, final String type, BulkRequestBuilder bulkRequest) {
if (bulkRequest.numberOfActions() > 0) {
BulkResponse bulkResponse = bulkRequest.get();
// If failures, continue but save missing blocks
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
for (BulkItemResponse itemResponse : bulkResponse) {
boolean skip = !itemResponse.isFailed();
if (!skip) {
logger.debug(String.format("[%s/%s] Error while deleting doc [%s]: %s. Skipping this deletion.", index, type, itemResponse.getId(), itemResponse.getFailureMessage()));
}
}
}
}
}
public interface StringReaderHandler {
String onReadLine(String line);
......
......@@ -44,6 +44,7 @@ import org.duniter.core.model.ProgressionModelImpl;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.ObjectUtils;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.exception.DuplicateIndexIdException;
import org.duniter.elasticsearch.threadpool.ThreadPool;
......@@ -89,11 +90,13 @@ public class BlockchainService extends AbstractService {
private BlockchainRemoteService blockchainRemoteService;
private CurrencyService currencyService;
private ThreadPool threadPool;
private List<WebsocketClientEndpoint.ConnectionListener> connectionListeners = new ArrayList<>();
private final WebsocketClientEndpoint.ConnectionListener dispatchConnectionListener;
private JsonAttributeParser blockNumberParser = new JsonAttributeParser("number");
private JsonAttributeParser blockCurrencyParser = new JsonAttributeParser("currency");
private JsonAttributeParser blockHashParser = new JsonAttributeParser("hash");
private JsonAttributeParser blockPreviousHashParser = new JsonAttributeParser("previousHash");
private final JsonAttributeParser blockNumberParser = new JsonAttributeParser("number");
private final JsonAttributeParser blockCurrencyParser = new JsonAttributeParser("currency");
private final JsonAttributeParser blockHashParser = new JsonAttributeParser("hash");
private final JsonAttributeParser blockPreviousHashParser = new JsonAttributeParser("previousHash");
private Gson gson;
......@@ -106,6 +109,20 @@ public class BlockchainService extends AbstractService {
threadPool.scheduleOnStarted(() -> {
blockchainRemoteService = serviceLocator.getBlockchainRemoteService();
});
dispatchConnectionListener = new WebsocketClientEndpoint.ConnectionListener() {
@Override
public void onSuccess() {
synchronized (connectionListeners) {
connectionListeners.stream().forEach(connectionListener -> connectionListener.onSuccess());
}
}
@Override
public void onError(Exception e, long lastTimeUp) {
synchronized (connectionListeners) {
connectionListeners.stream().forEach(connectionListener -> connectionListener.onError(e, lastTimeUp));
}
}
};
}
@Inject
......@@ -113,10 +130,16 @@ public class BlockchainService extends AbstractService {
this.currencyService = currencyService;
}
public BlockchainService listenAndIndexNewBlock(Peer peer){
blockchainRemoteService.addNewBlockListener(peer, message -> {
indexLastBlockFromJson(peer, message);
});
public void registerConnectionListener(WebsocketClientEndpoint.ConnectionListener listener) {
synchronized (connectionListeners) {
connectionListeners.add(listener);
}
}
public BlockchainService listenAndIndexNewBlock(final Peer peer){
WebsocketClientEndpoint wsEndPoint = blockchainRemoteService.addBlockListener(peer, message -> indexLastBlockFromJson(peer, message));
wsEndPoint.registerListener(dispatchConnectionListener);
return this;
}
......@@ -1036,42 +1059,27 @@ public class BlockchainService extends AbstractService {
* @param currencyName
* @param fromNumber
*/
protected void deleteBlocksFromNumber(String currencyName, int fromNumber, int toNumber) {
protected void deleteBlocksFromNumber(final String currencyName, final int fromNumber, final int toNumber) {
int bulkSize = pluginSettings.getIndexBulkSize();
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i=fromNumber; i<=toNumber; i++) {
for (int number=fromNumber; number<=toNumber; number++) {
bulkRequest.add(
client.prepareDelete(currencyName, BLOCK_TYPE, String.valueOf(i))
client.prepareDelete(currencyName, BLOCK_TYPE, String.valueOf(number))
);
// Flush the bulk if not empty
if ((fromNumber - i % bulkSize) == 0) {
flushDeleteBulk(bulkRequest);
if ((fromNumber - number % bulkSize) == 0) {
flushDeleteBulk(currencyName, BLOCK_TYPE, bulkRequest);
bulkRequest = client.prepareBulk();
}
}
// last flush
flushDeleteBulk(bulkRequest);
flushDeleteBulk(currencyName, BLOCK_TYPE, bulkRequest);
}
protected void flushDeleteBulk(BulkRequestBuilder bulkRequest) {
if (bulkRequest.numberOfActions() > 0) {
BulkResponse bulkResponse = bulkRequest.get();
// If failures, continue but save missing blocks
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
for (BulkItemResponse itemResponse : bulkResponse) {
boolean skip = !itemResponse.isFailed();
if (!skip) {
int itemNumber = Integer.parseInt(itemResponse.getId());
logger.debug(String.format("Error while deleting block #%s: %s. Skipping this deletion.", itemNumber, itemResponse.getFailureMessage()));
}
}
}
}
}
}
......@@ -54,6 +54,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
......@@ -366,10 +367,10 @@ public class CurrencyService extends AbstractService {
SearchRequestBuilder searchRequest = client
.prepareSearch(INDEX)
.setTypes(CURRENCY_TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
.setSearchType(SearchType.QUERY_AND_FETCH);
// If more than a word, search on terms match
searchRequest.setQuery(QueryBuilders.matchQuery("_id", currencyId));
searchRequest.setQuery(new IdsQueryBuilder().addIds(currencyId));
// Execute query
try {
......
......@@ -59,7 +59,7 @@ public class UserEvent extends Record {
public static final String PROPERTY_CODE="code";
public static final String PROPERTY_MESSAGE="message";
public static final String PROPERTY_PARAMS="params";
public static final String PROPERTY_LINK="reference";
public static final String PROPERTY_REFERENCE="reference";
public static final String PROPERTY_RECIPIENT="recipient";
......@@ -233,6 +233,12 @@ public class UserEvent extends Record {
public static class Reference {
public static final String PROPERTY_INDEX="index";
public static final String PROPERTY_TYPE="type";
public static final String PROPERTY_ID="id";
public static final String PROPERTY_ANCHOR="anchor";
public static final String PROPERTY_HASH="hash";
private String index;
private String type;
......
......@@ -28,7 +28,8 @@ package org.duniter.elasticsearch.user.model;
public enum UserEventCodes {
NODE_STARTED,
CREATE_DOC,
NODE_BMA_UP,
NODE_BMA_DOWN,
// Membership state
MEMBER_JOIN,
......
......@@ -24,14 +24,15 @@ package org.duniter.elasticsearch.user.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.duniter.core.client.model.ModelUtils;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.service.BlockchainService;
......@@ -45,32 +46,38 @@ import org.elasticsearch.common.inject.Inject;
import org.nuiton.i18n.I18n;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
/**
* Created by Benoit on 30/03/2015.
*/
public class BlockchainUserEventService extends AbstractService implements ChangeService.ChangeListener {
public static final String DEFAULT_PUBKEYS_SEPARATOR = ", ";
public final UserEventService userEventService;
public final ObjectMapper objectMapper;
public final List<ChangeSource> changeListenSources;
public final Joiner simpleJoiner = Joiner.on(',');
public final boolean enable;
@Inject
public BlockchainUserEventService(Client client, PluginSettings settings, CryptoService cryptoService,
BlockchainService blockchainService,
UserEventService userEventService) {
super("duniter.user.event.blockchain", client, settings, cryptoService);
this.userEventService = userEventService;
this.objectMapper = JacksonUtils.newObjectMapper();
this.changeListenSources = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE));
ChangeService.registerListener(this);
this.enable = pluginSettings.enableBlockchainSync();
if (this.enable) {
blockchainService.registerConnectionListener(createConnectionListeners());
}
}
@Override
......@@ -80,19 +87,23 @@ public class BlockchainUserEventService extends AbstractService implements Chang
@Override
public void onChange(ChangeEvent change) {
if (change.getSource() == null) return;
try {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
switch (change.getOperation()) {
case INDEX:
processBlockIndex(block);
if (change.getSource() != null) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processBlockIndex(block);
}
break;
// on DELETE : remove user event on block (using link
case DELETE:
processBlockDelete(block);
processBlockDelete(change);
break;
}
......@@ -111,6 +122,48 @@ public class BlockchainUserEventService extends AbstractService implements Chang
/* -- internal method -- */
/**
* Create a listener that notify admin when the Duniter node connection is lost or retrieve
*/
private WebsocketClientEndpoint.ConnectionListener createConnectionListeners() {
return new WebsocketClientEndpoint.ConnectionListener() {
private boolean errorNotified = false;
@Override
public void onSuccess() {
// Send notify on reconnection
if (errorNotified) {
errorNotified = false;
userEventService.notifyAdmin(UserEvent.newBuilder(UserEvent.EventType.INFO, UserEventCodes.NODE_BMA_UP.name())
.setMessage(I18n.n("duniter.event.NODE_BMA_UP"),
pluginSettings.getNodeBmaHost(),
String.valueOf(pluginSettings.getNodeBmaPort()),
pluginSettings.getClusterName())
.build());
}
}
@Override
public void onError(Exception e, long lastTimeUp) {
if (errorNotified) return; // already notify
// Wait 1 min, then notify admin (once)
long now = System.currentTimeMillis() / 1000;
boolean wait = now - lastTimeUp < 60;
if (!wait) {
errorNotified = true;
userEventService.notifyAdmin(UserEvent.newBuilder(UserEvent.EventType.ERROR, UserEventCodes.NODE_BMA_DOWN.name())
.setMessage(I18n.n("duniter.event.NODE_BMA_DOWN"),
pluginSettings.getNodeBmaHost(),
String.valueOf(pluginSettings.getNodeBmaPort()),
pluginSettings.getClusterName(),
String.valueOf(lastTimeUp))
.build());
}
}
};
}
private void processBlockIndex(BlockchainBlock block) {
// Joiners
if (CollectionUtils.isNotEmpty(block.getJoiners())) {
......@@ -147,28 +200,29 @@ public class BlockchainUserEventService extends AbstractService implements Chang
// Received
// TODO get profile name
String sendersStr = simpleJoiner.join(senders);
String sendersString = ModelUtils.joinPubkeys(senders, true, DEFAULT_PUBKEYS_SEPARATOR);
Set<String> receivers = new HashSet<>();
for (String output : tx.getOutputs()) {
String[] parts = output.split(":");
if (parts.length >= 3 && parts[2].startsWith("SIG(")) {
String receiver = parts[2].substring(4, parts[2].length() - 1);
if (!senders.contains(receiver) && !receivers.contains(receiver)) {
notifyUserEvent(block, receiver, UserEventCodes.TX_RECEIVED, I18n.n("duniter.user.event.tx.received"), sendersStr);
notifyUserEvent(block, receiver, UserEventCodes.TX_RECEIVED, I18n.n("duniter.user.event.tx.received"), sendersString);
receivers.add(receiver);
}
}
}
// Sent
// TODO get profile name
String receiverStr = simpleJoiner.join(receivers);
for (String sender:senders) {
notifyUserEvent(block, sender, UserEventCodes.TX_SENT, I18n.n("duniter.user.event.tx.sent"), receiverStr);
if (CollectionUtils.isNotEmpty(receivers)) {
// TODO get profile name
String receiverStr = ModelUtils.joinPubkeys(receivers, true, DEFAULT_PUBKEYS_SEPARATOR);
for (String sender : senders) {
notifyUserEvent(block, sender, UserEventCodes.TX_SENT, I18n.n("duniter.user.event.tx.sent"), receiverStr);
}
}
// TODO : index this TX in a special index ?
// TODO : indexer la TX dans un index/type spécifique ?
}
private void notifyUserEvent(BlockchainBlock block, String pubkey, UserEventCodes code, String message, String... params) {
......@@ -179,12 +233,17 @@ public class BlockchainUserEventService extends AbstractService implements Chang
.setReference(block.getCurrency(), BlockchainService.BLOCK_TYPE, String.valueOf(block.getNumber()))
.setReferenceHash(block.getHash())
.build();
userEventService.notifyUser(event);
}
private void processBlockDelete(BlockchainBlock block) {
private void processBlockDelete(ChangeEvent change) {
if (change.getId() == null) return;
// Delete events that reference this block
userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId()));
//userEventService.deleteUserEventByReference()
}
}
......@@ -26,23 +26,34 @@ package org.duniter.elasticsearch.user.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.gson.JsonSyntaxException;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.service.MailService;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.crypto.CryptoUtils;
import org.duniter.core.util.crypto.KeyPair;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.user.model.UserEvent;
import org.duniter.elasticsearch.user.model.UserEventCodes;
import org.duniter.elasticsearch.user.model.UserProfile;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.nuiton.i18n.I18n;
import java.io.IOException;
......@@ -84,18 +95,23 @@ public class UserEventService extends AbstractService {
public final boolean mailEnable;
@Inject
public UserEventService(Client client, PluginSettings settings, CryptoService cryptoService,
MailService mailService,
ThreadPool threadPool) {
super("duniter.user.event", client, settings, cryptoService);
public UserEventService(final Client client,
final PluginSettings pluginSettings,
final CryptoService cryptoService,
final MailService mailService,
final BlockchainService blockchainService,
final ThreadPool threadPool) {
super("duniter.user.event", client, pluginSettings, cryptoService);
this.mailService = mailService;
this.threadPool = threadPool;
this.nodeKeyPair = getNodeKeyPairOrNull(pluginSettings);
this.nodePubkey = getNodePubKey(this.nodeKeyPair);
this.nodePubkey = getNodePubKey(nodeKeyPair);
this.mailEnable = pluginSettings.getMailEnable();
if (!this.mailEnable && logger.isTraceEnabled()) {
logger.trace("Mail disable");
}
}
/**
......@@ -175,6 +191,16 @@ public class UserEventService extends AbstractService {
return response.getId();
}
public void deleteEventsByReference(final UserEvent.Reference reference) {
Preconditions.checkNotNull(reference);
Preconditions.checkNotNull(reference.getIndex());
Preconditions.checkNotNull(reference.getType());
threadPool.schedule(() -> {
doDeleteEventsByReference(reference);
});
}
/* -- Internal methods -- */
public static XContentBuilder createEventType() {
......@@ -217,8 +243,8 @@ public class UserEventService extends AbstractService {
.field("dynamic", "false")
.startObject("properties")
.startObject("index")
.field("type", "string")
.field("index", "not_analyzed")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("type")
.field("type", "string")
......@@ -232,6 +258,10 @@ public class UserEventService extends AbstractService {
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("anchor")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject()
......@@ -323,7 +353,7 @@ public class UserEventService extends AbstractService {
I18n.getDefaultLocale();
if (StringUtils.isNotBlank(nodePubkey)) {
event.setRecipient(nodePubkey);
indexEvent(locale, event);
indexEventAndNotifyListener(locale, event);
}
// Send email to admin
......@@ -350,29 +380,91 @@ public class UserEventService extends AbstractService {
Locale locale = userProfile.getLocale() != null ? new Locale(userProfile.getLocale()) : null;
// Add new event to index
indexEventAndNotifyListener(locale, event);
}
private void indexEventAndNotifyListener(Locale locale, UserEvent event) {
// Add new event to index
indexEvent(locale, event);
// Notify listeners
threadPool.schedule(() -> {
synchronized (LISTENERS) {
for (UserEventListener listener : LISTENERS.values()) {
LISTENERS.values().stream().forEach(listener -> {
if (event.getRecipient().equals(listener.getPubkey())) {
listener.onEvent(event);
}
}
});
}
});
}
private void doDeleteEventsByReference(final UserEvent.Reference reference) {
// Prepare search request
SearchRequestBuilder searchRequest = client
.prepareSearch(INDEX)
.setTypes(EVENT_TYPE)
.setFetchSource(false)
.setSearchType(SearchType.QUERY_AND_FETCH);
// Query = filter on reference
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_INDEX, reference.getIndex()))
.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, reference.getType()));
if (StringUtils.isNotBlank(reference.getId())) {
boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_ID, reference.getId()));
}
if (StringUtils.isNotBlank(reference.getHash())) {
boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_HASH, reference.getHash()));
}
if (StringUtils.isNotBlank(reference.getAnchor())) {
boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_ANCHOR, reference.getAnchor()));
}
searchRequest.setQuery(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery)));
// Execute query
try {
SearchResponse response = searchRequest.execute().actionGet();
int bulkSize = pluginSettings.getIndexBulkSize();
BulkRequestBuilder bulkRequest = client.prepareBulk();
// Read query result
long counter = 0;
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
bulkRequest.add(
client.prepareDelete(INDEX, EVENT_TYPE, searchHit.getId())
);
counter++;
// Flush the bulk if not empty
if ((counter % bulkSize) == 0) {
flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest);
bulkRequest = client.prepareBulk();
}
}
// last flush
flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest);
}
catch(SearchPhaseExecutionException | JsonSyntaxException e) {
// Failed or no item on index
logger.error(String.format("Error while deleting by reference: %s. Skipping deletions.", e.getMessage()), e);
}
}
private UserProfile getUserProfile(String pubkey, String... fieldnames) {
UserProfile result = getSourceById(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames);
UserProfile result = getSourceByIdOrNull(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames);
if (result == null) result = new UserProfile();
return result;
}
private UserProfile getUserProfileOrNull(String pubkey, String... fieldnames) {
return getSourceById(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames);
return getSourceByIdOrNull(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames);
}
private String toJson(UserEvent userEvent) {
......
#!/bin/sh
curl -XPOST "http://data.duniter.fr/market/comment/_search?pretty" -d'
curl -XPOST "http://127.0.0.1:9200/user/event/_search?pretty" -d'
{
"query": {
"bool":{
"filter": [
{"term":{
"record":"AVbieTIAup9uzWgKipsC"
}
}
]
query: {
nested: {
path: "reference",
query: {
constant_score: {
filter:
[
{term: { "reference.index": "test_net"}},
{term: { "reference.type": "block"}},
{term: { "reference.id": "10862"}}
]
}
}
}
}
},
from: 0,
size: 100
}'
duniter.event.NODE_BMA_DOWN=
duniter.event.NODE_BMA_UP=
duniter.event.NODE_STARTED=Node started on cluster Duniter4j ES [%s]
duniter.user.event.active=
duniter.user.event.join=
......
duniter.event.NODE_STARTED=Noeud démarré sur le cluster Duniter4j ES [%s]
duniter.event.NODE_BMA_DOWN=Noeud Duniter [%s\:%s] non joignable, depuis le noeud ES API [%s]. Dernière connexion à %d. Indexation de blockchain en attente.
duniter.event.NODE_BMA_UP=Noeud Duniter [%s\:%s] à nouveau accessible.
duniter.event.NODE_STARTED=Noeud ES API démarré sur le cluster Duniter [%s]
duniter.user.event.ms.active=Votre adhésion comme membre a bien été renouvellée
duniter.user.event.ms.join=Vous êtes maintenant membre de la monnaie
duniter.user.event.ms.leave=Votre adhésion comme membre à expirée
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment