Skip to content
Snippets Groups Projects
Commit b59a170f authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

- on index 'event' and 'message' : Add field "read_signature" to mark an event as read

parent 200715fa
No related branches found
No related tags found
No related merge requests found
Showing
with 664 additions and 149 deletions
package org.duniter.elasticsearch.rest;
/*
* #%L
* duniter4j-elasticsearch-plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.core.exception.BusinessException;
import org.duniter.elasticsearch.exception.DuniterElasticsearchException;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.*;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.OK;
public abstract class AbstractRestPostMarkAsReadAction extends BaseRestHandler {
private static ESLogger log = null;
private final JsonReadUpdater updater;
public AbstractRestPostMarkAsReadAction(Settings settings, RestController controller, Client client,
RestSecurityController securityController,
String indexName,
String typeName,
JsonReadUpdater updater) {
super(settings, controller, client);
controller.registerHandler(POST,
String.format("/%s/%s/{id}/_read", indexName, typeName),
this);
securityController.allowIndexType(POST, indexName, typeName);
log = ESLoggerFactory.getLogger(String.format("[%s]", indexName));
this.updater = updater;
}
@Override
protected void handleRequest(final RestRequest request, RestChannel restChannel, Client client) throws Exception {
String id = request.param("id");
try {
updater.handleSignature(request.content().toUtf8(), id);
restChannel.sendResponse(new BytesRestResponse(OK, id));
}
catch(DuniterElasticsearchException | BusinessException e) {
log.error(e.getMessage(), e);
restChannel.sendResponse(new XContentThrowableRestResponse(request, e));
}
catch(Exception e) {
log.error(e.getMessage(), e);
}
}
public interface JsonReadUpdater {
void handleSignature(String signature, String id) throws DuniterElasticsearchException, BusinessException;
}
}
\ No newline at end of file
......@@ -60,7 +60,9 @@ public class RestSecurityController extends AbstractLifecycleComponent<RestSecur
allowRules = new TreeSet<>();
allowRulesByMethod.put(method, allowRules);
}
if (!allowRules.contains(regexPath)) {
allowRules.add(regexPath);
}
return this;
}
......
......@@ -69,10 +69,7 @@ import org.elasticsearch.search.SearchHitField;
import org.nuiton.i18n.I18n;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.regex.Pattern;
/**
......@@ -247,6 +244,20 @@ public abstract class AbstractService implements Bean {
return value;
}
/**
* Retrieve some field from a document id, and check if all field not null
* @param docId
* @return
*/
protected Map<String, Object> getMandatoryFieldsById(String index, String type, String docId, String... fieldNames) {
Map<String, Object> fields = getFieldsById(index, type, docId, fieldNames);
if (MapUtils.isEmpty(fields)) throw new NotFoundException(String.format("Document [%s/%s/%s] not exists.", index, type, docId));
Arrays.stream(fieldNames).forEach((fieldName) -> {
if (!fields.containsKey(fieldName)) throw new NotFoundException(String.format("Document [%s/%s/%s] should have the madatory field [%s].", index, type, docId, fieldName));
});
return fields;
}
/**
* Retrieve some field from a document id
* @param docId
......@@ -259,7 +270,7 @@ public abstract class AbstractService implements Bean {
.setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequest.setQuery(QueryBuilders.matchQuery("_id", docId));
searchRequest.setQuery(QueryBuilders.idsQuery().ids(docId));
searchRequest.addFields(fieldNames);
// Execute query
......@@ -280,13 +291,54 @@ public abstract class AbstractService implements Bean {
}
catch(SearchPhaseExecutionException | JsonSyntaxException e) {
// Failed or no item on index
throw new TechnicalException(String.format("[%s/%s] Unable to retrieve fields [%s] for document [%s]",
throw new TechnicalException(String.format("[%s/%s] Unable to retrieve fields [%s] for id [%s]",
index, type,
Joiner.on(',').join(fieldNames).toString(),
docId), e);
}
}
/**
* Retrieve some field from a document id
* @param index
* @param type
* @param ids
* @param fieldName
* @return
*/
protected Map<String, Object> getFieldByIds(String index, String type, Set<String> ids, String fieldName) {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(index)
.setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequest.setQuery(QueryBuilders.idsQuery().ids(ids));
searchRequest.addFields(fieldName);
// Execute query
try {
SearchResponse response = searchRequest.execute().actionGet();
Map<String, Object> result = new HashMap<>();
// Read query result
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
Map<String, SearchHitField> hitFields = searchHit.getFields();
if (hitFields.get(fieldName) != null) {
result.put(searchHit.getId(), hitFields.get(fieldName).getValue());
}
}
return result;
}
catch(SearchPhaseExecutionException | JsonSyntaxException e) {
// Failed or no item on index
throw new TechnicalException(String.format("[%s/%s] Unable to retrieve field [%s] for ids [%s]",
index, type, fieldName,
Joiner.on(',').join(ids).toString()), e);
}
}
/**
* Retrieve a field from a document id
* @param docId
......@@ -467,52 +519,6 @@ public abstract class AbstractService implements Bean {
}
}
protected XContentBuilder createRecordCommentType(String index, String type) {
String stringAnalyzer = pluginSettings.getDefaultStringAnalyzer();
try {
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(type)
.startObject("properties")
// issuer
.startObject("issuer")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
// time
.startObject("time")
.field("type", "integer")
.endObject()
// message
.startObject("message")
.field("type", "string")
.field("analyzer", stringAnalyzer)
.endObject()
// record
.startObject("record")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
// reply to
.startObject("reply_to")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject().endObject();
return mapping;
}
catch(IOException ioe) {
throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", index, type, ioe.getMessage()), ioe);
}
}
protected void flushDeleteBulk(final String index, final String type, BulkRequestBuilder bulkRequest) {
if (bulkRequest.numberOfActions() > 0) {
BulkResponse bulkResponse = bulkRequest.get();
......
package org.duniter.elasticsearch.gchange.service;
/*
* #%L
* UCoin Java Client :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.client.model.elasticsearch.RecordComment;
import org.duniter.core.client.service.bma.WotRemoteService;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.exception.DocumentNotFoundException;
import org.duniter.elasticsearch.gchange.PluginSettings;
import org.duniter.elasticsearch.gchange.model.MarketRecord;
import org.duniter.elasticsearch.gchange.model.event.GchangeEventCodes;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.user.model.UserEvent;
import org.duniter.elasticsearch.user.service.UserEventService;
import org.duniter.elasticsearch.user.service.UserService;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.nuiton.i18n.I18n;
import java.io.IOException;
import java.util.Map;
/**
* Created by Benoit on 30/03/2015.
*/
public class CommentService extends AbstractService {
private UserEventService userEventService;
private UserService userService;
@Inject
public CommentService(Client client,
PluginSettings pluginSettings,
CryptoService cryptoService,
UserService userService,
UserEventService userEventService) {
super("gchange.comment", client, pluginSettings, cryptoService);
this.userEventService = userEventService;
this.userService = userService;
}
public String indexCommentFromJson(final String index, final String recordType, final String type, final String json) {
JsonNode commentObj = readAndVerifyIssuerSignature(json);
String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a %s from issuer [%s]", type, issuer.substring(0, 8)));
}
String commentId = indexDocumentFromJson(index, type, json);
// Notify record issuer
notifyRecordIssuerForComment(index, recordType, commentObj, true, commentId);
return commentId;
}
public void updateCommentFromJson(final String index, final String recordType, final String type, final String json, final String id) {
JsonNode commentObj = readAndVerifyIssuerSignature(json);
if (logger.isDebugEnabled()) {
String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText();
logger.debug(String.format("Indexing a %s from issuer [%s]", type, issuer.substring(0, 8)));
}
updateDocumentFromJson(index, type, id, json);
// Notify record issuer
notifyRecordIssuerForComment(index, recordType, commentObj, false, id);
}
public XContentBuilder createRecordCommentType(String index, String type) {
String stringAnalyzer = pluginSettings.getDefaultStringAnalyzer();
try {
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(type)
.startObject("properties")
// issuer
.startObject("issuer")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
// time
.startObject("time")
.field("type", "integer")
.endObject()
// message
.startObject("message")
.field("type", "string")
.field("analyzer", stringAnalyzer)
.endObject()
// record
.startObject("record")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
// reply to
.startObject("reply_to")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject().endObject();
return mapping;
}
catch(IOException ioe) {
throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", index, type, ioe.getMessage()), ioe);
}
}
/* -- Internal methods -- */
/**
* Notify user when new comment
*/
private void notifyRecordIssuerForComment(final String index, final String recordType, JsonNode actualObj, boolean isNewComment, String commentId) {
String issuer = getMandatoryField(actualObj, RecordComment.PROPERTY_ISSUER).asText();
// Notify issuer of record (is not same as comment writer)
String recordId = getMandatoryField(actualObj, RecordComment.PROPERTY_RECORD).asText();
Map<String, Object> recordFields = getFieldsById(index, recordType, recordId,
MarketRecord.PROPERTY_TITLE, MarketRecord.PROPERTY_ISSUER);
if (MapUtils.isEmpty(recordFields)) { // record not found
throw new DocumentNotFoundException(I18n.t("duniter.market.error.comment.recordNotFound", recordId));
}
String recordIssuer = recordFields.get(MarketRecord.PROPERTY_ISSUER).toString();
// Get user title
String issuerTitle = userService.getProfileTitle(issuer);
String recordTitle = recordFields.get(MarketRecord.PROPERTY_TITLE).toString();
if (!issuer.equals(recordIssuer)) {
userEventService.notifyUser(
UserEvent.newBuilder(UserEvent.EventType.INFO, GchangeEventCodes.NEW_COMMENT.name())
.setMessage(
isNewComment ? I18n.n("duniter.market.event.newComment") : I18n.n("duniter.market.event.updateComment"),
issuerTitle != null ? issuerTitle : issuer.substring(0, 8),
recordTitle)
.setRecipient(recordIssuer)
.setReference(index, recordType, recordId)
.setReferenceAnchor(commentId)
.build());
}
}
}
......@@ -65,18 +65,18 @@ public class MarketService extends AbstractService {
private WotRemoteService wotRemoteService;
private UserEventService userEventService;
private UserService userService;
private CommentService commentService;
@Inject
public MarketService(Client client, PluginSettings settings,
CryptoService cryptoService,
CommentService commentService,
WotRemoteService wotRemoteService,
UserService userService,
UserEventService userEventService) {
super("gchange." + INDEX, client, settings, cryptoService);
this.commentService = commentService;
this.wotRemoteService = wotRemoteService;
this.userEventService = userEventService;
this.userService = userService;
}
/**
......@@ -88,7 +88,6 @@ public class MarketService extends AbstractService {
return this;
}
public boolean existsIndex() {
return super.existsIndex(INDEX);
}
......@@ -128,7 +127,7 @@ public class MarketService extends AbstractService {
createIndexRequestBuilder.setSettings(indexSettings);
createIndexRequestBuilder.addMapping(RECORD_CATEGORY_TYPE, createRecordCategoryType());
createIndexRequestBuilder.addMapping(RECORD_TYPE, createRecordType());
createIndexRequestBuilder.addMapping(RECORD_COMMENT_TYPE, createRecordCommentType(INDEX, RECORD_COMMENT_TYPE));
createIndexRequestBuilder.addMapping(RECORD_COMMENT_TYPE, commentService.createRecordCommentType(INDEX, RECORD_COMMENT_TYPE));
createIndexRequestBuilder.execute().actionGet();
return this;
......@@ -165,33 +164,11 @@ public class MarketService extends AbstractService {
}
public String indexCommentFromJson(String json) {
JsonNode commentObj = readAndVerifyIssuerSignature(json);
String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a %s from issuer [%s]", RECORD_COMMENT_TYPE, issuer.substring(0, 8)));
}
String commentId = indexDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json);
// Notify record issuer
notifyRecordIssuerForComment(commentObj, true);
return commentId;
return commentService.indexCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, json);
}
public void updateCommentFromJson(String json, String id) {
JsonNode commentObj = readAndVerifyIssuerSignature(json);
if (logger.isDebugEnabled()) {
String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText();
logger.debug(String.format("Indexing a %s from issuer [%s]", RECORD_COMMENT_TYPE, issuer.substring(0, 8)));
}
updateDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, id, json);
// Notify record issuer
notifyRecordIssuerForComment(commentObj, false);
commentService.updateCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, json, id);
}
public MarketService fillRecordCategories() {
......@@ -413,34 +390,4 @@ public class MarketService extends AbstractService {
}
}
// Notification
private void notifyRecordIssuerForComment(JsonNode actualObj, boolean isNewComment) {
String issuer = getMandatoryField(actualObj, RecordComment.PROPERTY_ISSUER).asText();
// Notify issuer of record (is not same as comment writer)
String recordId = getMandatoryField(actualObj, RecordComment.PROPERTY_RECORD).asText();
Map<String, Object> recordFields = getFieldsById(INDEX, RECORD_TYPE, recordId,
MarketRecord.PROPERTY_TITLE, MarketRecord.PROPERTY_ISSUER);
if (MapUtils.isEmpty(recordFields)) { // record not found
throw new DocumentNotFoundException(I18n.t("duniter.market.error.comment.recordNotFound", recordId));
}
String recordIssuer = recordFields.get(MarketRecord.PROPERTY_ISSUER).toString();
// Get user title
String issuerTitle = userService.getProfileTitle(issuer);
String recordTitle = recordFields.get(MarketRecord.PROPERTY_TITLE).toString();
if (!issuer.equals(recordIssuer)) {
userEventService.notifyUser(
UserEvent.newBuilder(UserEvent.EventType.INFO, GchangeEventCodes.NEW_COMMENT.name())
.setMessage(
isNewComment ? I18n.n("duniter.market.event.newComment") : I18n.n("duniter.market.event.updateComment"),
issuerTitle != null ? issuerTitle : issuer.substring(0, 8),
recordTitle)
.setRecipient(recordIssuer)
.setReference(INDEX, RECORD_TYPE, recordId)
.build());
}
}
}
......@@ -24,8 +24,6 @@ package org.duniter.elasticsearch.gchange.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.gson.Gson;
import org.duniter.core.client.model.bma.gson.GsonUtils;
import org.duniter.core.client.service.bma.BlockchainRemoteService;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
......@@ -53,19 +51,17 @@ public class RegistryService extends AbstractService {
public static final String RECORD_COMMENT_TYPE = "comment";
private static final String CATEGORIES_BULK_CLASSPATH_FILE = "registry-categories-bulk-insert.json";
private final Gson gson;
private BlockchainRemoteService blockchainRemoteService;
private CommentService commentService;
private UserEventService userEventService;
@Inject
public RegistryService(Client client,
PluginSettings settings,
CryptoService cryptoService,
BlockchainRemoteService blockchainRemoteService,
CommentService commentService,
UserEventService userEventService) {
super("gchange." + INDEX, client, settings, cryptoService);
this.gson = GsonUtils.newBuilder().create();
this.blockchainRemoteService = blockchainRemoteService;
this.commentService = commentService;
this.userEventService = userEventService;
}
......@@ -102,7 +98,7 @@ public class RegistryService extends AbstractService {
createIndexRequestBuilder.setSettings(indexSettings);
createIndexRequestBuilder.addMapping(RECORD_CATEGORY_TYPE, createRecordCategoryType());
createIndexRequestBuilder.addMapping(RECORD_TYPE, createRecordType());
createIndexRequestBuilder.addMapping(RECORD_COMMENT_TYPE, createRecordCommentType(INDEX, RECORD_COMMENT_TYPE));
createIndexRequestBuilder.addMapping(RECORD_COMMENT_TYPE, commentService.createRecordCommentType(INDEX, RECORD_COMMENT_TYPE));
createIndexRequestBuilder.execute().actionGet();
return this;
......@@ -139,12 +135,11 @@ public class RegistryService extends AbstractService {
}
public String indexCommentFromJson(String json) {
return checkIssuerAndIndexDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json);
return commentService.indexCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, json);
}
public void updateCommentFromJson(String json, String id) {
checkIssuerAndUpdateDocumentFromJson(INDEX, RECORD_COMMENT_TYPE, json, id);
commentService.updateCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, json, id);
}
/* -- Internal methods -- */
......
......@@ -30,6 +30,7 @@ public class ServiceModule extends AbstractModule implements Module {
@Override protected void configure() {
bind(RegistryService.class).asEagerSingleton();
bind(CitiesRegistryService.class).asEagerSingleton();
bind(CommentService.class).asEagerSingleton();
bind(MarketService.class).asEagerSingleton();
bind(SynchroService.class).asEagerSingleton();
}
......
package org.duniter.elasticsearch.user.model;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.duniter.core.client.model.elasticsearch.Record;
import org.duniter.core.exception.TechnicalException;
import org.nuiton.i18n.I18n;
import java.util.Locale;
/**
* Created by blavenie on 29/11/16.
*/
public class Message extends Record {
public static final String PROPERTY_NONCE="nonce";
public static final String PROPERTY_CONTENT="content";
public static final String PROPERTY_RECIPIENT="recipient";
public static final String PROPERTY_READ_SIGNATURE="readSignature";
private String nonce;
private String recipient;
private String content;
private String readSignature;
public Message() {
super();
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getRecipient() {
return recipient;
}
public void setRecipient(String recipient) {
this.recipient = recipient;
}
@JsonGetter("read_signature")
public String getReadSignature() {
return readSignature;
}
@JsonSetter("read_signature")
public void setReadSignature(String readSignature) {
this.readSignature = readSignature;
}
public String getNonce() {
return nonce;
}
public void setNonce(String nonce) {
this.nonce = nonce;
}
@JsonIgnore
public String toJson() {
try {
ObjectMapper mapper = new ObjectMapper();
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
return mapper.writeValueAsString(this);
} catch(Exception e) {
throw new TechnicalException(e);
}
}
}
......@@ -22,8 +22,10 @@ package org.duniter.elasticsearch.user.model;
* #L%
*/
import com.fasterxml.jackson.annotation.JsonGetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.duniter.core.client.model.elasticsearch.Record;
......@@ -62,6 +64,8 @@ public class UserEvent extends Record {
public static final String PROPERTY_REFERENCE="reference";
public static final String PROPERTY_RECIPIENT="recipient";
public static final String PROPERTY_READ_SIGNATURE="readSignature";
private EventType type;
......@@ -75,6 +79,8 @@ public class UserEvent extends Record {
private Reference reference;
private String readSignature;
public UserEvent() {
super();
}
......@@ -96,6 +102,7 @@ public class UserEvent extends Record {
this.reference = (another.getReference() != null) ? new Reference(another.getReference()) : null;
this.message = another.getMessage();
this.recipient = another.getRecipient();
this.readSignature = another.getReadSignature();
}
public EventType getType() {
......@@ -150,6 +157,16 @@ public class UserEvent extends Record {
this.recipient = recipient;
}
@JsonGetter("read_signature")
public String getReadSignature() {
return readSignature;
}
@JsonSetter("read_signature")
public void setReadSignature(String readSignature) {
this.readSignature = readSignature;
}
@JsonIgnore
public String toJson() {
try {
......@@ -216,6 +233,12 @@ public class UserEvent extends Record {
return this;
}
public Builder setReferenceAnchor(String anchor) {
Preconditions.checkNotNull(result.getReference(), "No reference set. Please call setReference() first");
result.getReference().setAnchor(anchor);
return this;
}
public Builder setTime(long time) {
result.setTime(time);
return this;
......
......@@ -24,11 +24,9 @@ package org.duniter.elasticsearch.user.rest;
import org.duniter.elasticsearch.user.rest.history.RestHistoryDeleteIndexAction;
import org.duniter.elasticsearch.user.rest.message.RestMessageInboxIndexAction;
import org.duniter.elasticsearch.user.rest.message.RestMessageMarkAsReadAction;
import org.duniter.elasticsearch.user.rest.message.RestMessageOutboxIndexAction;
import org.duniter.elasticsearch.user.rest.user.RestUserProfileIndexAction;
import org.duniter.elasticsearch.user.rest.user.RestUserProfileUpdateAction;
import org.duniter.elasticsearch.user.rest.user.RestUserSettingsIndexAction;
import org.duniter.elasticsearch.user.rest.user.RestUserSettingsUpdateAction;
import org.duniter.elasticsearch.user.rest.user.*;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
......@@ -41,6 +39,7 @@ public class RestModule extends AbstractModule implements Module {
bind(RestUserProfileUpdateAction.class).asEagerSingleton();
bind(RestUserSettingsIndexAction.class).asEagerSingleton();
bind(RestUserSettingsUpdateAction.class).asEagerSingleton();
bind(RestUserEventMarkAsReadAction.class).asEagerSingleton();
// History
bind(RestHistoryDeleteIndexAction.class).asEagerSingleton();
......@@ -48,5 +47,6 @@ public class RestModule extends AbstractModule implements Module {
// Message
bind(RestMessageInboxIndexAction.class).asEagerSingleton();
bind(RestMessageOutboxIndexAction.class).asEagerSingleton();
bind(RestMessageMarkAsReadAction.class).asEagerSingleton();
}
}
\ No newline at end of file
package org.duniter.elasticsearch.user.rest.message;
/*
* #%L
* duniter4j-elasticsearch-plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.elasticsearch.rest.AbstractRestPostMarkAsReadAction;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.user.service.MessageService;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
public class RestMessageMarkAsReadAction extends AbstractRestPostMarkAsReadAction {
@Inject
public RestMessageMarkAsReadAction(Settings settings, RestController controller, Client client,
RestSecurityController securityController,
MessageService messageService) {
super(settings, controller, client, securityController, MessageService.INDEX, MessageService.RECORD_TYPE,
(signature, id) -> {
messageService.markMessageAsRead(signature, id);
});
}
}
\ No newline at end of file
package org.duniter.elasticsearch.user.rest.user;
/*
* #%L
* duniter4j-elasticsearch-plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.elasticsearch.rest.AbstractRestPostMarkAsReadAction;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.user.service.UserEventService;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestController;
public class RestUserEventMarkAsReadAction extends AbstractRestPostMarkAsReadAction {
@Inject
public RestUserEventMarkAsReadAction(Settings settings, RestController controller, Client client,
RestSecurityController securityController,
UserEventService userEventService) {
super(settings, controller, client, securityController, UserEventService.INDEX, UserEventService.EVENT_TYPE,
(signature, id) -> {
userEventService.markEventAsRead(signature, id);
});
}
}
\ No newline at end of file
......@@ -24,8 +24,10 @@ package org.duniter.elasticsearch.user.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.client.model.ModelUtils;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
......@@ -55,6 +57,8 @@ public class BlockchainUserEventService extends AbstractService implements Chang
public static final String DEFAULT_PUBKEYS_SEPARATOR = ", ";
public final UserService userService;
public final UserEventService userEventService;
public final ObjectMapper objectMapper;
......@@ -66,8 +70,10 @@ public class BlockchainUserEventService extends AbstractService implements Chang
@Inject
public BlockchainUserEventService(Client client, PluginSettings settings, CryptoService cryptoService,
BlockchainService blockchainService,
UserService userService,
UserEventService userEventService) {
super("duniter.user.event.blockchain", client, settings, cryptoService);
this.userService = userService;
this.userEventService = userEventService;
this.objectMapper = JacksonUtils.newObjectMapper();
this.changeListenSources = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE));
......@@ -197,10 +203,8 @@ public class BlockchainUserEventService extends AbstractService implements Chang
private void processTx(BlockchainBlock block, BlockchainBlock.Transaction tx) {
Set<String> senders = ImmutableSet.copyOf(tx.getIssuers());
// Received
// TODO get profile name
String sendersString = ModelUtils.joinPubkeys(senders, true, DEFAULT_PUBKEYS_SEPARATOR);
String sendersString = joinPubkeys(senders, true);
Set<String> receivers = new HashSet<>();
for (String output : tx.getOutputs()) {
String[] parts = output.split(":");
......@@ -215,8 +219,7 @@ public class BlockchainUserEventService extends AbstractService implements Chang
// Sent
if (CollectionUtils.isNotEmpty(receivers)) {
// TODO get profile name
String receiverStr = ModelUtils.joinPubkeys(receivers, true, DEFAULT_PUBKEYS_SEPARATOR);
String receiverStr = joinPubkeys(receivers, true);
for (String sender : senders) {
notifyUserEvent(block, sender, UserEventCodes.TX_SENT, I18n.n("duniter.user.event.tx.sent"), receiverStr);
}
......@@ -242,8 +245,27 @@ public class BlockchainUserEventService extends AbstractService implements Chang
// Delete events that reference this block
userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId()));
}
private String joinPubkeys(Set<String> pubkeys, boolean minify) {
Preconditions.checkNotNull(pubkeys);
Preconditions.checkArgument(pubkeys.size()>0);
if (pubkeys.size() == 1) {
String pubkey = pubkeys.iterator().next();
String title = userService.getProfileTitle(pubkey);
return title != null ? title :
(minify ? ModelUtils.minifyPubkey(pubkey) : pubkey);
}
Map<String, String> profileTitles = userService.getProfileTitles(pubkeys);
StringBuilder sb = new StringBuilder();
pubkeys.stream().forEach((pubkey)-> {
String title = profileTitles != null ? profileTitles.get(pubkey) : null;
sb.append(DEFAULT_PUBKEYS_SEPARATOR);
sb.append(title != null ? title :
(minify ? ModelUtils.minifyPubkey(pubkey) : pubkey));
});
return sb.substring(DEFAULT_PUBKEYS_SEPARATOR.length());
}
}
......@@ -25,12 +25,17 @@ package org.duniter.elasticsearch.user.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import org.duniter.core.client.model.elasticsearch.Record;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.exception.InvalidSignatureException;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.user.model.Message;
import org.duniter.elasticsearch.user.model.UserEvent;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
......@@ -38,6 +43,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.Map;
/**
* Created by Benoit on 30/03/2015.
......@@ -138,6 +144,22 @@ public class MessageService extends AbstractService {
return response.getId();
}
public void markMessageAsRead(String signature, String id) {
Map<String, Object> fields = getMandatoryFieldsById(INDEX, RECORD_TYPE, id, Message.PROPERTY_HASH, Message.PROPERTY_RECIPIENT);
String recipient = fields.get(UserEvent.PROPERTY_RECIPIENT).toString();
String hash = fields.get(UserEvent.PROPERTY_HASH).toString();
// Check signature
boolean valid = cryptoService.verify(hash, signature, recipient);
if (!valid) {
throw new InvalidSignatureException("Invalid signature: only the recipient can mark an message as read.");
}
UpdateRequestBuilder request = client.prepareUpdate(INDEX, RECORD_TYPE, id)
.setDoc("read_signature", signature);
request.execute();
}
/* -- Internal methods -- */
public XContentBuilder createRecordType() {
......@@ -182,6 +204,12 @@ public class MessageService extends AbstractService {
.field("index", "not_analyzed")
.endObject()
// read_signature
.startObject("read_signature")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject().endObject();
......
......@@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.gson.JsonSyntaxException;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.service.MailService;
......@@ -35,6 +36,9 @@ import org.duniter.core.util.crypto.CryptoUtils;
import org.duniter.core.util.crypto.KeyPair;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.exception.InvalidFormatException;
import org.duniter.elasticsearch.exception.InvalidSignatureException;
import org.duniter.elasticsearch.exception.NotFoundException;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
......@@ -47,6 +51,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
......@@ -201,6 +206,23 @@ public class UserEventService extends AbstractService {
});
}
public void markEventAsRead(String signature, String id) {
Map<String, Object> fields = getMandatoryFieldsById(INDEX, EVENT_TYPE, id, UserEvent.PROPERTY_HASH, UserEvent.PROPERTY_RECIPIENT);
String recipient = fields.get(UserEvent.PROPERTY_RECIPIENT).toString();
String hash = fields.get(UserEvent.PROPERTY_HASH).toString();
// Check signature
boolean valid = cryptoService.verify(hash, signature, recipient);
if (!valid) {
throw new InvalidSignatureException("Invalid signature: only the recipient can mark an event as read.");
}
UpdateRequestBuilder request = client.prepareUpdate(INDEX, EVENT_TYPE, id)
.setDoc("read_signature", signature);
request.execute();
}
/* -- Internal methods -- */
public static XContentBuilder createEventType() {
......@@ -287,6 +309,12 @@ public class UserEventService extends AbstractService {
.field("index", "not_analyzed")
.endObject()
// read_signature
.startObject("read_signature")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.endObject()
.endObject().endObject();
......
......@@ -25,6 +25,7 @@ package org.duniter.elasticsearch.user.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.client.model.elasticsearch.UserProfile;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
......@@ -41,7 +42,10 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
/**
* Created by Benoit on 30/03/2015.
......@@ -201,6 +205,15 @@ public class UserService extends AbstractService {
return title.toString();
}
public Map<String, String> getProfileTitles(Set<String> issuers) {
Map<String, Object> titles = getFieldByIds(INDEX, PROFILE_TYPE, issuers, UserProfile.PROPERTY_TITLE);
if (MapUtils.isEmpty(titles)) return null;
Map<String, String> result = new HashMap<>();
titles.entrySet().stream().forEach((entry) -> result.put(entry.getKey(), entry.getValue().toString()));
return result;
}
/* -- Internal methods -- */
......
#!/bin/sh
curl -XPOST "http://127.0.0.1:9200/user/event/_search?pretty" -d'
curl -XPOST "http://127.0.0.1:9200/user/event/_count?pretty" -d'
{
query: {
nested: {
path: "reference",
query: {
constant_score: {
filter:
[
{term: { "reference.index": "test_net"}},
{term: { "reference.type": "block"}},
{term: { "reference.id": "10862"}}
]
}
bool: {
filter: [
{term: {recipient: "5ocqzyDMMWf1V8bsoNhWb1iNwax1e9M7VTUN6navs8of"}}
],
must_not: {terms: { "code": ["TX_SENT"]}}
}
}
},
sort : [
{ "time" : {"order" : "desc"}}
],
from: 0,
size: 100
size: 100,
_source: false
}'
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment