Commit 645fd920 authored by Benoit Lavenier's avatar Benoit Lavenier

[enh] Add websocket implementation on /ws using Netty (= same port as default HTTP requests)

[enh] Add some BMA REST endpoints : /wot/members, blockchain/block/:number, blockchain/current, blockchain/patameters
[enh] Subscription: Allow to change the button label, in email content
parent 7b1153e2
Pipeline #4379 failed
......@@ -17,7 +17,7 @@
# cluster.name: my-application
cluster.name: cesium-plus-cluster-DEV
cluster.remote.host: localhost
cluster.remote.port: 9201
cluster.remote.port: 9200
#
# ------------------------------------ Node ------------------------------------
#
......@@ -125,6 +125,7 @@ duniter.string.analyzer: french
# Enabling blockchain synchronization (default: false)
#
duniter.blockchain.enable: true
duniter.blockchain.peer.enable: false
duniter.blockchain.event.user.enable: false
duniter.blockchain.event.admin.enable: false
#
......@@ -139,6 +140,8 @@ duniter.blockchain.event.admin.enable: false
duniter.host: g1.duniter.fr
duniter.port: 443
duniter.useSsl: true
duniter.network.timeout: 5000
#
# Compute statistics on indices (each hour) ? (default: true)
#
......
......@@ -35,7 +35,7 @@ logger:
#org.duniter.elasticsearch: DEBUG
#org.duniter.elasticsearch.service: DEBUG
#org.duniter.elasticsearch.user.service: DEBUG
#org.duniter.elasticsearch.subscription.service: DEBUG
#org.duniter.elasticsearch.subscription.service: INFO
org.nuiton.i18n: ERROR
org.nuiton.config: ERROR
......
#!/bin/sh
curl -XPOST 'https://g1.data.le-sou.org/g1/block/_search?pretty' -d '
{
"size": 1000,
"query": {
filtered: {
filter: {
bool: {
must: [
{
exists: {
field: "joiners"
}
},
{
range: {
medianTime: {
gt: 1506837759
}
}
}
]
}
}
}
},
_source: ["joiners", "number"],
sort: {
"number" : "asc"
}
}'
......@@ -30,12 +30,16 @@ import org.duniter.elasticsearch.security.SecurityModule;
import org.duniter.elasticsearch.service.ServiceModule;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.websocket.WebSocketModule;
import org.duniter.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.duniter.elasticsearch.http.WebSocketServerModule;
import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.script.ScriptModule;
import java.util.Collection;
......@@ -44,10 +48,13 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
private ESLogger logger;
private boolean enable;
private boolean enableWs;
@Inject public Plugin(Settings settings) {
this.enable = settings.getAsBoolean("duniter.enable", true);
this.logger = Loggers.getLogger("duniter.core", settings, new String[0]);
this.enable = settings.getAsBoolean("duniter.enable", true);
this.enableWs = settings.getAsBoolean("duniter.ws.enable", this.enable);
}
@Override
......@@ -60,38 +67,44 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
return "Duniter Core Plugin";
}
@Inject
public void onModule(org.elasticsearch.script.ScriptModule scriptModule) {
public void onModule(ScriptModule scriptModule) {
// TODO: in ES v5+, see example here :
// https://github.com/imotov/elasticsearch-native-script-example/blob/60a390f77f2fb25cb89d76de5071c52207a57b5f/src/main/java/org/elasticsearch/examples/nativescript/plugin/NativeScriptExamplesPlugin.java
scriptModule.registerScript("txcount", BlockchainTxCountScriptFactory.class);
}
public void onModule(HttpServerModule httpServerModule) {
if (this.enableWs) httpServerModule.setHttpServerTransport(NettyHttpServerTransport.class, "cesium-plus-core");
}
@Override
public Collection<Module> nodeModules() {
Collection<Module> modules = Lists.newArrayList();
if (!enable) {
logger.warn(description() + " has been disabled.");
return modules;
return Lists.newArrayList();
}
modules.add(new SecurityModule());
modules.add(new WebSocketModule());
Collection<Module> modules = Lists.newArrayList();
modules.add(new SecurityModule());
modules.add(new RestModule());
// Websocket
if (this.enableWs) {
modules.add(new WebSocketServerModule());
modules.add(new WebSocketModule());
}
modules.add(new DaoModule());
modules.add(new ServiceModule());
//modules.add(new ScriptModule());
return modules;
}
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (!enable) return Lists.newArrayList();
Collection<Class<? extends LifecycleComponent>> components = Lists.newArrayList();
if (!enable) {
return components;
}
components.add(PluginSettings.class);
components.add(ThreadPool.class);
components.add(PluginInit.class);
......
......@@ -23,6 +23,7 @@ package org.duniter.elasticsearch;
*/
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.io.FileUtils;
import org.duniter.core.client.config.Configuration;
......@@ -151,6 +152,9 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
}
initVersion(applicationConfig);
// Init Http client logging
System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.Log4JLogger");
}
@Override
......@@ -298,7 +302,17 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
String[] includeApis = settings.getAsArray("duniter.p2p.peer.indexedApis");
// By default: getPeeringPublishedApis + getPeeringTargetedApis
if (CollectionUtils.isEmpty(includeApis)) {
return CollectionUtils.union(getPeeringTargetedApis(), getPeeringPublishedApis());
return CollectionUtils.union(
ImmutableList.of(
EndpointApi.BASIC_MERKLED_API,
EndpointApi.BMAS,
EndpointApi.WS2P
),
CollectionUtils.union(
getPeeringTargetedApis(),
getPeeringPublishedApis()
)
);
}
return Arrays.stream(includeApis).map(EndpointApi::valueOf).collect(Collectors.toList());
......@@ -316,7 +330,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
}
/**
* Targeted API where to send the peer document.
* Targeted API where to sendBlock the peer document.
* This API should accept a POST request to '/network/peering' (like Duniter node, but can also be a pod)
* @return
*/
......
......@@ -24,10 +24,12 @@ package org.duniter.elasticsearch.dao;
import org.duniter.core.beans.Bean;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.BlockchainParameters;
import org.elasticsearch.common.bytes.BytesReference;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.Map;
/**
* Created by blavenie on 03/04/17.
......@@ -65,9 +67,17 @@ public interface BlockDao extends Bean, TypeDao<BlockDao> {
BlockchainBlock getBlockById(String currencyName, String id);
BytesReference getBlockByIdAsBytes(String currencyName, String id);
long[] getBlockNumberWithUd(String currencyName);
long[] getBlockNumberWithNewcomers(String currencyName);
void deleteRange(final String currencyName, final int fromNumber, final int toNumber);
List<BlockchainBlock> getBlocksByIds(String currencyName, Collection<String> ids);
void deleteById(final String currencyName, String id);
Map<String, String> getMembers(BlockchainParameters parameters);
}
......@@ -32,4 +32,5 @@ public interface CurrencyExtendDao extends CurrencyDao, IndexTypeDao<CurrencyExt
String RECORD_TYPE = "record";
String getDefaultCurrencyName();
}
......@@ -25,22 +25,35 @@ package org.duniter.elasticsearch.dao.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.BlockchainParameters;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.json.JsonSyntaxException;
import org.duniter.elasticsearch.dao.AbstractDao;
import org.duniter.elasticsearch.dao.BlockDao;
import org.duniter.elasticsearch.model.SynchroExecution;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.get.GetRequestBuilder;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.max.Max;
......@@ -48,10 +61,10 @@ import org.elasticsearch.search.highlight.HighlightField;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Created by Benoit on 30/03/2015.
......@@ -249,6 +262,101 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
return client.getSourceById(currencyName, TYPE, id, BlockchainBlock.class);
}
public BytesReference getBlockByIdAsBytes(String currencyName, String id) {
GetResponse response = client.prepareGet(currencyName, TYPE, id).setFetchSource(true).execute().actionGet();
if (response.isExists()) {
return client.prepareGet(currencyName, TYPE, id).setFetchSource(true).execute().actionGet().getSourceAsBytesRef();
}
return null;
}
public long[] getBlockNumberWithUd(String currencyName) {
return getBlockNumbersFromQuery(currencyName,
QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_DIVIDEND)));
}
@Override
public long[] getBlockNumberWithNewcomers(String currencyName) {
return getBlockNumbersFromQuery(currencyName,
QueryBuilders.boolQuery()
.filter(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_IDENTITIES)));
}
@Override
public Map<String, String> getMembers(BlockchainParameters parameters) {
Preconditions.checkNotNull(parameters);
Number medianTime = client.getMandatoryTypedFieldById(parameters.getCurrency(), TYPE, "current", BlockchainBlock.PROPERTY_MEDIAN_TIME);
long startMedianTime = medianTime.longValue() - parameters.getMsValidity() - (parameters.getAvgGenTime() / 2);
QueryBuilder withEvents = QueryBuilders.boolQuery()
.minimumNumberShouldMatch(1)
.should(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_JOINERS))
.should(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_ACTIVES));
QueryBuilder timeQuery = QueryBuilders.rangeQuery(BlockchainBlock.PROPERTY_MEDIAN_TIME)
.gte(startMedianTime);
long total = -1;
int from = 0;
int size = pluginSettings.getIndexBulkSize();
Map<String, String> results = Maps.newHashMap();
do {
SearchRequestBuilder req = client.prepareSearch(parameters.getCurrency())
.setTypes(BlockDao.TYPE)
.setFrom(from)
.setSize(size)
.addFields(BlockchainBlock.PROPERTY_JOINERS,
BlockchainBlock.PROPERTY_ACTIVES,
BlockchainBlock.PROPERTY_EXCLUDED,
BlockchainBlock.PROPERTY_LEAVERS,
BlockchainBlock.PROPERTY_REVOKED)
.setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().must(withEvents).must(timeQuery)))
.addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC)
.setFetchSource(false);
SearchResponse response = req.execute().actionGet();
if (total == -1) total = response.getHits().getTotalHits();
if (total > 0) {
for (SearchHit hit: response.getHits().getHits()) {
Map<String, SearchHitField> fields = hit.getFields();
// membership IN
updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_JOINERS), true);
updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_ACTIVES), true);
// membership OUT
updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_EXCLUDED), false);
updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_LEAVERS), false);
updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_REVOKED), false);
}
}
from += size;
} while(from<total);
if (logger.isDebugEnabled()) logger.debug("Wot members found: " + results);
return results;
}
private void updateMembershipsMap(Map<String, String> result, SearchHitField field, boolean membershipIn) {
List<Object> values = field != null ? field.values() : null;
if (CollectionUtils.isEmpty(values)) return;
for (Object value: values) {
String[] parts = value.toString().split(":");
String pubkey = parts[0];
if (membershipIn) {
String uid = parts[parts.length -1 ];
result.put(pubkey, uid);
}
else {
result.remove(pubkey);
}
}
}
/**
* Delete blocks from a start number (using bulk)
* @param currencyName
......@@ -411,4 +519,27 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao {
return result;
}
protected long[] getBlockNumbersFromQuery(String currencyName, QueryBuilder query) {
int size = pluginSettings.getIndexBulkSize();
int offset = 0;
long total = -1;
List<String> ids = Lists.newArrayList();
do {
SearchRequestBuilder request = client.prepareSearch(currencyName)
.setTypes(TYPE)
.setFrom(offset)
.setSize(size)
.addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC)
.setQuery(query)
.setFetchSource(false);
SearchResponse response = request.execute().actionGet();
ids.addAll(toListIds(response));
if (total == -1) total = response.getHits().getTotalHits();
offset += size;
} while (offset < total);
return ids.stream().mapToLong(Long::parseLong).toArray();
}
}
......@@ -25,7 +25,10 @@ package org.duniter.elasticsearch.dao.impl;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Lists;
import org.duniter.core.client.model.local.Currency;
import org.duniter.core.client.service.ServiceLocator;
import org.duniter.core.client.util.KnownCurrencies;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.dao.AbstractIndexTypeDao;
......@@ -33,7 +36,6 @@ import org.duniter.elasticsearch.dao.CurrencyExtendDao;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
......@@ -48,6 +50,7 @@ import java.util.Map;
public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> implements CurrencyExtendDao {
protected static final String REGEX_WORD_SEPARATOR = "[-\\t@# _]+";
private String defaultCurrency;
public CurrencyDaoImpl(){
super(INDEX, RECORD_TYPE);
......@@ -213,6 +216,27 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp
}
}
/**
* Return the default currency
* @return
*/
public String getDefaultCurrencyName() {
if (defaultCurrency != null) return defaultCurrency;
boolean enableBlockchainIndexation = pluginSettings.enableBlockchainIndexation() && existsIndex();
try {
List<String> currencyIds = enableBlockchainIndexation ? getCurrencyIds() : null;
if (CollectionUtils.isNotEmpty(currencyIds)) {
defaultCurrency = currencyIds.get(0);
return defaultCurrency;
}
} catch(Throwable t) {
// Continue (index not read yet?)
}
return KnownCurrencies.G1;
}
/* -- internal methods -- */
@Override
......
package org.duniter.elasticsearch.http;
/*
* #%L
* Duniter4j :: ElasticSearch Plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.elasticsearch.http.netty.NettyWebSocketServer;
import org.duniter.elasticsearch.http.tyrus.TyrusWebSocketServer;
import org.elasticsearch.common.inject.AbstractModule;
public class WebSocketServerModule extends AbstractModule {
@Override
protected void configure() {
// Netty transport: add websocket support
bind(NettyWebSocketServer.class).asEagerSingleton();
// Tyrus Web socket Server
bind(TyrusWebSocketServer.class).asEagerSingleton();
}
}
package org.duniter.elasticsearch.http.netty;
import org.duniter.elasticsearch.http.netty.websocket.WebSocketEndpoint;
import org.elasticsearch.http.netty.NettyHttpChannel;
import org.elasticsearch.http.netty.NettyHttpRequest;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpRequest;
@ChannelHandler.Sharable
public class HttpRequestHandler extends org.elasticsearch.http.netty.HttpRequestHandler {
private final NettyHttpServerTransport serverTransport;
private final boolean detailedErrorsEnabled;
public HttpRequestHandler(NettyHttpServerTransport transport, boolean detailedErrorsEnabled) {
super(transport, detailedErrorsEnabled);
this.serverTransport = transport;
this.detailedErrorsEnabled = detailedErrorsEnabled;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) e.getMessage();
HttpHeaders headers = httpRequest.headers();
// If web socket path, and connection request
if (httpRequest.getUri().startsWith(WebSocketEndpoint.WEBSOCKET_PATH + "/") &&
HttpHeaders.Names.UPGRADE.equalsIgnoreCase(headers.get(org.apache.http.HttpHeaders.CONNECTION)) &&
HttpHeaders.Values.WEBSOCKET.equalsIgnoreCase(headers.get(org.apache.http.HttpHeaders.UPGRADE))) {
// Convert request and channel
NettyHttpRequest request = new NettyHttpRequest(httpRequest, ctx.getChannel());
NettyHttpChannel channel = new NettyHttpChannel(this.serverTransport, request, null, this.detailedErrorsEnabled);
serverTransport.dispathWebsocketRequest(request, channel);
ctx.sendUpstream(e);
return;
}
}
super.messageReceived(ctx, e);
}
}
package org.duniter.elasticsearch.http.netty;
import org.duniter.elasticsearch.http.netty.websocket.NettyWebSocketSession;
import org.duniter.elasticsearch.http.netty.websocket.WebSocketEndpoint;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.http.netty.NettyHttpChannel;
import org.elasticsearch.http.netty.NettyHttpRequest;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.support.RestUtils;
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
public class NettyHttpServerTransport extends org.elasticsearch.http.netty.NettyHttpServerTransport {
private final PathTrie<Class<? extends WebSocketEndpoint>> websocketEndpoints;
@Inject
public NettyHttpServerTransport(Settings settings,
NetworkService networkService,
BigArrays bigArrays) {
super(settings, networkService, bigArrays);
this.websocketEndpoints = new PathTrie(RestUtils.REST_DECODER);
}
@Override
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new HttpChannelPipelineFactory(this, this.detailedErrorsEnabled);
}
protected static class HttpChannelPipelineFactory extends org.elasticsearch.http.netty.NettyHttpServerTransport.HttpChannelPipelineFactory {
protected final HttpRequestHandler handler;
public HttpChannelPipelineFactory(NettyHttpServerTransport transport, boolean detailedErrorsEnabled) {
super(transport, detailedErrorsEnabled);
this.handler = new HttpRequestHandler(transport, detailedErrorsEnabled);
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = super.getPipeline();
// Replace default HttpRequestHandler by a custom handler with WebSocket support
pipeline.replace("handler", "handler", handler);
return pipeline;
}
}
public <T extends WebSocketEndpoint> void addEndpoint(String path, Class<T> handler) {
websocketEndpoints.insert(path, handler);
}
@Override
protected void dispatchRequest(RestRequest request, RestChannel channel) {
super.dispatchRequest(request, channel);
}
public void dispathWebsocketRequest(NettyHttpRequest request, NettyHttpChannel channel) {
WebSocketEndpoint wsEndpoint = createWebsocketEndpoint(request);
if (wsEndpoint != null) {
WebSocketRequestHandler channelHandler = new WebSocketRequestHandler(wsEndpoint);
// Replacing the new handler to the existing pipeline to handle
request.getChannel().getPipeline().replace("handler", "websocketHandler", channelHandler);
// Execute the handshake
channelHandler.handleHandshake(request);
} else if (request.method() == RestRequest.Method.OPTIONS) {
channel.sendResponse(new BytesRestResponse(RestStatus.OK));