Skip to content
Snippets Groups Projects
Commit 2b5ebf78 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

- ES: Add notifications on new or updated comment (on record)

parent fe2e5ba5
Branches
Tags
No related merge requests found
Showing
with 348 additions and 175 deletions
......@@ -27,7 +27,7 @@ import org.duniter.elasticsearch.rest.RestModule;
import org.duniter.elasticsearch.security.SecurityModule;
import org.duniter.elasticsearch.service.ServiceModule;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.websocket.WebsocketModule;
import org.duniter.elasticsearch.websocket.WebSocketModule;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
......@@ -66,7 +66,7 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
}
modules.add(new SecurityModule());
modules.add(new WebsocketModule());
modules.add(new WebSocketModule());
modules.add(new RestModule());
modules.add(new ServiceModule());
......
......@@ -281,10 +281,14 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
return settings.getAsInt("duniter.ws.port", 9200);
}
public boolean getWebSocketEnable() {
return settings.getAsBoolean("duniter.ws.enable", Boolean.TRUE);
}
/* protected methods */
protected void initI18n() throws IOException {
if (I18n.getDefaultLocale() != null) return; // already init
//if (I18n.getDefaultLocale() != null) return; // already init
// --------------------------------------------------------------------//
// init i18n
......
package org.duniter.elasticsearch.exception;
/*
* #%L
* Duniter4j :: ElasticSearch Plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.elasticsearch.rest.RestStatus;
/**
* Created by blavenie on 01/03/16.
*/
public class DocumentNotFoundException extends DuniterElasticsearchException {
public DocumentNotFoundException(Throwable cause) {
super(cause);
}
public DocumentNotFoundException(String msg, Object... args) {
super(msg, args);
}
public DocumentNotFoundException(String msg, Throwable cause, Object... args) {
super(msg, args, cause);
}
@Override
public RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
......@@ -29,6 +29,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.beans.Bean;
import org.duniter.core.client.model.elasticsearch.Record;
import org.duniter.core.exception.TechnicalException;
......@@ -147,7 +148,7 @@ public abstract class AbstractService implements Bean {
.execute().actionGet();
return response.getId();
}
protected void checkIssuerAndUpdateDocumentFromJson(String index, String type, String json, String id) {
protected void checkIssuerAndUpdateDocumentFromJson(String index, String type, String id, String json) {
JsonNode actualObj = readAndVerifyIssuerSignature(json);
String issuer = getIssuer(actualObj);
......@@ -159,6 +160,10 @@ public abstract class AbstractService implements Bean {
logger.debug(String.format("Updating %s [%s] from issuer [%s]", type, id, issuer.substring(0, 8)));
}
updateDocumentFromJson(index, type, id, json);
}
protected void updateDocumentFromJson(String index, String type, String id, String json) {
// Execute indexBlocksFromNode
client.prepareUpdate(index, type, id)
.setDoc(json)
......@@ -283,6 +288,20 @@ public abstract class AbstractService implements Bean {
}
}
/**
* Retrieve a field from a document id
* @param docId
* @return
*/
protected Object getFieldById(String index, String type, String docId, String fieldName) {
Map<String, Object> result = getFieldsById(index, type, docId, fieldName);
if (MapUtils.isEmpty(result)) {
return null;
}
return result.get(fieldName);
}
protected void bulkFromClasspathFile(String classpathFile, String indexName, String indexType) {
bulkFromClasspathFile(classpathFile, indexName, indexType, null);
}
......
......@@ -54,7 +54,7 @@ import java.io.IOException;
public class ServiceLocator
extends org.duniter.core.client.service.ServiceLocator
{
private static final ESLogger logger = ESLoggerFactory.getLogger(ServiceLocator.class.getName());
private static final ESLogger logger = ESLoggerFactory.getLogger("duniter.service");
private static BeanFactory beanFactory = null;
......
......@@ -38,16 +38,13 @@ package org.duniter.elasticsearch.websocket;
limitations under the License.
*/
import org.duniter.elasticsearch.websocket.changes.WebSocketChangeEndPoint;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
public class WebsocketModule extends AbstractModule {
private final ESLogger log = Loggers.getLogger(WebsocketModule.class);
public class WebSocketModule extends AbstractModule {
@Override
protected void configure() {
log.debug("Binding websocket Module");
bind(WebsocketServer.class).asEagerSingleton();
bind(WebSocketServer.class).asEagerSingleton();
bind(WebSocketChangeEndPoint.Init.class).asEagerSingleton();
}
}
......@@ -38,7 +38,10 @@ package org.duniter.elasticsearch.websocket;
limitations under the License.
*/
import org.duniter.core.exception.TechnicalException;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.websocket.changes.WebSocketChangeEndPoint;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
......@@ -47,20 +50,45 @@ import org.glassfish.tyrus.server.Server;
import javax.websocket.DeploymentException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
public class WebsocketServer {
public class WebSocketServer {
private final ESLogger log = Loggers.getLogger(WebsocketServer.class);
private final ESLogger log = Loggers.getLogger("duniter.ws");
private List<Class<?>> endPoints = new ArrayList<>();
@Inject
public WebsocketServer(final PluginSettings pluginSettings) {
final String host = pluginSettings.getWebSocketHost();
final int port = pluginSettings.getWebSocketPort();
public WebSocketServer(final PluginSettings pluginSettings, ThreadPool threadPool) {
// If WS enable
if (pluginSettings.getWebSocketEnable()) {
// When node started
threadPool.scheduleOnStarted(() -> {
// start WS server
startServer(pluginSettings.getWebSocketHost(),
pluginSettings.getWebSocketPort(),
getEndPoints());
});
}
}
public void addEndPoint(Class<?> endPoint) {
endPoints.add(endPoint);
}
final Server server = new Server(host, port, "/ws", null, WebsocketChangeEndPoint.class) ;
/* -- private medthod -- */
private Class[] getEndPoints() {
return endPoints.toArray(new Class<?>[endPoints.size()]);
}
private void startServer(String host, int port, Class<?>[] endPoints) {
final Server server = new Server(host, port, "/ws", null, endPoints) ;
try {
log.info("Starting websocket server");
log.info("Starting Websocket server...");
AccessController.doPrivileged(new PrivilegedAction() {
@Override
public Object run() {
......@@ -70,16 +98,16 @@ public class WebsocketServer {
// This is a workaround for that
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
server.start();
log.info("Websocket server started");
return null;
} catch (DeploymentException e) {
throw new RuntimeException("Failed to start server", e);
}
}
});
log.info("Websocket server started");
} catch (Exception e) {
log.error("Failed to start Websocket server", e);
throw new RuntimeException(e);
throw new TechnicalException(e);
}
}
......
package org.duniter.elasticsearch.websocket;
package org.duniter.elasticsearch.websocket.changes;
/*
* #%L
......@@ -40,6 +40,8 @@ package org.duniter.elasticsearch.websocket;
import org.duniter.elasticsearch.service.changes.ChangeListener;
import org.duniter.elasticsearch.service.changes.ChangeService;
import org.duniter.elasticsearch.websocket.WebSocketServer;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
......@@ -47,14 +49,22 @@ import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
@ServerEndpoint(value = "/_changes")
public class WebsocketChangeEndPoint implements ChangeListener{
public class WebSocketChangeEndPoint implements ChangeListener{
private final ESLogger log = Loggers.getLogger(WebsocketChangeEndPoint.class);
public static class Init {
@Inject
public Init(WebSocketServer webSocketServer) {
webSocketServer.addEndPoint(WebSocketChangeEndPoint.class);
}
}
private final ESLogger log = Loggers.getLogger("duniter.ws.changes");
private Session session;
@OnOpen
public void onOpen(Session session) {
log.info("Connected ... " + session.getId());
log.debug("Connected ... " + session.getId());
this.session = session;
ChangeService.registerListener(this);
}
......@@ -71,12 +81,12 @@ public class WebsocketChangeEndPoint implements ChangeListener{
@OnMessage
public void onMessage(String message) {
log.info("Received message: "+message);
log.debug("Received message: "+message);
}
@OnClose
public void onClose(CloseReason reason) {
log.info("Closing websocket: "+reason);
log.debug("Closing websocket: "+reason);
ChangeService.unregisterListener(this);
this.session = null;
}
......
......@@ -49,7 +49,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
private final static ESLogger logger = Loggers.getLogger("gchange");
@Inject
public PluginInit(Client client, Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) {
public PluginInit(Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) {
super(settings);
this.pluginSettings = pluginSettings;
this.threadPool = threadPool;
......
......@@ -25,35 +25,28 @@ package org.duniter.elasticsearch.gchange.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Joiner;
import com.google.gson.JsonSyntaxException;
import org.duniter.core.client.model.elasticsearch.Record;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.client.model.elasticsearch.RecordComment;
import org.duniter.core.client.service.bma.WotRemoteService;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.exception.DocumentNotFoundException;
import org.duniter.elasticsearch.gchange.PluginSettings;
import org.duniter.elasticsearch.gchange.model.MarketRecord;
import org.duniter.elasticsearch.gchange.model.event.GchangeEventCodes;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.user.service.UserService;
import org.duniter.elasticsearch.user.service.event.UserEvent;
import org.duniter.elasticsearch.user.service.event.UserEventLink;
import org.duniter.elasticsearch.user.service.event.UserEventService;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.nuiton.i18n.I18n;
import java.io.IOException;
......@@ -73,15 +66,18 @@ public class MarketService extends AbstractService {
private WotRemoteService wotRemoteService;
private UserEventService userEventService;
private UserService userService;
@Inject
public MarketService(Client client, PluginSettings settings,
CryptoService cryptoService,
WotRemoteService wotRemoteService,
UserService userService,
UserEventService userEventService) {
super("gchange." + INDEX, client, settings, cryptoService);
this.wotRemoteService = wotRemoteService;
this.userEventService = userEventService;
this.userService = userService;
}
/**
......@@ -170,38 +166,33 @@ public class MarketService extends AbstractService {
}
public String indexCommentFromJson(String json) {
JsonNode actualObj = readAndVerifyIssuerSignature(json);
String issuer = getMandatoryField(actualObj, RecordComment.PROPERTY_ISSUER).asText();
JsonNode commentObj = readAndVerifyIssuerSignature(json);
String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a %s from issuer [%s]", RECORD_COMMENT_TYPE, issuer.substring(0, 8)));
}
String commentId = indexDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json);
// Notification
{
// Notify issuer of record (is not same as comment writer)
String recordId = getMandatoryField(actualObj, RecordComment.PROPERTY_RECORD).asText();
Map<String, Object> recordFields = getRecordFieldsById(recordId, MarketRecord.PROPERTY_TITLE, MarketRecord.PROPERTY_ISSUER);
String recordIssuer = recordFields.get(MarketRecord.PROPERTY_ISSUER).toString();
String recordTitle = recordFields.get(MarketRecord.PROPERTY_TITLE).toString();
if (!issuer.equals(recordIssuer)) {
userEventService.notifyUser(recordIssuer,
new UserEvent(UserEvent.EventType.INFO,
GchangeEventCodes.NEW_COMMENT.name(),
new UserEventLink(INDEX, RECORD_TYPE, recordId),
I18n.n("duniter.market.event.newComment"),
issuer, recordTitle
)
);
}
}
// Notify record issuer
notifyRecordIssuerForComment(commentObj, true);
return commentId;
}
public void updateCommentFromJson(String json, String id) {
checkIssuerAndUpdateDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json, id);
JsonNode commentObj = readAndVerifyIssuerSignature(json);
if (logger.isDebugEnabled()) {
String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText();
logger.debug(String.format("Indexing a %s from issuer [%s]", RECORD_COMMENT_TYPE, issuer.substring(0, 8)));
}
updateDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, id, json);
// Notify record issuer
notifyRecordIssuerForComment(commentObj, false);
}
public MarketService fillRecordCategories() {
......@@ -423,16 +414,34 @@ public class MarketService extends AbstractService {
}
}
/**
* Retrieve record field's values
* @param recordId
* @param fieldNames
* @return
*/
protected Map<String, Object> getRecordFieldsById(String recordId, String... fieldNames) {
// Notification
private void notifyRecordIssuerForComment(JsonNode actualObj, boolean isNewComment) {
String issuer = getMandatoryField(actualObj, RecordComment.PROPERTY_ISSUER).asText();
return getFieldsById(INDEX, RECORD_TYPE, recordId, fieldNames);
// Notify issuer of record (is not same as comment writer)
String recordId = getMandatoryField(actualObj, RecordComment.PROPERTY_RECORD).asText();
Map<String, Object> recordFields = getFieldsById(INDEX, RECORD_TYPE, recordId,
MarketRecord.PROPERTY_TITLE, MarketRecord.PROPERTY_ISSUER);
if (MapUtils.isEmpty(recordFields)) { // record not found
throw new DocumentNotFoundException(I18n.t("duniter.market.error.comment.recordNotFound", recordId));
}
String recordIssuer = recordFields.get(MarketRecord.PROPERTY_ISSUER).toString();
// Get user title
String issuerTitle = userService.getProfileTitle(issuer);
String recordTitle = recordFields.get(MarketRecord.PROPERTY_TITLE).toString();
if (!issuer.equals(recordIssuer)) {
userEventService.notifyUser(recordIssuer,
new UserEvent(UserEvent.EventType.INFO,
GchangeEventCodes.NEW_COMMENT.name(),
new UserEventLink(INDEX, RECORD_TYPE, recordId),
I18n.n(isNewComment ? "duniter.market.event.newComment": "duniter.market.event.updateComment"),
issuerTitle != null ? issuerTitle : issuer.substring(0, 8),
recordTitle
)
);
}
}
}
......@@ -25,12 +25,20 @@ package org.duniter.elasticsearch.gchange.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.gson.Gson;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.client.model.bma.gson.GsonUtils;
import org.duniter.core.client.model.elasticsearch.RecordComment;
import org.duniter.core.client.service.bma.BlockchainRemoteService;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.exception.DocumentNotFoundException;
import org.duniter.elasticsearch.gchange.PluginSettings;
import org.duniter.elasticsearch.gchange.model.MarketRecord;
import org.duniter.elasticsearch.gchange.model.event.GchangeEventCodes;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.user.service.event.UserEvent;
import org.duniter.elasticsearch.user.service.event.UserEventLink;
import org.duniter.elasticsearch.user.service.event.UserEventService;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
......@@ -38,8 +46,10 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.nuiton.i18n.I18n;
import java.io.IOException;
import java.util.Map;
/**
* Created by Benoit on 30/03/2015.
......@@ -54,15 +64,18 @@ public class RegistryService extends AbstractService {
private final Gson gson;
private BlockchainRemoteService blockchainRemoteService;
private UserEventService userEventService;
@Inject
public RegistryService(Client client,
PluginSettings settings,
CryptoService cryptoService,
BlockchainRemoteService blockchainRemoteService) {
BlockchainRemoteService blockchainRemoteService,
UserEventService userEventService) {
super("gchange." + INDEX, client, settings, cryptoService);
this.gson = GsonUtils.newBuilder().create();
this.blockchainRemoteService = blockchainRemoteService;
this.userEventService = userEventService;
}
/**
......@@ -136,6 +149,7 @@ public class RegistryService extends AbstractService {
public String indexCommentFromJson(String json) {
return checkIssuerAndIndexDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json);
}
public void updateCommentFromJson(String json, String id) {
......
......@@ -90,7 +90,7 @@
<configuration>
<attach>true</attach>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${bundlePrefix}</finalName>
<finalName>${project.artifactId}-${project.version}</finalName>
<descriptors>
<descriptor>
${basedir}/src/main/assembly/plugin.xml
......
......@@ -25,6 +25,7 @@ package org.duniter.elasticsearch.user;
import com.google.common.collect.Lists;
import org.duniter.elasticsearch.user.rest.RestModule;
import org.duniter.elasticsearch.user.service.ServiceModule;
import org.duniter.elasticsearch.user.websocket.WebSocketModule;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
......@@ -64,6 +65,8 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
modules.add(new RestModule());
modules.add(new ServiceModule());
modules.add(new WebSocketModule());
return modules;
}
......
......@@ -31,6 +31,8 @@ import org.duniter.elasticsearch.user.service.UserService;
import org.duniter.elasticsearch.user.service.event.UserEvent;
import org.duniter.elasticsearch.user.service.event.UserEventCodes;
import org.duniter.elasticsearch.user.service.event.UserEventService;
import org.duniter.elasticsearch.user.websocket.WebsocketUserEventEndPoint;
import org.duniter.elasticsearch.websocket.WebSocketServer;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
......@@ -110,9 +112,6 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse
injector.getInstance(UserService.class)
.deleteIndex()
.createIndexIfNotExists();
injector.getInstance(UserEventService.class)
.deleteIndex()
.createIndexIfNotExists();
if (logger.isInfoEnabled()) {
......@@ -126,7 +125,6 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse
injector.getInstance(HistoryService.class).createIndexIfNotExists();
injector.getInstance(UserService.class).createIndexIfNotExists();
injector.getInstance(MessageService.class).createIndexIfNotExists();
injector.getInstance(UserEventService.class).createIndexIfNotExists();
if (logger.isInfoEnabled()) {
logger.info("Checking Duniter indices... [OK]");
......
......@@ -25,12 +25,14 @@ package org.duniter.elasticsearch.user.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import org.duniter.core.client.model.elasticsearch.UserProfile;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.service.MailService;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.exception.AccessDeniedException;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.user.service.event.UserEventService;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
......@@ -91,6 +93,7 @@ public class UserService extends AbstractService {
createIndexRequestBuilder.setSettings(indexSettings);
createIndexRequestBuilder.addMapping(PROFILE_TYPE, createProfileType());
createIndexRequestBuilder.addMapping(SETTINGS_TYPE, createSettingsType());
createIndexRequestBuilder.addMapping(UserEventService.EVENT_TYPE, UserEventService.createEventType());
createIndexRequestBuilder.execute().actionGet();
return this;
......@@ -194,6 +197,12 @@ public class UserService extends AbstractService {
}
public String getProfileTitle(String issuer) {
Object title = getFieldById(INDEX, PROFILE_TYPE, issuer, UserProfile.PROPERTY_TITLE);
if (title == null) return null;
return title.toString();
}
/* -- Internal methods -- */
......
......@@ -84,6 +84,10 @@ public class UserEvent {
return time;
}
public UserEventLink getLink() {
return link;
}
public enum EventType {
INFO,
WARN,
......
......@@ -2,5 +2,6 @@ package org.duniter.elasticsearch.user.service.event;
public interface UserEventListener {
String getId();
String getPubkey();
void onEvent(UserEvent event);
}
\ No newline at end of file
......@@ -57,7 +57,7 @@ import java.util.Map;
/**
* Created by Benoit on 30/03/2015.
*/
public class UserEventService extends AbstractService implements ChangeListener {
public class UserEventService extends AbstractService {
public static final String INDEX = "user";
public static final String EVENT_TYPE = "event";
......@@ -89,7 +89,6 @@ public class UserEventService extends AbstractService implements ChangeListener
if (!this.mailEnable && logger.isTraceEnabled()) {
logger.trace("Mail disable");
}
//ChangeService.registerListener(this);
}
/**
......@@ -128,87 +127,16 @@ public class UserEventService extends AbstractService implements ChangeListener
}, TimeValue.timeValueMillis(100));
}
@Override
public void onChanges(String json) {
// TODO get doc issuer
/* String issuer = nodePubkey;
ChangeEvent event = ChangeUtils.fromJson(objectMapper, json);
// Skip event itself (avoid recursive call)
if (event.getIndex().equals(INDEX) && event.getType().equals(EVENT_TYPE)) {
return;
}
if (event.getOperation() == ChangeEvent.Operation.CREATE) {
notifyNewDocument(event.getIndex(), event.getType(), event.getId(), issuer);
}*/
}
@Override
public String getId() {
return "UserEventService";
}
/**
* Delete blockchain index, and all data
*/
public UserEventService deleteIndex() {
deleteIndexIfExists(INDEX);
return this;
}
public boolean existsIndex() {
return super.existsIndex(INDEX);
}
/**
* Create index need for blockchain registry, if need
*/
public UserEventService createIndexIfNotExists() {
try {
if (!existsIndex(INDEX)) {
createIndex();
}
}
catch(JsonProcessingException e) {
throw new TechnicalException(String.format("Error while creating index [%s]", INDEX));
}
return this;
}
/**
* Create index need for category registry
* @throws JsonProcessingException
*/
public UserEventService createIndex() throws JsonProcessingException {
logger.info(String.format("Creating index [%s/%s]", INDEX, EVENT_TYPE));
CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(INDEX);
Settings indexSettings = Settings.settingsBuilder()
.put("number_of_shards", 2)
.put("number_of_replicas", 1)
//.put("analyzer", createDefaultAnalyzer())
.build();
createIndexRequestBuilder.setSettings(indexSettings);
createIndexRequestBuilder.addMapping(EVENT_TYPE, createEventType());
createIndexRequestBuilder.execute().actionGet();
return this;
}
public String indexEvent(String recipient, Locale locale, UserEvent event) {
// Generate json
String eventJson;
if (StringUtils.isNotBlank(nodePubkey)) {
eventJson = toJson(nodePubkey, recipient, locale, event, null);
eventJson = UserEventUtils.toJson(nodePubkey, recipient, locale, event, null);
String signature = cryptoService.sign(eventJson, nodeKeyPair.getSecKey());
eventJson = toJson(nodePubkey, recipient, locale, event, signature);
eventJson = UserEventUtils.toJson(nodePubkey, recipient, locale, event, signature);
} else {
// Node has not keyring : TODO no issuer ?
eventJson = toJson(recipient, recipient, locale, event, null);
eventJson = UserEventUtils.toJson(recipient, recipient, locale, event, null);
}
if (logger.isDebugEnabled()) {
......@@ -246,7 +174,7 @@ public class UserEventService extends AbstractService implements ChangeListener
/* -- Internal methods -- */
public XContentBuilder createEventType() {
public static XContentBuilder createEventType() {
try {
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(EVENT_TYPE)
.startObject("properties")
......@@ -350,29 +278,7 @@ public class UserEventService extends AbstractService implements ChangeListener
}
}
private String toJson(String issuer, String recipient, Locale locale, UserEvent event, String signature) {
try {
XContentBuilder eventObject = XContentFactory.jsonBuilder().startObject()
.field("type", event.getType().name())
.field("issuer", issuer) // TODO isuer = node pubkey
.field("recipient", recipient)
.field("time", event.getTime())
.field("code", event.getCode())
.field("message", event.getLocalizedMessage(locale));
if (CollectionUtils.isNotEmpty(event.getParams())) {
eventObject.array("params", event.getParams());
}
if (StringUtils.isNotBlank(signature)) {
eventObject.field("signature", signature);
}
eventObject.endObject();
return eventObject.string();
}
catch(IOException e) {
throw new TechnicalException(e);
}
}
private KeyPair getNodeKeyPairOrNull(PluginSettings pluginSettings) {
......@@ -402,6 +308,7 @@ public class UserEventService extends AbstractService implements ChangeListener
indexEvent(recipient, locale, event);
// Send email to user
// TODO : group email by day ?
if (StringUtils.isNotBlank(email)) {
String subjectPrefix = pluginSettings.getMailSubjectPrefix();
sendEmail(email,
......@@ -410,7 +317,9 @@ public class UserEventService extends AbstractService implements ChangeListener
}
for (UserEventListener listener: LISTENERS.values()) {
if (recipient.equals(listener.getPubkey())) {
listener.onEvent(event);
}
}
}
}
package org.duniter.elasticsearch.user.service.event;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.StringUtils;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.Locale;
/**
* Created by blavenie on 02/12/16.
*/
public abstract class UserEventUtils {
public static String toJson(String issuer, String recipient, Locale locale, UserEvent event, String signature) {
try {
XContentBuilder eventObject = XContentFactory.jsonBuilder().startObject()
.field("type", event.getType().name())
.field("issuer", issuer) // TODO isuer = node pubkey
.field("recipient", recipient)
.field("time", event.getTime())
.field("code", event.getCode())
.field("message", event.getLocalizedMessage(locale));
if (CollectionUtils.isNotEmpty(event.getParams())) {
eventObject.array("params", event.getParams());
}
// Link
UserEventLink link = event.getLink();
if (link != null) {
eventObject.startObject("link")
.field("index", link.getIndex())
.field("type", link.getType());
if (StringUtils.isNotBlank(link.getId())) {
eventObject.field("id", link.getId());
}
eventObject.endObject();
}
if (StringUtils.isNotBlank(signature)) {
eventObject.field("signature", signature);
}
eventObject.endObject();
return eventObject.string();
}
catch(IOException e) {
throw new TechnicalException(e);
}
}
public static String toJson(Locale locale, UserEvent event) {
try {
XContentBuilder eventObject = XContentFactory.jsonBuilder().startObject()
.field("type", event.getType().name())
.field("time", event.getTime())
.field("code", event.getCode())
.field("message", event.getLocalizedMessage(locale));
if (CollectionUtils.isNotEmpty(event.getParams())) {
eventObject.array("params", event.getParams());
}
// Link
UserEventLink link = event.getLink();
if (link != null) {
eventObject.startObject("link")
.field("index", link.getIndex())
.field("type", link.getType());
if (StringUtils.isNotBlank(link.getId())) {
eventObject.field("id", link.getId());
}
eventObject.endObject();
}
eventObject.endObject();
return eventObject.string();
}
catch(IOException e) {
throw new TechnicalException(e);
}
}
}
package org.duniter.elasticsearch.user.websocket;
/*
* #%L
* duniter4j-elasticsearch-plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
public class WebSocketModule extends AbstractModule implements Module {
@Override protected void configure() {
bind(WebsocketUserEventEndPoint.Init.class).asEagerSingleton();
}
/* protected methods */
}
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment