diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java index b46fa5977432107d5126b4ae32319a173ea85c55..06f2cdd8cfa1a15acf871e277564f4ba13f9c5b7 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/ModelUtils.java @@ -22,13 +22,12 @@ package org.duniter.core.client.model; * #L% */ -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import com.google.common.base.Preconditions; import org.duniter.core.client.model.local.Certification; import org.duniter.core.client.model.local.Movement; +import org.duniter.core.util.CollectionUtils; /** * Helper class on model entities @@ -145,4 +144,17 @@ public class ModelUtils { } return pubkey.substring(0, 8); } + + public static String joinPubkeys(Collection<String> pubkeys, boolean minify, String separator) { + Preconditions.checkArgument(CollectionUtils.isNotEmpty(pubkeys)); + Preconditions.checkNotNull(separator); + + StringBuilder sb = new StringBuilder(); + for (String pubkey : pubkeys) { + sb.append(separator) + .append(minify ? ModelUtils.minifyPubkey(pubkey) : pubkey); + } + + return sb.toString().substring(separator.length()); + } } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteService.java index d3ccd7554b883ab769a6d4a4b7c63748a99b7262..03948649044d08af36e3d76e105f4ca17d62083b 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteService.java @@ -222,9 +222,8 @@ public interface BlockchainRemoteService extends Service { */ Map<Integer, Long> getUDs(long currencyId, long startOffset); - void addNewBlockListener(long currencyId, WebsocketClientEndpoint.MessageHandler messageHandler); - - void addNewBlockListener(Peer peer, WebsocketClientEndpoint.MessageHandler messageHandler); + WebsocketClientEndpoint addBlockListener(long currencyId, WebsocketClientEndpoint.MessageListener listener); + WebsocketClientEndpoint addBlockListener(Peer peer, WebsocketClientEndpoint.MessageListener listener); } \ No newline at end of file diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceImpl.java index 966c112ecd571eca3cf913f57ec55b735b797e27..4eaed7bbd5f8e472b2b3bc20e8dae1b5915d6cca 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceImpl.java @@ -22,6 +22,7 @@ package org.duniter.core.client.service.bma; * #L% */ +import com.google.common.base.Preconditions; import org.duniter.core.client.config.Configuration; import org.duniter.core.client.model.bma.*; import org.duniter.core.client.model.bma.gson.JsonArrayParser; @@ -560,35 +561,23 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement } @Override - public void addNewBlockListener(long currencyId, WebsocketClientEndpoint.MessageHandler messageHandler) { + public WebsocketClientEndpoint addBlockListener(long currencyId, WebsocketClientEndpoint.MessageListener listener) { Peer peer = peerService.getActivePeerByCurrencyId(currencyId); - addNewBlockListener(peer, messageHandler); + return addBlockListener(peer, listener); } @Override - public void addNewBlockListener(Peer peer, WebsocketClientEndpoint.MessageHandler messageHandler) { + public WebsocketClientEndpoint addBlockListener(Peer peer, WebsocketClientEndpoint.MessageListener listener) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(listener); - try { - URI wsBlockURI = new URI(String.format("ws://%s:%s/ws/block", - peer.getHost(), - peer.getPort())); + // Get the websocket endpoint + WebsocketClientEndpoint wsClientEndPoint = getWebsocketClientEndpoint(peer, "/ws/block"); - log.info(String.format("Starting to listen block from [%s]...", wsBlockURI.toString())); - - // Get the websocket, or open new one if not exists - WebsocketClientEndpoint wsClientEndPoint = blockWsEndPoints.get(wsBlockURI); - if (wsClientEndPoint == null || wsClientEndPoint.isClosed()) { - wsClientEndPoint = new WebsocketClientEndpoint(wsBlockURI, true/*autoReconnect*/); - blockWsEndPoints.put(wsBlockURI, wsClientEndPoint); - } - - // add listener - wsClientEndPoint.addMessageHandler(messageHandler); - - } catch (URISyntaxException | ServiceConfigurationError ex) { - throw new TechnicalException("could not create URI need for web socket on block: " + ex.getMessage()); - } + // add listener + wsClientEndPoint.registerListener(listener); + return wsClientEndPoint; } /* -- Internal methods -- */ @@ -804,4 +793,27 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement return Long.parseLong(dividendStr); } + public WebsocketClientEndpoint getWebsocketClientEndpoint(Peer peer, String path) { + + try { + URI wsBlockURI = new URI(String.format("ws://%s:%s%s", + peer.getHost(), + peer.getPort(), + path)); + + // Get the websocket, or open new one if not exists + WebsocketClientEndpoint wsClientEndPoint = blockWsEndPoints.get(wsBlockURI); + if (wsClientEndPoint == null || wsClientEndPoint.isClosed()) { + log.info(String.format("Starting to listen block from [%s]...", wsBlockURI.toString())); + wsClientEndPoint = new WebsocketClientEndpoint(wsBlockURI); + blockWsEndPoints.put(wsBlockURI, wsClientEndPoint); + } + + return wsClientEndPoint; + + } catch (URISyntaxException | ServiceConfigurationError ex) { + throw new TechnicalException("could not create URI need for web socket on block: " + ex.getMessage()); + } + + } } diff --git a/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java b/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java index 67ac086e694aaf64a0142add41c13bd103db70fc..dd096011524a5412e8e894e3beaa04b650331780 100644 --- a/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java +++ b/duniter4j-core-client/src/test/java/org/duniter/core/client/service/bma/BlockchainRemoteServiceTest.java @@ -36,7 +36,6 @@ import org.duniter.core.client.model.local.Wallet; import org.duniter.core.client.service.ServiceLocator; import org.duniter.core.client.service.exception.HttpBadRequestException; import org.duniter.core.util.crypto.CryptoUtils; -import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,7 +133,7 @@ public class BlockchainRemoteServiceTest { isWebSocketNewBlockReceived = false; - service.addNewBlockListener(createTestPeer(), (message) -> { + service.addBlockListener(createTestPeer(), (message) -> { BlockchainBlock block = GsonUtils.newBuilder().create().fromJson(message, BlockchainBlock.class); log.debug("Received block #" + block.getNumber()); isWebSocketNewBlockReceived = true; diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java index 9e1c68e48651f8563a4b053873f4dc32fce37a85..277334276ee0930a87ad5efbfc6adcdf6e368779 100644 --- a/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/websocket/WebsocketClientEndpoint.java @@ -23,6 +23,7 @@ package org.duniter.core.util.websocket; */ import com.google.common.collect.Lists; +import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,9 +45,11 @@ public class WebsocketClientEndpoint implements Closeable { private static final Logger log = LoggerFactory.getLogger(WebsocketClientEndpoint.class); private Session userSession = null; - private List<MessageHandler> messageHandlers = Lists.newArrayList(); + private List<MessageListener> messageListeners = Lists.newArrayList(); + private List<ConnectionListener> connectionListeners = Lists.newArrayList(); private final URI endpointURI; private final boolean autoReconnect; + private long lastTimeUp = -1; public WebsocketClientEndpoint(URI endpointURI) { this(endpointURI, true); @@ -55,7 +58,7 @@ public class WebsocketClientEndpoint implements Closeable { public WebsocketClientEndpoint(URI endpointURI, boolean autoReconnect) { this.endpointURI = endpointURI; this.autoReconnect = autoReconnect; - connect(true); + connect(); } @@ -95,8 +98,8 @@ public class WebsocketClientEndpoint implements Closeable { this.userSession = null; // abnormal close : try to reconnect - if (reason.getCloseCode() == CloseReason.CloseCodes.CLOSED_ABNORMALLY) { - connect(false); + if (reason.getCloseCode() == CloseReason.CloseCodes.CLOSED_ABNORMALLY && autoReconnect) { + connect(); } } @@ -106,32 +109,41 @@ public class WebsocketClientEndpoint implements Closeable { * @param message The text message */ @OnMessage - public void onMessage(String message) { - synchronized (messageHandlers) { - if (CollectionUtils.isNotEmpty(messageHandlers)) { - if (log.isDebugEnabled()) { - log.debug("[%s] Received message: " + message); - } + public void onMessage(final String message) { + if (CollectionUtils.isNotEmpty(messageListeners)) { + if (log.isDebugEnabled()) { + log.debug("[%s] Received message: " + message); + } - for (MessageHandler messageHandler : messageHandlers) { - try { - messageHandler.handleMessage(message); - } catch (Exception e) { - log.error(String.format("[%s] Error during message handling: %s", endpointURI, e.getMessage()), e); - } + messageListeners.stream().forEach(messageListener -> { + try { + messageListener.onMessage(message); + } catch (Exception e) { + log.error(String.format("[%s] Error during message handling: %s", endpointURI, e.getMessage()), e); } - } + }); + } + } + + /** + * register message listener + * + * @param listener + */ + public void registerListener(MessageListener listener) { + synchronized (messageListeners) { + this.messageListeners.add(listener); } } /** - * register message handler + * register connection listener * - * @param msgHandler + * @param listener */ - public void addMessageHandler(MessageHandler msgHandler) { - synchronized (messageHandlers) { - this.messageHandlers.add(msgHandler); + public void registerListener(ConnectionListener listener) { + synchronized (connectionListeners) { + this.connectionListeners.add(listener); } } @@ -153,29 +165,40 @@ public class WebsocketClientEndpoint implements Closeable { } /** - * Message handler. - * - * @author Jiji_Sasidharan + * Message listener. + */ + public interface MessageListener { + + void onMessage(String message); + } + + /** + * Connection listener. */ - public static interface MessageHandler { + public interface ConnectionListener { - public void handleMessage(String message); + void onSuccess(); + + void onError(Exception e, long lastTimeUp); } /* -- Internal method */ - private void connect(boolean throwErrorIfFailed) { + private void connect() { while(isClosed()) { try { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, endpointURI); - return; // stop + lastTimeUp = System.currentTimeMillis() / 1000; + notifyConnectionSuccess(); + return; // stop while } catch (Exception e) { - if (throwErrorIfFailed) throw new RuntimeException(e); + notifyConnectionError(e); + if (!this.autoReconnect) throw new TechnicalException(e); log.warn(String.format("[%s] Unable to connect. Retrying in 10s...", endpointURI.toString())); } - // wait 20s, then try again + // wait 10s, then try again try { Thread.sleep(10 * 1000); } @@ -184,4 +207,28 @@ public class WebsocketClientEndpoint implements Closeable { } } } + + private void notifyConnectionSuccess() { + synchronized (connectionListeners) { + connectionListeners.stream().forEach(connectionListener -> { + try { + connectionListener.onSuccess(); + } catch (Exception e) { + log.error(String.format("[%s] Error during ConnectionListener.onSuccess(): %s", endpointURI, e.getMessage()), e); + } + }); + } + } + + private void notifyConnectionError(final Exception error) { + synchronized (connectionListeners) { + connectionListeners.stream().forEach(connectionListener -> { + try { + connectionListener.onError(error, lastTimeUp); + } catch (Exception e) { + log.error(String.format("[%s] Error during ConnectionListener.onError(): %s", endpointURI, e.getMessage()), e); + } + }); + } + } } \ No newline at end of file diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index 7aafcdd43691cdadd92ca3070930ba6d3ef36505..bc8c9dae0aad45a7b7e154ca8a0871972614314d 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -121,12 +121,16 @@ duniter.string.analyzer: french # # Enabling node blockchain synchronization # -duniter.blockchain.sync.enable: false +duniter.blockchain.sync.enable: true # # Duniter node to synchronize # -duniter.host: cgeek.fr -duniter.port: 9330 +#duniter.host: cgeek.fr +#duniter.port: 9330 +#duniter.host: test-net.duniter.fr +#duniter.port: 9201 +duniter.host: 192.168.0.28 +duniter.port: 21378 # # ---------------------------------- Duniter4j security ------------------------- # @@ -157,7 +161,7 @@ duniter.data.sync.enable: false # # SMTP server configuration (host and port) # -duniter.mail.enable: false +#duniter.mail.enable: false #duniter.mail.smtp.host: localhost #duniter.mail.smtp.port: 25 # diff --git a/duniter4j-es-assembly/src/test/es-home/config/logging.yml b/duniter4j-es-assembly/src/test/es-home/config/logging.yml index 15cfa3e195cb46a62c7536f118d1684acfcc2ecf..5b11ce00392d57693f5af433ffb128a21424cdb1 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/logging.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/logging.yml @@ -21,6 +21,7 @@ logger: org.duniter.elasticsearch: DEBUG duniter : DEBUG + duniter.user.event : INFO duniter.network.p2p: TRACE security: DEBUG diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 1931d5416375118b87349c0199eddef61066781e..8996b5f96a8d108e3f16bac32a199641b410b499 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -145,6 +145,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } + public String getClusterName() { + return settings.get("cluster.name", "?"); + } + public String getNodeBmaHost() { return settings.get("duniter.host", "cgeek.fr"); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index 905328c98dcee2f9e646374b8e6ccd2d40349280..a82a7a192672a25c3ee8a82c64adddf898804f9e 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -46,7 +46,10 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; +import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -298,6 +301,20 @@ public abstract class AbstractService implements Bean { return result.get(fieldName); } + /** + * Retrieve a document by id (safe mode) + * @param docId + * @return + */ + public <T extends Object> T getSourceByIdOrNull(String index, String type, String docId, Class<T> classOfT, String... fieldNames) { + try { + return getSourceById(index, type, docId, classOfT, fieldNames); + } + catch(TechnicalException e) { + return null; // not found + } + } + /** * Retrieve a document by id * @param docId @@ -308,10 +325,9 @@ public abstract class AbstractService implements Bean { // Prepare request SearchRequestBuilder searchRequest = client .prepareSearch(index) - .setTypes(type) - .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + .setSearchType(SearchType.QUERY_AND_FETCH); - searchRequest.setQuery(QueryBuilders.matchQuery("_id", docId)); + searchRequest.setQuery(QueryBuilders.idsQuery(type).ids(docId)); if (CollectionUtils.isNotEmpty(fieldNames)) { searchRequest.setFetchSource(fieldNames, null); } @@ -323,8 +339,11 @@ public abstract class AbstractService implements Bean { try { SearchResponse response = searchRequest.execute().actionGet(); + if (response.getHits().getTotalHits() == 0) return null; + // Read query result SearchHit[] searchHits = response.getHits().getHits(); + for (SearchHit searchHit : searchHits) { if (searchHit.source() != null) { return objectMapper.readValue(searchHit.source(), classOfT); @@ -494,6 +513,22 @@ public abstract class AbstractService implements Bean { } } + protected void flushDeleteBulk(final String index, final String type, BulkRequestBuilder bulkRequest) { + if (bulkRequest.numberOfActions() > 0) { + BulkResponse bulkResponse = bulkRequest.get(); + // If failures, continue but save missing blocks + if (bulkResponse.hasFailures()) { + // process failures by iterating through each bulk response item + for (BulkItemResponse itemResponse : bulkResponse) { + boolean skip = !itemResponse.isFailed(); + if (!skip) { + logger.debug(String.format("[%s/%s] Error while deleting doc [%s]: %s. Skipping this deletion.", index, type, itemResponse.getId(), itemResponse.getFailureMessage())); + } + } + } + } + } + public interface StringReaderHandler { String onReadLine(String line); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 77fee625b794822dca61a20966abad991721f725..5ee621cf4cbbf64e4cac8b4b70400fa6f7196300 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -44,6 +44,7 @@ import org.duniter.core.model.ProgressionModelImpl; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.StringUtils; +import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.exception.DuplicateIndexIdException; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -89,11 +90,13 @@ public class BlockchainService extends AbstractService { private BlockchainRemoteService blockchainRemoteService; private CurrencyService currencyService; private ThreadPool threadPool; + private List<WebsocketClientEndpoint.ConnectionListener> connectionListeners = new ArrayList<>(); + private final WebsocketClientEndpoint.ConnectionListener dispatchConnectionListener; - private JsonAttributeParser blockNumberParser = new JsonAttributeParser("number"); - private JsonAttributeParser blockCurrencyParser = new JsonAttributeParser("currency"); - private JsonAttributeParser blockHashParser = new JsonAttributeParser("hash"); - private JsonAttributeParser blockPreviousHashParser = new JsonAttributeParser("previousHash"); + private final JsonAttributeParser blockNumberParser = new JsonAttributeParser("number"); + private final JsonAttributeParser blockCurrencyParser = new JsonAttributeParser("currency"); + private final JsonAttributeParser blockHashParser = new JsonAttributeParser("hash"); + private final JsonAttributeParser blockPreviousHashParser = new JsonAttributeParser("previousHash"); private Gson gson; @@ -106,6 +109,20 @@ public class BlockchainService extends AbstractService { threadPool.scheduleOnStarted(() -> { blockchainRemoteService = serviceLocator.getBlockchainRemoteService(); }); + dispatchConnectionListener = new WebsocketClientEndpoint.ConnectionListener() { + @Override + public void onSuccess() { + synchronized (connectionListeners) { + connectionListeners.stream().forEach(connectionListener -> connectionListener.onSuccess()); + } + } + @Override + public void onError(Exception e, long lastTimeUp) { + synchronized (connectionListeners) { + connectionListeners.stream().forEach(connectionListener -> connectionListener.onError(e, lastTimeUp)); + } + } + }; } @Inject @@ -113,10 +130,16 @@ public class BlockchainService extends AbstractService { this.currencyService = currencyService; } - public BlockchainService listenAndIndexNewBlock(Peer peer){ - blockchainRemoteService.addNewBlockListener(peer, message -> { - indexLastBlockFromJson(peer, message); - }); + + public void registerConnectionListener(WebsocketClientEndpoint.ConnectionListener listener) { + synchronized (connectionListeners) { + connectionListeners.add(listener); + } + } + + public BlockchainService listenAndIndexNewBlock(final Peer peer){ + WebsocketClientEndpoint wsEndPoint = blockchainRemoteService.addBlockListener(peer, message -> indexLastBlockFromJson(peer, message)); + wsEndPoint.registerListener(dispatchConnectionListener); return this; } @@ -1036,42 +1059,27 @@ public class BlockchainService extends AbstractService { * @param currencyName * @param fromNumber */ - protected void deleteBlocksFromNumber(String currencyName, int fromNumber, int toNumber) { + protected void deleteBlocksFromNumber(final String currencyName, final int fromNumber, final int toNumber) { int bulkSize = pluginSettings.getIndexBulkSize(); BulkRequestBuilder bulkRequest = client.prepareBulk(); - for (int i=fromNumber; i<=toNumber; i++) { + for (int number=fromNumber; number<=toNumber; number++) { bulkRequest.add( - client.prepareDelete(currencyName, BLOCK_TYPE, String.valueOf(i)) + client.prepareDelete(currencyName, BLOCK_TYPE, String.valueOf(number)) ); // Flush the bulk if not empty - if ((fromNumber - i % bulkSize) == 0) { - flushDeleteBulk(bulkRequest); + if ((fromNumber - number % bulkSize) == 0) { + flushDeleteBulk(currencyName, BLOCK_TYPE, bulkRequest); bulkRequest = client.prepareBulk(); } } // last flush - flushDeleteBulk(bulkRequest); + flushDeleteBulk(currencyName, BLOCK_TYPE, bulkRequest); } - protected void flushDeleteBulk(BulkRequestBuilder bulkRequest) { - if (bulkRequest.numberOfActions() > 0) { - BulkResponse bulkResponse = bulkRequest.get(); - // If failures, continue but save missing blocks - if (bulkResponse.hasFailures()) { - // process failures by iterating through each bulk response item - for (BulkItemResponse itemResponse : bulkResponse) { - boolean skip = !itemResponse.isFailed(); - if (!skip) { - int itemNumber = Integer.parseInt(itemResponse.getId()); - logger.debug(String.format("Error while deleting block #%s: %s. Skipping this deletion.", itemNumber, itemResponse.getFailureMessage())); - } - } - } - } - } + } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java index 1ab515e3034e1cbdb0c748129dd2d33d19041724..5d584caa765eb4a365e68d874ed150cc2d0e08a8 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java @@ -54,6 +54,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; @@ -366,10 +367,10 @@ public class CurrencyService extends AbstractService { SearchRequestBuilder searchRequest = client .prepareSearch(INDEX) .setTypes(CURRENCY_TYPE) - .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + .setSearchType(SearchType.QUERY_AND_FETCH); // If more than a word, search on terms match - searchRequest.setQuery(QueryBuilders.matchQuery("_id", currencyId)); + searchRequest.setQuery(new IdsQueryBuilder().addIds(currencyId)); // Execute query try { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEvent.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEvent.java index 601c819d62b03f8a70bcff8f7ebeb98d7173ee8c..f69e6fdd2b9390bcc08a71d676444adaefab4071 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEvent.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEvent.java @@ -59,7 +59,7 @@ public class UserEvent extends Record { public static final String PROPERTY_CODE="code"; public static final String PROPERTY_MESSAGE="message"; public static final String PROPERTY_PARAMS="params"; - public static final String PROPERTY_LINK="reference"; + public static final String PROPERTY_REFERENCE="reference"; public static final String PROPERTY_RECIPIENT="recipient"; @@ -233,6 +233,12 @@ public class UserEvent extends Record { public static class Reference { + public static final String PROPERTY_INDEX="index"; + public static final String PROPERTY_TYPE="type"; + public static final String PROPERTY_ID="id"; + public static final String PROPERTY_ANCHOR="anchor"; + public static final String PROPERTY_HASH="hash"; + private String index; private String type; diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEventCodes.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEventCodes.java index fbb06fab01d97cb879c0b4383f2f1c2b2edeece8..e1a5053e4e7c490f9c89d71b62a9dd5aa39157b0 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEventCodes.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEventCodes.java @@ -28,7 +28,8 @@ package org.duniter.elasticsearch.user.model; public enum UserEventCodes { NODE_STARTED, - CREATE_DOC, + NODE_BMA_UP, + NODE_BMA_DOWN, // Membership state MEMBER_JOIN, diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index 1b2976c2ff26f1ed24afb857465531414a5113b8..96c1078c753d6d5379894f46e4b522f6fc140658 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -24,14 +24,15 @@ package org.duniter.elasticsearch.user.service; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import org.duniter.core.client.model.ModelUtils; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.service.AbstractService; import org.duniter.elasticsearch.service.BlockchainService; @@ -45,32 +46,38 @@ import org.elasticsearch.common.inject.Inject; import org.nuiton.i18n.I18n; import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; /** * Created by Benoit on 30/03/2015. */ public class BlockchainUserEventService extends AbstractService implements ChangeService.ChangeListener { + public static final String DEFAULT_PUBKEYS_SEPARATOR = ", "; + public final UserEventService userEventService; public final ObjectMapper objectMapper; public final List<ChangeSource> changeListenSources; - public final Joiner simpleJoiner = Joiner.on(','); + public final boolean enable; @Inject public BlockchainUserEventService(Client client, PluginSettings settings, CryptoService cryptoService, + BlockchainService blockchainService, UserEventService userEventService) { super("duniter.user.event.blockchain", client, settings, cryptoService); this.userEventService = userEventService; this.objectMapper = JacksonUtils.newObjectMapper(); this.changeListenSources = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE)); ChangeService.registerListener(this); + + this.enable = pluginSettings.enableBlockchainSync(); + + if (this.enable) { + blockchainService.registerConnectionListener(createConnectionListeners()); + } } @Override @@ -80,19 +87,23 @@ public class BlockchainUserEventService extends AbstractService implements Chang @Override public void onChange(ChangeEvent change) { - if (change.getSource() == null) return; + try { - BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + switch (change.getOperation()) { case INDEX: - processBlockIndex(block); + if (change.getSource() != null) { + BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + processBlockIndex(block); + } break; // on DELETE : remove user event on block (using link case DELETE: - processBlockDelete(block); + processBlockDelete(change); + break; } @@ -111,6 +122,48 @@ public class BlockchainUserEventService extends AbstractService implements Chang /* -- internal method -- */ + /** + * Create a listener that notify admin when the Duniter node connection is lost or retrieve + */ + private WebsocketClientEndpoint.ConnectionListener createConnectionListeners() { + return new WebsocketClientEndpoint.ConnectionListener() { + private boolean errorNotified = false; + + @Override + public void onSuccess() { + // Send notify on reconnection + if (errorNotified) { + errorNotified = false; + userEventService.notifyAdmin(UserEvent.newBuilder(UserEvent.EventType.INFO, UserEventCodes.NODE_BMA_UP.name()) + .setMessage(I18n.n("duniter.event.NODE_BMA_UP"), + pluginSettings.getNodeBmaHost(), + String.valueOf(pluginSettings.getNodeBmaPort()), + pluginSettings.getClusterName()) + .build()); + } + } + + @Override + public void onError(Exception e, long lastTimeUp) { + if (errorNotified) return; // already notify + + // Wait 1 min, then notify admin (once) + long now = System.currentTimeMillis() / 1000; + boolean wait = now - lastTimeUp < 60; + if (!wait) { + errorNotified = true; + userEventService.notifyAdmin(UserEvent.newBuilder(UserEvent.EventType.ERROR, UserEventCodes.NODE_BMA_DOWN.name()) + .setMessage(I18n.n("duniter.event.NODE_BMA_DOWN"), + pluginSettings.getNodeBmaHost(), + String.valueOf(pluginSettings.getNodeBmaPort()), + pluginSettings.getClusterName(), + String.valueOf(lastTimeUp)) + .build()); + } + } + }; + } + private void processBlockIndex(BlockchainBlock block) { // Joiners if (CollectionUtils.isNotEmpty(block.getJoiners())) { @@ -147,28 +200,29 @@ public class BlockchainUserEventService extends AbstractService implements Chang // Received // TODO get profile name - String sendersStr = simpleJoiner.join(senders); + String sendersString = ModelUtils.joinPubkeys(senders, true, DEFAULT_PUBKEYS_SEPARATOR); Set<String> receivers = new HashSet<>(); for (String output : tx.getOutputs()) { String[] parts = output.split(":"); if (parts.length >= 3 && parts[2].startsWith("SIG(")) { String receiver = parts[2].substring(4, parts[2].length() - 1); if (!senders.contains(receiver) && !receivers.contains(receiver)) { - notifyUserEvent(block, receiver, UserEventCodes.TX_RECEIVED, I18n.n("duniter.user.event.tx.received"), sendersStr); + notifyUserEvent(block, receiver, UserEventCodes.TX_RECEIVED, I18n.n("duniter.user.event.tx.received"), sendersString); receivers.add(receiver); } } } - // Sent - // TODO get profile name - String receiverStr = simpleJoiner.join(receivers); - for (String sender:senders) { - notifyUserEvent(block, sender, UserEventCodes.TX_SENT, I18n.n("duniter.user.event.tx.sent"), receiverStr); + if (CollectionUtils.isNotEmpty(receivers)) { + // TODO get profile name + String receiverStr = ModelUtils.joinPubkeys(receivers, true, DEFAULT_PUBKEYS_SEPARATOR); + for (String sender : senders) { + notifyUserEvent(block, sender, UserEventCodes.TX_SENT, I18n.n("duniter.user.event.tx.sent"), receiverStr); + } } - // TODO : index this TX in a special index ? + // TODO : indexer la TX dans un index/type spécifique ? } private void notifyUserEvent(BlockchainBlock block, String pubkey, UserEventCodes code, String message, String... params) { @@ -179,12 +233,17 @@ public class BlockchainUserEventService extends AbstractService implements Chang .setReference(block.getCurrency(), BlockchainService.BLOCK_TYPE, String.valueOf(block.getNumber())) .setReferenceHash(block.getHash()) .build(); + userEventService.notifyUser(event); } - private void processBlockDelete(BlockchainBlock block) { + private void processBlockDelete(ChangeEvent change) { + if (change.getId() == null) return; + + // Delete events that reference this block + userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId())); - //userEventService.deleteUserEventByReference() } + } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index c961281e4b4be2268b6f775423affdbd4e55bb1f..8104eaf73ed51f15a499d66aef15efa3873391b8 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -26,23 +26,34 @@ package org.duniter.elasticsearch.user.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import com.google.gson.JsonSyntaxException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.service.MailService; import org.duniter.core.util.StringUtils; import org.duniter.core.util.crypto.CryptoUtils; import org.duniter.core.util.crypto.KeyPair; +import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.service.AbstractService; +import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.model.UserEvent; +import org.duniter.elasticsearch.user.model.UserEventCodes; import org.duniter.elasticsearch.user.model.UserProfile; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.*; +import org.elasticsearch.search.SearchHit; import org.nuiton.i18n.I18n; import java.io.IOException; @@ -84,18 +95,23 @@ public class UserEventService extends AbstractService { public final boolean mailEnable; @Inject - public UserEventService(Client client, PluginSettings settings, CryptoService cryptoService, - MailService mailService, - ThreadPool threadPool) { - super("duniter.user.event", client, settings, cryptoService); + public UserEventService(final Client client, + final PluginSettings pluginSettings, + final CryptoService cryptoService, + final MailService mailService, + final BlockchainService blockchainService, + final ThreadPool threadPool) { + super("duniter.user.event", client, pluginSettings, cryptoService); this.mailService = mailService; this.threadPool = threadPool; this.nodeKeyPair = getNodeKeyPairOrNull(pluginSettings); - this.nodePubkey = getNodePubKey(this.nodeKeyPair); + this.nodePubkey = getNodePubKey(nodeKeyPair); this.mailEnable = pluginSettings.getMailEnable(); if (!this.mailEnable && logger.isTraceEnabled()) { logger.trace("Mail disable"); } + + } /** @@ -175,6 +191,16 @@ public class UserEventService extends AbstractService { return response.getId(); } + public void deleteEventsByReference(final UserEvent.Reference reference) { + Preconditions.checkNotNull(reference); + Preconditions.checkNotNull(reference.getIndex()); + Preconditions.checkNotNull(reference.getType()); + + threadPool.schedule(() -> { + doDeleteEventsByReference(reference); + }); + } + /* -- Internal methods -- */ public static XContentBuilder createEventType() { @@ -217,8 +243,8 @@ public class UserEventService extends AbstractService { .field("dynamic", "false") .startObject("properties") .startObject("index") - .field("type", "string") - .field("index", "not_analyzed") + .field("type", "string") + .field("index", "not_analyzed") .endObject() .startObject("type") .field("type", "string") @@ -232,6 +258,10 @@ public class UserEventService extends AbstractService { .field("type", "string") .field("index", "not_analyzed") .endObject() + .startObject("anchor") + .field("type", "string") + .field("index", "not_analyzed") + .endObject() .endObject() .endObject() @@ -323,7 +353,7 @@ public class UserEventService extends AbstractService { I18n.getDefaultLocale(); if (StringUtils.isNotBlank(nodePubkey)) { event.setRecipient(nodePubkey); - indexEvent(locale, event); + indexEventAndNotifyListener(locale, event); } // Send email to admin @@ -350,29 +380,91 @@ public class UserEventService extends AbstractService { Locale locale = userProfile.getLocale() != null ? new Locale(userProfile.getLocale()) : null; + // Add new event to index + indexEventAndNotifyListener(locale, event); + } + + private void indexEventAndNotifyListener(Locale locale, UserEvent event) { // Add new event to index indexEvent(locale, event); // Notify listeners threadPool.schedule(() -> { synchronized (LISTENERS) { - for (UserEventListener listener : LISTENERS.values()) { + LISTENERS.values().stream().forEach(listener -> { if (event.getRecipient().equals(listener.getPubkey())) { listener.onEvent(event); } - } + }); } }); } + private void doDeleteEventsByReference(final UserEvent.Reference reference) { + + // Prepare search request + SearchRequestBuilder searchRequest = client + .prepareSearch(INDEX) + .setTypes(EVENT_TYPE) + .setFetchSource(false) + .setSearchType(SearchType.QUERY_AND_FETCH); + + // Query = filter on reference + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_INDEX, reference.getIndex())) + .filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, reference.getType())); + if (StringUtils.isNotBlank(reference.getId())) { + boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_ID, reference.getId())); + } + if (StringUtils.isNotBlank(reference.getHash())) { + boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_HASH, reference.getHash())); + } + if (StringUtils.isNotBlank(reference.getAnchor())) { + boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_ANCHOR, reference.getAnchor())); + } + + searchRequest.setQuery(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery))); + + // Execute query + try { + SearchResponse response = searchRequest.execute().actionGet(); + + int bulkSize = pluginSettings.getIndexBulkSize(); + BulkRequestBuilder bulkRequest = client.prepareBulk(); + + // Read query result + long counter = 0; + SearchHit[] searchHits = response.getHits().getHits(); + for (SearchHit searchHit : searchHits) { + bulkRequest.add( + client.prepareDelete(INDEX, EVENT_TYPE, searchHit.getId()) + ); + counter++; + + // Flush the bulk if not empty + if ((counter % bulkSize) == 0) { + flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); + bulkRequest = client.prepareBulk(); + } + } + + // last flush + flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); + } + catch(SearchPhaseExecutionException | JsonSyntaxException e) { + // Failed or no item on index + logger.error(String.format("Error while deleting by reference: %s. Skipping deletions.", e.getMessage()), e); + } + } + private UserProfile getUserProfile(String pubkey, String... fieldnames) { - UserProfile result = getSourceById(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames); + UserProfile result = getSourceByIdOrNull(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames); if (result == null) result = new UserProfile(); return result; } private UserProfile getUserProfileOrNull(String pubkey, String... fieldnames) { - return getSourceById(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames); + return getSourceByIdOrNull(UserService.INDEX, UserService.PROFILE_TYPE, pubkey, UserProfile.class, fieldnames); } private String toJson(UserEvent userEvent) { diff --git a/duniter4j-es-user/src/main/misc/curl_test.sh b/duniter4j-es-user/src/main/misc/curl_test.sh index 4f62377a7b20ee747e1a99f1c6ab627e9caa8b35..01b7056e4391fdaf8920d82c4bb5a9826acbf4af 100755 --- a/duniter4j-es-user/src/main/misc/curl_test.sh +++ b/duniter4j-es-user/src/main/misc/curl_test.sh @@ -1,16 +1,25 @@ #!/bin/sh -curl -XPOST "http://data.duniter.fr/market/comment/_search?pretty" -d' +curl -XPOST "http://127.0.0.1:9200/user/event/_search?pretty" -d' { - "query": { - "bool":{ - "filter": [ - {"term":{ - "record":"AVbieTIAup9uzWgKipsC" - } - } - ] + query: { + nested: { + path: "reference", + query: { + constant_score: { + filter: + [ + {term: { "reference.index": "test_net"}}, + {term: { "reference.type": "block"}}, + {term: { "reference.id": "10862"}} + ] + + } } - } + } + + }, + from: 0, + size: 100 }' diff --git a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties index f5143c946bb6a5ce5b47b0bab0e1cdc1bca4da67..4917ad91be8c20d95f6a655b90487d15f1721ef0 100644 --- a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties +++ b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties @@ -1,3 +1,5 @@ +duniter.event.NODE_BMA_DOWN= +duniter.event.NODE_BMA_UP= duniter.event.NODE_STARTED=Node started on cluster Duniter4j ES [%s] duniter.user.event.active= duniter.user.event.join= diff --git a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties index 57a43dcebc0aa9a807915238dededd5dd014b6cc..be08e46fca2fadf30534855569db23e859f98626 100644 --- a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties +++ b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties @@ -1,4 +1,6 @@ -duniter.event.NODE_STARTED=Noeud démarré sur le cluster Duniter4j ES [%s] +duniter.event.NODE_BMA_DOWN=Noeud Duniter [%s\:%s] non joignable, depuis le noeud ES API [%s]. Dernière connexion à %d. Indexation de blockchain en attente. +duniter.event.NODE_BMA_UP=Noeud Duniter [%s\:%s] à nouveau accessible. +duniter.event.NODE_STARTED=Noeud ES API démarré sur le cluster Duniter [%s] duniter.user.event.ms.active=Votre adhésion comme membre a bien été renouvellée duniter.user.event.ms.join=Vous êtes maintenant membre de la monnaie duniter.user.event.ms.leave=Votre adhésion comme membre à expirée