From 30f03309c6aec2205c03142c273f254d3e5a976c Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Thu, 1 Dec 2016 17:09:25 +0100 Subject: [PATCH] Market: notify user when new comments --- .../model/elasticsearch/RecordComment.java | 51 ++++++++++++ .../src/test/es-home/config/elasticsearch.yml | 5 +- .../duniter/elasticsearch/PluginSettings.java | 12 ++- .../service/AbstractService.java | 20 +++-- .../elasticsearch/gchange/PluginInit.java | 14 ---- .../gchange/service/MarketService.java | 79 ++++++++++++++++++- .../elasticsearch/user/PluginInit.java | 13 +++ .../user/service/event/UserEventService.java | 14 ++-- 8 files changed, 175 insertions(+), 33 deletions(-) create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/RecordComment.java diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/RecordComment.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/RecordComment.java new file mode 100644 index 00000000..e818e035 --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/RecordComment.java @@ -0,0 +1,51 @@ +package org.duniter.core.client.model.elasticsearch; + +/* + * #%L + * Duniter4j :: Core Client API + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +/** + * Created by blavenie on 01/03/16. + */ +public class RecordComment extends Record { + + public static final String PROPERTY_MESSAGE="message"; + public static final String PROPERTY_RECORD="record"; + + private String message; + private String record; + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public String getRecord() { + return record; + } + + public void setRecord(String record) { + this.record = record; + } +} diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index 828ddbd0..7aafcdd4 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -52,7 +52,7 @@ cluster.name: duniter4j-elasticsearch-TEST # # Set the bind address to a specific IP (IPv4 or IPv6): # -# network.host: 192.168.233.1 +#network.host: 192.168.233.118 # # Set a custom port for HTTP: # @@ -157,6 +157,7 @@ duniter.data.sync.enable: false # # SMTP server configuration (host and port) # +duniter.mail.enable: false #duniter.mail.smtp.host: localhost #duniter.mail.smtp.port: 25 # @@ -174,5 +175,5 @@ duniter.mail.admin: blavenie@EIS-DEV # #duniter.mail.subject.prefix: [Duniter4j ES] -duniter.changes.listenSource: */block +duniter.changes.listenSource: '*/block' duniter.ws.port: 9400 diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 5fe14728..e1401e60 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -241,6 +241,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsInt("duniter.data.sync.port", 80); } + public boolean getMailEnable() { + return settings.getAsBoolean("duniter.mail.enable", Boolean.TRUE); + } + public String getMailSmtpHost() { return settings.get("duniter.mail.smtp.host", "localhost"); } @@ -269,14 +273,14 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.get("duniter.mail.subject.prefix", "[Duniter4j ES]"); } - public int getWebSocketPort() { - return settings.getAsInt("duniter.ws.port", 9200); - } - public String getWebSocketHost() { return settings.get("network.host", "locahost"); } + public int getWebSocketPort() { + return settings.getAsInt("duniter.ws.port", 9200); + } + /* protected methods */ protected void initI18n() throws IOException { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index 89bc70ad..8e4f408f 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -128,14 +128,16 @@ public abstract class AbstractService implements Bean { logger.debug(String.format("Indexing a %s from issuer [%s]", type, issuer.substring(0, 8))); } + return indexDocumentFromJson(index, type, json); + } + + protected String indexDocumentFromJson(String index, String type, String json) { IndexResponse response = client.prepareIndex(index, type) .setSource(json) .setRefresh(false) .execute().actionGet(); - String id = response.getId(); - return id; + return response.getId(); } - protected void checkIssuerAndUpdateDocumentFromJson(String index, String type, String json, String id) { JsonNode actualObj = readAndVerifyIssuerSignature(json); @@ -174,8 +176,8 @@ public abstract class AbstractService implements Bean { || !fieldNames.contains(Record.PROPERTY_SIGNATURE)) { throw new InvalidFormatException(String.format("Invalid record JSON format. Required fields [%s,%s]", Record.PROPERTY_ISSUER, Record.PROPERTY_SIGNATURE)); } - String issuer = actualObj.get(Record.PROPERTY_ISSUER).asText(); - String signature = actualObj.get(Record.PROPERTY_SIGNATURE).asText(); + String issuer = getMandatoryField(actualObj, Record.PROPERTY_ISSUER).asText(); + String signature = getMandatoryField(actualObj, Record.PROPERTY_SIGNATURE).asText(); String recordNoSign = recordJson.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_SIGNATURE), "") .replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), ""); @@ -216,6 +218,14 @@ public abstract class AbstractService implements Bean { return actualObj.get(Record.PROPERTY_ISSUER).asText(); } + protected JsonNode getMandatoryField(JsonNode actualObj, String fieldName) { + JsonNode value = actualObj.get(fieldName); + if (value.isMissingNode()) { + throw new InvalidFormatException(String.format("Invalid format. Expected field '%s'", fieldName)); + } + return value; + } + protected void bulkFromClasspathFile(String classpathFile, String indexName, String indexType) { bulkFromClasspathFile(classpathFile, indexName, indexType, null); } diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/PluginInit.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/PluginInit.java index 020dda5d..030c42e3 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/PluginInit.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/PluginInit.java @@ -47,8 +47,6 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { private final ThreadPool threadPool; private final Injector injector; private final static ESLogger logger = Loggers.getLogger("gchange"); - private final Client client; - private final String clusterName; @Inject public PluginInit(Client client, Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) { @@ -56,8 +54,6 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { this.pluginSettings = pluginSettings; this.threadPool = threadPool; this.injector = injector; - this.client = client; - this.clusterName = settings.get("cluster.name"); } @Override @@ -70,16 +66,6 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { synchronize(); }, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN); }, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN); - - // When started - threadPool.scheduleOnStarted(() -> { - // Notify admin - injector.getInstance(UserEventService.class) - .notifyAdmin(new UserEvent( - UserEvent.EventType.INFO, - UserEventCodes.NODE_STARTED.name(), - new String[]{clusterName})); - }); } @Override diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/MarketService.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/MarketService.java index b037fc07..238d0b99 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/MarketService.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/MarketService.java @@ -24,21 +24,39 @@ package org.duniter.elasticsearch.gchange.service; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.gson.JsonSyntaxException; +import org.duniter.core.client.model.elasticsearch.Currency; +import org.duniter.core.client.model.elasticsearch.Record; +import org.duniter.core.client.model.elasticsearch.RecordComment; import org.duniter.core.client.service.bma.WotRemoteService; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.gchange.PluginSettings; import org.duniter.elasticsearch.service.AbstractService; +import org.duniter.elasticsearch.user.service.event.UserEvent; +import org.duniter.elasticsearch.user.service.event.UserEventCodes; +import org.duniter.elasticsearch.user.service.event.UserEventService; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; import java.io.IOException; +import java.io.UnsupportedEncodingException; /** * Created by Benoit on 30/03/2015. @@ -53,12 +71,16 @@ public class MarketService extends AbstractService { private static final String CATEGORIES_BULK_CLASSPATH_FILE = "market-categories-bulk-insert.json"; private WotRemoteService wotRemoteService; + private UserEventService userEventService; @Inject public MarketService(Client client, PluginSettings settings, - CryptoService cryptoService, WotRemoteService wotRemoteService) { + CryptoService cryptoService, + WotRemoteService wotRemoteService, + UserEventService userEventService) { super("gchange." + INDEX, client, settings, cryptoService); this.wotRemoteService = wotRemoteService; + this.userEventService = userEventService; } /** @@ -147,7 +169,21 @@ public class MarketService extends AbstractService { } public String indexCommentFromJson(String json) { - return checkIssuerAndIndexDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json); + JsonNode actualObj = readAndVerifyIssuerSignature(json); + String issuer = getMandatoryField(actualObj, RecordComment.PROPERTY_ISSUER).asText(); + String recordId = getMandatoryField(actualObj, RecordComment.PROPERTY_RECORD).asText(); + String recordIssuer = getRecordIssuerById(recordId); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Indexing a %s from issuer [%s]", RECORD_COMMENT_TYPE, issuer.substring(0, 8))); + } + String commentId = indexDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json); + + // Notify record issuer + if (!issuer.equals(recordIssuer)) { + userEventService.notifyUser(recordIssuer, + new UserEvent(UserEvent.EventType.INFO, /*TODO*/ "NEW_COMMENT")); + } } public void updateCommentFromJson(String json, String id) { @@ -373,6 +409,45 @@ public class MarketService extends AbstractService { } } + /** + * Retrieve a blockchain from its name + * @param recordId + * @return + */ + protected String getRecordIssuerById(String recordId) { + // Prepare request + SearchRequestBuilder searchRequest = client + .prepareSearch(INDEX) + .setTypes(RECORD_TYPE) + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + + searchRequest.setQuery(QueryBuilders.matchQuery("_id", recordId)); + searchRequest.addFields(Record.PROPERTY_ISSUER); + // Execute query + try { + SearchResponse response = searchRequest.execute().actionGet(); + + // Read query result + SearchHit[] searchHits = response.getHits().getHits(); + for (SearchHit searchHit : searchHits) { + if (searchHit.source() != null) { + JsonNode source = objectMapper.readTree(searchHit.source()); + return source.get(Record.PROPERTY_ISSUER).asText(); + } + else { + SearchHitField field = searchHit.getFields().get(Record.PROPERTY_ISSUER); + return field.getValue().toString(); + } + } + } + catch(SearchPhaseExecutionException | JsonSyntaxException | IOException | UnsupportedEncodingException e) { + // Failed or no item on index + if (logger.isDebugEnabled()) { + logger.error(String.format("Unable to retrieve issuer of record [%s]", recordId), e); + } + } + return null; + } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java index 87f044bf..15bbda25 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java @@ -28,6 +28,8 @@ import org.duniter.elasticsearch.user.service.HistoryService; import org.duniter.elasticsearch.user.service.MessageService; import org.duniter.elasticsearch.user.service.SynchroService; import org.duniter.elasticsearch.user.service.UserService; +import org.duniter.elasticsearch.user.service.event.UserEvent; +import org.duniter.elasticsearch.user.service.event.UserEventCodes; import org.duniter.elasticsearch.user.service.event.UserEventService; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -46,6 +48,7 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse private final ThreadPool threadPool; private final Injector injector; private final static ESLogger logger = Loggers.getLogger("node"); + private final String clusterName; @Inject public PluginInit(Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) { @@ -53,6 +56,7 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse this.pluginSettings = pluginSettings; this.threadPool = threadPool; this.injector = injector; + this.clusterName = settings.get("cluster.name"); } @Override @@ -66,6 +70,15 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse }, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN); }, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN); + // When started + threadPool.scheduleOnStarted(() -> { + // Notify admin + injector.getInstance(UserEventService.class) + .notifyAdmin(new UserEvent( + UserEvent.EventType.INFO, + UserEventCodes.NODE_STARTED.name(), + new String[]{clusterName})); + }); } @Override diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/event/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/event/UserEventService.java index 0ce4ede8..8daf4e68 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/event/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/event/UserEventService.java @@ -75,6 +75,7 @@ public class UserEventService extends AbstractService implements ChangeListener private final ThreadPool threadPool; public final KeyPair nodeKeyPair; public final String nodePubkey; + public final boolean mailEnable; @Inject public UserEventService(Client client, PluginSettings settings, CryptoService cryptoService, MailService mailService, @@ -84,6 +85,10 @@ public class UserEventService extends AbstractService implements ChangeListener this.threadPool = threadPool; this.nodeKeyPair = getNodeKeyPairOrNull(pluginSettings); this.nodePubkey = getNodePubKey(this.nodeKeyPair); + this.mailEnable = pluginSettings.getMailEnable(); + if (!this.mailEnable && logger.isTraceEnabled()) { + logger.trace("Mail disable"); + } ChangeService.registerListener(this); } @@ -321,6 +326,8 @@ public class UserEventService extends AbstractService implements ChangeListener * Send email */ private void sendEmail(String recipients, String subject, String textContent) { + if (!this.mailEnable) return; + String smtpHost = pluginSettings.getMailSmtpHost(); int smtpPort = pluginSettings.getMailSmtpPort(); String smtpUsername = pluginSettings.getMailSmtpUsername(); @@ -331,12 +338,7 @@ public class UserEventService extends AbstractService implements ChangeListener mailService.sendTextEmail(smtpHost, smtpPort, smtpUsername, smtpPassword, from, recipients, subject, textContent); } catch(TechnicalException e) { - if (logger.isDebugEnabled()) { - logger.error(String.format("Error while trying to send email: %s", e.getMessage()), e); - } - else { - logger.error(String.format("Error while trying to send email: %s", e.getMessage())); - } + logger.error(String.format("Could not send email: %s", e.getMessage())/*, e*/); } } -- GitLab