Skip to content
Snippets Groups Projects
Commit 41ae8b35 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[enh] Synchro: add config option 'duniter.p2p.ping.endpoints'

parent b63fd47f
Branches
Tags
No related merge requests found
Showing
with 326 additions and 147 deletions
package org.duniter.core.client.model.bma;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.http.InetAddressUtils;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by blavenie on 07/12/16.
*/
public class Endpoints {
public static final String EP_END_REGEXP = "(?:[ ]+([a-z0-9-_]+[.][a-z0-9-_.]*))?(?:[ ]+([0-9.]+))?(?:[ ]+([0-9a-f:]+))?(?:[ ]+([0-9]+))$";
public static final String BMA_API_REGEXP = "^BASIC_MERKLED_API" + EP_END_REGEXP;
public static final String BMAS_API_REGEXP = "^BMAS" + EP_END_REGEXP;
public static final String WS2P_API_REGEXP = "^WS2P[ ]+([a-z0-9]+)[ ]+" + EP_END_REGEXP;
public static final String OTHER_API_REGEXP = "^([A-Z_-]+)" + EP_END_REGEXP;
private static Pattern bmaPattern = Pattern.compile(BMA_API_REGEXP);
private static Pattern bmasPattern = Pattern.compile(BMAS_API_REGEXP);
private static Pattern ws2pPattern = Pattern.compile(WS2P_API_REGEXP);
private static Pattern otherApiPattern = Pattern.compile(OTHER_API_REGEXP);
private Endpoints() {
// helper class
}
public static NetworkPeering.Endpoint parse(String ept) throws IOException {
NetworkPeering.Endpoint endpoint = new NetworkPeering.Endpoint();
// BMA API
Matcher mather = bmaPattern.matcher(ept);
if (mather.matches()) {
endpoint.api = EndpointApi.BASIC_MERKLED_API;
parseDefaultFormatEndPoint(mather, endpoint, 1);
return endpoint;
}
// BMAS API
mather = bmasPattern.matcher(ept);
if (mather.matches()) {
endpoint.api = EndpointApi.BMAS;
parseDefaultFormatEndPoint(mather, endpoint, 1);
return endpoint;
}
// WS2P API
mather = ws2pPattern.matcher(ept);
if (mather.matches()) {
endpoint.api = EndpointApi.WS2P;
endpoint.id = mather.group(1);
parseDefaultFormatEndPoint(mather, endpoint, 2);
return endpoint;
}
// Other API
mather = otherApiPattern.matcher(ept);
if (mather.matches()) {
String api = mather.group(1);
try {
endpoint.api = EndpointApi.valueOf(api);
parseDefaultFormatEndPoint(mather, endpoint, 2);
return endpoint;
} catch(Exception e) {
// Unknown API
throw new IOException("Unable to deserialize endpoint: unknown api [" + api + "]", e); // link the exception
}
}
return null;
}
public static void parseDefaultFormatEndPoint(Matcher matcher, NetworkPeering.Endpoint endpoint, int startGroup) {
for(int i=startGroup; i<=matcher.groupCount(); i++) {
String word = matcher.group(i);
if (StringUtils.isNotBlank(word)) {
if (InetAddressUtils.isIPv4Address(word)) {
endpoint.ipv4 = word;
} else if (InetAddressUtils.isIPv6Address(word)) {
endpoint.ipv6 = word;
} else if (i == matcher.groupCount() && word.matches("\\d+")){
endpoint.port = Integer.parseInt(word);
} else {
endpoint.dns = word;
}
}
}
}
}
\ No newline at end of file
......@@ -25,16 +25,12 @@ package org.duniter.core.client.model.bma.jackson;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.bma.Endpoints;
import org.duniter.core.client.model.bma.NetworkPeering;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.http.InetAddressUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by blavenie on 07/12/16.
......@@ -43,23 +39,9 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi
private static final Logger log = LoggerFactory.getLogger(EndpointDeserializer.class);
public static final String EP_END_REGEXP = "(?:[ ]+([a-z0-9-_]+[.][a-z0-9-_.]*))?(?:[ ]+([0-9.]+))?(?:[ ]+([0-9a-f:]+))?(?:[ ]+([0-9]+))$";
public static final String BMA_API_REGEXP = "^BASIC_MERKLED_API" + EP_END_REGEXP;
public static final String BMAS_API_REGEXP = "^BMAS" + EP_END_REGEXP;
public static final String WS2P_API_REGEXP = "^WS2P[ ]+([a-z0-9]+)[ ]+" + EP_END_REGEXP;
public static final String OTHER_API_REGEXP = "^([A-Z_-]+)" + EP_END_REGEXP;
private Pattern bmaPattern;
private Pattern bmasPattern;
private Pattern ws2pPattern;
private Pattern otherApiPattern;
private boolean debug;
public EndpointDeserializer() {
this.bmaPattern = Pattern.compile(BMA_API_REGEXP);
this.bmasPattern = Pattern.compile(BMAS_API_REGEXP);
this.ws2pPattern = Pattern.compile(WS2P_API_REGEXP);
this.otherApiPattern = Pattern.compile(OTHER_API_REGEXP);
this.debug = log.isDebugEnabled();
}
......@@ -68,70 +50,17 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi
String ept = jp.getText();
NetworkPeering.Endpoint endpoint = new NetworkPeering.Endpoint();
// BMA API
Matcher mather = bmaPattern.matcher(ept);
if (mather.matches()) {
endpoint.api = EndpointApi.BASIC_MERKLED_API;
parseDefaultFormatEndPoint(mather, endpoint, 1);
return endpoint;
}
// BMAS API
mather = bmasPattern.matcher(ept);
if (mather.matches()) {
endpoint.api = EndpointApi.BMAS;
parseDefaultFormatEndPoint(mather, endpoint, 1);
return endpoint;
}
// WS2P API
mather = ws2pPattern.matcher(ept);
if (mather.matches()) {
endpoint.api = EndpointApi.WS2P;
endpoint.id = mather.group(1);
parseDefaultFormatEndPoint(mather, endpoint, 2);
return endpoint;
}
// Other API
mather = otherApiPattern.matcher(ept);
if (mather.matches()) {
String api = mather.group(1);
try {
endpoint.api = EndpointApi.valueOf(api);
parseDefaultFormatEndPoint(mather, endpoint, 2);
return endpoint;
} catch(Exception e) {
// Log unknown API (and continue = will skip this endpoint)
return Endpoints.parse(ept);
} catch(IOException e) {
// Unable to parse endpoint: continue (will skip this endpoint)
if (debug) {
log.warn("Unable to deserialize endpoint: unknown api [" + api + "]", e); // link the exception
log.warn(e.getMessage(), e); // link the exception
}
else {
log.warn("Unable to deserialize endpoint: unknown api [" + api + "]");
log.warn(e.getMessage());
}
}
}
return null;
}
public static void parseDefaultFormatEndPoint(Matcher matcher, NetworkPeering.Endpoint endpoint, int startGroup) {
for(int i=startGroup; i<=matcher.groupCount(); i++) {
String word = matcher.group(i);
if (StringUtils.isNotBlank(word)) {
if (InetAddressUtils.isIPv4Address(word)) {
endpoint.ipv4 = word;
} else if (InetAddressUtils.isIPv6Address(word)) {
endpoint.ipv6 = word;
} else if (i == matcher.groupCount() && word.matches("\\d+")){
endpoint.port = Integer.parseInt(word);
} else {
endpoint.dns = word;
}
}
}
}
}
\ No newline at end of file
......@@ -91,4 +91,5 @@ public interface NetworkService extends Service {
final ExecutorService executor);
String getVersion(final Peer peer);
}
......@@ -186,8 +186,8 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
}
public CompletableFuture<Peer> asyncRefreshPeer(final Peer peer, final Map<String, String> memberUids, final ExecutorService pool) {
return CompletableFuture.supplyAsync(() -> getVersion(peer), pool)
.thenApply(p -> Peers.hasBmaEndpoint(p) ? getCurrentBlock(p) : p)
return CompletableFuture.supplyAsync(() -> fillVersion(peer), pool)
.thenApply(p -> Peers.hasBmaEndpoint(p) ? fillCurrentBlock(p) : p)
.exceptionally(throwable -> {
peer.getStats().setStatus(Peer.PeerStatus.DOWN);
if(!(throwable instanceof HttpConnectException)) {
......@@ -210,7 +210,7 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
// Hardship
if (StringUtils.isNotBlank(uid)) {
getHardship(p);
fillHardship(p);
}
}
return p;
......@@ -432,6 +432,14 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
}
public String getVersion(final Peer peer) {
JsonNode json = get(peer, BMA_URL_STATUS);
json = json.get("duniter");
if (json.isMissingNode()) throw new TechnicalException(String.format("Invalid format of [%s] response", BMA_URL_STATUS));
json = json.get("version");
if (json.isMissingNode()) throw new TechnicalException(String.format("No version attribute found in [%s] response", BMA_URL_STATUS));
return json.asText();
}
/* -- protected methods -- */
......@@ -549,19 +557,14 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
return true;
}
protected Peer getVersion(final Peer peer) {
JsonNode json = executeRequest(peer, BMA_URL_STATUS, JsonNode.class);
json = json.get("duniter");
if (json.isMissingNode()) throw new TechnicalException(String.format("Invalid format of [%s] response", BMA_URL_STATUS));
json = json.get("version");
if (json.isMissingNode()) throw new TechnicalException(String.format("No version attribute found in [%s] response", BMA_URL_STATUS));
String version = json.asText();
protected Peer fillVersion(final Peer peer) {
String version = getVersion(peer);
peer.getStats().setVersion(version);
return peer;
}
protected Peer getCurrentBlock(final Peer peer) {
JsonNode json = executeRequest(peer, BMA_URL_BLOCKCHAIN_CURRENT , JsonNode.class);
protected Peer fillCurrentBlock(final Peer peer) {
JsonNode json = get(peer, BMA_URL_BLOCKCHAIN_CURRENT);
String currency = json.has("currency") ? json.get("currency").asText() : null;
peer.setCurrency(currency);
......@@ -582,10 +585,10 @@ public class NetworkServiceImpl extends BaseRemoteServiceImpl implements Network
return peer;
}
protected Peer getHardship(final Peer peer) {
protected Peer fillHardship(final Peer peer) {
if (StringUtils.isBlank(peer.getPubkey())) return peer;
JsonNode json = executeRequest(peer, BMA_URL_BLOCKCHAIN_HARDSHIP + peer.getPubkey(), JsonNode.class);
JsonNode json = get(peer, BMA_URL_BLOCKCHAIN_HARDSHIP + peer.getPubkey());
Integer level = json.has("level") ? json.get("level").asInt() : null;
peer.getStats().setHardshipLevel(level);
return peer;
......
......@@ -99,7 +99,7 @@ http.cors.enabled: true
#
# Require explicit names when deleting indices:
#
# rest.destructive_requires_name: true
# action.destructive_requires_name: true
#
# Security to isolate plugin classpath - /!\ WARNING: should be DISABLE for Duniter4j
#
......@@ -164,6 +164,10 @@ duniter.security.enable: true
#
# duniter.p2p.peerTimeOffset: 3600
#
# Pass an initial list of hosts to perform synchronization when new node is started:
#
duniter.p2p.ping.endpoints: ["g1:ES_USER_API g1.data.duniter.fr 443"]
#
# ---------------------------------- Duniter4j Mail module -----------------------
#
# Enable mail module ?
......
......@@ -130,8 +130,8 @@ duniter.blockchain.enable: true
#
# Duniter node address
#
duniter.host: g1.duniter.org
duniter.port: 10901
duniter.host: g1-test.duniter.org
duniter.port: 10900
# duniter.useSsl: true
#
# Compute statistics on indices (each hour) ? (default: true)
......@@ -171,6 +171,14 @@ duniter.security.enable: true
# Max peer time offset, in seconds (max peer's clock error) - use to request peer's data. (default: 3600 = 1hour)
#
# duniter.p2p.peerTimeOffset: 3600
#
# Pass an initial list of hosts to perform synchronization when new node is started:
#
duniter.p2p.ping.endpoints: [
"g1-test:ES_USER_API g1-test.data.duniter.fr 443",
"g1-test:ES_SUBSCRIPTION_API g1-test.data.duniter.fr 443"
]
#
# ---------------------------------- Duniter4j Mail module -----------------------
#
......
......@@ -34,7 +34,6 @@ import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
......@@ -48,7 +47,7 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
@Inject public Plugin(Settings settings) {
this.enable = settings.getAsBoolean("duniter.enabled", true);
this.logger = Loggers.getLogger(Plugin.class.getName(), settings, new String[0]);
this.logger = Loggers.getLogger("duniter.core", settings, new String[0]);
}
@Override
......
......@@ -189,7 +189,15 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
MovementDao.TYPE)
.allowPostSearchIndexType(
currencyName,
MovementDao.TYPE);
MovementDao.TYPE)
// Add access to <currency>/synchro index
.allowIndexType(RestRequest.Method.GET,
currencyName,
SynchroExecutionDao.TYPE)
.allowPostSearchIndexType(
currencyName,
SynchroExecutionDao.TYPE);
/* TODO à décommenter quand les pending seront sauvegardés
injector.getInstance(DocStatService.class)
......
......@@ -230,6 +230,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
return settings.getAsInt("duniter.p2p.peerTimeOffset", 60*60/*=1hour*/);
}
public String[] getSynchroPingEndpoints() {
return settings.getAsArray("duniter.p2p.ping.endpoints");
}
public boolean isDevMode() {
return settings.getAsBoolean("duniter.dev.enable", false);
......@@ -296,7 +300,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
}
public boolean enableDocStats() {
return settings.getAsBoolean("duniter.stats.enable", false);
return settings.getAsBoolean("duniter.stats.enable", true);
}
/* protected methods */
......
......@@ -111,14 +111,15 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
*/
public class Duniter4jClientImpl implements Duniter4jClient {
private final static ESLogger logger = Loggers.getLogger("duniter.client");
private final ESLogger logger;
private final Client client;
private final org.duniter.elasticsearch.threadpool.ThreadPool threadPool;
@Inject
public Duniter4jClientImpl(Client client, org.duniter.elasticsearch.threadpool.ThreadPool threadPool) {
public Duniter4jClientImpl(Client client, Settings settings, org.duniter.elasticsearch.threadpool.ThreadPool threadPool) {
super();
this.logger = Loggers.getLogger("duniter.client", settings, new String[0]);
this.client = client;
this.threadPool = threadPool;
}
......
......@@ -23,11 +23,11 @@ package org.duniter.elasticsearch.rest;
*/
import org.duniter.core.exception.BusinessException;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.exception.DuniterElasticsearchException;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.*;
......@@ -37,7 +37,7 @@ import static org.elasticsearch.rest.RestStatus.OK;
public abstract class AbstractRestPostIndexAction extends BaseRestHandler {
private static ESLogger log = null;
private final ESLogger log;
private final JsonIndexer indexer;
......@@ -48,12 +48,12 @@ public abstract class AbstractRestPostIndexAction extends BaseRestHandler {
String typeName,
JsonIndexer indexer) {
super(settings, controller, client);
log = Loggers.getLogger("duniter.rest" + indexName, settings, String.format("[%s]", indexName));
controller.registerHandler(POST,
String.format("/%s/%s", indexName, typeName),
this);
securityController.allowIndexType(POST, indexName, typeName);
securityController.allowIndexType(GET, indexName, typeName);
log = ESLoggerFactory.getLogger(String.format("[%s]", indexName));
this.indexer = indexer;
}
......
......@@ -27,7 +27,7 @@ import org.duniter.elasticsearch.exception.DuniterElasticsearchException;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.*;
......@@ -36,7 +36,7 @@ import static org.elasticsearch.rest.RestStatus.OK;
public abstract class AbstractRestPostMarkAsReadAction extends BaseRestHandler {
private static ESLogger log = null;
private final ESLogger log;
private final JsonReadUpdater updater;
......@@ -47,11 +47,11 @@ public abstract class AbstractRestPostMarkAsReadAction extends BaseRestHandler {
String typeName,
JsonReadUpdater updater) {
super(settings, controller, client);
log = Loggers.getLogger("duniter.rest" + indexName, settings, String.format("[%s]", indexName));
controller.registerHandler(POST,
String.format("/%s/%s/{id}/_read", indexName, typeName),
this);
securityController.allow(POST, String.format("/%s/%s/[^/]+/_read", indexName, typeName));
log = ESLoggerFactory.getLogger(String.format("[%s]", indexName));
this.updater = updater;
}
......
......@@ -30,6 +30,7 @@ import org.duniter.elasticsearch.exception.DuniterElasticsearchException;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.*;
......@@ -38,7 +39,7 @@ import static org.elasticsearch.rest.RestStatus.OK;
public abstract class AbstractRestPostUpdateAction extends BaseRestHandler {
private static ESLogger log = null;
private final ESLogger log;
private final JsonUpdater updater;
......@@ -49,11 +50,11 @@ public abstract class AbstractRestPostUpdateAction extends BaseRestHandler {
String typeName,
JsonUpdater updater) {
super(settings, controller, client);
log = Loggers.getLogger("duniter.rest" + indexName, settings, String.format("[%s]", indexName));
controller.registerHandler(POST,
String.format("/%s/%s/{id}/_update", indexName, typeName),
this);
securityController.allowIndexType(POST, indexName, typeName);
log = ESLoggerFactory.getLogger(String.format("[%s]", indexName));
this.updater = updater;
}
......
......@@ -27,6 +27,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.RestRequest;
......@@ -40,15 +41,18 @@ import java.util.TreeSet;
*/
public class RestSecurityController extends AbstractLifecycleComponent<RestSecurityController> {
private static final ESLogger log = ESLoggerFactory.getLogger("duniter.security");
private final ESLogger log;
private boolean enable;
private boolean trace;
private Map<RestRequest.Method, Set<String>> allowRulesByMethod;
@Inject
public RestSecurityController(Settings settings, PluginSettings pluginSettings) {
super(settings);
this.log = Loggers.getLogger("duniter.security", settings, new String[0]);
this.trace = log.isTraceEnabled();
this.enable = pluginSettings.enableSecurity();
this.allowRulesByMethod = new HashMap<>();
if (!enable) {
......@@ -72,11 +76,8 @@ public class RestSecurityController extends AbstractLifecycleComponent<RestSecur
}
public RestSecurityController allow(RestRequest.Method method, String regexPath) {
Set<String> allowRules = allowRulesByMethod.get(method);
if (allowRules == null) {
allowRules = new TreeSet<>();
allowRulesByMethod.put(method, allowRules);
}
Set<String> allowRules = allowRulesByMethod.computeIfAbsent(method, k -> new TreeSet<>());
if (!allowRules.contains(regexPath)) {
allowRules.add(regexPath);
}
......@@ -85,25 +86,42 @@ public class RestSecurityController extends AbstractLifecycleComponent<RestSecur
public boolean isAllow(RestRequest request) {
if (!this.enable) return true;
RestRequest.Method method = request.method();
if (log.isTraceEnabled()) {
log.trace(String.format("Checking rules for %s request [%s]...", method, request.path()));
}
String path = request.path();
Set<String> allowRules = allowRulesByMethod.get(request.method());
String path = request.path();
if (allowRules != null) {
// Trace mode
if (trace) {
log.trace(String.format("Checking rules for %s request [%s]...", method, path));
if (allowRules == null) {
log.trace(String.format("No matching rules for %s request [%s]: reject", method, path));
}
else {
boolean found = false;
for (String allowRule : allowRules) {
log.trace(String.format(" - Trying against rule [%s] for %s requests: not match", allowRule, method));
if (path.matches(allowRule)) {
if (log.isTraceEnabled()) {
log.trace(String.format("Find matching rule [%s] for %s request [%s]: allow", allowRule, method, path));
found = true;
break;
}
return true;
}
if (!found) {
log.trace(String.format("No matching rules for %s request [%s]: reject", method, path));
}
}
}
log.trace(String.format("No matching rules for %s request [%s]: reject", method, path));
// Check if allow
if (allowRules != null) {
for (String allowRule : allowRules) {
if (path.matches(allowRule)) {
return true;
}
}
}
return false;
}
......
......@@ -23,19 +23,16 @@ package org.duniter.elasticsearch.rest.security;
*/
import org.duniter.elasticsearch.PluginSettings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.*;
import java.util.Map;
import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
public class RestSecurityFilter extends RestFilter {
private static final ESLogger log = ESLoggerFactory.getLogger("duniter.security");
private final ESLogger logger;
private RestSecurityController securityController;
private final boolean debug;
......@@ -43,19 +40,20 @@ public class RestSecurityFilter extends RestFilter {
@Inject
public RestSecurityFilter(PluginSettings pluginSettings, RestController controller, RestSecurityController securityController) {
super();
logger = Loggers.getLogger("duniter.security", pluginSettings.getSettings(), new String[0]);
if (pluginSettings.enableSecurity()) {
log.info("Enable security on duniter4j index access");
logger.info("Enable security on all duniter4j indices");
controller.registerFilter(this);
}
this.securityController = securityController;
this.debug = log.isDebugEnabled();
this.debug = logger.isDebugEnabled();
}
@Override
public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception {
if (request.path().contains("message/record")) {
log.debug("---------------- Redirection ?!");
logger.debug("---------------- Redirection ?!");
filterChain.continueProcessing(new RedirectionRestRequest(request, "message/inbox"), channel);
return;
......@@ -63,14 +61,14 @@ public class RestSecurityFilter extends RestFilter {
if (securityController.isAllow(request)) {
if (debug) {
log.debug(String.format("Allow %s request [%s]", request.method().name(), request.path()));
logger.debug(String.format("Allow %s request [%s]", request.method().name(), request.path()));
}
filterChain.continueProcessing(request, channel);
}
else {
log.warn(String.format("Refused %s request to [%s]", request.method().name(), request.path()));
logger.warn(String.format("Refused %s request to [%s]", request.method().name(), request.path()));
channel.sendResponse(new BytesRestResponse(FORBIDDEN));
}
}
......
......@@ -22,16 +22,21 @@ package org.duniter.elasticsearch.synchro;
* #L%
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.duniter.core.client.dao.CurrencyDao;
import org.duniter.core.client.dao.PeerDao;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.bma.Endpoints;
import org.duniter.core.client.model.bma.NetworkPeering;
import org.duniter.core.client.model.local.Currency;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.client.service.HttpService;
import org.duniter.core.client.service.local.NetworkService;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.DateUtils;
......@@ -50,6 +55,7 @@ import org.duniter.elasticsearch.service.changes.ChangeSource;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.inject.Inject;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
......@@ -63,12 +69,13 @@ public class SynchroService extends AbstractService {
private static final String WS_CHANGES_URL = "/ws/_changes";
protected HttpService httpService;
protected final Set<EndpointApi> peerApiFilters = Sets.newHashSet();
protected final ThreadPool threadPool;
protected final PeerDao peerDao;
protected final CurrencyDao currencyDao;
protected final SynchroExecutionDao synchroExecutionDao;
private HttpService httpService;
private NetworkService networkService;
private final Set<EndpointApi> peerApiFilters = Sets.newHashSet();
private final ThreadPool threadPool;
private final PeerDao peerDao;
private final CurrencyDao currencyDao;
private final SynchroExecutionDao synchroExecutionDao;
private List<WebsocketClientEndpoint> wsClientEndpoints = Lists.newArrayList();
private List<SynchroAction> actions = Lists.newArrayList();
......@@ -88,6 +95,7 @@ public class SynchroService extends AbstractService {
this.synchroExecutionDao = synchroExecutionDao;
threadPool.scheduleOnStarted(() -> {
httpService = serviceLocator.getHttpService();
networkService = serviceLocator.getNetworkService();
setIsReady(true);
});
}
......@@ -173,6 +181,14 @@ public class SynchroService extends AbstractService {
public SynchroResult synchronizePeer(final Peer peer, boolean enableSynchroWebsocket) {
long now = System.currentTimeMillis();
// Check if peer alive and valid
boolean isAliveAndValid = isAliveAndValid(peer);
if (!isAliveAndValid) {
logger.warn(String.format("[%s] [%s] Not reachable, or not running on this currency. Skipping.", peer.getCurrency(), peer));
return null;
}
SynchroResult result = new SynchroResult();
// Get the last execution time (or 0 is never synchronized)
......@@ -210,7 +226,39 @@ public class SynchroService extends AbstractService {
/* -- protected methods -- */
protected List<Peer> getConfigPingPeers(List<String> currencyIds, final EndpointApi api) {
String[] endpoints = pluginSettings.getSynchroPingEndpoints();
if (ArrayUtils.isEmpty(endpoints)) return null;
List<Peer> peers = Lists.newArrayList();
for (String endpoint: endpoints) {
try {
String[] endpointPart = endpoint.split(":");
if (endpointPart.length == 2) {
String currency = endpointPart[0];
NetworkPeering.Endpoint ep = Endpoints.parse(endpointPart[1]);
if (ep.api == api && currencyIds.contains(currency)) {
Peer peer = Peer.newBuilder()
.setEndpoint(ep)
.setCurrency(currency)
.build();
String peerId = cryptoService.hash(peer.computeKey());
peer.setId(peerId);
peers.add(peer);
}
}
else {
logger.warn(String.format("Unable to parse P2P endpoint [%s]: %s", endpoint));
}
} catch (IOException e) {
logger.warn(String.format("Unable to parse P2P endpoint [%s]: %s", endpoint, e.getMessage()));
}
}
return peers;
}
protected List<Peer> getPeersFromApi(final EndpointApi api) {
Preconditions.checkNotNull(api);
......@@ -219,11 +267,19 @@ public class SynchroService extends AbstractService {
List<String> currencyIds = currencyDao.getCurrencyIds();
if (CollectionUtils.isEmpty(currencyIds)) return null;
return currencyIds.stream()
// Get default peer, defined in config option
List<Peer> peers = getConfigPingPeers(currencyIds, api);
if (peers == null) {
peers = Lists.newArrayList();
}
peers.addAll(currencyIds.stream()
.map(currencyId -> peerDao.getPeersByCurrencyIdAndApi(currencyId, api.name()))
.filter(Objects::nonNull)
.flatMap(List::stream)
.collect(Collectors.toList());
.collect(Collectors.toList()));
return peers;
}
catch (Exception e) {
logger.error(String.format("Could not get peers for Api [%s]", api.name()), e);
......@@ -364,4 +420,26 @@ public class SynchroService extends AbstractService {
wsClientEndpoints.add(wsClientEndPoint);
}
protected boolean isAliveAndValid(Peer peer) {
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getCurrency());
try {
// TODO: check version is compatible
//String version = networkService.getVersion(peer);
Currency currency = currencyDao.getById(peer.getCurrency());
if (currency == null) return false;
BlockchainBlock block = httpService.executeRequest(peer, String.format("/%s/block/0/_source", peer.getCurrency()), BlockchainBlock.class);
return Objects.equals(block.getCurrency(), peer.getCurrency()) &&
Objects.equals(block.getSignature(), currency.getFirstBlockSignature());
}
catch(Exception e) {
logger.debug(String.format("[%s] [%s] Peer not alive or invalid: %s", peer.getCurrency(), peer, e.getMessage()));
return false;
}
}
}
package org.duniter.elasticsearch.subscription.synchro;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.synchro.SynchroService;
......@@ -21,7 +22,12 @@ public class SynchroSubscriptionExecutionIndexAction extends AbstractSynchroActi
super(SubscriptionIndexDao.INDEX, SubscriptionExecutionDao.TYPE, client, pluginSettings.getDelegate(),
cryptoService, threadPool);
synchroService.register(this);
}
@Override
public EndpointApi getEndPointApi() {
return EndpointApi.ES_SUBSCRIPTION_API;
}
}
package org.duniter.elasticsearch.subscription.synchro;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.duniter.elasticsearch.synchro.SynchroService;
......@@ -25,4 +26,9 @@ public class SynchroSubscriptionRecordAction extends AbstractSynchroAction {
synchroService.register(this);
}
@Override
public EndpointApi getEndPointApi() {
return EndpointApi.ES_SUBSCRIPTION_API;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment