diff --git a/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml b/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml index ecddee4df3d7e601939d2c99cc4968fc0553de1a..2a1f079bc8adafd00dd79f8ed7e8262db465106f 100644 --- a/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/main/assembly/config/elasticsearch.yml @@ -111,10 +111,10 @@ security.manager.enabled: false # # duniter.enabled: false # -# Reset and reload all Duniter4j data at startup - DO SET to true in production +# Reset and reload all Duniter4j data at startup - DO NOT SET to true in production # # duniter.indices.reload: true -#: +# # Default string analyzer # duniter.string.analyzer: french @@ -123,12 +123,12 @@ duniter.string.analyzer: french # duniter.blockchain.sync.enable: true # -# Duniter node to synchronize +# Duniter node address # duniter.host: gtest.duniter.org duniter.port: 10900 #duniter.useSsl: true -#duniter4j.network.timeout +#duniter.network.timeout: 5000 # # ---------------------------------- Duniter4j security ------------------------- # @@ -154,8 +154,8 @@ duniter.security.enable: true # Should synchronize data using P2P # duniter.data.sync.enable: true -duniter.data.sync.host: data.gtest.duniter.fr -duniter.data.sync.port: 80 +duniter.data.sync.host: g1.data.duniter.fr +duniter.data.sync.port: 443 # ---------------------------------- Duniter4j Mail module ----------------------- # @@ -213,4 +213,4 @@ duniter.subscription.enable: true # # Email subscription: URL to a Cesium site (for link in the email content) # -#duniter.subscription.email.cesium.url: 'https://g1.duniter.fr' \ No newline at end of file +#duniter.subscription.email.cesium.url: 'http://domain.com/cesium' \ No newline at end of file diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index 36a53740f78b601eaf9dba499231788e7e01abe4..7950b7cb20dcfc79b45ac6cfb5c1e573790ce88b 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -112,7 +112,7 @@ security.manager.enabled: false # # duniter.enabled: false # -# Reset and reload all Duniter4j data at startup - DO SET to true in production +# Reset all data and re-create all Duniter4j inidices, at startup - DO SET to true in production # # duniter.indices.reload: true # @@ -122,13 +122,17 @@ duniter.string.analyzer: french # # Enabling node blockchain synchronization # -duniter.blockchain.sync.enable: true +duniter.blockchain.enable: true +# +# Force to reset and reload the blockchain, at startup - DO SET to true in production +# +#duniter.blockchain.reload: true # # Duniter node to synchronize # duniter.host: g1.duniter.org -duniter.port: 443 -duniter.useSsl: true +duniter.port: 10901 +#duniter.useSsl: true duniter4j.network.timeout: 10000 # # ---------------------------------- Duniter4j security module ------------------- diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index 6607ca13cb4cca01df149bbf4fc5eafc0f583dac..0e7988f75c9dec3715f8503db0297d616b2e9255 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -29,7 +29,6 @@ import org.duniter.elasticsearch.dao.BlockStatDao; import org.duniter.elasticsearch.dao.PeerDao; import org.duniter.elasticsearch.rest.security.RestSecurityController; import org.duniter.elasticsearch.service.BlockchainService; -import org.duniter.elasticsearch.service.BlockchainStatsService; import org.duniter.elasticsearch.service.CurrencyService; import org.duniter.elasticsearch.service.PeerService; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -83,11 +82,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { protected void createIndices() { - boolean reloadIndices = pluginSettings.reloadIndices(); - - if (reloadIndices) { - if (logger.isInfoEnabled()) { - logger.info("Reloading all Duniter core indices..."); + // Reload All indices + if (pluginSettings.reloadAllIndices() || pluginSettings.reloadBlockchainIndices()) { + if (logger.isWarnEnabled()) { + logger.warn("Reloading [core-plugin] indices..."); } injector.getInstance(CurrencyService.class) @@ -95,19 +93,21 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Reloading all Duniter indices... [OK]"); + logger.info("Reloading [core-plugin] indices. [OK]"); } } + else { + if (logger.isInfoEnabled()) { - logger.info("Checking Duniter core indices..."); + logger.info("Checking if [core-plugin] indices exists..."); } injector.getInstance(CurrencyService.class) .createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Checking Duniter core indices... [OK]"); + logger.info("Checking if [core-plugin] indices exists. [OK]"); } } } @@ -115,7 +115,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { protected void doAfterStart() { // Synchronize blockchain - if (pluginSettings.enableBlockchainSync()) { + if (pluginSettings.enableBlockchain()) { Peer peer = pluginSettings.checkAndGetPeer(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 2157b179e7390494e231d4f7817327cc717615d3..9b218b7005fb0ea8a9eabff21274d7e7ddc25f35 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -188,12 +188,16 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.get("duniter.string.analyzer", "english"); } - public boolean reloadIndices() { + public boolean reloadAllIndices() { return settings.getAsBoolean("duniter.indices.reload", false); } - public boolean enableBlockchainSync() { - return settings.getAsBoolean("duniter.blockchain.sync.enable", false); + public boolean enableBlockchain() { + return settings.getAsBoolean("duniter.blockchain.enable", false); + } + + public boolean reloadBlockchainIndices() { + return settings.getAsBoolean("duniter.blockchain.indices.reload", false); } public File getTempDirectory() { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java new file mode 100644 index 0000000000000000000000000000000000000000..5f95a2a5124b89c2ab2938b639b3e1f937d6b4de --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractBlockchainListenerService.java @@ -0,0 +1,130 @@ +package org.duniter.elasticsearch.service; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.google.common.collect.ImmutableList; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.util.BlockchainBlockUtils; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.BlockStatDao; +import org.duniter.elasticsearch.model.BlockchainBlockStat; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; + +import java.io.IOException; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Created by Benoit on 26/04/2017. + */ +public abstract class AbstractBlockchainListenerService extends AbstractService implements ChangeService.ChangeListener { + + private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE)); + + protected final boolean enable; + protected final String listenerId; + protected final ThreadPool threadPool; + + @Inject + public AbstractBlockchainListenerService(String loggerName, + Duniter4jClient client, + PluginSettings settings, + CryptoService cryptoService, + ThreadPool threadPool) { + super(loggerName, client, settings, cryptoService); + this.listenerId = loggerName; + this.enable = pluginSettings.enableBlockchain(); + this.threadPool = threadPool; + + if (this.enable) { + ChangeService.registerListener(this); + } + } + + + @Override + public String getId() { + return listenerId; + } + + @Override + public final void onChange(ChangeEvent change) { + + // Skip _id=current + if("current".equals(change.getId())) return; + + switch (change.getOperation()) { + // on create + case CREATE: // create + if (change.getSource() != null) { + CompletableFuture.runAsync(() -> { + processCreateBlock(change); + }, threadPool.scheduler()); + } + break; + + // on update + case INDEX: + if (change.getSource() != null) { + // Delete existing stat + CompletableFuture.runAsync(() -> processBlockDelete(change, true), threadPool.scheduler()) + // Then process block + .thenAcceptAsync(aVoid -> processCreateBlock(change)); + } + break; + + // on DELETE : remove user event on block (using link + case DELETE: + // Delete existing stat + CompletableFuture.runAsync(() -> processBlockDelete(change, false)); + break; + } + + } + + @Override + public Collection<ChangeSource> getChangeSources() { + return CHANGE_LISTEN_SOURCES; + } + + /* -- internal method -- */ + + protected abstract void processCreateBlock(final ChangeEvent change); + + protected abstract void processBlockDelete(ChangeEvent change, boolean wait); + + +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java index 908d87e49b34e94a6d76006f81576238b0e42607..e233e42c5e6b81f1d4527e3caf837b2548c74242 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainStatsService.java @@ -23,20 +23,16 @@ package org.duniter.elasticsearch.service; */ -import com.google.common.collect.ImmutableList; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.util.BlockchainBlockUtils; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; -import org.duniter.core.util.Preconditions; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.dao.BlockStatDao; import org.duniter.elasticsearch.model.BlockchainBlockStat; import org.duniter.elasticsearch.service.changes.ChangeEvent; -import org.duniter.elasticsearch.service.changes.ChangeService; -import org.duniter.elasticsearch.service.changes.ChangeSource; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.metrics.CounterMetric; @@ -44,82 +40,36 @@ import org.elasticsearch.common.metrics.CounterMetric; import java.io.IOException; import java.math.BigInteger; import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.CompletableFuture; /** * Created by Benoit on 26/04/2017. */ -public class BlockchainStatsService extends AbstractService implements ChangeService.ChangeListener { +public class BlockchainStatsService extends AbstractBlockchainListenerService { - private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE)); - - private final boolean enable; private final BlockStatDao blockStatDao; - private final ThreadPool threadPool; @Inject public BlockchainStatsService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, BlockStatDao blockStatDao, ThreadPool threadPool) { - super("duniter.blockchain.stats", client, settings, cryptoService); - this.enable = pluginSettings.enableBlockchainSync(); + super("duniter.blockchain.stats", client, settings, cryptoService, threadPool); this.blockStatDao = blockStatDao; - this.threadPool = threadPool; - - if (this.enable) { - ChangeService.registerListener(this); - } } - @Override - public String getId() { - return "duniter.blockchain.stats"; - } - - @Override - public void onChange(ChangeEvent change) { - - // Skip _id=current - if(change.getId() == "current") return; - + protected void processCreateBlock(final ChangeEvent change) { try { - - switch (change.getOperation()) { - // on create - case CREATE: // create - if (change.getSource() != null) { - BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); - processCreateBlock(block); - } - break; - - // on update - case INDEX: - if (change.getSource() != null) { - BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); - processUpdateBlock(block); - } - break; - - // on DELETE : remove user event on block (using link - case DELETE: - processBlockDelete(change); - - break; - } - - } - catch(IOException e) { + BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + processCreateBlock(block); + } catch (IOException e) { throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); } - } - @Override - public Collection<ChangeSource> getChangeSources() { - return CHANGE_LISTEN_SOURCES; + protected void processBlockDelete(ChangeEvent change, boolean wait) { + if (change.getId() == null) return; + + // Delete existing stat + blockStatDao.delete(change.getIndex(), change.getId(), wait); } /* -- internal method -- */ @@ -157,24 +107,7 @@ public class BlockchainStatsService extends AbstractService implements ChangeSer blockStatDao.create(stat, false/*wait*/); } - private void processUpdateBlock(final BlockchainBlock block) { - Preconditions.checkNotNull(block); - Preconditions.checkNotNull(block.getNumber()); - - // Delete existing stat - CompletableFuture.runAsync(() -> blockStatDao.delete(block.getCurrency(), block.getNumber().toString(), true /*wait*/), threadPool.scheduler()) - // Then process block - .thenAccept(aVoid -> processCreateBlock(block)); - } - - private void processBlockDelete(ChangeEvent change) { - if (change.getId() == null) return; - - // Delete existing stat - blockStatDao.delete(change.getIndex(), change.getId(), false /*wait*/); - } - - protected BlockchainBlockStat newBlockStat(BlockchainBlock block) { + private BlockchainBlockStat newBlockStat(BlockchainBlock block) { BlockchainBlockStat stat = new BlockchainBlockStat(); stat.setNumber(block.getNumber()); diff --git a/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml index b9d916475573ca46815ff863b2268b8386cb1941..e1c3d91b428c0be5c12cb0ab28ddec3efa8cd14c 100644 --- a/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-core/src/test/es-home/config/elasticsearch.yml @@ -121,7 +121,7 @@ duniter.string.analyzer: french # # Enabling node blockchain synchronization # -duniter.blockchain.sync.enable: false +duniter.blockchain.enable: false # # Duniter node to synchronize # diff --git a/duniter4j-es-subscription/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-subscription/src/test/es-home/config/elasticsearch.yml index c0d0fd118f5894a353c0a84149a37a1f5b69332c..abd6812e68fb27a937c3278129b9adbde89b5a2e 100644 --- a/duniter4j-es-subscription/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-subscription/src/test/es-home/config/elasticsearch.yml @@ -121,7 +121,7 @@ duniter.string.analyzer: french # # Enabling node blockchain synchronization # -duniter.blockchain.sync.enable: false +duniter.blockchain.enable: false # # Duniter node to synchronize # diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java index 0e554a0d8d79146bf6adf2ad1dc2facc7166697d..2833d2c0aab1d008f6bf6aba4c9114828398da1d 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java @@ -22,10 +22,8 @@ package org.duniter.elasticsearch.user; * #L% */ -import org.duniter.core.util.StringUtils; -import org.duniter.core.util.crypto.CryptoUtils; -import org.duniter.core.util.crypto.KeyPair; import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.service.*; @@ -82,11 +80,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { protected void createIndices() { - boolean reloadIndices = pluginSettings.reloadIndices(); - - if (reloadIndices) { + // Reload all indices + if (pluginSettings.reloadAllIndices()) { if (logger.isInfoEnabled()) { - logger.info("Reloading all User indices..."); + logger.info("Reloading [user-plugin] indices..."); } injector.getInstance(HistoryService.class) .deleteIndex() @@ -105,12 +102,13 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Reloading all Duniter User indices... [OK]"); + logger.info("Reloading [user-plugin] indices. [OK]"); } } + else { if (logger.isInfoEnabled()) { - logger.info("Checking Duniter User indices..."); + logger.info("Checking [user-plugin] indices..."); } injector.getInstance(HistoryService.class).createIndexIfNotExists(); injector.getInstance(UserService.class).createIndexIfNotExists(); @@ -119,9 +117,23 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { injector.getInstance(UserInvitationService.class).createIndexIfNotExists(); if (logger.isInfoEnabled()) { - logger.info("Checking Duniter User indices... [OK]"); + logger.info("Checking [user-plugin] indices. [OK]"); + } + + // Reload blockchain indices : user/event + if (pluginSettings.reloadBlockchainIndices()) { + if (logger.isInfoEnabled()) { + logger.info("Deleting existing user event, referencing a block..."); + } + // Delete events that reference a block + injector.getInstance(UserEventService.class) + .deleteEventsByReference(new UserEvent.Reference(null/*all*/, BlockchainService.BLOCK_TYPE, null/*all*/)); + if (logger.isInfoEnabled()) { + logger.info("Deleting existing user event, referencing a block. [OK]"); + } } } + } protected void doAfterStart() { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java index a73b8f1afb9ccc478a8e5804ab4b06fc22fd4330..7e54777d4a9c3d45a3a051d987a74ad593a08b54 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java @@ -28,9 +28,7 @@ import org.duniter.core.util.StringUtils; import org.duniter.core.util.crypto.CryptoUtils; import org.duniter.core.util.crypto.KeyPair; import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Injector; import org.elasticsearch.common.settings.Settings; import java.util.Locale; @@ -85,7 +83,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public boolean reloadIndices() { - return delegate.reloadIndices(); + return delegate.reloadAllIndices(); } public boolean enableDataSync() { @@ -151,7 +149,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public boolean enableBlockchainSync() { - return delegate.enableBlockchainSync(); + return delegate.enableBlockchain(); } public String getKeyringSalt() { diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index 0cf01c62f3a05726a55e4585d9e5f6b9ccc3b599..057796749cc02f8384f30396b591d74525ef3540 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -32,6 +32,7 @@ import org.duniter.core.service.CryptoService; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.service.AbstractBlockchainListenerService; import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.changes.ChangeEvent; import org.duniter.elasticsearch.service.changes.ChangeService; @@ -40,6 +41,7 @@ import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.model.UserEventCodes; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.common.inject.Inject; import org.nuiton.i18n.I18n; @@ -53,89 +55,52 @@ import java.util.concurrent.CompletableFuture; /** * Created by Benoit on 30/03/2015. */ -public class BlockchainUserEventService extends AbstractService implements ChangeService.ChangeListener { +public class BlockchainUserEventService extends AbstractBlockchainListenerService { public static final String DEFAULT_PUBKEYS_SEPARATOR = ", "; - private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE)); - private final UserService userService; private final UserEventService userEventService; private final AdminService adminService; - private final boolean enable; - private final ThreadPool threadPool; - @Inject public BlockchainUserEventService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService, + ThreadPool threadPool, BlockchainService blockchainService, UserService userService, AdminService adminService, - UserEventService userEventService, - ThreadPool threadPool) { - super("duniter.user.event.blockchain", client, settings, cryptoService); + UserEventService userEventService) { + super("duniter.user.event.blockchain", client, settings.getDelegate(), cryptoService, threadPool); this.userService = userService; this.adminService = adminService; this.userEventService = userEventService; - this.threadPool = threadPool; - this.enable = pluginSettings.enableBlockchainSync(); - if (this.enable) { - ChangeService.registerListener(this); blockchainService.registerConnectionListener(createConnectionListeners()); } } - @Override - public String getId() { - return "duniter.user.event.blockchain"; - } @Override - public void onChange(ChangeEvent change) { - - // Skip _id=current - if(change.getId() == "current") return; - + protected void processCreateBlock(final ChangeEvent change) { try { - - switch (change.getOperation()) { - // on create - case CREATE: // create - if (change.getSource() != null) { - BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); - processCreateBlock(block); - } - break; - - // on update - case INDEX: - if (change.getSource() != null) { - BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); - processUpdateBlock(block); - } - break; - - // on DELETE : remove user event on block (using link - case DELETE: - processBlockDelete(change); - - break; - } - - } - catch(IOException e) { + BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + processCreateBlock(block); + } catch (IOException e) { throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); } - - //logger.info("receiveing block change: " + change.toJson()); } @Override - public Collection<ChangeSource> getChangeSources() { - return CHANGE_LISTEN_SOURCES; + protected void processBlockDelete(ChangeEvent change, boolean wait) { + if (change.getId() == null) return; + + // Delete events that reference this block + ActionFuture<?> actionFuture = userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId())); + if (wait) { + actionFuture.actionGet(); + } } /* -- internal method -- */ @@ -182,6 +147,7 @@ public class BlockchainUserEventService extends AbstractService implements Chang }; } + private void processCreateBlock(BlockchainBlock block) { // Joiners if (CollectionUtils.isNotEmpty(block.getJoiners())) { @@ -219,15 +185,6 @@ public class BlockchainUserEventService extends AbstractService implements Chang } } - private void processUpdateBlock(final BlockchainBlock block) { - - // Delete events that reference this block - CompletableFuture.runAsync(() -> userEventService.deleteEventsByReference(new UserEvent.Reference(block.getCurrency(), BlockchainService.BLOCK_TYPE, String.valueOf(block.getNumber()))) - .actionGet(), threadPool.scheduler()) - // Then process the block - .thenAccept(aVoid -> processCreateBlock(block)); - } - private void processTx(BlockchainBlock block, BlockchainBlock.Transaction tx) { Set<String> senders = ImmutableSet.copyOf(tx.getIssuers()); @@ -289,13 +246,5 @@ public class BlockchainUserEventService extends AbstractService implements Chang userEventService.notifyUser(event); } - private void processBlockDelete(ChangeEvent change) { - if (change.getId() == null) return; - - // Delete events that reference this block - userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId())); - } - - } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index 0860736cc5268a319e2cf3fc72dab8a006151071..ae33dccf7e950ec9c9eac3b91e345f3f7383ed8f 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -196,8 +196,6 @@ public class UserEventService extends AbstractService implements ChangeService.C public ActionFuture<?> deleteEventsByReference(final UserEvent.Reference reference) { Preconditions.checkNotNull(reference); - Preconditions.checkNotNull(reference.getIndex()); - Preconditions.checkNotNull(reference.getType()); return threadPool.schedule(() -> doDeleteEventsByReference(reference)); } @@ -370,9 +368,13 @@ public class UserEventService extends AbstractService implements ChangeService.C .setSearchType(SearchType.QUERY_AND_FETCH); // Query = filter on reference - BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_INDEX, reference.getIndex())) - .filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, reference.getType())); + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + if (StringUtils.isNotBlank(reference.getIndex())) { + boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_INDEX, reference.getIndex())); + } + if (StringUtils.isNotBlank(reference.getType())) { + boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, reference.getType())); + } if (StringUtils.isNotBlank(reference.getId())) { boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_ID, reference.getId())); } @@ -385,33 +387,51 @@ public class UserEventService extends AbstractService implements ChangeService.C searchRequest.setQuery(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery))); - // Execute query + // Execute query, while there is some data try { - SearchResponse response = searchRequest.execute().actionGet(); - int bulkSize = pluginSettings.getIndexBulkSize(); BulkRequestBuilder bulkRequest = client.prepareBulk(); - // Read query result - long counter = 0; - SearchHit[] searchHits = response.getHits().getHits(); - for (SearchHit searchHit : searchHits) { - bulkRequest.add( - client.prepareDelete(INDEX, EVENT_TYPE, searchHit.getId()) - ); - counter++; - - // Flush the bulk if not empty - if ((counter % bulkSize) == 0) { - client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); - bulkRequest = client.prepareBulk(); + int counter = 0; + boolean loop = true; + searchRequest.setSize(bulkSize); + SearchResponse response = searchRequest.execute().actionGet(); + do { + + // Read response + SearchHit[] searchHits = response.getHits().getHits(); + for (SearchHit searchHit : searchHits) { + + // Add deletion to bulk + bulkRequest.add( + client.prepareDelete(INDEX, EVENT_TYPE, searchHit.getId()) + ); + counter++; + + // Flush the bulk if not empty + if ((counter % bulkSize) == 0) { + client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); + bulkRequest = client.prepareBulk(); + } } - } + + // Prepare next iteration + if (counter == 0 || counter >= response.getHits().getTotalHits()) { + loop = false; + } + // Prepare next iteration + else { + searchRequest.setFrom(counter); + response = searchRequest.execute().actionGet(); + } + } while(loop); // last flush - client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); - } - catch(SearchPhaseExecutionException e) { + if ((counter % bulkSize) != 0) { + client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest); + } + + } catch (SearchPhaseExecutionException e) { // Failed or no item on index logger.error(String.format("Error while deleting by reference: %s. Skipping deletions.", e.getMessage()), e); } diff --git a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties index 3a0282233313932b6adb7e15c3add4f6d56ab3b7..7110fb3bdd703cea3d7c23989bfb0313a29920b6 100644 --- a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties +++ b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties @@ -8,7 +8,7 @@ duniter.user.event.MEMBER_ACTIVE=Your membership to %1$s has been renewed succes duniter.user.event.MEMBER_JOIN=You are now a member of currency %1$s\! duniter.user.event.MEMBER_LEAVE=You are not a member anymore of currency %1$s\! duniter.user.event.MESSAGE_RECEIVED=You received a message from %2$s. -duniter.user.event.NODE_BMA_DOWN=Duniter node [%1$s\:%2$s] is DOWN\: no access from ES node [%3$s]. Last connexion at %4$d. Blockchain indexation waiting. +duniter.user.event.NODE_BMA_DOWN=Duniter node [%1$s\:%2$s] is DOWN\: no access from ES node [%3$s]. Last connexion at %4$s. Blockchain indexation waiting. duniter.user.event.NODE_BMA_UP=Duniter node [%1$s\:%2$s] is UP again. duniter.user.event.NODE_STARTED=Your node ES API [%1$s] is UP. duniter.user.event.TX_RECEIVED=You received a payment from %2$s. diff --git a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties index 9c45f1707a484acc722a683f1bde38c0488e9d24..be2471dfe68a8b57fe958b79b6410933fbe396e2 100644 --- a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties +++ b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties @@ -8,7 +8,7 @@ duniter.user.event.MEMBER_ACTIVE=Votre adhésion comme membre a bien été renou duniter.user.event.MEMBER_JOIN=Vous êtes maintenant membre de la monnaie %1$s \! duniter.user.event.MEMBER_LEAVE=Votre adhésion comme membre à expirée. duniter.user.event.MESSAGE_RECEIVED=Vous avez reçu un message de %2$s. -duniter.user.event.NODE_BMA_DOWN=Noeud Duniter [%1$s\:%2$s] non joignable, depuis le noeud ES API [%3$s]. Dernière connexion à %4$d. Indexation de blockchain en attente. +duniter.user.event.NODE_BMA_DOWN=Noeud Duniter [%1$s\:%2$s] non joignable, depuis le noeud ES API [%3$s]. Dernière connexion à %4$s. Indexation de blockchain en attente. duniter.user.event.NODE_BMA_UP=Noeud Duniter [%1$s\:%2$s] à nouveau accessible. duniter.user.event.NODE_STARTED=Noeud ES API [%1$s] est démarré. duniter.user.event.TX_RECEIVED=Vous avez recu un paiement de %2$s. diff --git a/pom.xml b/pom.xml index fd768c7ca446153bd7026e5029534da76b08b026..29bcc9ba774a7f0063c7a6acab5f1401bd6246f6 100644 --- a/pom.xml +++ b/pom.xml @@ -48,7 +48,7 @@ <!-- release config --> <autoVersionSubmodules>true</autoVersionSubmodules> <goals>deploy</goals> - <arguments /> + <arguments>-DperformFullRelease</arguments> <preparationGoals>verify</preparationGoals> <projectInfoReportsPluginVersion>2.7</projectInfoReportsPluginVersion>