Skip to content
Snippets Groups Projects
Commit c9be06a7 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[fix] fix peer indexation (job was not started, and change field 'hash' has optional)

[fix] email subscription scheduled job not started
[fix] Allow access to index subscription/execution
[enh] synchronize index subscription/execution
parent 04b0f074
No related branches found
No related tags found
No related merge requests found
Showing
with 134 additions and 67 deletions
......@@ -182,7 +182,7 @@ public class MailServiceImpl implements MailService, Closeable {
transport.sendMessage(message, message.getAllRecipients());
} catch (MessagingException e) {
throw new TechnicalException(String.format("Error while sending email to [%s] using smtp server [%s]: %s",
throw new TechnicalException(String.format("Error while sending email to [%s] using smtp config [%s]: %s",
Joiner.on(',').join(recipients), getSmtpServerAsString(),
e.getMessage()
), e);
......@@ -198,7 +198,9 @@ public class MailServiceImpl implements MailService, Closeable {
private String getSmtpServerAsString(String smtpHost, int smtpPort, String smtpUsername) {
StringBuilder buffer = new StringBuilder();
if (StringUtils.isNotBlank(smtpUsername)) {
buffer.append(smtpUsername).append("@");
buffer.append("username=")
.append(smtpUsername)
.append(", server=");
}
return buffer.append(smtpHost)
.append(":")
......@@ -321,7 +323,8 @@ public class MailServiceImpl implements MailService, Closeable {
mc.addMailcap("text/html;; x-java-content-handler=com.sun.mail.handlers.text_html");
mc.addMailcap("text/xml;; x-java-content-handler=com.sun.mail.handlers.text_xml");
mc.addMailcap("text/plain;; x-java-content-handler=com.sun.mail.handlers.text_plain");
mc.addMailcap("multipart/*;; x-java-content-handler=com.sun.mail.handlers.multipart_mixed");
//mc.addMailcap("multipart/*;; x-java-content-handler=com.sun.mail.handlers.multipart_mixed");
mc.addMailcap("multipart/alternative;; x-java-content-handler=com.sun.mail.handlers.multipart_mixed");
mc.addMailcap("message/rfc822;; x-java-content- handler=com.sun.mail.handlers.message_rfc822");
}
}
\ No newline at end of file
......@@ -87,7 +87,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao {
Preconditions.checkNotNull(peer);
Preconditions.checkArgument(StringUtils.isNotBlank(peer.getId()));
Preconditions.checkArgument(StringUtils.isNotBlank(peer.getCurrency()));
Preconditions.checkNotNull(peer.getHash());
//Preconditions.checkNotNull(peer.getHash());
Preconditions.checkNotNull(peer.getHost());
Preconditions.checkNotNull(peer.getApi());
......
......@@ -97,7 +97,7 @@ public abstract class AbstractService implements Bean {
}
if (logger.isDebugEnabled()) {
logger.debug(I18n.t("duniter4j.removeServiceUtils.waitThenRetry", e.getMessage(), retry, retryCount));
logger.debug(I18n.t("duniter4j.service.waitThenRetry", e.getMessage(), retry, retryCount));
}
try {
......
......@@ -540,7 +540,7 @@ public class BlockchainService extends AbstractService {
}
if (StringUtils.isNotBlank(currentBlockJson)) {
indexCurrentBlockFromJson(currencyName, currentBlockJson, false);
indexCurrentBlockFromJson(currencyName, currentBlockJson, true);
}
return missingBlockNumbers;
......
......@@ -34,7 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Clase qui offre l'interface de ES, tout en étant compatible avec java.util.concurrent.CompletableFuture
* Classe qui offre l'interface de ES, tout en étant compatible avec java.util.concurrent.CompletableFuture
* (poar exemple pour faire des thenCompose()...
* @param <T>
*/
......
......@@ -42,10 +42,7 @@ import org.elasticsearch.transport.TransportService;
import org.nuiton.i18n.I18n;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
* Manage thread pool, to execute tasks asynchronously.
......@@ -76,11 +73,6 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
this.scheduler = new LoggingScheduledThreadPoolExecutor(logger, availableProcessors,
EsExecutors.daemonThreadFactory(settings, "duniter-scheduler"),
new RetryPolicy(1, TimeUnit.SECONDS));
/*this.scheduler = new ScheduledThreadPoolExecutor(availableProcessors,
EsExecutors.daemonThreadFactory(settings, "duniter-scheduler"),
new RetryPolicy(1, TimeUnit.SECONDS)) {
};*/
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
this.scheduler.setRemoveOnCancelPolicy(true);
......@@ -161,6 +153,13 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
command));
}
public ScheduledActionFuture<?> schedule(Runnable command,
long delay, TimeUnit unit) {
return new ScheduledActionFuture<>(scheduler.schedule(command, delay, unit));
}
/**
* Schedules an rest that runs on the scheduler thread, after a delay.
*
......@@ -182,21 +181,11 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
* @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled
*/
public ScheduledActionFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit timeUnit) {
return new ScheduledActionFuture<>(scheduler.scheduleAtFixedRate(command, initialDelay, period, timeUnit));
}
/**
* Schedules a periodic rest that always runs on the scheduler thread.
*
* @param command the rest to take
* @param initialDelay the initial delay
* @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled
*/
public ScheduledActionFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit timeUnit) {
return new ScheduledActionFuture<>(scheduler.scheduleWithFixedDelay(command, initialDelay, delay, timeUnit));
long initialDelayMs = new TimeValue(initialDelay, timeUnit).millis();
long periodMs = new TimeValue(period, timeUnit).millis();
return new ScheduledActionFuture<>(scheduleAtFixedRateWorkaround(command, initialDelayMs, periodMs));
}
/* -- protected methods -- */
protected <T extends LifecycleComponent<T>> ScheduledActionFuture<?> scheduleAfterServiceState(Class<T> waitingServiceClass,
......@@ -254,9 +243,38 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> {
return canContinue;
}
/**
* This method use a workaround to execution schedule at fixed time, because standard call of scheduler.scheduleAtFixedRate
* does not worked !!
**/
protected ScheduledFuture<?> scheduleAtFixedRateWorkaround(final Runnable command, final long initialDelayMs, final long periodMs) {
final long expectedNextExecutionTime = System.currentTimeMillis() + initialDelayMs + periodMs;
return scheduler.schedule(
() -> {
try {
command.run();
} catch (Throwable t) {
logger.error("Error while processing subscriptions", t);
}
long nextDelayMs = expectedNextExecutionTime - System.currentTimeMillis();
// When an execution duration is too long, go to next execution time.
while (nextDelayMs < 0) {
nextDelayMs += periodMs;
}
// Schedule the next execution
scheduleAtFixedRateWorkaround(command, nextDelayMs, periodMs);
},
initialDelayMs,
TimeUnit.MILLISECONDS)
;
}
public ScheduledExecutorService scheduler() {
return delegate.scheduler();
//return scheduler;
return scheduler;
}
......
......@@ -43,7 +43,7 @@ duniter4j.executor.task.waitingExecution=
duniter4j.job.stopped=
duniter4j.job.stopping=
duniter4j.job.success=
duniter4j.removeServiceUtils.waitThenRetry=
duniter4j.service.waitThenRetry=Error [%s]... will retry [%s/%s]
duniter4j.task.issuer.system=System
duniter4j.task.starting=Starting task...
duniter4j.threadPool.clusterHealthStatus.changed=Cluster health status changed to [%s]
......@@ -43,7 +43,7 @@ duniter4j.executor.task.waitingExecution=
duniter4j.job.stopped=
duniter4j.job.stopping=
duniter4j.job.success=
duniter4j.removeServiceUtils.waitThenRetry=
duniter4j.service.waitThenRetry=Echec [%s]... tentative [%s/%s]
duniter4j.task.issuer.system=Système
duniter4j.task.starting=Démarrage du traitement...
duniter4j.threadPool.clusterHealthStatus.changed=Cluster health status changed to [%s]
......@@ -22,7 +22,12 @@ package org.duniter.elasticsearch.subscription;
* #L%
*/
import org.duniter.elasticsearch.dao.BlockDao;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.service.PeerService;
import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao;
import org.duniter.elasticsearch.subscription.dao.execution.SubscriptionExecutionDao;
import org.duniter.elasticsearch.subscription.service.SubscriptionService;
import org.duniter.elasticsearch.subscription.service.SynchroService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
......@@ -33,6 +38,7 @@ import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestRequest;
/**
* Created by blavenie on 17/06/16.
......@@ -58,7 +64,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
createIndices();
// Waiting cluster back to GREEN or YELLOW state, before synchronize
threadPool.scheduleOnClusterHealthStatus(this::synchronize,
threadPool.scheduleOnClusterHealthStatus(this::doAfterStart,
ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN);
}, ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN);
}
......@@ -101,6 +107,13 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
}
}
protected void doAfterStart() {
// Wait cluster state OK, then synchronize
threadPool.scheduleOnClusterHealthStatus(this::synchronize,
ClusterHealthStatus.YELLOW, ClusterHealthStatus.GREEN);
}
protected void synchronize() {
if (pluginSettings.enableDataSync()) {
......
......@@ -32,9 +32,15 @@ public class RestModule extends AbstractModule implements Module {
@Override protected void configure() {
// Mail
// Subscription category
bind(RestSubscriptionCategoryGetAction.class).asEagerSingleton();
// Subscription execution
bind(RestSubscriptionCategoryGetAction.class).asEagerSingleton();
// Subscription record
bind(RestSubscriptionRecordIndexAction.class).asEagerSingleton();
bind(RestSubscriptionRecordUpdateAction.class).asEagerSingleton();
bind(RestSubscriptionCategoryGetAction.class).asEagerSingleton();
}
}
\ No newline at end of file
package org.duniter.elasticsearch.subscription.rest.execution;
/*
* #%L
* duniter4j-elasticsearch-plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao;
import org.duniter.elasticsearch.subscription.dao.execution.SubscriptionExecutionDao;
import org.elasticsearch.common.inject.Inject;
public class RestSubscriptionExecutionGetAction {
@Inject
public RestSubscriptionExecutionGetAction(RestSecurityController securityController) {
// Add security rule to enable access on /subscription/execution
securityController.allowPostSearchIndexType(SubscriptionIndexDao.INDEX, SubscriptionExecutionDao.TYPE);
}
}
\ No newline at end of file
......@@ -34,6 +34,7 @@ import org.duniter.core.service.CryptoService;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.concurrent.CompletableFutures;
import org.duniter.core.util.crypto.CryptoUtils;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.subscription.PluginSettings;
......@@ -45,6 +46,8 @@ import org.duniter.elasticsearch.subscription.model.email.EmailSubscription;
import org.duniter.elasticsearch.subscription.util.DateUtils;
import org.duniter.elasticsearch.subscription.util.stringtemplate.DateRenderer;
import org.duniter.elasticsearch.subscription.util.stringtemplate.StringRenderer;
import org.duniter.elasticsearch.threadpool.CompletableActionFuture;
import org.duniter.elasticsearch.threadpool.ScheduledActionFuture;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.user.model.UserEvent;
import org.duniter.elasticsearch.user.service.AdminService;
......@@ -60,6 +63,7 @@ import org.stringtemplate.v4.STGroupDir;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
......@@ -133,6 +137,15 @@ public class SubscriptionService extends AbstractService {
return this;
}
// for DEBUG only: execute a fake job every minute, to test scheduler
if (logger.isDebugEnabled()) {
threadPool.scheduleAtFixedRate(
() -> logger.debug("Scheduled fake task successfully executed - scheduled every [1 min]"),
20 * 1000 /* start in 20s */,
60 * 1000 /* every 1 min */,
TimeUnit.MILLISECONDS);
}
// Email subscriptions
{
if (logger.isInfoEnabled()) {
......@@ -143,25 +156,19 @@ public class SubscriptionService extends AbstractService {
logger.warn(I18n.t("duniter4j.es.subscription.email.start", pluginSettings.getEmailSubscriptionsExecuteHour(), dayOfWeek));
}
// TODO: remove this (DEV lon)
threadPool.scheduler().scheduleAtFixedRate(
() -> executeEmailSubscriptions(EmailSubscription.Frequency.daily),
1000 * 20, // start in 20s
10 * 60 * 1000, // every 10 min
TimeUnit.MILLISECONDS);
// Daily execution
threadPool.scheduleAtFixedRate(
() -> executeEmailSubscriptions(EmailSubscription.Frequency.daily),
DateUtils.delayBeforeHour(pluginSettings.getEmailSubscriptionsExecuteHour()),
DateUtils.DAY_DURATION_IN_MILLIS, TimeUnit.MILLISECONDS);
DateUtils.DAY_DURATION_IN_MILLIS,
TimeUnit.MILLISECONDS);
// Weekly execution
threadPool.scheduleAtFixedRate(
() -> executeEmailSubscriptions(EmailSubscription.Frequency.weekly),
DateUtils.delayBeforeDayAndHour(pluginSettings.getEmailSubscriptionsExecuteDayOfWeek(), pluginSettings.getEmailSubscriptionsExecuteHour()),
7 * DateUtils.DAY_DURATION_IN_MILLIS, TimeUnit.MILLISECONDS);
7 * DateUtils.DAY_DURATION_IN_MILLIS,
TimeUnit.MILLISECONDS);
}
return this;
}
......@@ -209,7 +216,6 @@ public class SubscriptionService extends AbstractService {
/* -- protected methods -- */
protected EmailSubscription decryptEmailSubscription(EmailSubscription subscription) {
Preconditions.checkNotNull(subscription);
Preconditions.checkNotNull(subscription.getId());
......@@ -309,8 +315,6 @@ public class SubscriptionService extends AbstractService {
pluginSettings.getCesiumUrl())
.render(issuerLocale);
// Schedule email sending
threadPool.schedule(() -> mailService.sendHtmlEmailWithText(
emailSubjectPrefix + I18n.t("duniter4j.es.subscription.email.subject", userEvents.size()),
......@@ -318,7 +322,6 @@ public class SubscriptionService extends AbstractService {
"<body>" + html + "</body>",
subscription.getContent().getEmail()));
// Compute last time (should be the first one, as events are sorted in DESC order)
Long lastEventTime = userEvents.get(0).getTime();
if (lastExecution == null) {
......
......@@ -29,8 +29,10 @@ import org.duniter.elasticsearch.model.SynchroResult;
import org.duniter.elasticsearch.service.AbstractSynchroService;
import org.duniter.elasticsearch.service.ServiceLocator;
import org.duniter.elasticsearch.subscription.dao.SubscriptionIndexDao;
import org.duniter.elasticsearch.subscription.dao.execution.SubscriptionExecutionDao;
import org.duniter.elasticsearch.subscription.dao.record.SubscriptionRecordDao;
import org.duniter.elasticsearch.subscription.model.Protocol;
import org.duniter.elasticsearch.subscription.model.SubscriptionExecution;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.user.PluginSettings;
import org.elasticsearch.common.inject.Inject;
......@@ -70,6 +72,7 @@ public class SynchroService extends AbstractSynchroService {
}
protected void importSubscriptionsChanges(SynchroResult result, Peer peer, long sinceTime) {
importChanges(result, peer, SubscriptionIndexDao.INDEX, SubscriptionExecutionDao.TYPE, sinceTime);
importChanges(result, peer, SubscriptionIndexDao.INDEX, SubscriptionRecordDao.TYPE, sinceTime);
}
}
......@@ -2,8 +2,3 @@
{ "name": "Notifications" , "parent": null}
{ "index": { "_id": "email"}}
{ "name": "Email", "parent": "_notification"}
{ "index": { "_id": "_service"}}
{ "name": "Services de paiement" , "parent": null}
{ "index": { "_id": "transfer"}}
{ "name": "Virement automatique", "parent": "_service"}
\ No newline at end of file
......@@ -26,21 +26,9 @@ package org.duniter.elasticsearch.user.service;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.model.SmtpConfig;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.crypto.CryptoUtils;
import org.duniter.core.util.crypto.KeyPair;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.service.changes.ChangeService;
import org.duniter.elasticsearch.user.PluginSettings;
import org.duniter.elasticsearch.user.model.UserEvent;
import org.duniter.elasticsearch.user.model.UserProfile;
import org.elasticsearch.common.inject.Inject;
import org.nuiton.i18n.I18n;
import javax.activation.CommandMap;
import javax.activation.MailcapCommandMap;
import java.util.Locale;
/**
* Created by Benoit on 30/03/2015.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment