Skip to content
Snippets Groups Projects
Commit c641b197 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[enh] Before indexing a document, add a control rule to check time validity - fix #27

[enh] Refactor synchro action result, to be able to store deletes hits
parent f77c64f5
No related branches found
No related tags found
No related merge requests found
Showing
with 265 additions and 49 deletions
......@@ -286,6 +286,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
return settings.getAsBoolean("duniter.security.enable", true);
}
public int getDocumentMaxTimeDelta() {
return settings.getAsInt("duniter.documentMaxTimeDelta", 7200); // in seconds = 2h
}
public String getWebSocketHost() {
return settings.get("network.host", "locahost");
}
......
......@@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import org.duniter.core.beans.Bean;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
import org.duniter.core.client.model.elasticsearch.Record;
import org.duniter.core.client.model.elasticsearch.Records;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
......@@ -56,6 +57,7 @@ public abstract class AbstractService implements Bean {
protected CryptoService cryptoService;
protected final int retryCount;
protected final int retryWaitDuration;
protected final int documentMaxTimeDelta;
protected boolean ready = false;
public AbstractService(String loggerName, Duniter4jClient client, PluginSettings pluginSettings) {
......@@ -78,6 +80,7 @@ public abstract class AbstractService implements Bean {
this.cryptoService = cryptoService;
this.retryCount = pluginSettings.getNodeRetryCount();
this.retryWaitDuration = pluginSettings.getNodeRetryWaitDuration();
this.documentMaxTimeDelta = pluginSettings.getDocumentMaxTimeDelta();
}
/* -- protected methods --*/
......@@ -153,6 +156,38 @@ public abstract class AbstractService implements Bean {
readAndVerifyIssuerSignature(recordJson, actualObj, issuerFieldName);
}
protected void verifyTimeForUpdate(String index, String type, String id, JsonNode actualObj) {
verifyTimeForUpdate(index, type, id, actualObj, Record.PROPERTY_TIME);
}
protected void verifyTimeForUpdate(String index, String type, String id, JsonNode actualObj, String timeFieldName) {
// Check time has been increase - fix #27
int actualTime = getMandatoryField(actualObj, timeFieldName).asInt();
int existingTime = client.getTypedFieldById(index, type, id, timeFieldName);
if (actualTime <= existingTime) {
throw new InvalidFormatException(String.format("Invalid '%s' value: can not be less or equal to the previous value.", timeFieldName, timeFieldName));
}
// Check time has been increase - fix #27
if (Math.abs(System.currentTimeMillis()/1000 - actualTime) > documentMaxTimeDelta) {
throw new InvalidFormatException(String.format("Invalid '%s' value: too far from the UTC server time. Check your device's clock.", timeFieldName));
}
}
protected void verifyTimeForInsert(JsonNode actualObj) {
verifyTimeForInsert(actualObj, Record.PROPERTY_TIME);
}
protected void verifyTimeForInsert(JsonNode actualObj, String timeFieldName) {
// Check time has been increase - fix #27
int actualTime = getMandatoryField(actualObj, timeFieldName).asInt();
// Check time has been increase - fix #27
if (Math.abs(System.currentTimeMillis()/1000 - actualTime) > documentMaxTimeDelta) {
throw new InvalidFormatException(String.format("Invalid '%s' value: too far from the UTC server time. Check your device's clock.", timeFieldName));
}
}
protected String getIssuer(JsonNode actualObj) {
return getMandatoryField(actualObj, Records.PROPERTY_ISSUER).asText();
}
......
......@@ -31,8 +31,9 @@ import org.duniter.elasticsearch.service.ServiceLocator;
import org.duniter.elasticsearch.service.changes.ChangeEvent;
import org.duniter.elasticsearch.service.changes.ChangeEvents;
import org.duniter.elasticsearch.service.changes.ChangeSource;
import org.duniter.elasticsearch.synchro.impl.NullSynchroActionResult;
import org.duniter.elasticsearch.synchro.impl.SynchroActionResultImpl;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.util.bytes.BytesJsonNode;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
......@@ -43,20 +44,16 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.joda.time.format.DateTimeFormat;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
public abstract class AbstractSynchroAction extends AbstractService implements SynchroAction {
private static final String SCROLL_PARAM_VALUE = "1m";
public interface SourceConsumer {
void accept(String id, JsonNode source) throws Exception;
}
private static SynchroActionResult NULL_ACTION_RESULT = new NullSynchroActionResult();
private String fromIndex;
private String fromType;
......@@ -210,25 +207,25 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
/* -- protected methods -- */
protected void notifyInsertion(final String id, final JsonNode source) throws Exception {
protected void notifyInsertion(final String id, final JsonNode source, final SynchroActionResult actionResult) throws Exception {
if (CollectionUtils.isNotEmpty(insertionListeners)) {
for (SourceConsumer listener: insertionListeners) {
listener.accept(id, source);
listener.accept(id, source, actionResult);
}
}
}
protected void notifyUpdate(final String id, final JsonNode source) throws Exception {
protected void notifyUpdate(final String id, final JsonNode source, final SynchroActionResult actionResult) throws Exception {
if (CollectionUtils.isNotEmpty(updateListeners)) {
for (SourceConsumer listener: updateListeners) {
listener.accept(id, source);
listener.accept(id, source, actionResult);
}
}
}
protected void notifyValidation(final String id,
final JsonNode source,
final Iterator<Long> invalidSignatureHits,
final SynchroActionResult actionResult,
final String logPrefix) throws Exception {
if (enableSignatureValidation) {
try {
......@@ -236,7 +233,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
} catch (InvalidSignatureException e) {
// FIXME: some user/profile document failed ! - see issue #11
// Il semble que le format JSON ne soit pas le même que celui qui a été signé
invalidSignatureHits.next();
actionResult.addInvalidSignature();
if (trace) {
logger.warn(String.format("%s %s.\n%s", logPrefix, e.getMessage(), source.toString()));
}
......@@ -245,7 +242,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
if (CollectionUtils.isNotEmpty(validationListeners)) {
for (SourceConsumer listener : validationListeners) {
listener.accept(id, source);
listener.accept(id, source, actionResult);
}
}
}
......@@ -257,7 +254,8 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
}
private HttpPost createScrollRequest(Peer peer,
String fromIndex, String fromType,
String fromIndex,
String fromType,
QueryBuilder query) {
HttpPost httpPost = new HttpPost(httpService.getPath(peer, fromIndex, fromType, "_search?scroll=" + SCROLL_PARAM_VALUE));
httpPost.setHeader("Content-Type", "application/json;charset=UTF-8");
......@@ -321,6 +319,11 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
ObjectMapper objectMapper = getObjectMapper();
// DEV ONLY: skip
if (!"user".equalsIgnoreCase(fromIndex) || !"profile".equalsIgnoreCase(fromType)) {
return;
}
long counter = 0;
boolean stop = false;
String scrollId = null;
......@@ -354,17 +357,14 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
}
private long fetchAndSave(final Peer peer,
SearchScrollResponse response,
final SearchScrollResponse response,
final ObjectMapper objectMapper,
SynchroResult result) {
final SynchroResult result) {
long counter = 0;
PrimitiveIterators.OfLong insertHits = PrimitiveIterators.newLongSequence();
PrimitiveIterators.OfLong updateHits = PrimitiveIterators.newLongSequence();
PrimitiveIterators.OfLong invalidSignatureHits = PrimitiveIterators.newLongSequence();
SynchroActionResult actionResult = new SynchroActionResultImpl();
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.setRefresh(true);
......@@ -386,9 +386,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
save(id, source,
objectMapper,
bulkRequest,
insertHits,
updateHits,
invalidSignatureHits,
actionResult,
logPrefix);
}
}
......@@ -415,26 +413,24 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
}
}
// update result stats
result.addInserts(toIndex, toType, insertHits.current());
result.addUpdates(toIndex, toType, updateHits.current());
result.addInvalidSignatures(toIndex, toType, invalidSignatureHits.current());
// update result
result.addInserts(toIndex, toType, actionResult.getInserts());
result.addUpdates(toIndex, toType, actionResult.getUpdates());
result.addDeletes(toIndex, toType, actionResult.getDeletes());
result.addInvalidSignatures(toIndex, toType, actionResult.getInvalidSignatures());
return counter;
}
protected void save(String id, JsonNode source, String logPrefix) {
Iterator<Long> nullSeq = PrimitiveIterators.nullLongSequence();
save(id, source, getObjectMapper(), null, nullSeq, nullSeq, nullSeq, logPrefix);
save(id, source, getObjectMapper(), null, NULL_ACTION_RESULT, logPrefix);
}
protected void save(final String id,
final JsonNode source,
final ObjectMapper objectMapper,
final BulkRequestBuilder bulkRequest,
final Iterator<Long> insertHits,
final Iterator<Long> updateHits,
final Iterator<Long> invalidSignatureHits,
final SynchroActionResult actionResult,
final String logPrefix) {
try {
......@@ -458,7 +454,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
}
// Validate doc
notifyValidation(id, source, invalidSignatureHits, logPrefix);
notifyValidation(id, source, actionResult, logPrefix);
// Execute insertion
IndexRequestBuilder request = client.prepareIndex(toIndex, toType, id)
......@@ -471,9 +467,9 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
}
// Notify insert listeners
notifyInsertion(id, source);
notifyInsertion(id, source, actionResult);
insertHits.next();
actionResult.addInsert();
}
// Existing doc: do update (if enable)
......@@ -495,7 +491,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
}
// Validate source
notifyValidation(id, source, invalidSignatureHits, logPrefix);
notifyValidation(id, source, actionResult, logPrefix);
// Execute update
UpdateRequestBuilder request = client.prepareUpdate(toIndex, toType, id)
......@@ -509,9 +505,9 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
}
// Notify insert listeners
notifyUpdate(id, source);
notifyUpdate(id, source, actionResult);
updateHits.next();
actionResult.addUpdate();
}
}
......
package org.duniter.elasticsearch.synchro;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.local.Peer;
import org.duniter.elasticsearch.model.SynchroResult;
......@@ -8,6 +10,10 @@ import org.duniter.elasticsearch.service.changes.ChangeSource;
public interface SynchroAction {
interface SourceConsumer {
void accept(String id, JsonNode source, SynchroActionResult result) throws Exception;
}
EndpointApi getEndPointApi();
ChangeSource getChangeSource();
......@@ -17,4 +23,10 @@ public interface SynchroAction {
SynchroResult result);
void handleChange(Peer peer, ChangeEvent changeEvent);
void addInsertionListener(SourceConsumer listener);
void addUpdateListener(SourceConsumer listener);
void addValidationListener(SourceConsumer listener);
}
package org.duniter.elasticsearch.synchro;
public interface SynchroActionResult {
void addInsert();
void addUpdate();
void addDelete();
void addInvalidSignature();
long getInserts();
long getUpdates();
long getDeletes();
long getInvalidSignatures();
}
package org.duniter.elasticsearch.synchro.impl;
import org.duniter.elasticsearch.synchro.SynchroActionResult;
public class NullSynchroActionResult implements SynchroActionResult {
@Override
public void addInsert(){
}
@Override
public void addUpdate() {
}
@Override
public void addDelete() {
}
@Override
public void addInvalidSignature() {
}
@Override
public long getInserts() {
return 0;
}
@Override
public long getUpdates() {
return 0;
}
@Override
public long getDeletes() {
return 0;
}
@Override
public long getInvalidSignatures() {
return 0;
}
}
package org.duniter.elasticsearch.synchro.impl;
import org.duniter.core.util.PrimitiveIterators;
import org.duniter.elasticsearch.synchro.SynchroActionResult;
public class SynchroActionResultImpl implements SynchroActionResult {
private final PrimitiveIterators.OfLong insertHits = PrimitiveIterators.newLongSequence();
private final PrimitiveIterators.OfLong updateHits = PrimitiveIterators.newLongSequence();
private final PrimitiveIterators.OfLong deleteHits = PrimitiveIterators.newLongSequence();
private final PrimitiveIterators.OfLong invalidSignatureHits = PrimitiveIterators.newLongSequence();
@Override
public void addInsert() {
insertHits.nextLong();
}
@Override
public void addUpdate() {
updateHits.nextLong();
}
@Override
public void addDelete() {
deleteHits.nextLong();
}
@Override
public void addInvalidSignature() {
invalidSignatureHits.nextLong();
}
@Override
public long getInserts() {
return insertHits.current();
}
@Override
public long getUpdates() {
return updateHits.current();
}
@Override
public long getDeletes() {
return deleteHits.current();
}
@Override
public long getInvalidSignatures() {
return invalidSignatureHits.current();
}
}
......@@ -106,6 +106,11 @@ public class AbstractCommentDaoImpl<T extends AbstractCommentDaoImpl> extends Ab
.field("index", "not_analyzed")
.endObject()
// creationTime
.startObject("creationTime")
.field("type", "integer")
.endObject()
// time
.startObject("time")
.field("type", "integer")
......
......@@ -120,6 +120,9 @@ public class GroupService extends AbstractService {
String id = computeIdFromTitle(title);
String issuer = getIssuer(actualObj);
// Check time is valid - fix #27
verifyTimeForInsert(actualObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing group [%s] from issuer [%s]", id, issuer.substring(0, 8)));
}
......@@ -140,6 +143,9 @@ public class GroupService extends AbstractService {
JsonNode actualObj = readAndVerifyIssuerSignature(recordJson);
// Check time is valid - fix #27
verifyTimeForUpdate(INDEX, RECORD_TYPE, id, actualObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Updating group [%s]", id));
}
......
......@@ -159,6 +159,9 @@ public class HistoryService extends AbstractService {
// Check same document issuer
client.checkSameDocumentIssuer(index, type, id, issuer);
}
// Check time is valid - fix #27
verifyTimeForInsert(actualObj);
}
public void applyDocDelete(JsonNode actualObj) {
......
......@@ -30,6 +30,7 @@ import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.exception.InvalidSignatureException;
import org.duniter.elasticsearch.synchro.SynchroActionResult;
import org.duniter.elasticsearch.user.PluginSettings;
import org.duniter.elasticsearch.user.model.Message;
import org.duniter.elasticsearch.user.model.UserEvent;
......
......@@ -76,6 +76,9 @@ public class PageService extends AbstractService {
JsonNode actualObj = readAndVerifyIssuerSignature(json);
String issuer = getIssuer(actualObj);
// Check time is valid - fix #27
verifyTimeForInsert(actualObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a %s from issuer [%s]", recordDao.getType(), issuer.substring(0, 8)));
}
......@@ -90,6 +93,9 @@ public class PageService extends AbstractService {
// Check same document issuer
recordDao.checkSameDocumentIssuer(id, issuer);
// Check time is valid - fix #27
verifyTimeForUpdate(recordDao.getIndex(), recordDao.getType(), id, actualObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Updating %s [%s] from issuer [%s]", recordDao.getType(), id, issuer.substring(0, 8)));
}
......@@ -105,8 +111,11 @@ public class PageService extends AbstractService {
String recordId = getMandatoryField(commentObj, RecordComment.PROPERTY_RECORD).asText();
checkRecordExistsOrDeleted(recordId);
// Check time is valid - fix #27
verifyTimeForInsert(commentObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a %s from issuer [%s]", commentDao.getType(), issuer.substring(0, 8)));
logger.debug(String.format("[%s] Indexing new %s, issuer {%s}", RegistryIndexDao.INDEX, commentDao.getType(), issuer.substring(0, 8)));
}
return commentDao.create(json);
}
......@@ -118,9 +127,12 @@ public class PageService extends AbstractService {
String recordId = getMandatoryField(commentObj, RecordComment.PROPERTY_RECORD).asText();
checkRecordExistsOrDeleted(recordId);
// Check time is valid - fix #27
verifyTimeForUpdate(commentDao.getIndex(), commentDao.getType(), id, commentObj);
if (logger.isDebugEnabled()) {
String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText();
logger.debug(String.format("[%s] Indexing a %s from issuer [%s] on [%s]", commentDao.getType(), commentDao.getType(), issuer.substring(0, 8)));
logger.debug(String.format("[%s] Updating existing %s {%s}, issuer {%s}", RegistryIndexDao.INDEX, commentDao.getType(), id, issuer.substring(0, 8)));
}
commentDao.update(id, json);
......
......@@ -29,6 +29,8 @@ import org.duniter.core.client.model.ModelUtils;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.model.SynchroResult;
import org.duniter.elasticsearch.synchro.SynchroActionResult;
import org.duniter.elasticsearch.user.PluginSettings;
import org.duniter.elasticsearch.user.model.Message;
import org.duniter.elasticsearch.user.model.UserEvent;
......@@ -109,6 +111,9 @@ public class UserInvitationService extends AbstractService {
JsonNode source = readAndVerifyIssuerSignature(recordJson);
// Check time is valid - fix #27
verifyTimeForInsert(source);
if (logger.isDebugEnabled()) {
String issuer = getMandatoryField(source, Message.PROPERTY_ISSUER).asText();
logger.debug(String.format("Indexing a invitation to certify from issuer [%s]", issuer.substring(0, 8)));
......
......@@ -25,6 +25,7 @@ package org.duniter.elasticsearch.user.service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import org.duniter.core.client.model.elasticsearch.Record;
import org.duniter.core.util.Preconditions;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.client.model.ModelUtils;
......@@ -32,6 +33,7 @@ import org.duniter.core.client.model.elasticsearch.UserProfile;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.exception.InvalidFormatException;
import org.duniter.elasticsearch.user.PluginSettings;
import org.duniter.elasticsearch.exception.AccessDeniedException;
import org.duniter.elasticsearch.service.AbstractService;
......@@ -126,6 +128,9 @@ public class UserService extends AbstractService {
JsonNode actualObj = readAndVerifyIssuerSignature(profileJson);
String issuer = getIssuer(actualObj);
// Check time is valid - fix #27
verifyTimeForInsert(actualObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a user profile from issuer [%s]", issuer.substring(0, 8)));
}
......@@ -148,8 +153,12 @@ public class UserService extends AbstractService {
String issuer = getIssuer(actualObj);
if (!Objects.equals(issuer, id)) {
throw new AccessDeniedException(String.format("Could not update this document: not issuer."));
throw new AccessDeniedException(String.format("Could not update this document: only the issuer can update."));
}
// Check time is valid - fix #27
verifyTimeForUpdate(INDEX, PROFILE_TYPE, id, actualObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Updating a user profile from issuer [%s]", issuer.substring(0, 8)));
}
......@@ -170,6 +179,9 @@ public class UserService extends AbstractService {
JsonNode actualObj = readAndVerifyIssuerSignature(settingsJson);
String issuer = getIssuer(actualObj);
// Check time is valid - fix #27
verifyTimeForInsert(actualObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a user settings from issuer [%s]", issuer.substring(0, 8)));
}
......@@ -194,6 +206,10 @@ public class UserService extends AbstractService {
if (!Objects.equals(issuer, id)) {
throw new AccessDeniedException(String.format("Could not update this document: not issuer."));
}
// Check time is valid - fix #27
verifyTimeForUpdate(INDEX, SETTINGS_TYPE, id, actualObj);
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a user settings from issuer [%s]", issuer.substring(0, 8)));
}
......
......@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.exception.NotFoundException;
import org.duniter.elasticsearch.synchro.SynchroActionResult;
import org.duniter.elasticsearch.synchro.SynchroService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.user.PluginSettings;
......@@ -32,24 +33,23 @@ public class SynchroHistoryIndexAction extends AbstractSynchroAction {
/* -- protected method -- */
protected void onValidate(String deleteId, JsonNode source) {
protected void onValidate(String deleteId, JsonNode source, SynchroActionResult result) {
try {
// Check if valid document
service.checkIsValidDeletion(source);
// Delete the document
service.applyDocDelete(source);
} catch(NotFoundException e) {
// doc not exists: continue
}
}
protected void onInsert(String deleteId, JsonNode source) {
protected void onInsert(String deleteId, JsonNode source, SynchroActionResult result) {
try {
// Delete the document
service.applyDocDelete(source);
result.addDelete();
} catch(NotFoundException e) {
// doc not exists: continue
logger.debug("Doc to delete could not be found. Skipping deletion");
......
package org.duniter.elasticsearch.user.synchro.invitation;
import com.fasterxml.jackson.databind.JsonNode;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.synchro.SynchroActionResult;
import org.duniter.elasticsearch.synchro.SynchroService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.user.PluginSettings;
......@@ -11,6 +13,8 @@ import org.elasticsearch.common.inject.Inject;
public class SynchroInvitationCertificationIndexAction extends AbstractSynchroAction {
private UserInvitationService service;
@Inject
public SynchroInvitationCertificationIndexAction(Duniter4jClient client,
PluginSettings pluginSettings,
......@@ -21,10 +25,14 @@ public class SynchroInvitationCertificationIndexAction extends AbstractSynchroAc
super(UserInvitationService.INDEX, UserInvitationService.CERTIFICATION_TYPE, client,
pluginSettings.getDelegate(), cryptoService, threadPool);
addInsertionListener(service::notifyUser);
this.service = service;
addInsertionListener(this::onInsert);
synchroService.register(this);
}
protected void onInsert(String id, JsonNode source, SynchroActionResult result) {
service.notifyUser(id, source);
}
}
package org.duniter.elasticsearch.user.synchro.message;
import com.fasterxml.jackson.databind.JsonNode;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.synchro.SynchroActionResult;
import org.duniter.elasticsearch.synchro.SynchroService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.user.PluginSettings;
......@@ -11,6 +13,7 @@ import org.elasticsearch.common.inject.Inject;
public class SynchroMessageInboxIndexAction extends AbstractSynchroAction {
private MessageService service;
@Inject
public SynchroMessageInboxIndexAction(Duniter4jClient client,
PluginSettings pluginSettings,
......@@ -20,9 +23,14 @@ public class SynchroMessageInboxIndexAction extends AbstractSynchroAction {
MessageService service) {
super(MessageService.INDEX, MessageService.INBOX_TYPE, client, pluginSettings.getDelegate(), cryptoService, threadPool);
addInsertionListener(service::notifyUser);
this.service = service;
addInsertionListener(this::onInsert);
synchroService.register(this);
}
protected void onInsert(String id, JsonNode source, SynchroActionResult result) {
service.notifyUser(id, source);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment