From 6d89fb3bb131306c8896d9ba947b38854e2bf4f9 Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Mon, 18 Sep 2017 18:17:38 +0200 Subject: [PATCH] - Synchro: Refactoring SynchroService classes into Synchro***Action classes, for each index/type. - Synchro: generate user event from Synchro***Action --- .../org/duniter/elasticsearch/PluginInit.java | 16 +- .../duniter/elasticsearch/PluginSettings.java | 9 + .../duniter/elasticsearch/dao/DaoModule.java | 5 +- .../duniter/elasticsearch/dao/PeerDao.java | 4 +- .../dao/SynchroExecutionDao.java | 38 ++ .../dao/impl/SynchroExecutionDaoImpl.java | 185 +++++++++ .../elasticsearch/model/SynchroExecution.java | 39 ++ .../elasticsearch/model/SynchroResult.java | 14 +- .../service/CurrencyService.java | 11 +- .../elasticsearch/service/ServiceModule.java | 2 + .../service/changes/ChangeSource.java | 26 +- .../service/synchro/SynchroAction.java | 14 + .../service/synchro/SynchroService.java | 288 ++++++++++++++ .../elasticsearch/subscription/Plugin.java | 2 + .../subscription/PluginInit.java | 15 +- .../subscription/PluginSettings.java | 5 + .../SubscriptionExecutionDaoImpl.java | 14 +- .../subscription/service/ServiceModule.java | 3 - .../subscription/service/SynchroService.java | 102 ----- .../subscription/synchro/SynchroModule.java | 38 ++ ...nchroSubscriptionExecutionIndexAction.java | 26 ++ .../SynchroSubscriptionRecordAction.java | 28 ++ .../duniter/elasticsearch/user/Plugin.java | 4 +- .../elasticsearch/user/PluginInit.java | 13 +- .../elasticsearch/user/PluginSettings.java | 6 +- .../user/service/HistoryService.java | 44 ++- .../user/service/MessageService.java | 33 +- .../user/service/PageService.java | 2 +- .../user/service/ServiceModule.java | 2 +- .../user/service/SynchroService.java | 149 ------- .../user/service/UserInvitationService.java | 21 +- .../user/synchro/AbstractSynchroAction.java | 365 ++++++++---------- .../user/synchro/SynchroModule.java | 62 +++ .../group/SynchroGroupRecordAction.java | 27 ++ .../history/SynchroHistoryIndexAction.java | 45 +++ ...hroInvitationCertificationIndexAction.java | 28 ++ .../SynchroMessageInboxIndexAction.java | 28 ++ .../SynchroMessageOutboxIndexAction.java | 27 ++ .../page/SynchroPageCommentAction.java | 28 ++ .../synchro/page/SynchroPageRecordAction.java | 28 ++ .../user/SynchroUserProfileAction.java | 27 ++ .../user/SynchroUserSettingsAction.java | 27 ++ 42 files changed, 1308 insertions(+), 542 deletions(-) create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/SynchroExecutionDao.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroExecution.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroAction.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java delete mode 100644 duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java create mode 100644 duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroModule.java create mode 100644 duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionExecutionIndexAction.java create mode 100644 duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionRecordAction.java delete mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java rename duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java => duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/AbstractSynchroAction.java (57%) create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/group/SynchroGroupRecordAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/history/SynchroHistoryIndexAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/invitation/SynchroInvitationCertificationIndexAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageInboxIndexAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageOutboxIndexAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageCommentAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageRecordAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserProfileAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserSettingsAction.java diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index bbed3fe5..0045a406 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -30,6 +30,7 @@ import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.CurrencyService; import org.duniter.elasticsearch.service.DocStatService; import org.duniter.elasticsearch.service.PeerService; +import org.duniter.elasticsearch.service.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -118,8 +119,8 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { else { - if (logger.isInfoEnabled()) { - logger.info("Checking indices..."); + if (logger.isDebugEnabled()) { + logger.debug("Checking indices..."); } injector.getInstance(CurrencyService.class) @@ -130,8 +131,8 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .createIndexIfNotExists(); } - if (logger.isInfoEnabled()) { - logger.info("Checking indices [OK]"); + if (logger.isDebugEnabled()) { + logger.debug("Checking indices [OK]"); } } } @@ -225,6 +226,13 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { injector.getInstance(PeerService.class) .listenAndIndexPeers(peer); + + // Start synchro + if (pluginSettings.enableSynchro()) { + injector.getInstance(SynchroService.class) + .startScheduling(); + } + if (logger.isInfoEnabled()) { logger.info(String.format("[%s] Indexing blockchain [OK]", currencyName)); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index d5f47142..737d5915 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -218,6 +218,15 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.getAsInt("duniter.network.maxConnectionsPerRoute", 5); } + public boolean enableSynchro() { + return settings.getAsBoolean("duniter.synchro.enable", true); + } + + public int getSynchroTimeOffset() { + return settings.getAsInt("duniter.synchro.timeOffset", 60*60/*=1hour*/); + } + + public boolean isDevMode() { return settings.getAsBoolean("duniter.dev.enable", false); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java index d9e609fd..acb7f105 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java @@ -30,6 +30,7 @@ import org.duniter.elasticsearch.client.Duniter4jClientImpl; import org.duniter.elasticsearch.dao.impl.BlockStatDaoImpl; import org.duniter.elasticsearch.dao.impl.DocStatDaoImpl; import org.duniter.elasticsearch.dao.impl.MovementDaoImpl; +import org.duniter.elasticsearch.dao.impl.SynchroExecutionDaoImpl; import org.duniter.elasticsearch.service.ServiceLocator; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -44,10 +45,12 @@ public class DaoModule extends AbstractModule implements Module { bind(Duniter4jClient.class).to(Duniter4jClientImpl.class).asEagerSingleton(); bind(DocStatDao.class).to(DocStatDaoImpl.class).asEagerSingleton(); - // + // Dao defined in module es-core bind(BlockStatDao.class).to(BlockStatDaoImpl.class).asEagerSingleton(); bind(MovementDao.class).to(MovementDaoImpl.class).asEagerSingleton(); + bind(SynchroExecutionDao.class).to(SynchroExecutionDaoImpl.class).asEagerSingleton(); + // Dao defined in module core-client bindWithLocator(BlockDao.class); bindWithLocator(PeerDao.class); bindWithLocator(CurrencyDao.class); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java index 304e4b2e..30dd3026 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/PeerDao.java @@ -22,12 +22,10 @@ package org.duniter.elasticsearch.dao; * #L% */ -import org.duniter.elasticsearch.dao.impl.PeerDaoImpl; - /** * Created by blavenie on 26/04/17. */ -public interface PeerDao extends org.duniter.core.client.dao.PeerDao, TypeDao<PeerDaoImpl>{ +public interface PeerDao extends org.duniter.core.client.dao.PeerDao, TypeDao<PeerDao>{ String TYPE = "peer"; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/SynchroExecutionDao.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/SynchroExecutionDao.java new file mode 100644 index 00000000..c6b3071a --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/SynchroExecutionDao.java @@ -0,0 +1,38 @@ +package org.duniter.elasticsearch.dao; + +/*- + * #%L + * Duniter4j :: ElasticSearch Core plugin + * %% + * Copyright (C) 2014 - 2017 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.core.client.model.local.Peer; +import org.duniter.elasticsearch.model.SynchroExecution; + +/** + * Created by blavenie on 26/04/17. + */ +public interface SynchroExecutionDao extends TypeDao<SynchroExecutionDao>{ + + String TYPE = "synchro"; + + void save(SynchroExecution execution); + + SynchroExecution getLastExecution(Peer peer); +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java new file mode 100644 index 00000000..fe615af0 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/dao/impl/SynchroExecutionDaoImpl.java @@ -0,0 +1,185 @@ +package org.duniter.elasticsearch.dao.impl; + +/* + * #%L + * UCoin Java :: Core Client API + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.Preconditions; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.dao.AbstractDao; +import org.duniter.elasticsearch.dao.SynchroExecutionDao; +import org.duniter.elasticsearch.model.SynchroExecution; +import org.duniter.elasticsearch.model.SynchroResult; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; + +/** + * Created by blavenie on 29/12/15. + */ +public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecutionDao { + + public SynchroExecutionDaoImpl(){ + super("duniter.dao.peer"); + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public void save(SynchroExecution execution) { + Preconditions.checkNotNull(execution); + Preconditions.checkArgument(StringUtils.isNotBlank(execution.getCurrency())); + Preconditions.checkArgument(StringUtils.isNotBlank(execution.getPeer())); + Preconditions.checkNotNull(execution.getTime()); + Preconditions.checkArgument(execution.getTime() > 0); + + // Serialize into JSON + // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) + try { + String json = getObjectMapper().writeValueAsString(execution); + + // Preparing indexBlocksFromNode + IndexRequestBuilder indexRequest = client.prepareIndex(execution.getCurrency(), TYPE) + .setSource(json); + + // Execute indexBlocksFromNode + indexRequest + .setRefresh(true) + .execute(); + } + catch(JsonProcessingException e) { + throw new TechnicalException(e); + } + } + + @Override + public SynchroExecution getLastExecution(Peer peer) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(peer.getCurrency()); + Preconditions.checkNotNull(peer.getId()); + + BoolQueryBuilder query = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(SynchroExecution.PROPERTY_PEER, peer.getId())); + + SearchResponse response = client.prepareSearch(peer.getCurrency()) + .setTypes(TYPE) + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) + .setQuery(query) + .setFetchSource(true) + .setFrom(0).setSize(1) + .addSort(SynchroExecution.PROPERTY_TIME, SortOrder.DESC) + .get(); + + if (response.getHits().getTotalHits() == 0) return null; + + SearchHit hit = response.getHits().getHits()[0]; + return client.readSourceOrNull(hit, SynchroExecution.class); + } + + @Override + public XContentBuilder createTypeMapping() { + try { + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject(TYPE) + .startObject("properties") + + // currency + .startObject(SynchroExecution.PROPERTY_CURRENCY) + .field("type", "string") + .endObject() + + // peer + .startObject(SynchroExecution.PROPERTY_PEER) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // issuer + .startObject(SynchroExecution.PROPERTY_ISSUER) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // time + .startObject(SynchroExecution.PROPERTY_TIME) + .field("type", "long") + .endObject() + + // hash + .startObject(SynchroExecution.PROPERTY_HASH) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // signature + .startObject(SynchroExecution.PROPERTY_SIGNATURE) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // result + .startObject(SynchroExecution.PROPERTY_RESULT) + .field("type", "nested") + //.field("dynamic", "false") + + // inserts + .startObject(SynchroResult.PROPERTY_INSERTS) + .field("type", "long") + .endObject() + + // updates + .startObject(SynchroResult.PROPERTY_UPDATES) + .field("type", "long") + .endObject() + + // deletes + .startObject(SynchroResult.PROPERTY_DELETES) + .field("type", "long") + .endObject() + + .endObject() + .endObject() + + .endObject() + .endObject().endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException("Error while getting mapping for synchro index: " + ioe.getMessage(), ioe); + } + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroExecution.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroExecution.java new file mode 100644 index 00000000..c17dfc07 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroExecution.java @@ -0,0 +1,39 @@ +package org.duniter.elasticsearch.model; + +import org.duniter.core.client.model.elasticsearch.Record; + +public class SynchroExecution extends Record { + + public static final String PROPERTY_CURRENCY = "currency"; + public static final String PROPERTY_PEER = "peer"; + public static final String PROPERTY_RESULT = "result"; + + + private String currency; + private String peer; + private SynchroResult result; + + public String getCurrency() { + return currency; + } + + public void setCurrency(String currency) { + this.currency = currency; + } + + public String getPeer() { + return peer; + } + + public void setPeer(String peer) { + this.peer = peer; + } + + public SynchroResult getResult() { + return result; + } + + public void setResult(SynchroResult result) { + this.result = result; + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java index b87dcab0..5dfc72e7 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java @@ -22,13 +22,20 @@ package org.duniter.elasticsearch.model; * #L% */ +import com.fasterxml.jackson.annotation.JsonIgnore; + +import java.io.Serializable; import java.util.HashMap; import java.util.Map; /** * Created by blavenie on 30/12/16. */ -public class SynchroResult { +public class SynchroResult implements Serializable { + + public static final String PROPERTY_INSERTS = "inserts"; + public static final String PROPERTY_UPDATES = "updates"; + public static final String PROPERTY_DELETES = "deletes"; private long insertTotal = 0; private long updateTotal = 0; @@ -59,18 +66,22 @@ public class SynchroResult { invalidSignatureTotal += nbHits; } + @JsonIgnore public long getInserts(String index, String type) { return insertHits.getOrDefault(index + "/" + type, 0l); } + @JsonIgnore public long getUpdates(String index, String type) { return updateHits.getOrDefault(index + "/" + type, 0l); } + @JsonIgnore public long getInvalidSignatures(String index, String type) { return invalidSignatureHits.getOrDefault(index + "/" + type, 0l); } + @JsonIgnore public long getDeletes(String index, String type) { return deleteHits.getOrDefault(index + "/" + type, 0l); } @@ -91,6 +102,7 @@ public class SynchroResult { return invalidSignatureTotal; } + @JsonIgnore public long getTotal() { return insertTotal + updateTotal + deleteTotal; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java index 27956d90..07f32889 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java @@ -213,14 +213,19 @@ public class CurrencyService extends AbstractService { BlockDao blockDao = ServiceLocator.instance().getBean(BlockDao.class); createIndexRequestBuilder.addMapping(blockDao.getType(), blockDao.createTypeMapping()); + // Add movement type + MovementDao operationDao = ServiceLocator.instance().getBean(MovementDao.class); + createIndexRequestBuilder.addMapping(operationDao.getType(), operationDao.createTypeMapping()); + // Add blockStat type BlockStatDao blockStatDao = injector.getInstance(BlockStatDao.class); createIndexRequestBuilder.addMapping(blockStatDao.getType(), blockStatDao.createTypeMapping()); - // Add operation type - MovementDao operationDao = ServiceLocator.instance().getBean(MovementDao.class); - createIndexRequestBuilder.addMapping(operationDao.getType(), operationDao.createTypeMapping()); + // Add synchro execution + SynchroExecutionDao synchroExecutionDao = injector.getInstance(SynchroExecutionDao.class); + createIndexRequestBuilder.addMapping(synchroExecutionDao.getType(), synchroExecutionDao.createTypeMapping()); + // Creating the index createIndexRequestBuilder.execute().actionGet(); } }; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java index f6b9397a..f5a01631 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java @@ -35,6 +35,7 @@ import org.duniter.core.service.MailService; import org.duniter.elasticsearch.PluginInit; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.synchro.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -50,6 +51,7 @@ public class ServiceModule extends AbstractModule implements Module { bind(PluginInit.class).asEagerSingleton(); bind(ChangeService.class).asEagerSingleton(); bind(DocStatService.class).asEagerSingleton(); + bind(SynchroService.class).asEagerSingleton(); // blockchain indexation services bind(BlockchainService.class).asEagerSingleton(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java index 3f9276d4..e806a37c 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java @@ -39,6 +39,8 @@ package org.duniter.elasticsearch.service.changes; */ import com.google.common.base.Joiner; +import com.google.common.collect.Sets; +import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; import com.google.common.collect.ImmutableSet; import org.duniter.core.util.StringUtils; @@ -50,6 +52,12 @@ public class ChangeSource { private final Set<String> types; private final Set<String> ids; + public ChangeSource() { + this.indices = Sets.newHashSet(); + this.types = Sets.newHashSet(); + this.ids = Sets.newHashSet(); + } + public ChangeSource(String source) { String[] parts = source.split("/"); @@ -91,12 +99,22 @@ public class ChangeSource { return types; } + public void addIndex(String index){ + this.indices.add(index); + } + public void addType(String type){ + this.types.add(type); + } + public void addId(String id){ + this.ids.add(id); + } + public String toString() { StringBuilder sb = new StringBuilder(); // Add indices Joiner joiner = Joiner.on(','); - if (indices == null) { + if (CollectionUtils.isEmpty(indices)) { sb.append('*'); } else { @@ -104,8 +122,8 @@ public class ChangeSource { } // Add types - if (types == null) { - if (ids != null) { + if (CollectionUtils.isEmpty(types)) { + if (CollectionUtils.isNotEmpty(ids)) { sb.append("/*"); } } @@ -115,7 +133,7 @@ public class ChangeSource { } // Add ids - if (ids != null) { + if (CollectionUtils.isNotEmpty(ids)) { sb.append('/'); joiner.appendTo(sb, ids); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroAction.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroAction.java new file mode 100644 index 00000000..c030ad16 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroAction.java @@ -0,0 +1,14 @@ +package org.duniter.elasticsearch.service.synchro; + +import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.core.client.model.local.Peer; +import org.duniter.elasticsearch.model.SynchroResult; + +public interface SynchroAction { + + EndpointApi getEndPointApi(); + + void handleSynchronize(Peer peer, + long fromTime, + SynchroResult result); +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java new file mode 100644 index 00000000..7037621b --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/synchro/SynchroService.java @@ -0,0 +1,288 @@ +package org.duniter.elasticsearch.service.synchro; + +/* + * #%L + * Duniter4j :: ElasticSearch Plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.io.IOUtils; +import org.duniter.core.client.dao.CurrencyDao; +import org.duniter.core.client.dao.PeerDao; +import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.service.HttpService; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.DateUtils; +import org.duniter.core.util.Preconditions; +import org.duniter.core.util.websocket.WebsocketClientEndpoint; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.SynchroExecutionDao; +import org.duniter.elasticsearch.model.SynchroExecution; +import org.duniter.elasticsearch.model.SynchroResult; +import org.duniter.elasticsearch.service.AbstractService; +import org.duniter.elasticsearch.service.ServiceLocator; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.inject.Inject; + +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Created by blavenie on 27/10/16. + */ +public class SynchroService extends AbstractService { + + private static final String WS_CHANGES_URL = "/ws/_changes"; + + protected HttpService httpService; + protected final Set<EndpointApi> peerApiFilters = Sets.newHashSet(); + protected final ThreadPool threadPool; + protected final PeerDao peerDao; + protected final CurrencyDao currencyDao; + protected final SynchroExecutionDao synchroExecutionDao; + private List<WebsocketClientEndpoint> wsClientEndpoints = Lists.newArrayList(); + private List<SynchroAction> actions = Lists.newArrayList(); + + @Inject + public SynchroService(Duniter4jClient client, + PluginSettings settings, + CryptoService cryptoService, + ThreadPool threadPool, + CurrencyDao currencyDao, + PeerDao peerDao, + SynchroExecutionDao synchroExecutionDao, + final ServiceLocator serviceLocator) { + super("duniter.synchro", client, settings, cryptoService); + this.threadPool = threadPool; + this.currencyDao = currencyDao; + this.peerDao = peerDao; + this.synchroExecutionDao = synchroExecutionDao; + threadPool.scheduleOnStarted(() -> { + httpService = serviceLocator.getHttpService(); + setIsReady(true); + }); + } + + public void register(SynchroAction action) { + Preconditions.checkNotNull(action); + Preconditions.checkNotNull(action.getEndPointApi()); + + if (!peerApiFilters.contains(action.getEndPointApi())) { + peerApiFilters.add(action.getEndPointApi()); + } + actions.add(action); + } + + /** + * Start scheduling doc stats update + * @return + */ + public SynchroService startScheduling() { + long delayBeforeNextHour = DateUtils.delayBeforeNextHour(); + + // Five minute before the hour (to make sure to be ready when computing doc stat - see DocStatService) + delayBeforeNextHour -= 5 * 60 * 1000; + + // If not already scheduling to early (in the next 5 min) then launch it + if (delayBeforeNextHour > 5 * 60 * 1000) { + + // Launch with a delay of 10 sec + threadPool.schedule(this::synchronize, 10 * 1000, TimeUnit.MILLISECONDS); + } + + // Schedule every hour + threadPool.scheduleAtFixedRate( + this::synchronize, + delayBeforeNextHour, + 60 * 60 * 1000 /* every hour */, + TimeUnit.MILLISECONDS); + + return this; + } + + /* -- protected methods -- */ + + protected void synchronize() { + logger.info("Starting synchronization..."); + + // Closing all opened WS + closeWsClientEndpoints(); + + if (CollectionUtils.isNotEmpty(peerApiFilters)) { + + peerApiFilters.forEach(peerApiFilter -> { + + // Get peers + List<Peer> peers = getPeersFromApi(peerApiFilter); + if (CollectionUtils.isNotEmpty(peers)) { + peers.forEach(this::synchronize); + logger.info("Synchronization [OK]"); + } else { + logger.info(String.format("Synchronization [OK] - no endpoint found for API [%s]", peerApiFilter.name())); + } + }); + } + } + + protected List<Peer> getPeersFromApi(final EndpointApi api) { + Preconditions.checkNotNull(api); + + try { + List<String> currencyIds = currencyDao.getCurrencyIds(); + if (CollectionUtils.isEmpty(currencyIds)) return null; + + return currencyIds.stream() + .map(currencyId -> peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name())) + .filter(Objects::nonNull) + .flatMap(List::stream) + .collect(Collectors.toList()); + } + catch (Exception e) { + logger.error(String.format("Could not get peers for Api [%s]", api.name()), e); + return null; + } + } + + protected void synchronize(final Peer peer) { + long now = System.currentTimeMillis(); + SynchroResult result = new SynchroResult(); + + long fromTime = getLastExecutionTime(peer); + + // If not the first synchro, add a delay to last execution time + // to avoid missing data because incorrect clock configuration + if (fromTime > 0) { + fromTime -= Math.abs(pluginSettings.getSynchroTimeOffset()); + } + + ChangeSource changeSourceToListen = new ChangeSource(); + + // insert + for (SynchroAction action: actions) { + + action.handleSynchronize(peer, fromTime, result); + } + //synchronize(peer, fromTime, result, changeSourceToListen); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s] User data imported in %s ms: %s", peer.getCurrency(), peer, System.currentTimeMillis() - now, result.toString())); + } + + saveExecution(peer, result); + + // Listens changes on this peer + //startListenChanges(peer); + + } + + protected long getLastExecutionTime(Peer peer) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(peer.getId()); + + try { + SynchroExecution execution = synchroExecutionDao.getLastExecution(peer); + return execution != null ? execution.getTime() : 0; + } + catch (Exception e) { + logger.error(String.format("Error while saving last synchro execution time, for peer [%s]. Will resync all.", peer), e); + return 0; + } + } + + protected void saveExecution(Peer peer, SynchroResult result) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(peer.getId()); + Preconditions.checkNotNull(result); + + try { + SynchroExecution execution = new SynchroExecution(); + execution.setCurrency(peer.getCurrency()); + execution.setPeer(peer.getId()); + execution.setResult(result); + + // Last execution time (in seconds) + long executionTime = System.currentTimeMillis()/1000; + execution.setTime(executionTime); + + synchroExecutionDao.save(execution); + } + catch (Exception e) { + logger.error(String.format("Error while saving synchro execution on peer [%s]", peer), e); + } + } + + protected void closeWsClientEndpoints() { + // Closing all opened WS + wsClientEndpoints.forEach(IOUtils::closeQuietly); + wsClientEndpoints.clear(); + } + + protected void listenChanges(final Peer peer, ChangeSource changeSource) { + // Listens changes on this peer + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(changeSource); + + // Get (or create) the websocket endpoint + WebsocketClientEndpoint wsClientEndPoint = httpService.getWebsocketClientEndpoint(peer, WS_CHANGES_URL, false); + + // filter on selected sources + wsClientEndPoint.sendMessage(changeSource.toString()); + + // add listener + wsClientEndPoint.registerListener( message -> { + try { + ChangeEvent changeEvent = getObjectMapper().readValue(message, ChangeEvent.class); + importChangeEvent(peer, changeEvent); + } catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.warn(String.format("[%s] Unable to process changes received by [/ws/_changes]: %s", peer, e.getMessage()), e); + } + else { + logger.warn(String.format("[%s] Unable to process changes received by [/ws/_changes]: %s", peer, e.getMessage())); + } + } + }); + + // Add to list + wsClientEndpoints.add(wsClientEndPoint); + } + + protected void importChangeEvent(final Peer peer, ChangeEvent changeEvent) { + Preconditions.checkNotNull(changeEvent); + Preconditions.checkNotNull(changeEvent.getOperation()); + + // Skip delete operation (imported by history/delete) + if (changeEvent.getOperation() == ChangeEvent.Operation.DELETE) return; + + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s/%s] Received change event", peer, changeEvent.getIndex(), changeEvent.getType())); + } + changeEvent.getSource(); + } +} diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/Plugin.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/Plugin.java index ffb16353..d53caf2c 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/Plugin.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/Plugin.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import org.duniter.elasticsearch.subscription.dao.DaoModule; import org.duniter.elasticsearch.subscription.rest.RestModule; import org.duniter.elasticsearch.subscription.service.ServiceModule; +import org.duniter.elasticsearch.subscription.synchro.SynchroModule; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; @@ -67,6 +68,7 @@ public class Plugin extends org.elasticsearch.plugins.Plugin { modules.add(new DaoModule()); modules.add(new ServiceModule()); modules.add(new RestModule()); + modules.add(new SynchroModule()); return modules; } diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java index 005ff967..67c4904d 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginInit.java @@ -24,7 +24,6 @@ package org.duniter.elasticsearch.subscription; import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao; import org.duniter.elasticsearch.subscription.service.SubscriptionService; -import org.duniter.elasticsearch.subscription.service.SynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -89,13 +88,13 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { } } else { - if (logger.isInfoEnabled()) { - logger.info("Checking indices..."); + if (logger.isDebugEnabled()) { + logger.debug("Checking indices..."); } injector.getInstance(SubscriptionIndexDao.class).createIndexIfNotExists(); - if (logger.isInfoEnabled()) { - logger.info("Checking indices [OK]"); + if (logger.isDebugEnabled()) { + logger.debug("Checking indices [OK]"); } } } @@ -107,12 +106,6 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { injector.getInstance(SubscriptionService.class) .startScheduling(); } - - // Start synchronization service - if (pluginSettings.enableP2PSync()) { - injector.getInstance(SynchroService.class) - .startScheduling(); - } } } diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java index ad01c30c..6b6efbaa 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java @@ -117,6 +117,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { /* -- delegate methods -- */ + public boolean reloadIndices() { return delegate.reloadAllIndices(); } @@ -125,6 +126,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return delegate.enableP2PSync(); } + public int getP2PSyncTimeOffset() { + return delegate.getP2PSyncTimeOffset(); + } + public boolean getMailEnable() { return delegate.getMailEnable(); } diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/dao/execution/SubscriptionExecutionDaoImpl.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/dao/execution/SubscriptionExecutionDaoImpl.java index d21bf52e..52802c01 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/dao/execution/SubscriptionExecutionDaoImpl.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/dao/execution/SubscriptionExecutionDaoImpl.java @@ -137,42 +137,42 @@ public class SubscriptionExecutionDaoImpl extends AbstractSubscriptionIndexTypeD .startObject("properties") // issuer - .startObject("issuer") + .startObject(SubscriptionExecution.PROPERTY_ISSUER) .field("type", "string") .field("index", "not_analyzed") .endObject() // recipient - .startObject("recipient") + .startObject(SubscriptionExecution.PROPERTY_RECIPIENT) .field("type", "string") .field("index", "not_analyzed") .endObject() // record type - .startObject("recordType") + .startObject(SubscriptionExecution.PROPERTY_RECORD_TYPE) .field("type", "string") .field("index", "not_analyzed") .endObject() // record id - .startObject("recordId") + .startObject(SubscriptionExecution.PROPERTY_RECORD_ID) .field("type", "string") .field("index", "not_analyzed") .endObject() // time - .startObject("time") + .startObject(SubscriptionExecution.PROPERTY_TIME) .field("type", "integer") .endObject() // hash - .startObject("hash") + .startObject(SubscriptionExecution.PROPERTY_HASH) .field("type", "string") .field("index", "not_analyzed") .endObject() // signature - .startObject("signature") + .startObject(SubscriptionExecution.PROPERTY_SIGNATURE) .field("type", "string") .field("index", "not_analyzed") .endObject() diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/ServiceModule.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/ServiceModule.java index 2bb9c7b9..72c4223c 100644 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/ServiceModule.java +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/ServiceModule.java @@ -30,8 +30,5 @@ public class ServiceModule extends AbstractModule implements Module { @Override protected void configure() { // Subscription services bind(SubscriptionService.class).asEagerSingleton(); - - // Synchro service - bind(SynchroService.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java deleted file mode 100644 index 6050a38a..00000000 --- a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SynchroService.java +++ /dev/null @@ -1,102 +0,0 @@ -package org.duniter.elasticsearch.subscription.service; - -/* - * #%L - * Duniter4j :: ElasticSearch Plugin - * %% - * Copyright (C) 2014 - 2016 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ - -import org.duniter.core.client.dao.CurrencyDao; -import org.duniter.core.client.dao.PeerDao; -import org.duniter.core.client.model.bma.EndpointApi; -import org.duniter.core.client.model.local.Peer; -import org.duniter.core.service.CryptoService; -import org.duniter.core.util.CollectionUtils; -import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.model.SynchroResult; -import org.duniter.elasticsearch.service.AbstractSynchroService; -import org.duniter.elasticsearch.service.PeerService; -import org.duniter.elasticsearch.service.ServiceLocator; -import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao; -import org.duniter.elasticsearch.subscription.dao.execution.SubscriptionExecutionDao; -import org.duniter.elasticsearch.subscription.dao.record.SubscriptionRecordDao; -import org.duniter.elasticsearch.subscription.model.Protocol; -import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.duniter.elasticsearch.user.PluginSettings; -import org.elasticsearch.common.inject.Inject; - -import java.util.List; - -/** - * Created by blavenie on 27/10/16. - */ -public class SynchroService extends AbstractSynchroService { - - private static final EndpointApi ENDPOINT_API = EndpointApi.ES_SUBSCRIPTION_API; - - @Inject - public SynchroService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, - ThreadPool threadPool, - PeerService peerService, - CurrencyDao currencyDao, - PeerDao peerDao, - final ServiceLocator serviceLocator) { - super(client, settings.getDelegate(), cryptoService, threadPool, currencyDao, peerDao, serviceLocator); - - // Configure peer service to allow API - peerService.addIncludeEndpointApi(ENDPOINT_API.name()); - } - - @Override - protected void synchronize() { - logger.info("Starting subscription data synchronization..."); - - // Get peers - List<Peer> peers = getPeersFromApi(ENDPOINT_API); - if (CollectionUtils.isNotEmpty(peers)) { - peers.forEach(this::synchronize); - logger.info("User subscription synchronization [OK]"); - } - else { - logger.info(String.format("User subscription synchronization [OK] - no endpoint found for API [%s]", ENDPOINT_API.name())); - } - } - - /* -- protected methods -- */ - - protected void synchronize(Peer peer) { - SynchroResult result = new SynchroResult(); - long now = System.currentTimeMillis(); - - long fromTime = 0; // TODO: get last sync time from somewhere ? (e.g. a specific index) - synchronizeSubscriptions(peer, fromTime, result); - - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] Subscription data imported in %s ms: %s", peer, System.currentTimeMillis() - now, result.toString())); - } - } - - protected void synchronizeSubscriptions(Peer peer, long fromTime, SynchroResult result) { - // Workaround to skip data older than june 2017 - long executionFromTime = Math.max(fromTime, 1493743177); - safeSynchronizeIndex(peer, SubscriptionIndexDao.INDEX, SubscriptionExecutionDao.TYPE, executionFromTime, result); - - safeSynchronizeIndex(peer, SubscriptionIndexDao.INDEX, SubscriptionRecordDao.TYPE, fromTime, result); - } -} diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroModule.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroModule.java new file mode 100644 index 00000000..2196d42f --- /dev/null +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroModule.java @@ -0,0 +1,38 @@ +package org.duniter.elasticsearch.subscription.synchro; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Module; + +public class SynchroModule extends AbstractModule implements Module { + + @Override protected void configure() { + + // Subscription + bind(SynchroSubscriptionRecordAction.class).asEagerSingleton(); + bind(SynchroSubscriptionExecutionIndexAction.class).asEagerSingleton(); + + } + +} \ No newline at end of file diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionExecutionIndexAction.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionExecutionIndexAction.java new file mode 100644 index 00000000..cd506568 --- /dev/null +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionExecutionIndexAction.java @@ -0,0 +1,26 @@ +package org.duniter.elasticsearch.subscription.synchro; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao; +import org.duniter.elasticsearch.subscription.dao.execution.SubscriptionExecutionDao; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.elasticsearch.common.inject.Inject; + +public class SynchroSubscriptionExecutionIndexAction extends AbstractSynchroAction { + + @Inject + public SynchroSubscriptionExecutionIndexAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService) { + super(SubscriptionIndexDao.INDEX, SubscriptionExecutionDao.TYPE, client, pluginSettings, cryptoService, threadPool); + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionRecordAction.java b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionRecordAction.java new file mode 100644 index 00000000..ca7b0684 --- /dev/null +++ b/duniter4j-es-subscription/src/main/java/org/duniter/elasticsearch/subscription/synchro/SynchroSubscriptionRecordAction.java @@ -0,0 +1,28 @@ +package org.duniter.elasticsearch.subscription.synchro; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao; +import org.duniter.elasticsearch.subscription.dao.record.SubscriptionRecordDao; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.elasticsearch.common.inject.Inject; + +public class SynchroSubscriptionRecordAction extends AbstractSynchroAction { + + @Inject + public SynchroSubscriptionRecordAction(Duniter4jClient client, + PluginSettings pluginSettings, + ThreadPool threadPool, + CryptoService cryptoService, + SynchroService synchroService) { + super(SubscriptionIndexDao.INDEX, SubscriptionRecordDao.TYPE, client, pluginSettings, cryptoService, threadPool); + + setEnableUpdate(true); // with update + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/Plugin.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/Plugin.java index c649e733..987ab8bc 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/Plugin.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/Plugin.java @@ -26,6 +26,7 @@ import com.google.common.collect.Lists; import org.duniter.elasticsearch.user.dao.DaoModule; import org.duniter.elasticsearch.user.rest.RestModule; import org.duniter.elasticsearch.user.service.ServiceModule; +import org.duniter.elasticsearch.user.synchro.SynchroModule; import org.duniter.elasticsearch.user.websocket.WebSocketModule; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -67,9 +68,10 @@ public class Plugin extends org.elasticsearch.plugins.Plugin { } modules.add(new DaoModule()); - modules.add(new RestModule()); modules.add(new ServiceModule()); modules.add(new WebSocketModule()); + modules.add(new RestModule()); + modules.add(new SynchroModule()); return modules; } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java index bb1ed809..02c7acdb 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java @@ -112,8 +112,8 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { } else { - if (logger.isInfoEnabled()) { - logger.info("Checking indices..."); + if (logger.isDebugEnabled()) { + logger.debug("Checking indices..."); } boolean cleanBlockchainUserEvents = injector.getInstance(UserService.class).isIndexExists() && pluginSettings.reloadBlockchainIndices(); @@ -125,8 +125,8 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { injector.getInstance(UserInvitationService.class).createIndexIfNotExists(); injector.getInstance(PageService.class).createIndexIfNotExists(); - if (logger.isInfoEnabled()) { - logger.info("Checking indices [OK]"); + if (logger.isDebugEnabled()) { + logger.debug("Checking indices [OK]"); } // Clean user events on blockchain @@ -164,11 +164,6 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { } protected void doAfterStart() { - // Synchronize - if (pluginSettings.enableP2PSync()) { - injector.getInstance(SynchroService.class) - .startScheduling(); - } // Notify admin injector.getInstance(AdminService.class) diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java index aa607bb3..90bfec49 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java @@ -99,7 +99,11 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public boolean enableP2PSync() { - return settings.getAsBoolean("duniter.p2p.enable", false); + return settings.getAsBoolean("duniter.p2p.enable", true); + } + + public int getP2PSyncTimeOffset() { + return settings.getAsInt("duniter.p2p.timeOffsetInSec", 60*60 /*1 hour*/ ); } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/HistoryService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/HistoryService.java index fc4b3ce7..141102a5 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/HistoryService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/HistoryService.java @@ -112,9 +112,33 @@ public class HistoryService extends AbstractService { public String indexDeleteFromJson(String recordJson) { - JsonNode actualObj = readAndVerifyIssuerSignature(recordJson); - String issuer = actualObj.get(DeleteRecord.PROPERTY_ISSUER).asText(); + JsonNode source = readAndVerifyIssuerSignature(recordJson); + + // Check if valid deletion + checkIsValidDeletion(source); + + if (logger.isDebugEnabled()) { + String issuer = source.get(DeleteRecord.PROPERTY_ISSUER).asText(); + String index = getMandatoryField(source, DeleteRecord.PROPERTY_INDEX).asText(); + String type = getMandatoryField(source, DeleteRecord.PROPERTY_TYPE).asText(); + String id = getMandatoryField(source, DeleteRecord.PROPERTY_ID).asText(); + logger.debug(String.format("Deleting document [%s/%s/%s] - issuer [%s]", index, type, id, issuer.substring(0, 8))); + } + + // Add deletion to history + IndexResponse response = client.prepareIndex(INDEX, DELETE_TYPE) + .setSource(recordJson) + .setRefresh(false) + .execute().actionGet(); + + // Delete the document + applyDocDelete(source); + + return response.getId(); + } + public void checkIsValidDeletion(JsonNode actualObj) { + String issuer = actualObj.get(DeleteRecord.PROPERTY_ISSUER).asText(); String index = getMandatoryField(actualObj, DeleteRecord.PROPERTY_INDEX).asText(); String type = getMandatoryField(actualObj,DeleteRecord.PROPERTY_TYPE).asText(); String id = getMandatoryField(actualObj,DeleteRecord.PROPERTY_ID).asText(); @@ -135,21 +159,15 @@ public class HistoryService extends AbstractService { // Check same document issuer client.checkSameDocumentIssuer(index, type, id, issuer); } + } - if (logger.isDebugEnabled()) { - logger.debug(String.format("Deleting document [%s/%s/%s] - issuer [%s]", index, type, id, issuer.substring(0, 8))); - } - - // Add deletion to history - IndexResponse response = client.prepareIndex(INDEX, DELETE_TYPE) - .setSource(recordJson) - .setRefresh(false) - .execute().actionGet(); + public void applyDocDelete(JsonNode actualObj) { + String index = getMandatoryField(actualObj, DeleteRecord.PROPERTY_INDEX).asText(); + String type = getMandatoryField(actualObj,DeleteRecord.PROPERTY_TYPE).asText(); + String id = getMandatoryField(actualObj,DeleteRecord.PROPERTY_ID).asText(); // Delete the document client.prepareDelete(index, type, id).execute().actionGet(); - - return response.getId(); } public boolean existsInDeleteHistory(final String index, final String type, final String id) { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java index 9ee9d154..b89fef09 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java @@ -29,16 +29,14 @@ import org.duniter.core.client.model.ModelUtils; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.exception.InvalidSignatureException; -import org.duniter.elasticsearch.user.service.AbstractService; +import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.model.Message; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.model.UserEventCodes; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequestBuilder; -import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -123,8 +121,6 @@ public class MessageService extends AbstractService { JsonNode actualObj = readAndVerifyIssuerSignature(recordJson); String issuer = getIssuer(actualObj); - String recipient = getMandatoryField(actualObj, Message.PROPERTY_RECIPIENT).asText(); - Long time = getMandatoryField(actualObj, Message.PROPERTY_TIME).asLong(); if (logger.isDebugEnabled()) { logger.debug(String.format("Indexing a message from issuer [%s]", issuer.substring(0, 8))); @@ -137,23 +133,18 @@ public class MessageService extends AbstractService { String messageId = response.getId(); - // Notify recipient - userEventService.notifyUser(UserEvent.newBuilder(UserEvent.EventType.INFO, UserEventCodes.MESSAGE_RECEIVED.name()) - .setRecipient(recipient) - .setMessage(I18n.n("duniter.user.event.MESSAGE_RECEIVED"), issuer, ModelUtils.minifyPubkey(issuer)) - .setTime(time) - .setReference(INDEX, INBOX_TYPE, messageId) - .build()); + // Notify new message + notifyUser(messageId, actualObj); return messageId; } public String indexOuboxFromJson(String recordJson) { - JsonNode actualObj = readAndVerifyIssuerSignature(recordJson); - String issuer = getIssuer(actualObj); + JsonNode source = readAndVerifyIssuerSignature(recordJson); if (logger.isDebugEnabled()) { + String issuer = getMandatoryField(source, Message.PROPERTY_ISSUER).asText(); logger.debug(String.format("Indexing a message from issuer [%s]", issuer.substring(0, 8))); } @@ -165,6 +156,20 @@ public class MessageService extends AbstractService { return response.getId(); } + public void notifyUser(String messageId, JsonNode actualObj) { + String issuer = getMandatoryField(actualObj, Message.PROPERTY_ISSUER).asText(); + String recipient = getMandatoryField(actualObj, Message.PROPERTY_RECIPIENT).asText(); + Long time = getMandatoryField(actualObj, Message.PROPERTY_TIME).asLong(); + + // Notify recipient + userEventService.notifyUser(UserEvent.newBuilder(UserEvent.EventType.INFO, UserEventCodes.MESSAGE_RECEIVED.name()) + .setRecipient(recipient) + .setMessage(I18n.n("duniter.user.event.MESSAGE_RECEIVED"), issuer, ModelUtils.minifyPubkey(issuer)) + .setTime(time) + .setReference(INDEX, INBOX_TYPE, messageId) + .build()); + } + public void markMessageAsRead(String id, String signature) { Map<String, Object> fields = client.getMandatoryFieldsById(INDEX, INBOX_TYPE, id, Message.PROPERTY_HASH, Message.PROPERTY_RECIPIENT); String recipient = fields.get(UserEvent.PROPERTY_RECIPIENT).toString(); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/PageService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/PageService.java index 91909fe5..8cb1ff3b 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/PageService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/PageService.java @@ -52,7 +52,7 @@ public class PageService extends AbstractService { RegistryIndexDao registryIndexDao, RegistryCommentDao commentDao, RegistryRecordDao recordDao) { - super("gchange.service.page", client, settings, cryptoService); + super("duniter.page", client, settings, cryptoService); this.indexDao = registryIndexDao; this.commentDao = commentDao; this.recordDao = recordDao; diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java index 37b08563..86a95b9a 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java @@ -44,7 +44,7 @@ public class ServiceModule extends AbstractModule implements Module { bind(BlockchainUserEventService.class).asEagerSingleton(); - bind(SynchroService.class).asEagerSingleton(); + //bind(SynchroService.class).asEagerSingleton(); } /* protected methods */ diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java deleted file mode 100644 index 1aa05be8..00000000 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java +++ /dev/null @@ -1,149 +0,0 @@ -package org.duniter.elasticsearch.user.service; - -/* - * #%L - * Duniter4j :: ElasticSearch Plugin - * %% - * Copyright (C) 2014 - 2016 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ - -import org.duniter.core.client.dao.CurrencyDao; -import org.duniter.core.client.dao.PeerDao; -import org.duniter.core.client.model.bma.EndpointApi; -import org.duniter.core.client.model.elasticsearch.Protocol; -import org.duniter.core.client.model.local.Peer; -import org.duniter.core.service.CryptoService; -import org.duniter.core.util.CollectionUtils; -import org.duniter.core.util.DateUtils; -import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.model.SynchroResult; -import org.duniter.elasticsearch.service.AbstractSynchroService; -import org.duniter.elasticsearch.service.DocStatService; -import org.duniter.elasticsearch.service.PeerService; -import org.duniter.elasticsearch.service.ServiceLocator; -import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.duniter.elasticsearch.user.PluginSettings; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.nuiton.i18n.I18n; - -import java.util.List; -import java.util.concurrent.TimeUnit; - -/** - * Created by blavenie on 27/10/16. - */ -public class SynchroService extends AbstractSynchroService<SynchroService> { - - private static final EndpointApi ENDPOINT_API = EndpointApi.ES_USER_API; - - @Inject - public SynchroService(Duniter4jClient client, PluginSettings settings, - CryptoService cryptoService, - PeerService peerService, - ThreadPool threadPool, - CurrencyDao currencyDao, - PeerDao peerDao, - final ServiceLocator serviceLocator) { - super(client, settings.getDelegate(), cryptoService, threadPool, currencyDao, peerDao, serviceLocator); - - // Configure peer service to allow API - peerService.addIncludeEndpointApi(ENDPOINT_API.name()); - } - - /* -- protected methods -- */ - - @Override - protected void synchronize() { - logger.info("Starting user data synchronization..."); - - // Get peers - List<Peer> peers = getPeersFromApi(ENDPOINT_API); - if (CollectionUtils.isNotEmpty(peers)) { - peers.forEach(this::synchronize); - logger.info("User data synchronization [OK]"); - } - else { - logger.info(String.format("User data synchronization [OK] - no endpoint found for API [%s]", ENDPOINT_API.name())); - } - - } - - - protected void synchronize(Peer peer) { - long now = System.currentTimeMillis(); - SynchroResult result = new SynchroResult(); - - long fromTime = 0; // TODO: get last sync time from somewhere ? (e.g. a specific index) - synchronize(peer, fromTime, result); - - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] User data imported in %s ms: %s", peer, System.currentTimeMillis() - now, result.toString())); - } - - } - - protected void synchronize(Peer peer, long fromTime, SynchroResult result) { - synchronizeHistory(peer, fromTime, result); - synchronizeUser(peer, fromTime, result); - synchronizeMessage(peer, fromTime, result); - synchronizeGroup(peer, fromTime, result); - synchronizeInvitation(peer, fromTime, result); - } - - protected void synchronizeHistory(Peer peer, long fromTime, SynchroResult result) { - safeSynchronizeIndex(peer, HistoryService.INDEX, HistoryService.DELETE_TYPE, fromTime, result); - } - - protected void synchronizeUser(Peer peer, long fromTime, SynchroResult result) { - safeSynchronizeIndex(peer, UserService.INDEX, UserService.PROFILE_TYPE, fromTime, result); - safeSynchronizeIndex(peer, UserService.INDEX, UserService.SETTINGS_TYPE, fromTime, result); - } - - protected void synchronizeMessage(Peer peer, long fromTime, SynchroResult result) { - safeSynchronizeIndex(peer, MessageService.INDEX, MessageService.INBOX_TYPE, fromTime, result); - safeSynchronizeIndex(peer, MessageService.INDEX, MessageService.OUTBOX_TYPE, fromTime, result); - - // User events, that reference message index - synchronizeUserEventsOnReferenceIndex(peer, MessageService.INDEX, fromTime, result); - } - - protected void synchronizeGroup(Peer peer, long fromTime, SynchroResult result) { - safeSynchronizeIndex(peer, GroupService.INDEX, GroupService.RECORD_TYPE, fromTime, result); - - // User events, that reference invitation index - synchronizeUserEventsOnReferenceIndex(peer, GroupService.INDEX, fromTime, result); - } - - protected void synchronizeInvitation(Peer peer, long fromTime, SynchroResult result) { - safeSynchronizeIndex(peer, UserInvitationService.INDEX, UserInvitationService.CERTIFICATION_TYPE, fromTime, result); - - // User events, that reference invitation index - synchronizeUserEventsOnReferenceIndex(peer, UserInvitationService.INDEX, fromTime, result); - } - - - protected void synchronizeUserEventsOnReferenceIndex(Peer peer, String referenceIndex, long fromTime, SynchroResult result) { - - /*QueryBuilder query = QueryBuilders.boolQuery() - .filter(QueryBuilders.rangeQuery("time").gte(fromTime)); - safeSynchronizeIndex(peer, UserService.INDEX, UserEventService.EVENT_TYPE, query, result);*/ - } - -} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserInvitationService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserInvitationService.java index f5e393b8..b002322e 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserInvitationService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserInvitationService.java @@ -107,12 +107,10 @@ public class UserInvitationService extends AbstractService { public String indexCertificationInvitationFromJson(String recordJson) { - JsonNode actualObj = readAndVerifyIssuerSignature(recordJson); - String issuer = getIssuer(actualObj); - String recipient = getMandatoryField(actualObj, Message.PROPERTY_RECIPIENT).asText(); - Long time = getMandatoryField(actualObj, Message.PROPERTY_TIME).asLong(); + JsonNode source = readAndVerifyIssuerSignature(recordJson); if (logger.isDebugEnabled()) { + String issuer = getMandatoryField(source, Message.PROPERTY_ISSUER).asText(); logger.debug(String.format("Indexing a invitation to certify from issuer [%s]", issuer.substring(0, 8))); } @@ -123,15 +121,24 @@ public class UserInvitationService extends AbstractService { String invitationId = response.getId(); + // Notify user + notifyUser(invitationId, source); + + return invitationId; + } + + public void notifyUser(String id, JsonNode source) { + String issuer = getMandatoryField(source, Message.PROPERTY_ISSUER).asText(); + String recipient = getMandatoryField(source, Message.PROPERTY_RECIPIENT).asText(); + Long time = getMandatoryField(source, Message.PROPERTY_TIME).asLong(); + // Notify recipient userEventService.notifyUser(UserEvent.newBuilder(UserEvent.EventType.INFO, UserEventCodes.INVITATION_TO_CERTIFY.name()) .setRecipient(recipient) .setMessage(I18n.n("duniter.user.event.INVITATION_TO_CERTIFY"), issuer, ModelUtils.minifyPubkey(issuer)) .setTime(time) - .setReference(INDEX, CERTIFICATION_TYPE, invitationId) + .setReference(INDEX, CERTIFICATION_TYPE, id) .build()); - - return invitationId; } /* -- Internal methods -- */ diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/AbstractSynchroAction.java similarity index 57% rename from duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java rename to duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/AbstractSynchroAction.java index 5e936c8c..7b67d18f 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/AbstractSynchroAction.java @@ -1,209 +1,112 @@ -package org.duniter.elasticsearch.service; - -/* - * #%L - * Duniter4j :: ElasticSearch Plugin - * %% - * Copyright (C) 2014 - 2016 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ +package org.duniter.elasticsearch.service.synchro; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.entity.StringEntity; -import org.duniter.core.client.dao.CurrencyDao; -import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.HttpService; import org.duniter.core.client.service.exception.HttpUnauthorizeException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; -import org.duniter.core.util.CollectionUtils; -import org.duniter.core.util.DateUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; -import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; -import org.duniter.elasticsearch.dao.CurrencyExtendDao; import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.duniter.elasticsearch.exception.InvalidFormatException; import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.duniter.elasticsearch.model.SearchResponse; import org.duniter.elasticsearch.model.SearchScrollResponse; import org.duniter.elasticsearch.model.SynchroResult; +import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.model.UserEvent; +import org.duniter.elasticsearch.user.service.AbstractService; +import org.duniter.elasticsearch.user.service.UserEventService; +import org.duniter.elasticsearch.user.service.UserService; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import java.io.IOException; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -/** - * Created by blavenie on 27/10/16. - */ -public abstract class AbstractSynchroService<T extends AbstractService> extends AbstractService { +public abstract class AbstractSynchroAction extends AbstractService implements SynchroAction { private static final String SCROLL_PARAM_VALUE = "1m"; - protected HttpService httpService; - protected final ThreadPool threadPool; - protected final PeerDao peerDao; - protected final CurrencyDao currencyDao; - - public AbstractSynchroService(Duniter4jClient client, - PluginSettings settings, - CryptoService cryptoService, - ThreadPool threadPool, - CurrencyDao currencyDao, - PeerDao peerDao, - final ServiceLocator serviceLocator) { - super("duniter.network.p2p", client, settings,cryptoService); - this.threadPool = threadPool; - this.currencyDao = currencyDao; - this.peerDao = peerDao; - threadPool.scheduleOnStarted(() -> { - httpService = serviceLocator.getHttpService(); - setIsReady(true); - }); + public interface InsertListener { + void onInsert(String id, JsonNode source); } - /** - * Start scheduling doc stats update - * @return - */ - public T startScheduling() { - long delayBeforeNextHour = DateUtils.delayBeforeNextHour(); + private String fromIndex; + private String fromType; + private String toIndex; + private String toType; + private String issuerFieldName = Record.PROPERTY_ISSUER; + private String versionFieldName = Record.PROPERTY_TIME; - // Five minute before the hour (to make sure to be ready when computing doc stat - see DocStatService) - delayBeforeNextHour -= 5 * 60 * 1000; + private HttpService httpService; - // If not already scheduling to early (in the next 5 min) then launch it - if (delayBeforeNextHour > 5 * 60 * 1000) { + private boolean enableUpdate = false; + private List<InsertListener> insertListeners; - // Launch with a delay of 10 sec - threadPool.schedule(this::synchronize, 10 * 1000, TimeUnit.MILLISECONDS); - } - - // Schedule every hour - threadPool.scheduleAtFixedRate( - this::synchronize, - delayBeforeNextHour, - 60 * 60 * 1000 /* every hour */, - TimeUnit.MILLISECONDS); - - return (T)this; + public AbstractSynchroAction(String index, String type, + Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool) { + this(index, type, index, type, client, pluginSettings, cryptoService, threadPool); } - /* -- protected methods -- */ - - - - protected abstract void synchronize(); - - protected List<Peer> getPeersFromApi(final EndpointApi api) { - Preconditions.checkNotNull(api); - - try { - List<String> currencyIds = currencyDao.getCurrencyIds(); - if (CollectionUtils.isEmpty(currencyIds)) return null; - - return currencyIds.stream() - .map(currencyId -> peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name())) - .filter(Objects::nonNull) - .flatMap(List::stream) - .collect(Collectors.toList()); - } - catch (Exception e) { - logger.error(String.format("Could not get peers for Api [%s]", api.name()), e); - return null; - } + public AbstractSynchroAction(String fromIndex, String fromType, + String toIndex, String toType, + Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool) { + super("duniter.synchro." + toIndex, client, pluginSettings, cryptoService); + this.fromIndex = fromIndex; + this.fromType = fromType; + this.toIndex = toIndex; + this.toType = toType; + threadPool.scheduleOnStarted(() -> httpService = ServiceLocator.instance().getHttpService()); } - protected void safeSynchronizeIndex(Peer peer, String index, String type, long fromTime, SynchroResult result) { - safeSynchronizeIndexRemap(peer, index, type, index, type, Record.PROPERTY_ISSUER, Record.PROPERTY_TIME, fromTime, result); - } - protected void safeSynchronizeIndexRemap(Peer peer, - String fromIndex, String fromType, - String toIndex, String toType, - long fromTime, - SynchroResult result) { - safeSynchronizeIndexRemap(peer, fromIndex, fromType, toIndex, toType, Record.PROPERTY_ISSUER, Record.PROPERTY_TIME, fromTime, result); + @Override + public EndpointApi getEndPointApi() { + return EndpointApi.ES_USER_API; } - protected void safeSynchronizeIndexRemap(Peer peer, - String fromIndex, String fromType, - String toIndex, String toType, - String issuerFieldName, String versionFieldName, - long fromTime, - SynchroResult result) { + @Override + public void handleSynchronize(Peer peer, + long fromTime, + SynchroResult result) { + Preconditions.checkNotNull(peer); Preconditions.checkArgument(fromTime >= 0); + Preconditions.checkNotNull(result); if (logger.isDebugEnabled()) { logger.debug(String.format("[%s] [%s/%s] Synchronizing where [%s > %s]...", peer, toIndex, toType, versionFieldName, fromTime)); } - QueryBuilder fromQuery = createDefaultQuery(fromTime); - safeSynchronizeIndexRemap(peer, fromIndex, fromType, toIndex, toType, issuerFieldName, versionFieldName, fromQuery, result); - } - - protected void safeSynchronizeIndex(Peer peer, - String index, String type, - QueryBuilder query, - SynchroResult result) { - Preconditions.checkNotNull(query); - - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] [%s/%s] Synchronizing using query [%s]...", peer, index, type, query.toString())); - } - - safeSynchronizeIndexRemap(peer, index, type, index, type, Record.PROPERTY_ISSUER, Record.PROPERTY_TIME, query, result); - } - - protected void safeSynchronizeIndexRemap(Peer peer, - String fromIndex, String fromType, - String toIndex, String toType, - String issuerFieldName, String versionFieldName, - QueryBuilder query, - SynchroResult result) { - Preconditions.checkNotNull(peer); - Preconditions.checkNotNull(fromIndex); - Preconditions.checkNotNull(fromType); - Preconditions.checkNotNull(toIndex); - Preconditions.checkNotNull(toType); - Preconditions.checkNotNull(issuerFieldName); - Preconditions.checkNotNull(versionFieldName); - Preconditions.checkNotNull(query); - Preconditions.checkNotNull(result); - try { - synchronizeIndexRemap(peer, fromIndex, fromType, toIndex, toType, issuerFieldName, versionFieldName, query, result); + QueryBuilder query = createQuery(fromTime); + synchronize(peer, query, result); } catch(Exception e1) { // Log the first error @@ -216,53 +119,21 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends } } - private void synchronizeIndexRemap(Peer peer, - String fromIndex, String fromType, - String toIndex, String toType, - String issuerFieldName, - String versionFieldName, - QueryBuilder query, - SynchroResult result) { - - if (!client.existsIndex(toIndex)) { - throw new TechnicalException(String.format("Unable to import changes. Index [%s] not exists", toIndex)); + public void addInsertListener(InsertListener listener) { + if (insertListeners == null) { + insertListeners = Lists.newArrayList(); } + insertListeners.add(listener); + } - ObjectMapper objectMapper = getObjectMapper(); - - long counter = 0; - boolean stop = false; - String scrollId = null; - int total = 0; - while(!stop) { - SearchScrollResponse response; - if (scrollId == null) { - HttpUriRequest request = createScrollRequest(peer, fromIndex, fromType, query); - response = executeAndParseRequest(peer, fromIndex, fromType, request); - if (response != null) { - scrollId = response.getScrollId(); - total = response.getHits().getTotalHits(); - if (total > 0 && logger.isDebugEnabled()) { - logger.debug(String.format("[%s] [%s/%s] %s docs to check...", peer, toIndex, toType, total)); - } - } - } - else { - HttpUriRequest request = createNextScrollRequest(peer, scrollId); - response = executeAndParseRequest(peer, fromIndex, fromType, request); - } + /* -- protected methods -- */ - if (response == null) { - stop = true; - } - else { - counter += fetchAndIndex(peer, toIndex, toType, issuerFieldName, versionFieldName, response, objectMapper, result); - stop = counter >= total; - } - } + protected void notifyInsertListeners(final String id, final JsonNode source) { + if (insertListeners == null) return; + insertListeners.forEach(l -> l.onInsert(id, source)); } - private QueryBuilder createDefaultQuery(long fromTime) { + protected QueryBuilder createQuery(long fromTime) { return QueryBuilders.boolQuery() .should(QueryBuilders.rangeQuery("time").gte(fromTime)); @@ -288,7 +159,7 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends httpPost.setEntity(new StringEntity(content, "UTF-8")); if (logger.isTraceEnabled()) { - logger.trace(String.format("[%s] [%s/%s] Sending POST scroll request: %s", peer, fromIndex, fromType, content)); + logger.trace(String.format("[%s] [%s] [%s/%s] Sending POST scroll request: %s", peer.getCurrency(), peer, fromIndex, fromType, content)); } } catch (IOException e) { @@ -309,23 +180,63 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends return httpPost; } - private SearchScrollResponse executeAndParseRequest(Peer peer, String fromIndex, String fromType, HttpUriRequest request) { + private SearchScrollResponse executeAndParseRequest(Peer peer, HttpUriRequest request) { try { // Execute query & parse response JsonNode node = httpService.executeRequest(request, JsonNode.class, String.class); return node == null ? null : new SearchScrollResponse(node); } catch (HttpUnauthorizeException e) { - throw new TechnicalException(String.format("[%s] [%s/%s] Unable to access (%s).", peer, fromIndex, fromType, e.getMessage()), e); + throw new TechnicalException(String.format("[%s] [%s] [%s/%s] Unable to access (%s).", peer.getCurrency(), peer, fromIndex, fromType, e.getMessage()), e); } catch (TechnicalException e) { - throw new TechnicalException(String.format("[%s] [%s/%s] Unable to synchronize: %s", peer, fromIndex, fromType, e.getMessage()), e); + throw new TechnicalException(String.format("[%s] [%s] [%s/%s] Unable to synchronize: %s", peer.getCurrency(), peer, fromIndex, fromType, e.getMessage()), e); } catch (Exception e) { - throw new TechnicalException(String.format("[%s] [%s/%s] Unable to parse response: ", peer, fromIndex, fromType, e.getMessage()), e); + throw new TechnicalException(String.format("[%s] [%s] [%s/%s] Unable to parse response: ", peer.getCurrency(), peer, fromIndex, fromType, e.getMessage()), e); + } + } + + private void synchronize(Peer peer, + QueryBuilder query, + SynchroResult result) { + + if (!client.existsIndex(toIndex)) { + throw new TechnicalException(String.format("Unable to import changes. Index [%s] not exists", toIndex)); + } + + ObjectMapper objectMapper = getObjectMapper(); + + long counter = 0; + boolean stop = false; + String scrollId = null; + int total = 0; + while(!stop) { + SearchScrollResponse response; + if (scrollId == null) { + HttpUriRequest request = createScrollRequest(peer, fromIndex, fromType, query); + response = executeAndParseRequest(peer, request); + if (response != null) { + scrollId = response.getScrollId(); + total = response.getHits().getTotalHits(); + if (total > 0 && logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s] [%s/%s] %s docs to check...", peer.getCurrency(), peer, toIndex, toType, total)); + } + } + } + else { + HttpUriRequest request = createNextScrollRequest(peer, scrollId); + response = executeAndParseRequest(peer, request); + } + + if (response == null) { + stop = true; + } + else { + counter += fetchAndIndex(peer, response, objectMapper, result); + stop = counter >= total; + } } } private long fetchAndIndex(final Peer peer, - String toIndex, String toType, - String issuerFieldName, String versionFieldName, SearchScrollResponse response, final ObjectMapper objectMapper, SynchroResult result) { @@ -346,7 +257,8 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends JsonNode source = hit.getSource(); if (source == null) { - logger.error("No source for doc " + id); + logger.error(String.format("[%s] [%s] [%s/%s/%s] No source found. Skipping.", peer.getCurrency(), peer, + toIndex, toType, id)); } else { counter++; @@ -368,7 +280,7 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends if (!exists) { if (debug) { - logger.trace(String.format("[%s] [%s/%s] insert _id=%s\n%s", peer, toIndex, toType, id, source.toString())); + logger.trace(String.format("[%s] [%s] [%s/%s] insert _id=%s\n%s", peer.getCurrency(), peer, toIndex, toType, id, source.toString())); } // FIXME: some user/profile document failed ! - see issue #11 @@ -384,11 +296,15 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends bulkRequest.add(client.prepareIndex(toIndex, toType, id) .setSource(objectMapper.writeValueAsBytes(source)) ); + + // Notify insert listeners + notifyInsertListeners(id, source); + insertHits++; } - // Existing doc - else { + // Existing doc: do update (if enable) + else if (enableUpdate){ // Check same issuer String existingIssuer = (String) existingFields.get(issuerFieldName); @@ -402,7 +318,7 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends if (doUpdate) { if (debug) { - logger.trace(String.format("[%s] [%s/%s] update _id=%s\n%s", peer, toIndex, toType, id, source.toString())); + logger.trace(String.format("[%s] [%s] [%s/%s] update _id=%s\n%s", peer.getCurrency(), peer, toIndex, toType, id, source.toString())); } // FIXME: some user/profile document failed ! - see issue #11 @@ -424,13 +340,13 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends } catch (DuniterElasticsearchException e) { if (logger.isDebugEnabled()) { - logger.warn(String.format("[%s] [%s/%s/%s] %s. Skipping.\n%s", peer, toIndex, toType, id, e.getMessage(), source.toString())); + logger.warn(String.format("[%s] [%s] [%s/%s/%s] %s. Skipping.\n%s", peer.getCurrency(), peer, toIndex, toType, id, e.getMessage(), source.toString())); } else { - logger.warn(String.format("[%s] [%s/%s/%s] %s. Skipping.", peer, toIndex, toType, id, e.getMessage())); + logger.warn(String.format("[%s] [%s] [%s/%s/%s] %s. Skipping.", peer.getCurrency(), peer, toIndex, toType, id, e.getMessage())); } // Skipping document (continue) } catch (Exception e) { - logger.error(String.format("[%s] [%s/%s/%s] %s. Skipping.", peer, toIndex, toType, id, e.getMessage()), e); + logger.error(String.format("[%s] [%s] [%s/%s/%s] %s. Skipping.", peer.getCurrency(), peer, toIndex, toType, id, e.getMessage()), e); // Skipping document (continue) } } @@ -450,7 +366,7 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends || missingDocIds.contains(itemResponse.getId()); if (!skip) { if (debug) { - logger.debug(String.format("[%s] [%s/%s] could not process _id=%s: %s. Skipping.", peer, toIndex, toType, itemResponse.getId(), itemResponse.getFailureMessage())); + logger.debug(String.format("[%s] [%s] [%s/%s] could not process _id=%s: %s. Skipping.", peer.getCurrency(), peer, toIndex, toType, itemResponse.getId(), itemResponse.getFailureMessage())); } missingDocIds.add(itemResponse.getId()); } @@ -466,4 +382,39 @@ public abstract class AbstractSynchroService<T extends AbstractService> extends return counter; } + protected void setIssuerFieldName(String issuerFieldName) { + this.issuerFieldName = issuerFieldName; + } + + protected void setVersionFieldName(String versionFieldName) { + this.versionFieldName = versionFieldName; + } + + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } + + protected void setEnableUpdate(boolean enableUpdate) { + this.enableUpdate = enableUpdate; + } + + private void synchronizeUserEventsByReference(Peer peer, UserEvent.Reference reference, long fromTime, SynchroResult result) { + + BoolQueryBuilder query = QueryBuilders.boolQuery() + .filter(QueryBuilders.rangeQuery(Record.PROPERTY_TIME).gte(fromTime)); + + // Query = filter on reference + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + if (StringUtils.isNotBlank(reference.getIndex())) { + boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_INDEX, reference.getIndex())); + } + if (StringUtils.isNotBlank(reference.getType())) { + boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, reference.getType())); + } + + query.should(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery))); + + //safeSynchronizeIndex(peer, UserService.INDEX, UserEventService.EVENT_TYPE, query, result); + } + } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java new file mode 100644 index 00000000..68658ded --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/SynchroModule.java @@ -0,0 +1,62 @@ +package org.duniter.elasticsearch.user.synchro; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.user.synchro.group.SynchroGroupRecordAction; +import org.duniter.elasticsearch.user.synchro.history.SynchroHistoryIndexAction; +import org.duniter.elasticsearch.user.synchro.invitation.SynchroInvitationCertificationIndexAction; +import org.duniter.elasticsearch.user.synchro.message.SynchroMessageInboxIndexAction; +import org.duniter.elasticsearch.user.synchro.message.SynchroMessageOutboxIndexAction; +import org.duniter.elasticsearch.user.synchro.page.SynchroPageCommentAction; +import org.duniter.elasticsearch.user.synchro.page.SynchroPageRecordAction; +import org.duniter.elasticsearch.user.synchro.user.SynchroUserProfileAction; +import org.duniter.elasticsearch.user.synchro.user.SynchroUserSettingsAction; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Module; + +public class SynchroModule extends AbstractModule implements Module { + + @Override protected void configure() { + + // History + bind(SynchroHistoryIndexAction.class).asEagerSingleton(); + + // User + bind(SynchroUserProfileAction.class).asEagerSingleton(); + bind(SynchroUserSettingsAction.class).asEagerSingleton(); + + // Message + bind(SynchroMessageInboxIndexAction.class).asEagerSingleton(); + bind(SynchroMessageOutboxIndexAction.class).asEagerSingleton(); + + // Page and Group + bind(SynchroGroupRecordAction.class).asEagerSingleton(); + bind(SynchroPageRecordAction.class).asEagerSingleton(); + bind(SynchroPageCommentAction.class).asEagerSingleton(); + + // Invitation + bind(SynchroInvitationCertificationIndexAction.class).asEagerSingleton(); + + } + +} \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/group/SynchroGroupRecordAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/group/SynchroGroupRecordAction.java new file mode 100644 index 00000000..c21d4fdf --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/group/SynchroGroupRecordAction.java @@ -0,0 +1,27 @@ +package org.duniter.elasticsearch.user.synchro.group; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.service.GroupService; +import org.elasticsearch.common.inject.Inject; + +public class SynchroGroupRecordAction extends AbstractSynchroAction { + + @Inject + public SynchroGroupRecordAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService) { + super(GroupService.INDEX, GroupService.RECORD_TYPE, client, pluginSettings, cryptoService, threadPool); + + setEnableUpdate(true); // with update + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/history/SynchroHistoryIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/history/SynchroHistoryIndexAction.java new file mode 100644 index 00000000..ce37ed07 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/history/SynchroHistoryIndexAction.java @@ -0,0 +1,45 @@ +package org.duniter.elasticsearch.user.synchro.history; + +import com.fasterxml.jackson.databind.JsonNode; +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.exception.NotFoundException; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.service.HistoryService; +import org.elasticsearch.common.inject.Inject; + +public class SynchroHistoryIndexAction extends AbstractSynchroAction { + + private HistoryService service; + @Inject + public SynchroHistoryIndexAction(final Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService, + HistoryService service) { + super(service.INDEX, service.DELETE_TYPE, client, pluginSettings, cryptoService, threadPool); + this.service = service; + + addInsertListener(this::onInsertDeletion); + + synchroService.register(this); + } + + protected void onInsertDeletion(String historyId, JsonNode source) { + try { + // Check if valid document + service.checkIsValidDeletion(source); + + // Delete the document + service.applyDocDelete(source); + + } catch(NotFoundException e) { + // doc not exists: continue + logger.debug("Doc to delete could not be found. Skipping deletion"); + } + } +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/invitation/SynchroInvitationCertificationIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/invitation/SynchroInvitationCertificationIndexAction.java new file mode 100644 index 00000000..5f456e10 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/invitation/SynchroInvitationCertificationIndexAction.java @@ -0,0 +1,28 @@ +package org.duniter.elasticsearch.user.synchro.invitation; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.service.UserInvitationService; +import org.elasticsearch.common.inject.Inject; + +public class SynchroInvitationCertificationIndexAction extends AbstractSynchroAction { + + @Inject + public SynchroInvitationCertificationIndexAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService, + UserInvitationService service) { + super(UserInvitationService.INDEX, UserInvitationService.CERTIFICATION_TYPE, client, pluginSettings, cryptoService, threadPool); + + addInsertListener(service::notifyUser); + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageInboxIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageInboxIndexAction.java new file mode 100644 index 00000000..b9b2e0a6 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageInboxIndexAction.java @@ -0,0 +1,28 @@ +package org.duniter.elasticsearch.user.synchro.message; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.service.MessageService; +import org.elasticsearch.common.inject.Inject; + +public class SynchroMessageInboxIndexAction extends AbstractSynchroAction { + + @Inject + public SynchroMessageInboxIndexAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService, + MessageService service) { + super(MessageService.INDEX, MessageService.INBOX_TYPE, client, pluginSettings, cryptoService, threadPool); + + addInsertListener(service::notifyUser); + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageOutboxIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageOutboxIndexAction.java new file mode 100644 index 00000000..b3c31970 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/message/SynchroMessageOutboxIndexAction.java @@ -0,0 +1,27 @@ +package org.duniter.elasticsearch.user.synchro.message; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.service.MessageService; +import org.elasticsearch.common.inject.Inject; + +public class SynchroMessageOutboxIndexAction extends AbstractSynchroAction { + + @Inject + public SynchroMessageOutboxIndexAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService) { + super(MessageService.INDEX, MessageService.OUTBOX_TYPE, client, pluginSettings, cryptoService, threadPool); + + setEnableUpdate(false); // no update + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageCommentAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageCommentAction.java new file mode 100644 index 00000000..04dbbfc7 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageCommentAction.java @@ -0,0 +1,28 @@ +package org.duniter.elasticsearch.user.synchro.page; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.dao.page.RegistryCommentDao; +import org.duniter.elasticsearch.user.dao.page.RegistryIndexDao; +import org.elasticsearch.common.inject.Inject; + +public class SynchroPageCommentAction extends AbstractSynchroAction { + + @Inject + public SynchroPageCommentAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService) { + super(RegistryIndexDao.INDEX, RegistryCommentDao.TYPE, client, pluginSettings, cryptoService, threadPool); + + setEnableUpdate(true); // with update + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageRecordAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageRecordAction.java new file mode 100644 index 00000000..ff4038e1 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/page/SynchroPageRecordAction.java @@ -0,0 +1,28 @@ +package org.duniter.elasticsearch.user.synchro.page; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.dao.page.RegistryIndexDao; +import org.duniter.elasticsearch.user.dao.page.RegistryRecordDao; +import org.elasticsearch.common.inject.Inject; + +public class SynchroPageRecordAction extends AbstractSynchroAction { + + @Inject + public SynchroPageRecordAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService) { + super(RegistryIndexDao.INDEX, RegistryRecordDao.TYPE, client, pluginSettings, cryptoService, threadPool); + + setEnableUpdate(true); // with update + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserProfileAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserProfileAction.java new file mode 100644 index 00000000..dfb4d354 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserProfileAction.java @@ -0,0 +1,27 @@ +package org.duniter.elasticsearch.user.synchro.user; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.service.UserService; +import org.elasticsearch.common.inject.Inject; + +public class SynchroUserProfileAction extends AbstractSynchroAction { + + @Inject + public SynchroUserProfileAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService) { + super(UserService.INDEX, UserService.PROFILE_TYPE, client, pluginSettings, cryptoService, threadPool); + + setEnableUpdate(true); // with update + + synchroService.register(this); + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserSettingsAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserSettingsAction.java new file mode 100644 index 00000000..cb07474b --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/synchro/user/SynchroUserSettingsAction.java @@ -0,0 +1,27 @@ +package org.duniter.elasticsearch.user.synchro.user; + +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.synchro.AbstractSynchroAction; +import org.duniter.elasticsearch.service.synchro.SynchroService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.duniter.elasticsearch.user.PluginSettings; +import org.duniter.elasticsearch.user.service.UserService; +import org.elasticsearch.common.inject.Inject; + +public class SynchroUserSettingsAction extends AbstractSynchroAction { + + @Inject + public SynchroUserSettingsAction(Duniter4jClient client, + PluginSettings pluginSettings, + CryptoService cryptoService, + ThreadPool threadPool, + SynchroService synchroService) { + super(UserService.INDEX, UserService.SETTINGS_TYPE, client, pluginSettings, cryptoService, threadPool); + + setEnableUpdate(true); // with update + + synchroService.register(this); + } + +} -- GitLab