Skip to content
Snippets Groups Projects
Commit 30f03309 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

Market: notify user when new comments

parent 30041649
No related branches found
No related tags found
No related merge requests found
Showing with 175 additions and 33 deletions
package org.duniter.core.client.model.elasticsearch;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
/**
* Created by blavenie on 01/03/16.
*/
public class RecordComment extends Record {
public static final String PROPERTY_MESSAGE="message";
public static final String PROPERTY_RECORD="record";
private String message;
private String record;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getRecord() {
return record;
}
public void setRecord(String record) {
this.record = record;
}
}
......@@ -52,7 +52,7 @@ cluster.name: duniter4j-elasticsearch-TEST
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
# network.host: 192.168.233.1
#network.host: 192.168.233.118
#
# Set a custom port for HTTP:
#
......@@ -157,6 +157,7 @@ duniter.data.sync.enable: false
#
# SMTP server configuration (host and port)
#
duniter.mail.enable: false
#duniter.mail.smtp.host: localhost
#duniter.mail.smtp.port: 25
#
......@@ -174,5 +175,5 @@ duniter.mail.admin: blavenie@EIS-DEV
#
#duniter.mail.subject.prefix: [Duniter4j ES]
duniter.changes.listenSource: */block
duniter.changes.listenSource: '*/block'
duniter.ws.port: 9400
......@@ -241,6 +241,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
return settings.getAsInt("duniter.data.sync.port", 80);
}
public boolean getMailEnable() {
return settings.getAsBoolean("duniter.mail.enable", Boolean.TRUE);
}
public String getMailSmtpHost() {
return settings.get("duniter.mail.smtp.host", "localhost");
}
......@@ -269,14 +273,14 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
return settings.get("duniter.mail.subject.prefix", "[Duniter4j ES]");
}
public int getWebSocketPort() {
return settings.getAsInt("duniter.ws.port", 9200);
}
public String getWebSocketHost() {
return settings.get("network.host", "locahost");
}
public int getWebSocketPort() {
return settings.getAsInt("duniter.ws.port", 9200);
}
/* protected methods */
protected void initI18n() throws IOException {
......
......@@ -128,14 +128,16 @@ public abstract class AbstractService implements Bean {
logger.debug(String.format("Indexing a %s from issuer [%s]", type, issuer.substring(0, 8)));
}
return indexDocumentFromJson(index, type, json);
}
protected String indexDocumentFromJson(String index, String type, String json) {
IndexResponse response = client.prepareIndex(index, type)
.setSource(json)
.setRefresh(false)
.execute().actionGet();
String id = response.getId();
return id;
return response.getId();
}
protected void checkIssuerAndUpdateDocumentFromJson(String index, String type, String json, String id) {
JsonNode actualObj = readAndVerifyIssuerSignature(json);
......@@ -174,8 +176,8 @@ public abstract class AbstractService implements Bean {
|| !fieldNames.contains(Record.PROPERTY_SIGNATURE)) {
throw new InvalidFormatException(String.format("Invalid record JSON format. Required fields [%s,%s]", Record.PROPERTY_ISSUER, Record.PROPERTY_SIGNATURE));
}
String issuer = actualObj.get(Record.PROPERTY_ISSUER).asText();
String signature = actualObj.get(Record.PROPERTY_SIGNATURE).asText();
String issuer = getMandatoryField(actualObj, Record.PROPERTY_ISSUER).asText();
String signature = getMandatoryField(actualObj, Record.PROPERTY_SIGNATURE).asText();
String recordNoSign = recordJson.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_SIGNATURE), "")
.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), "");
......@@ -216,6 +218,14 @@ public abstract class AbstractService implements Bean {
return actualObj.get(Record.PROPERTY_ISSUER).asText();
}
protected JsonNode getMandatoryField(JsonNode actualObj, String fieldName) {
JsonNode value = actualObj.get(fieldName);
if (value.isMissingNode()) {
throw new InvalidFormatException(String.format("Invalid format. Expected field '%s'", fieldName));
}
return value;
}
protected void bulkFromClasspathFile(String classpathFile, String indexName, String indexType) {
bulkFromClasspathFile(classpathFile, indexName, indexType, null);
}
......
......@@ -47,8 +47,6 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
private final ThreadPool threadPool;
private final Injector injector;
private final static ESLogger logger = Loggers.getLogger("gchange");
private final Client client;
private final String clusterName;
@Inject
public PluginInit(Client client, Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) {
......@@ -56,8 +54,6 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
this.pluginSettings = pluginSettings;
this.threadPool = threadPool;
this.injector = injector;
this.client = client;
this.clusterName = settings.get("cluster.name");
}
@Override
......@@ -70,16 +66,6 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
synchronize();
}, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN);
}, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN);
// When started
threadPool.scheduleOnStarted(() -> {
// Notify admin
injector.getInstance(UserEventService.class)
.notifyAdmin(new UserEvent(
UserEvent.EventType.INFO,
UserEventCodes.NODE_STARTED.name(),
new String[]{clusterName}));
});
}
@Override
......
......@@ -24,21 +24,39 @@ package org.duniter.elasticsearch.gchange.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.gson.JsonSyntaxException;
import org.duniter.core.client.model.elasticsearch.Currency;
import org.duniter.core.client.model.elasticsearch.Record;
import org.duniter.core.client.model.elasticsearch.RecordComment;
import org.duniter.core.client.service.bma.WotRemoteService;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.exception.InvalidFormatException;
import org.duniter.elasticsearch.gchange.PluginSettings;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.user.service.event.UserEvent;
import org.duniter.elasticsearch.user.service.event.UserEventCodes;
import org.duniter.elasticsearch.user.service.event.UserEventService;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* Created by Benoit on 30/03/2015.
......@@ -53,12 +71,16 @@ public class MarketService extends AbstractService {
private static final String CATEGORIES_BULK_CLASSPATH_FILE = "market-categories-bulk-insert.json";
private WotRemoteService wotRemoteService;
private UserEventService userEventService;
@Inject
public MarketService(Client client, PluginSettings settings,
CryptoService cryptoService, WotRemoteService wotRemoteService) {
CryptoService cryptoService,
WotRemoteService wotRemoteService,
UserEventService userEventService) {
super("gchange." + INDEX, client, settings, cryptoService);
this.wotRemoteService = wotRemoteService;
this.userEventService = userEventService;
}
/**
......@@ -147,7 +169,21 @@ public class MarketService extends AbstractService {
}
public String indexCommentFromJson(String json) {
return checkIssuerAndIndexDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json);
JsonNode actualObj = readAndVerifyIssuerSignature(json);
String issuer = getMandatoryField(actualObj, RecordComment.PROPERTY_ISSUER).asText();
String recordId = getMandatoryField(actualObj, RecordComment.PROPERTY_RECORD).asText();
String recordIssuer = getRecordIssuerById(recordId);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a %s from issuer [%s]", RECORD_COMMENT_TYPE, issuer.substring(0, 8)));
}
String commentId = indexDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json);
// Notify record issuer
if (!issuer.equals(recordIssuer)) {
userEventService.notifyUser(recordIssuer,
new UserEvent(UserEvent.EventType.INFO, /*TODO*/ "NEW_COMMENT"));
}
}
public void updateCommentFromJson(String json, String id) {
......@@ -373,6 +409,45 @@ public class MarketService extends AbstractService {
}
}
/**
* Retrieve a blockchain from its name
* @param recordId
* @return
*/
protected String getRecordIssuerById(String recordId) {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(INDEX)
.setTypes(RECORD_TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequest.setQuery(QueryBuilders.matchQuery("_id", recordId));
searchRequest.addFields(Record.PROPERTY_ISSUER);
// Execute query
try {
SearchResponse response = searchRequest.execute().actionGet();
// Read query result
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
if (searchHit.source() != null) {
JsonNode source = objectMapper.readTree(searchHit.source());
return source.get(Record.PROPERTY_ISSUER).asText();
}
else {
SearchHitField field = searchHit.getFields().get(Record.PROPERTY_ISSUER);
return field.getValue().toString();
}
}
}
catch(SearchPhaseExecutionException | JsonSyntaxException | IOException | UnsupportedEncodingException e) {
// Failed or no item on index
if (logger.isDebugEnabled()) {
logger.error(String.format("Unable to retrieve issuer of record [%s]", recordId), e);
}
}
return null;
}
}
......@@ -28,6 +28,8 @@ import org.duniter.elasticsearch.user.service.HistoryService;
import org.duniter.elasticsearch.user.service.MessageService;
import org.duniter.elasticsearch.user.service.SynchroService;
import org.duniter.elasticsearch.user.service.UserService;
import org.duniter.elasticsearch.user.service.event.UserEvent;
import org.duniter.elasticsearch.user.service.event.UserEventCodes;
import org.duniter.elasticsearch.user.service.event.UserEventService;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
......@@ -46,6 +48,7 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse
private final ThreadPool threadPool;
private final Injector injector;
private final static ESLogger logger = Loggers.getLogger("node");
private final String clusterName;
@Inject
public PluginInit(Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) {
......@@ -53,6 +56,7 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse
this.pluginSettings = pluginSettings;
this.threadPool = threadPool;
this.injector = injector;
this.clusterName = settings.get("cluster.name");
}
@Override
......@@ -66,6 +70,15 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse
}, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN);
}, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN);
// When started
threadPool.scheduleOnStarted(() -> {
// Notify admin
injector.getInstance(UserEventService.class)
.notifyAdmin(new UserEvent(
UserEvent.EventType.INFO,
UserEventCodes.NODE_STARTED.name(),
new String[]{clusterName}));
});
}
@Override
......
......@@ -75,6 +75,7 @@ public class UserEventService extends AbstractService implements ChangeListener
private final ThreadPool threadPool;
public final KeyPair nodeKeyPair;
public final String nodePubkey;
public final boolean mailEnable;
@Inject
public UserEventService(Client client, PluginSettings settings, CryptoService cryptoService, MailService mailService,
......@@ -84,6 +85,10 @@ public class UserEventService extends AbstractService implements ChangeListener
this.threadPool = threadPool;
this.nodeKeyPair = getNodeKeyPairOrNull(pluginSettings);
this.nodePubkey = getNodePubKey(this.nodeKeyPair);
this.mailEnable = pluginSettings.getMailEnable();
if (!this.mailEnable && logger.isTraceEnabled()) {
logger.trace("Mail disable");
}
ChangeService.registerListener(this);
}
......@@ -321,6 +326,8 @@ public class UserEventService extends AbstractService implements ChangeListener
* Send email
*/
private void sendEmail(String recipients, String subject, String textContent) {
if (!this.mailEnable) return;
String smtpHost = pluginSettings.getMailSmtpHost();
int smtpPort = pluginSettings.getMailSmtpPort();
String smtpUsername = pluginSettings.getMailSmtpUsername();
......@@ -331,12 +338,7 @@ public class UserEventService extends AbstractService implements ChangeListener
mailService.sendTextEmail(smtpHost, smtpPort, smtpUsername, smtpPassword, from, recipients, subject, textContent);
}
catch(TechnicalException e) {
if (logger.isDebugEnabled()) {
logger.error(String.format("Error while trying to send email: %s", e.getMessage()), e);
}
else {
logger.error(String.format("Error while trying to send email: %s", e.getMessage()));
}
logger.error(String.format("Could not send email: %s", e.getMessage())/*, e*/);
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment