Skip to content
Snippets Groups Projects
Commit 79cdd2cd authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[enh] Rename config option 'duniter.blockchain.sync.enable' into 'duniter.blockchain.sync.enable'

[enh] Add an config option 'duniter.blockchain.reload' to force a clean blockchain reloading
[fix] Fix i18n message, for email content when BMA node is DOWN
parent a2add0f8
No related branches found
No related tags found
No related merge requests found
Showing
with 267 additions and 217 deletions
......@@ -111,10 +111,10 @@ security.manager.enabled: false
#
# duniter.enabled: false
#
# Reset and reload all Duniter4j data at startup - DO SET to true in production
# Reset and reload all Duniter4j data at startup - DO NOT SET to true in production
#
# duniter.indices.reload: true
#:
#
# Default string analyzer
#
duniter.string.analyzer: french
......@@ -123,12 +123,12 @@ duniter.string.analyzer: french
#
duniter.blockchain.sync.enable: true
#
# Duniter node to synchronize
# Duniter node address
#
duniter.host: gtest.duniter.org
duniter.port: 10900
#duniter.useSsl: true
#duniter4j.network.timeout
#duniter.network.timeout: 5000
#
# ---------------------------------- Duniter4j security -------------------------
#
......@@ -154,8 +154,8 @@ duniter.security.enable: true
# Should synchronize data using P2P
#
duniter.data.sync.enable: true
duniter.data.sync.host: data.gtest.duniter.fr
duniter.data.sync.port: 80
duniter.data.sync.host: g1.data.duniter.fr
duniter.data.sync.port: 443
# ---------------------------------- Duniter4j Mail module -----------------------
#
......@@ -213,4 +213,4 @@ duniter.subscription.enable: true
#
# Email subscription: URL to a Cesium site (for link in the email content)
#
#duniter.subscription.email.cesium.url: 'https://g1.duniter.fr'
\ No newline at end of file
#duniter.subscription.email.cesium.url: 'http://domain.com/cesium'
\ No newline at end of file
......@@ -112,7 +112,7 @@ security.manager.enabled: false
#
# duniter.enabled: false
#
# Reset and reload all Duniter4j data at startup - DO SET to true in production
# Reset all data and re-create all Duniter4j inidices, at startup - DO SET to true in production
#
# duniter.indices.reload: true
#
......@@ -122,13 +122,17 @@ duniter.string.analyzer: french
#
# Enabling node blockchain synchronization
#
duniter.blockchain.sync.enable: true
duniter.blockchain.enable: true
#
# Force to reset and reload the blockchain, at startup - DO SET to true in production
#
#duniter.blockchain.reload: true
#
# Duniter node to synchronize
#
duniter.host: g1.duniter.org
duniter.port: 443
duniter.useSsl: true
duniter.port: 10901
#duniter.useSsl: true
duniter4j.network.timeout: 10000
#
# ---------------------------------- Duniter4j security module -------------------
......
......@@ -29,7 +29,6 @@ import org.duniter.elasticsearch.dao.BlockStatDao;
import org.duniter.elasticsearch.dao.PeerDao;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.service.BlockchainStatsService;
import org.duniter.elasticsearch.service.CurrencyService;
import org.duniter.elasticsearch.service.PeerService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
......@@ -83,11 +82,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
protected void createIndices() {
boolean reloadIndices = pluginSettings.reloadIndices();
if (reloadIndices) {
if (logger.isInfoEnabled()) {
logger.info("Reloading all Duniter core indices...");
// Reload All indices
if (pluginSettings.reloadAllIndices() || pluginSettings.reloadBlockchainIndices()) {
if (logger.isWarnEnabled()) {
logger.warn("Reloading [core-plugin] indices...");
}
injector.getInstance(CurrencyService.class)
......@@ -95,19 +93,21 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
.createIndexIfNotExists();
if (logger.isInfoEnabled()) {
logger.info("Reloading all Duniter indices... [OK]");
logger.info("Reloading [core-plugin] indices. [OK]");
}
}
else {
if (logger.isInfoEnabled()) {
logger.info("Checking Duniter core indices...");
logger.info("Checking if [core-plugin] indices exists...");
}
injector.getInstance(CurrencyService.class)
.createIndexIfNotExists();
if (logger.isInfoEnabled()) {
logger.info("Checking Duniter core indices... [OK]");
logger.info("Checking if [core-plugin] indices exists. [OK]");
}
}
}
......@@ -115,7 +115,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
protected void doAfterStart() {
// Synchronize blockchain
if (pluginSettings.enableBlockchainSync()) {
if (pluginSettings.enableBlockchain()) {
Peer peer = pluginSettings.checkAndGetPeer();
......
......@@ -188,12 +188,16 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
return settings.get("duniter.string.analyzer", "english");
}
public boolean reloadIndices() {
public boolean reloadAllIndices() {
return settings.getAsBoolean("duniter.indices.reload", false);
}
public boolean enableBlockchainSync() {
return settings.getAsBoolean("duniter.blockchain.sync.enable", false);
public boolean enableBlockchain() {
return settings.getAsBoolean("duniter.blockchain.enable", false);
}
public boolean reloadBlockchainIndices() {
return settings.getAsBoolean("duniter.blockchain.indices.reload", false);
}
public File getTempDirectory() {
......
package org.duniter.elasticsearch.service;
/*
* #%L
* UCoin Java Client :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.google.common.collect.ImmutableList;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.util.BlockchainBlockUtils;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.CollectionUtils;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.dao.BlockStatDao;
import org.duniter.elasticsearch.model.BlockchainBlockStat;
import org.duniter.elasticsearch.service.changes.ChangeEvent;
import org.duniter.elasticsearch.service.changes.ChangeService;
import org.duniter.elasticsearch.service.changes.ChangeSource;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Created by Benoit on 26/04/2017.
*/
public abstract class AbstractBlockchainListenerService extends AbstractService implements ChangeService.ChangeListener {
private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE));
protected final boolean enable;
protected final String listenerId;
protected final ThreadPool threadPool;
@Inject
public AbstractBlockchainListenerService(String loggerName,
Duniter4jClient client,
PluginSettings settings,
CryptoService cryptoService,
ThreadPool threadPool) {
super(loggerName, client, settings, cryptoService);
this.listenerId = loggerName;
this.enable = pluginSettings.enableBlockchain();
this.threadPool = threadPool;
if (this.enable) {
ChangeService.registerListener(this);
}
}
@Override
public String getId() {
return listenerId;
}
@Override
public final void onChange(ChangeEvent change) {
// Skip _id=current
if("current".equals(change.getId())) return;
switch (change.getOperation()) {
// on create
case CREATE: // create
if (change.getSource() != null) {
CompletableFuture.runAsync(() -> {
processCreateBlock(change);
}, threadPool.scheduler());
}
break;
// on update
case INDEX:
if (change.getSource() != null) {
// Delete existing stat
CompletableFuture.runAsync(() -> processBlockDelete(change, true), threadPool.scheduler())
// Then process block
.thenAcceptAsync(aVoid -> processCreateBlock(change));
}
break;
// on DELETE : remove user event on block (using link
case DELETE:
// Delete existing stat
CompletableFuture.runAsync(() -> processBlockDelete(change, false));
break;
}
}
@Override
public Collection<ChangeSource> getChangeSources() {
return CHANGE_LISTEN_SOURCES;
}
/* -- internal method -- */
protected abstract void processCreateBlock(final ChangeEvent change);
protected abstract void processBlockDelete(ChangeEvent change, boolean wait);
}
......@@ -23,20 +23,16 @@ package org.duniter.elasticsearch.service;
*/
import com.google.common.collect.ImmutableList;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.util.BlockchainBlockUtils;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.dao.BlockStatDao;
import org.duniter.elasticsearch.model.BlockchainBlockStat;
import org.duniter.elasticsearch.service.changes.ChangeEvent;
import org.duniter.elasticsearch.service.changes.ChangeService;
import org.duniter.elasticsearch.service.changes.ChangeSource;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
......@@ -44,82 +40,36 @@ import org.elasticsearch.common.metrics.CounterMetric;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
/**
* Created by Benoit on 26/04/2017.
*/
public class BlockchainStatsService extends AbstractService implements ChangeService.ChangeListener {
public class BlockchainStatsService extends AbstractBlockchainListenerService {
private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE));
private final boolean enable;
private final BlockStatDao blockStatDao;
private final ThreadPool threadPool;
@Inject
public BlockchainStatsService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService,
BlockStatDao blockStatDao,
ThreadPool threadPool) {
super("duniter.blockchain.stats", client, settings, cryptoService);
this.enable = pluginSettings.enableBlockchainSync();
super("duniter.blockchain.stats", client, settings, cryptoService, threadPool);
this.blockStatDao = blockStatDao;
this.threadPool = threadPool;
if (this.enable) {
ChangeService.registerListener(this);
}
}
@Override
public String getId() {
return "duniter.blockchain.stats";
}
@Override
public void onChange(ChangeEvent change) {
// Skip _id=current
if(change.getId() == "current") return;
protected void processCreateBlock(final ChangeEvent change) {
try {
switch (change.getOperation()) {
// on create
case CREATE: // create
if (change.getSource() != null) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processCreateBlock(block);
}
break;
// on update
case INDEX:
if (change.getSource() != null) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processUpdateBlock(block);
}
break;
// on DELETE : remove user event on block (using link
case DELETE:
processBlockDelete(change);
break;
}
}
catch(IOException e) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processCreateBlock(block);
} catch (IOException e) {
throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e);
}
}
@Override
public Collection<ChangeSource> getChangeSources() {
return CHANGE_LISTEN_SOURCES;
protected void processBlockDelete(ChangeEvent change, boolean wait) {
if (change.getId() == null) return;
// Delete existing stat
blockStatDao.delete(change.getIndex(), change.getId(), wait);
}
/* -- internal method -- */
......@@ -157,24 +107,7 @@ public class BlockchainStatsService extends AbstractService implements ChangeSer
blockStatDao.create(stat, false/*wait*/);
}
private void processUpdateBlock(final BlockchainBlock block) {
Preconditions.checkNotNull(block);
Preconditions.checkNotNull(block.getNumber());
// Delete existing stat
CompletableFuture.runAsync(() -> blockStatDao.delete(block.getCurrency(), block.getNumber().toString(), true /*wait*/), threadPool.scheduler())
// Then process block
.thenAccept(aVoid -> processCreateBlock(block));
}
private void processBlockDelete(ChangeEvent change) {
if (change.getId() == null) return;
// Delete existing stat
blockStatDao.delete(change.getIndex(), change.getId(), false /*wait*/);
}
protected BlockchainBlockStat newBlockStat(BlockchainBlock block) {
private BlockchainBlockStat newBlockStat(BlockchainBlock block) {
BlockchainBlockStat stat = new BlockchainBlockStat();
stat.setNumber(block.getNumber());
......
......@@ -121,7 +121,7 @@ duniter.string.analyzer: french
#
# Enabling node blockchain synchronization
#
duniter.blockchain.sync.enable: false
duniter.blockchain.enable: false
#
# Duniter node to synchronize
#
......
......@@ -121,7 +121,7 @@ duniter.string.analyzer: french
#
# Enabling node blockchain synchronization
#
duniter.blockchain.sync.enable: false
duniter.blockchain.enable: false
#
# Duniter node to synchronize
#
......
......@@ -22,10 +22,8 @@ package org.duniter.elasticsearch.user;
* #L%
*/
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.crypto.CryptoUtils;
import org.duniter.core.util.crypto.KeyPair;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.user.model.UserEvent;
import org.duniter.elasticsearch.user.service.*;
......@@ -82,11 +80,10 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
protected void createIndices() {
boolean reloadIndices = pluginSettings.reloadIndices();
if (reloadIndices) {
// Reload all indices
if (pluginSettings.reloadAllIndices()) {
if (logger.isInfoEnabled()) {
logger.info("Reloading all User indices...");
logger.info("Reloading [user-plugin] indices...");
}
injector.getInstance(HistoryService.class)
.deleteIndex()
......@@ -105,12 +102,13 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
.createIndexIfNotExists();
if (logger.isInfoEnabled()) {
logger.info("Reloading all Duniter User indices... [OK]");
logger.info("Reloading [user-plugin] indices. [OK]");
}
}
else {
if (logger.isInfoEnabled()) {
logger.info("Checking Duniter User indices...");
logger.info("Checking [user-plugin] indices...");
}
injector.getInstance(HistoryService.class).createIndexIfNotExists();
injector.getInstance(UserService.class).createIndexIfNotExists();
......@@ -119,9 +117,23 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
injector.getInstance(UserInvitationService.class).createIndexIfNotExists();
if (logger.isInfoEnabled()) {
logger.info("Checking Duniter User indices... [OK]");
logger.info("Checking [user-plugin] indices. [OK]");
}
// Reload blockchain indices : user/event
if (pluginSettings.reloadBlockchainIndices()) {
if (logger.isInfoEnabled()) {
logger.info("Deleting existing user event, referencing a block...");
}
// Delete events that reference a block
injector.getInstance(UserEventService.class)
.deleteEventsByReference(new UserEvent.Reference(null/*all*/, BlockchainService.BLOCK_TYPE, null/*all*/));
if (logger.isInfoEnabled()) {
logger.info("Deleting existing user event, referencing a block. [OK]");
}
}
}
}
protected void doAfterStart() {
......
......@@ -28,9 +28,7 @@ import org.duniter.core.util.StringUtils;
import org.duniter.core.util.crypto.CryptoUtils;
import org.duniter.core.util.crypto.KeyPair;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.settings.Settings;
import java.util.Locale;
......@@ -85,7 +83,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
}
public boolean reloadIndices() {
return delegate.reloadIndices();
return delegate.reloadAllIndices();
}
public boolean enableDataSync() {
......@@ -151,7 +149,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
}
public boolean enableBlockchainSync() {
return delegate.enableBlockchainSync();
return delegate.enableBlockchain();
}
public String getKeyringSalt() {
......
......@@ -32,6 +32,7 @@ import org.duniter.core.service.CryptoService;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.service.AbstractBlockchainListenerService;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.service.changes.ChangeEvent;
import org.duniter.elasticsearch.service.changes.ChangeService;
......@@ -40,6 +41,7 @@ import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.user.PluginSettings;
import org.duniter.elasticsearch.user.model.UserEvent;
import org.duniter.elasticsearch.user.model.UserEventCodes;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.common.inject.Inject;
import org.nuiton.i18n.I18n;
......@@ -53,89 +55,52 @@ import java.util.concurrent.CompletableFuture;
/**
* Created by Benoit on 30/03/2015.
*/
public class BlockchainUserEventService extends AbstractService implements ChangeService.ChangeListener {
public class BlockchainUserEventService extends AbstractBlockchainListenerService {
public static final String DEFAULT_PUBKEYS_SEPARATOR = ", ";
private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE));
private final UserService userService;
private final UserEventService userEventService;
private final AdminService adminService;
private final boolean enable;
private final ThreadPool threadPool;
@Inject
public BlockchainUserEventService(Duniter4jClient client, PluginSettings settings, CryptoService cryptoService,
ThreadPool threadPool,
BlockchainService blockchainService,
UserService userService,
AdminService adminService,
UserEventService userEventService,
ThreadPool threadPool) {
super("duniter.user.event.blockchain", client, settings, cryptoService);
UserEventService userEventService) {
super("duniter.user.event.blockchain", client, settings.getDelegate(), cryptoService, threadPool);
this.userService = userService;
this.adminService = adminService;
this.userEventService = userEventService;
this.threadPool = threadPool;
this.enable = pluginSettings.enableBlockchainSync();
if (this.enable) {
ChangeService.registerListener(this);
blockchainService.registerConnectionListener(createConnectionListeners());
}
}
@Override
public String getId() {
return "duniter.user.event.blockchain";
}
@Override
public void onChange(ChangeEvent change) {
// Skip _id=current
if(change.getId() == "current") return;
protected void processCreateBlock(final ChangeEvent change) {
try {
switch (change.getOperation()) {
// on create
case CREATE: // create
if (change.getSource() != null) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processCreateBlock(block);
}
break;
// on update
case INDEX:
if (change.getSource() != null) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processUpdateBlock(block);
}
break;
// on DELETE : remove user event on block (using link
case DELETE:
processBlockDelete(change);
break;
}
}
catch(IOException e) {
BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class);
processCreateBlock(block);
} catch (IOException e) {
throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e);
}
//logger.info("receiveing block change: " + change.toJson());
}
@Override
public Collection<ChangeSource> getChangeSources() {
return CHANGE_LISTEN_SOURCES;
protected void processBlockDelete(ChangeEvent change, boolean wait) {
if (change.getId() == null) return;
// Delete events that reference this block
ActionFuture<?> actionFuture = userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId()));
if (wait) {
actionFuture.actionGet();
}
}
/* -- internal method -- */
......@@ -182,6 +147,7 @@ public class BlockchainUserEventService extends AbstractService implements Chang
};
}
private void processCreateBlock(BlockchainBlock block) {
// Joiners
if (CollectionUtils.isNotEmpty(block.getJoiners())) {
......@@ -219,15 +185,6 @@ public class BlockchainUserEventService extends AbstractService implements Chang
}
}
private void processUpdateBlock(final BlockchainBlock block) {
// Delete events that reference this block
CompletableFuture.runAsync(() -> userEventService.deleteEventsByReference(new UserEvent.Reference(block.getCurrency(), BlockchainService.BLOCK_TYPE, String.valueOf(block.getNumber())))
.actionGet(), threadPool.scheduler())
// Then process the block
.thenAccept(aVoid -> processCreateBlock(block));
}
private void processTx(BlockchainBlock block, BlockchainBlock.Transaction tx) {
Set<String> senders = ImmutableSet.copyOf(tx.getIssuers());
......@@ -289,13 +246,5 @@ public class BlockchainUserEventService extends AbstractService implements Chang
userEventService.notifyUser(event);
}
private void processBlockDelete(ChangeEvent change) {
if (change.getId() == null) return;
// Delete events that reference this block
userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId()));
}
}
......@@ -196,8 +196,6 @@ public class UserEventService extends AbstractService implements ChangeService.C
public ActionFuture<?> deleteEventsByReference(final UserEvent.Reference reference) {
Preconditions.checkNotNull(reference);
Preconditions.checkNotNull(reference.getIndex());
Preconditions.checkNotNull(reference.getType());
return threadPool.schedule(() -> doDeleteEventsByReference(reference));
}
......@@ -370,9 +368,13 @@ public class UserEventService extends AbstractService implements ChangeService.C
.setSearchType(SearchType.QUERY_AND_FETCH);
// Query = filter on reference
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_INDEX, reference.getIndex()))
.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, reference.getType()));
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (StringUtils.isNotBlank(reference.getIndex())) {
boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_INDEX, reference.getIndex()));
}
if (StringUtils.isNotBlank(reference.getType())) {
boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_TYPE, reference.getType()));
}
if (StringUtils.isNotBlank(reference.getId())) {
boolQuery.filter(QueryBuilders.termQuery(UserEvent.PROPERTY_REFERENCE + "." + UserEvent.Reference.PROPERTY_ID, reference.getId()));
}
......@@ -385,33 +387,51 @@ public class UserEventService extends AbstractService implements ChangeService.C
searchRequest.setQuery(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery)));
// Execute query
// Execute query, while there is some data
try {
SearchResponse response = searchRequest.execute().actionGet();
int bulkSize = pluginSettings.getIndexBulkSize();
BulkRequestBuilder bulkRequest = client.prepareBulk();
// Read query result
long counter = 0;
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
bulkRequest.add(
client.prepareDelete(INDEX, EVENT_TYPE, searchHit.getId())
);
counter++;
// Flush the bulk if not empty
if ((counter % bulkSize) == 0) {
client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest);
bulkRequest = client.prepareBulk();
int counter = 0;
boolean loop = true;
searchRequest.setSize(bulkSize);
SearchResponse response = searchRequest.execute().actionGet();
do {
// Read response
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
// Add deletion to bulk
bulkRequest.add(
client.prepareDelete(INDEX, EVENT_TYPE, searchHit.getId())
);
counter++;
// Flush the bulk if not empty
if ((counter % bulkSize) == 0) {
client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest);
bulkRequest = client.prepareBulk();
}
}
}
// Prepare next iteration
if (counter == 0 || counter >= response.getHits().getTotalHits()) {
loop = false;
}
// Prepare next iteration
else {
searchRequest.setFrom(counter);
response = searchRequest.execute().actionGet();
}
} while(loop);
// last flush
client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest);
}
catch(SearchPhaseExecutionException e) {
if ((counter % bulkSize) != 0) {
client.flushDeleteBulk(INDEX, EVENT_TYPE, bulkRequest);
}
} catch (SearchPhaseExecutionException e) {
// Failed or no item on index
logger.error(String.format("Error while deleting by reference: %s. Skipping deletions.", e.getMessage()), e);
}
......
......@@ -8,7 +8,7 @@ duniter.user.event.MEMBER_ACTIVE=Your membership to %1$s has been renewed succes
duniter.user.event.MEMBER_JOIN=You are now a member of currency %1$s\!
duniter.user.event.MEMBER_LEAVE=You are not a member anymore of currency %1$s\!
duniter.user.event.MESSAGE_RECEIVED=You received a message from %2$s.
duniter.user.event.NODE_BMA_DOWN=Duniter node [%1$s\:%2$s] is DOWN\: no access from ES node [%3$s]. Last connexion at %4$d. Blockchain indexation waiting.
duniter.user.event.NODE_BMA_DOWN=Duniter node [%1$s\:%2$s] is DOWN\: no access from ES node [%3$s]. Last connexion at %4$s. Blockchain indexation waiting.
duniter.user.event.NODE_BMA_UP=Duniter node [%1$s\:%2$s] is UP again.
duniter.user.event.NODE_STARTED=Your node ES API [%1$s] is UP.
duniter.user.event.TX_RECEIVED=You received a payment from %2$s.
......
......@@ -8,7 +8,7 @@ duniter.user.event.MEMBER_ACTIVE=Votre adhésion comme membre a bien été renou
duniter.user.event.MEMBER_JOIN=Vous êtes maintenant membre de la monnaie %1$s \!
duniter.user.event.MEMBER_LEAVE=Votre adhésion comme membre à expirée.
duniter.user.event.MESSAGE_RECEIVED=Vous avez reçu un message de %2$s.
duniter.user.event.NODE_BMA_DOWN=Noeud Duniter [%1$s\:%2$s] non joignable, depuis le noeud ES API [%3$s]. Dernière connexion à %4$d. Indexation de blockchain en attente.
duniter.user.event.NODE_BMA_DOWN=Noeud Duniter [%1$s\:%2$s] non joignable, depuis le noeud ES API [%3$s]. Dernière connexion à %4$s. Indexation de blockchain en attente.
duniter.user.event.NODE_BMA_UP=Noeud Duniter [%1$s\:%2$s] à nouveau accessible.
duniter.user.event.NODE_STARTED=Noeud ES API [%1$s] est démarré.
duniter.user.event.TX_RECEIVED=Vous avez recu un paiement de %2$s.
......
......@@ -48,7 +48,7 @@
<!-- release config -->
<autoVersionSubmodules>true</autoVersionSubmodules>
<goals>deploy</goals>
<arguments />
<arguments>-DperformFullRelease</arguments>
<preparationGoals>verify</preparationGoals>
<projectInfoReportsPluginVersion>2.7</projectInfoReportsPluginVersion>
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment