Skip to content
Snippets Groups Projects
Commit f72d450d authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

- HttpService: manage http connection in a pool

- HttpService: ignore cookie (avoid error cause by Yunohost SSO)
- Email subscription: fix scheduler delay
parent 94ffdd0b
No related branches found
No related tags found
No related merge requests found
Showing
with 254 additions and 57 deletions
package org.duniter.client.actions.validators;
/*
* #%L
* Duniter4j :: Client
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.beust.jcommander.IParameterValidator;
import com.beust.jcommander.ParameterException;
import org.duniter.core.client.model.bma.Constants;
......
......@@ -250,6 +250,14 @@ public class Configuration {
return applicationConfig.getOptionAsInt(ConfigurationOption.NETWORK_TIMEOUT.getKey());
}
public int getNetworkMaxTotalConnections() {
return applicationConfig.getOptionAsInt(ConfigurationOption.NETWORK_MAX_CONNECTIONS.getKey());
}
public int getNetworkMaxConnectionsPerRoute() {
return applicationConfig.getOptionAsInt(ConfigurationOption.NETWORK_MAX_CONNECTIONS_PER_ROUTE.getKey());
}
public int getNetworkCacheTimeInMillis() {
return Integer.parseInt(ConfigurationOption.NETWORK_CACHE_TIME_IN_MILLIS.getDefaultValue());
}
......
......@@ -164,13 +164,28 @@ public enum ConfigurationOption implements ConfigOptionDef {
NETWORK_TIMEOUT(
"duniter4j.network.timeout",
n("duniter4j.config.option.network.timeout.description"),
"20000", // = 2 s
"5000", // = 5 s
Integer.class,
false),
NETWORK_MAX_CONNECTIONS(
"duniter4j.network.maxConnections",
n("duniter4j.config.option.network.maxConnections.description"),
"100",
Integer.class,
false),
NETWORK_MAX_CONNECTIONS_PER_ROUTE(
"duniter4j.network.maxConnectionsPerHost",
n("duniter4j.config.option.network.maxConnectionsPerHost.description"),
"5",
Integer.class,
false),
NETWORK_CACHE_TIME_IN_MILLIS (
"ucoin.network.cacheTimeInMillis",
"ucoin.config.option.network.cacheTimeInMillis.description",
"duniter4j.network.cacheTimeInMillis",
"duniter4j.config.option.network.cacheTimeInMillis.description",
"10000", // = 10 s
Integer.class,
false),
......
......@@ -27,14 +27,17 @@ import com.google.common.base.Joiner;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.ConnectTimeoutException;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.duniter.core.beans.InitializingBean;
import org.duniter.core.client.config.Configuration;
......@@ -70,6 +73,8 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean
public static final String URL_PEER_ALIVE = "/blockchain/parameters";
private PoolingHttpClientConnectionManager connectionManager;
protected ObjectMapper objectMapper;
protected Peer defaultPeer;
private boolean debug;
......@@ -99,16 +104,18 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean
protected void initCaches() {
Configuration config = Configuration.instance();
int cacheTimeInMillis = config.getNetworkCacheTimeInMillis();
int defaultTimeout = config.getNetworkTimeout();
final int defaultTimeout = config.getNetworkTimeout();
requestConfigCache = new SimpleCache<Integer, RequestConfig>(cacheTimeInMillis) {
requestConfigCache = new SimpleCache<Integer, RequestConfig>(cacheTimeInMillis*100) {
@Override
public RequestConfig load(Integer timeout) {
// Use config default timeout, if 0
if (timeout <= 0) timeout = defaultTimeout;
return createRequestConfig(timeout);
}
};
httpClientCache = new SimpleCache<Integer, HttpClient>(cacheTimeInMillis) {
httpClientCache = new SimpleCache<Integer, HttpClient>(cacheTimeInMillis*100) {
@Override
public HttpClient load(Integer timeout) {
return createHttpClient(timeout);
......@@ -158,6 +165,8 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean
}
wsEndPoints.clear();
}
connectionManager.close();
}
public <T> T executeRequest(HttpUriRequest request, Class<? extends T> resultClass) {
......@@ -220,20 +229,40 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean
}
}
protected PoolingHttpClientConnectionManager createConnectionManager(
int maxTotalConnections,
int maxConnectionsPerRoute,
int timeout) {
PoolingHttpClientConnectionManager connectionManager
= new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(maxTotalConnections);
connectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute);
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoTimeout(timeout).build());
return connectionManager;
}
protected HttpClient createHttpClient(int timeout) {
CloseableHttpClient httpClient = HttpClients.custom().setDefaultRequestConfig(requestConfigCache.get(timeout))
// .setDefaultCredentialsProvider(getCredentialsProvider())
if (connectionManager == null) {
Configuration config = Configuration.instance();
connectionManager = createConnectionManager(
config.getNetworkMaxTotalConnections(),
config.getNetworkMaxConnectionsPerRoute(),
config.getNetworkTimeout());
}
return HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(requestConfigCache.get(timeout))
.build();
return httpClient;
}
protected RequestConfig createRequestConfig(int timeout) {
// build request config for timeout
if (timeout <= 0) {
// Use config default timeout
timeout = Configuration.instance().getNetworkTimeout();
}
return RequestConfig.custom().setSocketTimeout(timeout).setConnectTimeout(timeout).build();
return RequestConfig.custom()
.setSocketTimeout(timeout).setConnectTimeout(timeout)
.setMaxRedirects(1)
.setCookieSpec(CookieSpecs.IGNORE_COOKIES)
.build();
}
protected <T> T executeRequest(HttpClient httpClient, HttpUriRequest request, Class<? extends T> resultClass) {
......@@ -481,4 +510,5 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean
public <T> T readValue(InputStream json, Class<T> clazz) throws IOException {
return objectMapper.readValue(json, clazz);
}
}
......@@ -71,6 +71,8 @@ public interface NetworkService extends Service {
List<Peer> getPeers(Peer mainPeer, Filter filter, Sort sort);
List<Peer> getPeers(Peer mainPeer, Filter filter, Sort sort, ExecutorService pool);
CompletableFuture<List<CompletableFuture<Peer>>> asyncGetPeers(Peer mainPeer, ExecutorService pool) throws ExecutionException, InterruptedException;
List<Peer> fillPeerStatsConsensus(final List<Peer> peers);
......
......@@ -127,9 +127,14 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
@Override
public List<Peer> getPeers(final Peer mainPeer, Filter filter, Sort sort) {
return getPeers(mainPeer, filter, sort, null);
}
@Override
public List<Peer> getPeers(final Peer mainPeer, Filter filter, Sort sort, ExecutorService executor) {
try {
return asyncGetPeers(mainPeer, null)
return asyncGetPeers(mainPeer, executor)
.thenCompose(CompletableFutures::allOfToList)
.thenApply(this::fillPeerStatsConsensus)
.thenApply(peers -> peers.stream()
......@@ -290,31 +295,16 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
final Comparator<Peer> peerComparator = peerComparator(sort);
final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool();
/*Runnable initCacheRunnable = () -> {
if (threadLock.isLocked()) return;
synchronized (threadLock) {
threadLock.lock();
}
try {
// TODO : load all peers from DAO, then fill list ?
}
catch(Exception e) {
log.error("Error while loading all peers: " + e.getMessage(), e);
}
finally {
synchronized (threadLock) {
threadLock.unlock();
}
}
};*/
Runnable getPeersRunnable = () -> {
if (threadLock.isLocked()) return;
if (threadLock.isLocked()) {
log.error("Rejected getPeersRunnable() call. Another refresh is already running...");
return;
}
synchronized (threadLock) {
threadLock.lock();
}
try {
List<Peer> updatedPeers = getPeers(mainPeer, filter, sort);
List<Peer> updatedPeers = getPeers(mainPeer, filter, sort, pool);
knownPeers.clear();
updatedPeers.stream().forEach(peer -> {
......@@ -340,7 +330,10 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
};
Consumer<NetworkPeers.Peer> refreshPeerConsumer = (bmaPeer) -> {
if (threadLock.isLocked()) return;
if (threadLock.isLocked()) {
log.error("Rejected refreshPeerConsumer() call. Another refresh is already running...");
return;
}
synchronized (threadLock) {
threadLock.lock();
}
......@@ -411,7 +404,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
log.error("Could not parse peer received by WS: " + e.getMessage(), e);
}
schedule(getPeersRunnable, pool, 5000);
schedule(getPeersRunnable, pool, 3000/*waiting block propagation*/);
}, autoreconnect);
// Manage new peer event
......
......@@ -12,6 +12,8 @@ duniter4j.config.option.data.directory.description=
duniter4j.config.option.i18n.directory.description=
duniter4j.config.option.i18n.locale.description=
duniter4j.config.option.inceptionYear.description=
duniter4j.config.option.network.maxConnections.description=
duniter4j.config.option.network.maxConnectionsPerHost.description=
duniter4j.config.option.network.timeout.description=
duniter4j.config.option.node.currency.description=
duniter4j.config.option.node.elasticsearch.host.description=
......
......@@ -12,6 +12,8 @@ duniter4j.config.option.data.directory.description=
duniter4j.config.option.i18n.directory.description=
duniter4j.config.option.i18n.locale.description=
duniter4j.config.option.inceptionYear.description=
duniter4j.config.option.network.maxConnections.description=
duniter4j.config.option.network.maxConnectionsPerHost.description=
duniter4j.config.option.network.timeout.description=
duniter4j.config.option.node.currency.description=
duniter4j.config.option.node.elasticsearch.host.description=
......
......@@ -8,6 +8,7 @@ public class SmtpConfig {
private String smtpUsername;
private String smtpPassword;
private String senderAddress;
private String senderName;
private boolean useSsl;
private boolean startTLS;
......@@ -51,6 +52,14 @@ public class SmtpConfig {
this.senderAddress = senderAddress;
}
public String getSenderName() {
return senderName;
}
public void setSenderName(String senderName) {
this.senderName = senderName;
}
public boolean isUseSsl() {
return useSsl;
}
......
......@@ -26,6 +26,7 @@ import com.google.common.base.Joiner;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.model.SmtpConfig;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import javax.activation.CommandMap;
......@@ -33,6 +34,8 @@ import javax.activation.MailcapCommandMap;
import javax.mail.*;
import javax.mail.internet.*;
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;
......@@ -158,7 +161,8 @@ public class MailServiceImpl implements MailService, Closeable {
// send email to recipients
try {
Message message = new MimeMessage(session);
message.setFrom(new InternetAddress(smtpConfig.getSenderAddress()));
message.setFrom(getSenderAddress(smtpConfig));
Address[] recipientsAddresses = Arrays.asList(recipients).stream().map(recipient -> {
try {
......@@ -190,6 +194,7 @@ public class MailServiceImpl implements MailService, Closeable {
return getSmtpServerAsString(smtpConfig.getSmtpHost(), smtpConfig.getSmtpPort(), smtpConfig.getSmtpUsername());
}
private String getSmtpServerAsString(String smtpHost, int smtpPort, String smtpUsername) {
StringBuilder buffer = new StringBuilder();
if (StringUtils.isNotBlank(smtpUsername)) {
......@@ -202,6 +207,19 @@ public class MailServiceImpl implements MailService, Closeable {
}
private InternetAddress getSenderAddress(SmtpConfig smtpConfig) {
Preconditions.checkNotNull(smtpConfig);
try {
if (StringUtils.isNotBlank(smtpConfig.getSenderAddress())) {
return new InternetAddress(smtpConfig.getSenderAddress(), smtpConfig.getSenderName());
}
return new InternetAddress(smtpConfig.getSenderAddress());
}
catch(UnsupportedEncodingException | AddressException e) {
throw new TechnicalException(e);
}
}
private void connect(String smtpHost, int smtpPort,
String smtpUsername,
String smtpPassword,
......@@ -248,7 +266,10 @@ public class MailServiceImpl implements MailService, Closeable {
props.put("mail.smtp.host", config.getSmtpHost());
props.put("mail.smtp.port", config.getSmtpPort());
if (StringUtils.isNotBlank(config.getSenderAddress())) {
props.put("mail.from", config.getSenderAddress());
props.put("mail.from.alias", config.getSenderAddress());
if (StringUtils.isNotBlank(config.getSenderName())) {
props.put("mail.from.alias", config.getSenderName());
}
}
if (config.isUseSsl()) {
props.put("mail.smtp.socketFactory.port", config.getSmtpPort());
......
package org.duniter.core.service;
import org.duniter.core.model.SmtpConfig;
import org.duniter.core.test.TestFixtures;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.UnsupportedEncodingException;
/**
* Created by blavenie on 20/04/17.
*/
public class MailServiceTest {
private MailService service;
@Before
public void setUp() throws UnsupportedEncodingException {
service = new MailServiceImpl();
SmtpConfig config = new SmtpConfig();
config.setSenderName("test");
config.setSenderAddress("no-reply@duniter.fr");
config.setSmtpHost("localhost");
config.setSmtpPort(25);
service.setSmtpConfig(config);
}
@Test
@Ignore
public void sendTextEmail() {
service.sendTextEmail("Test " + System.currentTimeMillis(),
"a test content",
"root@localhost");
}
}
......@@ -128,6 +128,7 @@ duniter.blockchain.sync.enable: true
duniter.host: gtest.duniter.org
duniter.port: 10900
#duniter.useSsl: true
#duniter4j.network.timeout
#
# ---------------------------------- Duniter4j security -------------------------
#
......
......@@ -14,7 +14,7 @@ logger:
com.amazonaws.jmx.SdkMBeanRegistrySupport: ERROR
com.amazonaws.metrics.AwsSdkMetrics: ERROR
org.apache.http: INFO
org.apache.http: WARN
org.apache.http.client: ERROR
org.duniter: INFO
......
#!/bin/sh
curl -XPOST 'http://localhost:9200/g1/block/_search?pretty' -d '
{
"size": 10000,
"query": {
"filtered": {
"filter": {
"bool": {
"must": [
{
"exists": {
"field": "dividend"
}
}
]
}
}
}
},
"_source": ["dividend", "monetaryMass", "membersCount"],
sort
}'
......@@ -106,6 +106,8 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
applicationConfig.setDefaultOption(ConfigurationOption.NODE_HOST.getKey(), getNodeBmaHost());
applicationConfig.setDefaultOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getNodeBmaPort()));
applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_TIMEOUT.getKey(), String.valueOf(getNetworkTimeout()));
applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_MAX_CONNECTIONS.getKey(), String.valueOf(getNetworkMaxConnections()));
applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_MAX_CONNECTIONS_PER_ROUTE.getKey(), String.valueOf(getNetworkMaxConnectionsPerRoute()));
try {
applicationConfig.parse(new String[]{});
......@@ -199,7 +201,15 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
}
public int getNetworkTimeout() {
return settings.getAsInt("duniter.network.timeout", 100000 /*10s*/);
return settings.getAsInt("duniter.network.timeout", 5000 /*5s*/);
}
public int getNetworkMaxConnections() {
return settings.getAsInt("duniter.network.maxConnections", 100);
}
public int getNetworkMaxConnectionsPerRoute() {
return settings.getAsInt("duniter.network.maxConnectionsPerRoute", 5);
}
public boolean isDevMode() {
......
......@@ -176,7 +176,9 @@ public class PeerService extends AbstractService {
networkService.addPeersChangeListener(mainPeer, peers -> {
if (CollectionUtils.isNotEmpty(peers)) {
logger.info(String.format("[%s] Updating peers endpoints (%s endpoints found)", currencyName, peers.size()));
if (logger.isDebugEnabled()) {
logger.debug(String.format("[%s] Updating peers endpoints (%s endpoints found)", currencyName, peers.size()));
}
peers.stream().forEach(peer -> savePeer(peer));
}
}, filterDef, sortDef, true /*autoreconnect*/, threadPool.scheduler());
......@@ -212,7 +214,9 @@ public class PeerService extends AbstractService {
// Update existing peer
else {
if (logger.isTraceEnabled()) {
logger.trace(String.format("Update peer [%s]", peer));
}
peerDao.update(peer);
}
return peer;
......
......@@ -57,6 +57,8 @@ import org.stringtemplate.v4.ST;
import org.stringtemplate.v4.STGroup;
import org.stringtemplate.v4.STGroupDir;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
......@@ -131,18 +133,28 @@ public class SubscriptionService extends AbstractService {
return this;
}
// Email subscriptions
{
if (logger.isInfoEnabled()) {
Calendar cal = new GregorianCalendar();
cal.setTimeInMillis(0);
cal.set(Calendar.DAY_OF_WEEK, pluginSettings.getEmailSubscriptionsExecuteDayOfWeek());
String dayOfWeek = new SimpleDateFormat("EEE").format(cal.getTime());
logger.warn(I18n.t("duniter4j.es.subscription.email.start", pluginSettings.getEmailSubscriptionsExecuteHour(), dayOfWeek));
}
// Daily execution
threadPool.scheduler().scheduleAtFixedRate(
() -> executeEmailSubscriptions(EmailSubscription.Frequency.daily),
DateUtils.delayBeforeHour(pluginSettings.getEmailSubscriptionsExecuteHour()),
1, TimeUnit.DAYS);
DateUtils.DAY_DURATION_IN_MILLIS, TimeUnit.MILLISECONDS);
// Weekly execution
threadPool.scheduler().scheduleAtFixedRate(
() -> executeEmailSubscriptions(EmailSubscription.Frequency.weekly),
DateUtils.delayBeforeDayAndHour(pluginSettings.getEmailSubscriptionsExecuteDayOfWeek(), pluginSettings.getEmailSubscriptionsExecuteHour()),
7, TimeUnit.DAYS);
7 * DateUtils.DAY_DURATION_IN_MILLIS, TimeUnit.MILLISECONDS);
}
return this;
}
......
......@@ -9,10 +9,13 @@ import java.util.GregorianCalendar;
*/
public class DateUtils {
public static final long DAY_DURATION_IN_MILLIS = 24 * 60 * 60 * 1000;
public static Date nextHour(int hour) {
Calendar cal = new GregorianCalendar();
cal.setTimeInMillis(System.currentTimeMillis());
if (cal.get(Calendar.HOUR_OF_DAY) > hour) {
if (cal.get(Calendar.HOUR_OF_DAY) >= hour) {
// Too late for today: add 1 day (will wait tomorrow)
cal.add(Calendar.DAY_OF_YEAR, 1);
}
......@@ -26,7 +29,7 @@ public class DateUtils {
public static Date nextDayAndHour(int dayOfTheWeek, int hour) {
Calendar cal = new GregorianCalendar();
cal.setTimeInMillis(System.currentTimeMillis());
if (cal.get(Calendar.DAY_OF_WEEK) > dayOfTheWeek || (cal.get(Calendar.DAY_OF_WEEK) == dayOfTheWeek && cal.get(Calendar.HOUR_OF_DAY) > hour)) {
if (cal.get(Calendar.DAY_OF_WEEK) > dayOfTheWeek || (cal.get(Calendar.DAY_OF_WEEK) == dayOfTheWeek && cal.get(Calendar.HOUR_OF_DAY) >= hour)) {
// Too late for this week: will wait for next week
cal.add(Calendar.WEEK_OF_YEAR, 1);
}
......@@ -45,5 +48,7 @@ public class DateUtils {
public static long delayBeforeDayAndHour(int dayOfTheWeek, int hour) {
return nextDayAndHour(dayOfTheWeek, hour).getTime() - System.currentTimeMillis();
}
}
......@@ -9,6 +9,7 @@ duniter4j.es.subscription.email.html.unreadCount=You received <b>%s new notifica
duniter4j.es.subscription.email.notificationsDivider=Notifications list\:
duniter4j.es.subscription.email.openCesium=Open Cesium+
duniter4j.es.subscription.email.pubkey=Public key\: %2$s (%1$s)
duniter4j.es.subscription.email.start=Email subscriptions\: daily mailing [at %1$s\:00] and weekly [on %2$s at %1$s\:00]
duniter4j.es.subscription.email.subject=You received %s new notifications
duniter4j.es.subscription.email.unreadCount=You received %s new notifications.
duniter4j.es.subscription.error.mailDisabling=Unable to process email subscriptions\: Email sending is disabled in the configuration
......@@ -9,6 +9,7 @@ duniter4j.es.subscription.email.html.unreadCount=Vous avez <b>%s notifications</
duniter4j.es.subscription.email.notificationsDivider=Liste des notifications \:
duniter4j.es.subscription.email.openCesium=Ouvrir Cesium+
duniter4j.es.subscription.email.pubkey=Clé publique \: %2$s (%1$s)
duniter4j.es.subscription.email.start=Abonnement email\: envoi quotidien [à %1$s\:00] et hebdomadaire [le %2$s à %1$s\:00]
duniter4j.es.subscription.email.subject=%s nouvelles notifications non lues
duniter4j.es.subscription.email.unreadCount=Vous avez %s notifications non lues.
duniter4j.es.subscription.error.mailDisabling=Impossible de traiter les abonnements email\: la fonction d'envoi d'email est désactivée dans la configuration
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment