diff --git a/cesium-plus-pod-assembly/src/main/assembly/config/favicon.ico b/cesium-plus-pod-assembly/src/main/assembly/config/favicon.ico new file mode 100644 index 0000000000000000000000000000000000000000..641664a33fb26fdd0f60b9f41ae1d9ccd110c77a Binary files /dev/null and b/cesium-plus-pod-assembly/src/main/assembly/config/favicon.ico differ diff --git a/cesium-plus-pod-assembly/src/test/es-home/config/elasticsearch.yml b/cesium-plus-pod-assembly/src/test/es-home/config/elasticsearch.yml index 124763c14cc966bd636c5760fa01c9264d202ddb..b0c24a34550867f096eba92d8205939b350c3f9c 100644 --- a/cesium-plus-pod-assembly/src/test/es-home/config/elasticsearch.yml +++ b/cesium-plus-pod-assembly/src/test/es-home/config/elasticsearch.yml @@ -17,7 +17,7 @@ # cluster.name: my-application cluster.name: cesium-plus-cluster-DEV cluster.remote.host: localhost -cluster.remote.port: 9201 +cluster.remote.port: 9200 # # ------------------------------------ Node ------------------------------------ # @@ -125,6 +125,7 @@ duniter.string.analyzer: french # Enabling blockchain synchronization (default: false) # duniter.blockchain.enable: true +duniter.blockchain.peer.enable: false duniter.blockchain.event.user.enable: false duniter.blockchain.event.admin.enable: false # @@ -139,6 +140,8 @@ duniter.blockchain.event.admin.enable: false duniter.host: g1.duniter.fr duniter.port: 443 duniter.useSsl: true + +duniter.network.timeout: 5000 # # Compute statistics on indices (each hour) ? (default: true) # diff --git a/cesium-plus-pod-assembly/src/test/es-home/config/logging.yml b/cesium-plus-pod-assembly/src/test/es-home/config/logging.yml index 033d8a1510bddfe11a21074467be9dcf374ea202..a281deaa87cc39b47ef1e77294607fe452d7fad4 100644 --- a/cesium-plus-pod-assembly/src/test/es-home/config/logging.yml +++ b/cesium-plus-pod-assembly/src/test/es-home/config/logging.yml @@ -35,7 +35,7 @@ logger: #org.duniter.elasticsearch: DEBUG #org.duniter.elasticsearch.service: DEBUG #org.duniter.elasticsearch.user.service: DEBUG - #org.duniter.elasticsearch.subscription.service: DEBUG + #org.duniter.elasticsearch.subscription.service: INFO org.nuiton.i18n: ERROR org.nuiton.config: ERROR diff --git a/cesium-plus-pod-assembly/src/test/misc/block_members.sh b/cesium-plus-pod-assembly/src/test/misc/block_members.sh new file mode 100755 index 0000000000000000000000000000000000000000..2e12d2083c9ded36f2966d97da2311fade529255 --- /dev/null +++ b/cesium-plus-pod-assembly/src/test/misc/block_members.sh @@ -0,0 +1,33 @@ +#!/bin/sh + +curl -XPOST 'https://g1.data.le-sou.org/g1/block/_search?pretty' -d ' + { + "size": 1000, + "query": { + filtered: { + filter: { + + bool: { + must: [ + { + exists: { + field: "joiners" + } + }, + { + range: { + medianTime: { + gt: 1506837759 + } + } + } + ] + } + } + } + }, + _source: ["joiners", "number"], + sort: { + "number" : "asc" + } + }' diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/Plugin.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/Plugin.java index bd86c8296a6ec6e815374586e124a6ad61085999..71beba0f60f260977c20c7c9da2701f74ac7789c 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/Plugin.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/Plugin.java @@ -30,12 +30,16 @@ import org.duniter.elasticsearch.security.SecurityModule; import org.duniter.elasticsearch.service.ServiceModule; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.websocket.WebSocketModule; +import org.duniter.elasticsearch.http.netty.NettyHttpServerTransport; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.duniter.elasticsearch.http.WebSocketServerModule; +import org.elasticsearch.http.HttpServerModule; +import org.elasticsearch.script.ScriptModule; import java.util.Collection; @@ -44,10 +48,13 @@ public class Plugin extends org.elasticsearch.plugins.Plugin { private ESLogger logger; private boolean enable; + private boolean enableWs; @Inject public Plugin(Settings settings) { - this.enable = settings.getAsBoolean("duniter.enable", true); this.logger = Loggers.getLogger("duniter.core", settings, new String[0]); + + this.enable = settings.getAsBoolean("duniter.enable", true); + this.enableWs = settings.getAsBoolean("duniter.ws.enable", this.enable); } @Override @@ -60,38 +67,44 @@ public class Plugin extends org.elasticsearch.plugins.Plugin { return "Duniter Core Plugin"; } - @Inject - public void onModule(org.elasticsearch.script.ScriptModule scriptModule) { + public void onModule(ScriptModule scriptModule) { // TODO: in ES v5+, see example here : // https://github.com/imotov/elasticsearch-native-script-example/blob/60a390f77f2fb25cb89d76de5071c52207a57b5f/src/main/java/org/elasticsearch/examples/nativescript/plugin/NativeScriptExamplesPlugin.java scriptModule.registerScript("txcount", BlockchainTxCountScriptFactory.class); + + } + + public void onModule(HttpServerModule httpServerModule) { + if (this.enableWs) httpServerModule.setHttpServerTransport(NettyHttpServerTransport.class, "cesium-plus-core"); } @Override public Collection<Module> nodeModules() { - Collection<Module> modules = Lists.newArrayList(); if (!enable) { logger.warn(description() + " has been disabled."); - return modules; + return Lists.newArrayList(); } - modules.add(new SecurityModule()); - modules.add(new WebSocketModule()); + Collection<Module> modules = Lists.newArrayList(); + modules.add(new SecurityModule()); modules.add(new RestModule()); + // Websocket + if (this.enableWs) { + modules.add(new WebSocketServerModule()); + modules.add(new WebSocketModule()); + } + modules.add(new DaoModule()); modules.add(new ServiceModule()); - //modules.add(new ScriptModule()); return modules; } @Override public Collection<Class<? extends LifecycleComponent>> nodeServices() { + if (!enable) return Lists.newArrayList(); Collection<Class<? extends LifecycleComponent>> components = Lists.newArrayList(); - if (!enable) { - return components; - } components.add(PluginSettings.class); components.add(ThreadPool.class); components.add(PluginInit.class); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index c03d2d20915be83d8dac5840a0f057e10081ed95..a7239bde667147b84f2836608567e9d18ec72d8b 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -23,6 +23,7 @@ package org.duniter.elasticsearch; */ +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.apache.commons.io.FileUtils; import org.duniter.core.client.config.Configuration; @@ -151,6 +152,9 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } initVersion(applicationConfig); + + // Init Http client logging + System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.Log4JLogger"); } @Override @@ -298,7 +302,17 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { String[] includeApis = settings.getAsArray("duniter.p2p.peer.indexedApis"); // By default: getPeeringPublishedApis + getPeeringTargetedApis if (CollectionUtils.isEmpty(includeApis)) { - return CollectionUtils.union(getPeeringTargetedApis(), getPeeringPublishedApis()); + return CollectionUtils.union( + ImmutableList.of( + EndpointApi.BASIC_MERKLED_API, + EndpointApi.BMAS, + EndpointApi.WS2P + ), + CollectionUtils.union( + getPeeringTargetedApis(), + getPeeringPublishedApis() + ) + ); } return Arrays.stream(includeApis).map(EndpointApi::valueOf).collect(Collectors.toList()); @@ -316,7 +330,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } /** - * Targeted API where to send the peer document. + * Targeted API where to sendBlock the peer document. * This API should accept a POST request to '/network/peering' (like Duniter node, but can also be a pod) * @return */ diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java index 9e4dca4cbb926421d691e15f2e76c030b49d3189..9770ad5da39e4c42c85660d5431fbd449080034b 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java @@ -24,10 +24,12 @@ package org.duniter.elasticsearch.dao; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.BlockchainParameters; +import org.elasticsearch.common.bytes.BytesReference; import java.util.Collection; import java.util.List; -import java.util.Set; +import java.util.Map; /** * Created by blavenie on 03/04/17. @@ -65,9 +67,17 @@ public interface BlockDao extends Bean, TypeDao<BlockDao> { BlockchainBlock getBlockById(String currencyName, String id); + BytesReference getBlockByIdAsBytes(String currencyName, String id); + + long[] getBlockNumberWithUd(String currencyName); + + long[] getBlockNumberWithNewcomers(String currencyName); + void deleteRange(final String currencyName, final int fromNumber, final int toNumber); List<BlockchainBlock> getBlocksByIds(String currencyName, Collection<String> ids); void deleteById(final String currencyName, String id); + + Map<String, String> getMembers(BlockchainParameters parameters); } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java index de4aae0ac0f93977c6b9baed4b051e4c9dcaf7cf..08482695830c1992b1f41e0a35894576fefd9d15 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java @@ -32,4 +32,5 @@ public interface CurrencyExtendDao extends CurrencyDao, IndexTypeDao<CurrencyExt String RECORD_TYPE = "record"; + String getDefaultCurrencyName(); } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java index c3bf183539b63b69cd3eb4979755982d735e0ba2..3cd69c296d6ec1a47554b99e915df58685019bbc 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java @@ -25,22 +25,35 @@ package org.duniter.elasticsearch.dao.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Streams; import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.BlockchainParameters; +import org.duniter.core.client.model.local.Peer; import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.core.util.json.JsonSyntaxException; import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.model.SynchroExecution; import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.get.GetRequestBuilder; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.metrics.max.Max; @@ -48,10 +61,10 @@ import org.elasticsearch.search.highlight.HighlightField; import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Created by Benoit on 30/03/2015. @@ -249,6 +262,101 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { return client.getSourceById(currencyName, TYPE, id, BlockchainBlock.class); } + public BytesReference getBlockByIdAsBytes(String currencyName, String id) { + GetResponse response = client.prepareGet(currencyName, TYPE, id).setFetchSource(true).execute().actionGet(); + if (response.isExists()) { + return client.prepareGet(currencyName, TYPE, id).setFetchSource(true).execute().actionGet().getSourceAsBytesRef(); + } + return null; + } + + + public long[] getBlockNumberWithUd(String currencyName) { + return getBlockNumbersFromQuery(currencyName, + QueryBuilders.boolQuery() + .filter(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_DIVIDEND))); + } + + @Override + public long[] getBlockNumberWithNewcomers(String currencyName) { + return getBlockNumbersFromQuery(currencyName, + QueryBuilders.boolQuery() + .filter(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_IDENTITIES))); + } + + @Override + public Map<String, String> getMembers(BlockchainParameters parameters) { + Preconditions.checkNotNull(parameters); + + Number medianTime = client.getMandatoryTypedFieldById(parameters.getCurrency(), TYPE, "current", BlockchainBlock.PROPERTY_MEDIAN_TIME); + long startMedianTime = medianTime.longValue() - parameters.getMsValidity() - (parameters.getAvgGenTime() / 2); + + QueryBuilder withEvents = QueryBuilders.boolQuery() + .minimumNumberShouldMatch(1) + .should(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_JOINERS)) + .should(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_ACTIVES)); + + QueryBuilder timeQuery = QueryBuilders.rangeQuery(BlockchainBlock.PROPERTY_MEDIAN_TIME) + .gte(startMedianTime); + + long total = -1; + int from = 0; + int size = pluginSettings.getIndexBulkSize(); + Map<String, String> results = Maps.newHashMap(); + do { + SearchRequestBuilder req = client.prepareSearch(parameters.getCurrency()) + .setTypes(BlockDao.TYPE) + .setFrom(from) + .setSize(size) + .addFields(BlockchainBlock.PROPERTY_JOINERS, + BlockchainBlock.PROPERTY_ACTIVES, + BlockchainBlock.PROPERTY_EXCLUDED, + BlockchainBlock.PROPERTY_LEAVERS, + BlockchainBlock.PROPERTY_REVOKED) + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().must(withEvents).must(timeQuery))) + .addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC) + .setFetchSource(false); + + SearchResponse response = req.execute().actionGet(); + if (total == -1) total = response.getHits().getTotalHits(); + + if (total > 0) { + for (SearchHit hit: response.getHits().getHits()) { + Map<String, SearchHitField> fields = hit.getFields(); + // membership IN + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_JOINERS), true); + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_ACTIVES), true); + // membership OUT + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_EXCLUDED), false); + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_LEAVERS), false); + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_REVOKED), false); + } + } + + from += size; + } while(from<total); + + if (logger.isDebugEnabled()) logger.debug("Wot members found: " + results); + return results; + } + + private void updateMembershipsMap(Map<String, String> result, SearchHitField field, boolean membershipIn) { + List<Object> values = field != null ? field.values() : null; + if (CollectionUtils.isEmpty(values)) return; + for (Object value: values) { + String[] parts = value.toString().split(":"); + String pubkey = parts[0]; + if (membershipIn) { + String uid = parts[parts.length -1 ]; + result.put(pubkey, uid); + } + else { + result.remove(pubkey); + } + } + + } + /** * Delete blocks from a start number (using bulk) * @param currencyName @@ -411,4 +519,27 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { return result; } + + protected long[] getBlockNumbersFromQuery(String currencyName, QueryBuilder query) { + int size = pluginSettings.getIndexBulkSize(); + int offset = 0; + long total = -1; + List<String> ids = Lists.newArrayList(); + do { + SearchRequestBuilder request = client.prepareSearch(currencyName) + .setTypes(TYPE) + .setFrom(offset) + .setSize(size) + .addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC) + .setQuery(query) + .setFetchSource(false); + SearchResponse response = request.execute().actionGet(); + ids.addAll(toListIds(response)); + + if (total == -1) total = response.getHits().getTotalHits(); + offset += size; + } while (offset < total); + + return ids.stream().mapToLong(Long::parseLong).toArray(); + } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java index be4033f9783db1cbc170ebbb394a4d5c1feabe86..61f73b7b2d12688f6f3bdf0d9c0dcfd3917ef3fb 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java @@ -25,7 +25,10 @@ package org.duniter.elasticsearch.dao.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; import org.duniter.core.client.model.local.Currency; +import org.duniter.core.client.service.ServiceLocator; +import org.duniter.core.client.util.KnownCurrencies; import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.dao.AbstractIndexTypeDao; @@ -33,7 +36,6 @@ import org.duniter.elasticsearch.dao.CurrencyExtendDao; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -48,6 +50,7 @@ import java.util.Map; public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> implements CurrencyExtendDao { protected static final String REGEX_WORD_SEPARATOR = "[-\\t@# _]+"; + private String defaultCurrency; public CurrencyDaoImpl(){ super(INDEX, RECORD_TYPE); @@ -213,6 +216,27 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp } } + /** + * Return the default currency + * @return + */ + public String getDefaultCurrencyName() { + + if (defaultCurrency != null) return defaultCurrency; + + boolean enableBlockchainIndexation = pluginSettings.enableBlockchainIndexation() && existsIndex(); + try { + List<String> currencyIds = enableBlockchainIndexation ? getCurrencyIds() : null; + if (CollectionUtils.isNotEmpty(currencyIds)) { + defaultCurrency = currencyIds.get(0); + return defaultCurrency; + } + } catch(Throwable t) { + // Continue (index not read yet?) + } + return KnownCurrencies.G1; + } + /* -- internal methods -- */ @Override diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/WebSocketServerModule.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/WebSocketServerModule.java new file mode 100644 index 0000000000000000000000000000000000000000..614c55f3ac8a37abb32f80794260e0933d3a58c6 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/WebSocketServerModule.java @@ -0,0 +1,42 @@ +package org.duniter.elasticsearch.http; + +/* + * #%L + * Duniter4j :: ElasticSearch Plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import org.duniter.elasticsearch.http.netty.NettyWebSocketServer; +import org.duniter.elasticsearch.http.tyrus.TyrusWebSocketServer; +import org.elasticsearch.common.inject.AbstractModule; + +public class WebSocketServerModule extends AbstractModule { + @Override + protected void configure() { + + // Netty transport: add websocket support + bind(NettyWebSocketServer.class).asEagerSingleton(); + + // Tyrus Web socket Server + bind(TyrusWebSocketServer.class).asEagerSingleton(); + + } + +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/HttpRequestHandler.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/HttpRequestHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..31c12d526ec9f4cc8e30bf4685edfe6ddae09be1 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/HttpRequestHandler.java @@ -0,0 +1,46 @@ +package org.duniter.elasticsearch.http.netty; + +import org.duniter.elasticsearch.http.netty.websocket.WebSocketEndpoint; +import org.elasticsearch.http.netty.NettyHttpChannel; +import org.elasticsearch.http.netty.NettyHttpRequest; +import org.jboss.netty.channel.*; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpRequest; + +@ChannelHandler.Sharable +public class HttpRequestHandler extends org.elasticsearch.http.netty.HttpRequestHandler { + + private final NettyHttpServerTransport serverTransport; + private final boolean detailedErrorsEnabled; + + public HttpRequestHandler(NettyHttpServerTransport transport, boolean detailedErrorsEnabled) { + super(transport, detailedErrorsEnabled); + this.serverTransport = transport; + this.detailedErrorsEnabled = detailedErrorsEnabled; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + if (e.getMessage() instanceof HttpRequest) { + HttpRequest httpRequest = (HttpRequest) e.getMessage(); + HttpHeaders headers = httpRequest.headers(); + + // If web socket path, and connection request + if (httpRequest.getUri().startsWith(WebSocketEndpoint.WEBSOCKET_PATH + "/") && + HttpHeaders.Names.UPGRADE.equalsIgnoreCase(headers.get(org.apache.http.HttpHeaders.CONNECTION)) && + HttpHeaders.Values.WEBSOCKET.equalsIgnoreCase(headers.get(org.apache.http.HttpHeaders.UPGRADE))) { + + // Convert request and channel + NettyHttpRequest request = new NettyHttpRequest(httpRequest, ctx.getChannel()); + NettyHttpChannel channel = new NettyHttpChannel(this.serverTransport, request, null, this.detailedErrorsEnabled); + + serverTransport.dispathWebsocketRequest(request, channel); + ctx.sendUpstream(e); + return; + } + } + super.messageReceived(ctx, e); + } + + +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/NettyHttpServerTransport.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/NettyHttpServerTransport.java new file mode 100644 index 0000000000000000000000000000000000000000..3c3eddd1a0d43606734d340c884bc16d9d56676a --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -0,0 +1,104 @@ +package org.duniter.elasticsearch.http.netty; + + +import org.duniter.elasticsearch.http.netty.websocket.NettyWebSocketSession; +import org.duniter.elasticsearch.http.netty.websocket.WebSocketEndpoint; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.path.PathTrie; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.http.netty.NettyHttpChannel; +import org.elasticsearch.http.netty.NettyHttpRequest; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.support.RestUtils; +import org.jboss.netty.channel.*; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; +import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; + +public class NettyHttpServerTransport extends org.elasticsearch.http.netty.NettyHttpServerTransport { + + private final PathTrie<Class<? extends WebSocketEndpoint>> websocketEndpoints; + + @Inject + public NettyHttpServerTransport(Settings settings, + NetworkService networkService, + BigArrays bigArrays) { + super(settings, networkService, bigArrays); + this.websocketEndpoints = new PathTrie(RestUtils.REST_DECODER); + } + + @Override + public ChannelPipelineFactory configureServerChannelPipelineFactory() { + return new HttpChannelPipelineFactory(this, this.detailedErrorsEnabled); + } + + protected static class HttpChannelPipelineFactory extends org.elasticsearch.http.netty.NettyHttpServerTransport.HttpChannelPipelineFactory { + + protected final HttpRequestHandler handler; + + public HttpChannelPipelineFactory(NettyHttpServerTransport transport, boolean detailedErrorsEnabled) { + super(transport, detailedErrorsEnabled); + this.handler = new HttpRequestHandler(transport, detailedErrorsEnabled); + } + + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = super.getPipeline(); + + // Replace default HttpRequestHandler by a custom handler with WebSocket support + pipeline.replace("handler", "handler", handler); + + return pipeline; + } + } + + + public <T extends WebSocketEndpoint> void addEndpoint(String path, Class<T> handler) { + websocketEndpoints.insert(path, handler); + } + + @Override + protected void dispatchRequest(RestRequest request, RestChannel channel) { + super.dispatchRequest(request, channel); + } + + public void dispathWebsocketRequest(NettyHttpRequest request, NettyHttpChannel channel) { + + WebSocketEndpoint wsEndpoint = createWebsocketEndpoint(request); + if (wsEndpoint != null) { + + WebSocketRequestHandler channelHandler = new WebSocketRequestHandler(wsEndpoint); + + // Replacing the new handler to the existing pipeline to handle + request.getChannel().getPipeline().replace("handler", "websocketHandler", channelHandler); + + // Execute the handshake + channelHandler.handleHandshake(request); + + } else if (request.method() == RestRequest.Method.OPTIONS) { + channel.sendResponse(new BytesRestResponse(RestStatus.OK)); + } else { + channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Websocket to URI [" + request.uri() + "] not authorized")); + } + } + + + /* -- protected method -- */ + + protected <T extends WebSocketEndpoint> T createWebsocketEndpoint(RestRequest request) { + String path = request.rawPath(); + Class<? extends WebSocketEndpoint> clazz = websocketEndpoints != null ? websocketEndpoints.retrieve(path, request.params()) : null; + try { + return (T)clazz.newInstance(); + } + catch(Exception e) { + logger.error(String.format("Could not create websocket endpoint instance, from class %s: %s", clazz.getName(), e.getMessage()), e); + return null; + } + } + +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/NettyWebSocketServer.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/NettyWebSocketServer.java new file mode 100644 index 0000000000000000000000000000000000000000..11a6144c83cb3f0f0dc54e4678060b752df32873 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/NettyWebSocketServer.java @@ -0,0 +1,30 @@ +package org.duniter.elasticsearch.http.netty; + +import org.duniter.elasticsearch.http.netty.websocket.WebSocketEndpoint; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.http.HttpServerTransport; + +public class NettyWebSocketServer { + + private final ESLogger logger; + private HttpServerTransport serverTransport; + + @Inject + public NettyWebSocketServer(HttpServerTransport serverTransport) { + logger = Loggers.getLogger("duniter.ws"); + this.serverTransport = serverTransport; + } + + public <T extends WebSocketEndpoint> void addEndpoint(String path, Class<T> handler) { + if (serverTransport instanceof NettyHttpServerTransport) { + NettyHttpServerTransport transport = (NettyHttpServerTransport)serverTransport; + transport.addEndpoint(path, handler); + } + else { + logger.warn("Ignoring websocket endpoint {" + handler.getName()+ "}: server transport is not compatible"); + } + } + +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/WebSocketRequestHandler.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/WebSocketRequestHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..31cec32c2336edadda8d020e016ade76148cbe14 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/WebSocketRequestHandler.java @@ -0,0 +1,106 @@ +package org.duniter.elasticsearch.http.netty; + +import org.duniter.elasticsearch.http.netty.websocket.NettyWebSocketSession; +import org.duniter.elasticsearch.http.netty.websocket.WebSocketEndpoint; +import org.elasticsearch.common.bytes.ChannelBufferBytesReference; +import org.elasticsearch.http.netty.NettyHttpRequest; +import org.jboss.netty.channel.*; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.websocketx.*; + +import javax.websocket.CloseReason; + +@ChannelHandler.Sharable +public class WebSocketRequestHandler extends SimpleChannelHandler { + + private final WebSocketEndpoint endpoint; + private NettyWebSocketSession session; + + public WebSocketRequestHandler(WebSocketEndpoint endpoint) { + super(); + this.endpoint = endpoint; + } + + /* Do the handshaking for WebSocket request */ + public ChannelFuture handleHandshake(final NettyHttpRequest request) { + WebSocketServerHandshakerFactory wsFactory = + new WebSocketServerHandshakerFactory(getWebSocketURL(request), null, true); + WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request.request()); + if (handshaker == null) { + return wsFactory.sendUnsupportedWebSocketVersionResponse(request.getChannel()); + } + + ChannelFuture future = handshaker.handshake(request.getChannel(), request.request()); + future.addListener(new ChannelFutureListener() { + public void operationComplete(ChannelFuture future) { + // Session is open + session = new NettyWebSocketSession(future.getChannel(), request.params()); + endpoint.onOpen(session); + } + }); + return future; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + if (endpoint == null) return; // not open + + Object msg = e.getMessage(); + if (msg instanceof NettyWebSocketSession) { + endpoint.onOpen((NettyWebSocketSession)msg); + } + + else if (msg instanceof WebSocketFrame) { + + // Received binary + if (msg instanceof BinaryWebSocketFrame) { + BinaryWebSocketFrame frame = (BinaryWebSocketFrame)msg; + endpoint.onMessage(new ChannelBufferBytesReference(frame.getBinaryData())); + } + + // Received text + else if (msg instanceof TextWebSocketFrame) { + TextWebSocketFrame frame = (TextWebSocketFrame) msg; + endpoint.onMessage(frame.getText()); + } + + // Ping event + else if (msg instanceof PingWebSocketFrame) { + // TODO + } + + // Pong event + else if (msg instanceof PongWebSocketFrame) { + // TODO + } + + // Close + else if (msg instanceof CloseWebSocketFrame) { + ctx.getChannel().close(); + CloseWebSocketFrame frame = (CloseWebSocketFrame)msg; + endpoint.onClose(new CloseReason(getCloseCode(frame), frame.getReasonText())); + } + + // Unknown event + else { + System.out.println("Unsupported WebSocketFrame"); + } + } + } + + protected String getWebSocketURL(NettyHttpRequest req) { + return "ws://" + req.request().headers().get(HttpHeaders.Names.HOST) + req.rawPath() ; + } + + protected CloseReason.CloseCode getCloseCode(CloseWebSocketFrame frame) { + + int statusCode = frame.getStatusCode(); + if (statusCode == -1) return CloseReason.CloseCodes.NO_STATUS_CODE; + try { + return CloseReason.CloseCodes.getCloseCode(statusCode); + } + catch(IllegalArgumentException e) { + return CloseReason.CloseCodes.NO_STATUS_CODE; + } + } +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/NettyBaseWebSocketEndpoint.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/NettyBaseWebSocketEndpoint.java new file mode 100644 index 0000000000000000000000000000000000000000..5c07932382f771d76b69f7a8af868f560c66a8c3 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/NettyBaseWebSocketEndpoint.java @@ -0,0 +1,27 @@ +package org.duniter.elasticsearch.http.netty.websocket; + +import org.elasticsearch.common.bytes.BytesReference; + +import javax.websocket.CloseReason; + +public class NettyBaseWebSocketEndpoint implements WebSocketEndpoint { + + @Override + public void onOpen(NettyWebSocketSession session) { + + } + + @Override + public void onMessage(String message) { + + } + + @Override + public void onMessage(BytesReference bytes) { + + } + + public void onClose(CloseReason reason) { + } + +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/NettyWebSocketSession.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/NettyWebSocketSession.java new file mode 100644 index 0000000000000000000000000000000000000000..914d97227d8584d66b33c09be357c049f735663d --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/NettyWebSocketSession.java @@ -0,0 +1,60 @@ +package org.duniter.elasticsearch.http.netty.websocket; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.http.netty.NettyHttpRequest; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; +import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame; + +import javax.websocket.CloseReason; +import java.util.Map; + +public class NettyWebSocketSession { + + private Channel channel; + private Map<String, String> pathParameters; + + public NettyWebSocketSession(Channel channel, Map<String, String> pathParameters) { + this.channel = channel; + this.pathParameters = pathParameters; + } + + public void close(CloseReason closeReason) { + + CloseWebSocketFrame frame = new CloseWebSocketFrame(closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase()); + ChannelFuture future = channel.write(frame); + + future.addListener(ChannelFutureListener.CLOSE); + } + + public void sendText(String text) { + channel.write(new TextWebSocketFrame(text)); + } + + public void sendBinary(ChannelBuffer buffer) { + BinaryWebSocketFrame frame = new BinaryWebSocketFrame(); + frame.setBinaryData(buffer); + channel.write(frame); + } + + public void sendBinary(BytesReference bytes) { + sendBinary(bytes.toChannelBuffer()); + } + + public Map<String, String> getPathParameters() { + return pathParameters; + } + + public void setPathParameters( Map<String, String> pathParameters) { + this.pathParameters = pathParameters; + } + + public String getId() { + return String.valueOf(this.hashCode()); + } + +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/WebSocketEndpoint.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/WebSocketEndpoint.java new file mode 100644 index 0000000000000000000000000000000000000000..9c1b8c2d3910201502c85aa92510f522ce4a578b --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/netty/websocket/WebSocketEndpoint.java @@ -0,0 +1,18 @@ +package org.duniter.elasticsearch.http.netty.websocket; + +import org.elasticsearch.common.bytes.BytesReference; + +import javax.websocket.CloseReason; + +public interface WebSocketEndpoint { + + String WEBSOCKET_PATH = "/ws"; + + void onOpen(NettyWebSocketSession session); + + void onMessage(String message); + + void onMessage(BytesReference bytes); + + void onClose(CloseReason reason); +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/tyrus/TyrusWebSocketServer.java similarity index 92% rename from cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java rename to cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/tyrus/TyrusWebSocketServer.java index 4abb50fd042aacede7ef78a80de4ab688ee1eac5..778cdebde8f846c7379e6e7fc5a6ad3093ba77b1 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketServer.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/tyrus/TyrusWebSocketServer.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.websocket; +package org.duniter.elasticsearch.http.tyrus; /* * #%L @@ -45,6 +45,9 @@ import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.http.HttpServer; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; import org.glassfish.tyrus.server.Server; import java.net.BindException; @@ -54,7 +57,7 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.List; -public class WebSocketServer { +public class TyrusWebSocketServer { public static final String WS_PATH = "/ws"; @@ -64,10 +67,13 @@ public class WebSocketServer { private List<Class<?>> endPoints = new ArrayList<>(); @Inject - public WebSocketServer(final PluginSettings pluginSettings, ThreadPool threadPool) { + public TyrusWebSocketServer(final PluginSettings pluginSettings, + ThreadPool threadPool) { logger = Loggers.getLogger("duniter.ws", pluginSettings.getSettings(), new String[0]); + // If WS enable if (pluginSettings.getWebSocketEnable()) { + // When node started threadPool.scheduleOnStarted(() -> { // startScheduling WS server @@ -76,6 +82,8 @@ public class WebSocketServer { getEndPoints()); }); } + + } public void addEndPoint(Class<?> endPoint) { @@ -104,15 +112,15 @@ public class WebSocketServer { final Server server = new Server(host, port, WS_PATH, null, endPoints) ; try { - AccessController.doPrivileged(new PrivilegedExceptionAction<Server>() { + AccessController.doPrivileged(new PrivilegedExceptionAction<Void>() { @Override - public Server run() throws Exception { + public Void run() throws Exception { // Tyrus tries to load the server code using reflection. In Elasticsearch 2.x Java // security manager is used which breaks the reflection code as it can't find the class. // This is a workaround for that Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); server.start(); - return server; + return null; } }); started = true; @@ -130,6 +138,8 @@ public class WebSocketServer { } + + if (started) { logger.info(String.format("Websocket server started {%s:%s%s}", host, port, WS_PATH)); } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java index 193fe0b318edb05fe6b07ccfab08cdb539e76b3a..19e935e181be3e42b297a6ced05c97ef13b76131 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java @@ -23,15 +23,16 @@ package org.duniter.elasticsearch.rest; */ import org.duniter.elasticsearch.rest.attachment.RestImageAttachmentAction; -import org.duniter.elasticsearch.rest.blockchain.RestBlockchainBlockGetAction; -import org.duniter.elasticsearch.rest.blockchain.RestBlockchainParametersGetAction; +import org.duniter.elasticsearch.rest.blockchain.*; import org.duniter.elasticsearch.rest.network.RestNetworkPeeringGetAction; -import org.duniter.elasticsearch.rest.network.RestNetworkPeeringPostAction; +import org.duniter.elasticsearch.rest.network.RestNetworkPeeringPeersPostAction; import org.duniter.elasticsearch.rest.node.RestNodeSummaryGetAction; import org.duniter.elasticsearch.rest.security.RestSecurityAuthAction; import org.duniter.elasticsearch.rest.security.RestSecurityController; import org.duniter.elasticsearch.rest.security.RestSecurityFilter; import org.duniter.elasticsearch.rest.security.RestSecurityGetChallengeAction; +import org.duniter.elasticsearch.rest.wot.RestWotLookupGetAction; +import org.duniter.elasticsearch.rest.wot.RestWotMembersGetAction; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -56,11 +57,18 @@ public class RestModule extends AbstractModule implements Module { // Network bind(RestNetworkPeeringGetAction.class).asEagerSingleton(); - bind(RestNetworkPeeringPostAction.class).asEagerSingleton(); + bind(RestNetworkPeeringPeersPostAction.class).asEagerSingleton(); // Blockchain bind(RestBlockchainParametersGetAction.class).asEagerSingleton(); bind(RestBlockchainBlockGetAction.class).asEagerSingleton(); + bind(RestBlockchainWithUdAction.class).asEagerSingleton(); + bind(RestBlockchainWithNewcomersAction.class).asEagerSingleton(); + bind(RestBlockchainBlocksGetAction.class).asEagerSingleton(); + + // Wot + bind(RestWotLookupGetAction.class).asEagerSingleton(); + bind(RestWotMembersGetAction.class).asEagerSingleton(); } } \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlockGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlockGetAction.java index e8c4c434f2eef30b9334d0baf41ea2446c87a3ab..e37fbe281cea613784616d31b4c0b4a8da2d4f0d 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlockGetAction.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlockGetAction.java @@ -22,19 +22,14 @@ package org.duniter.elasticsearch.rest.blockchain; * #L% */ -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.http.entity.ContentType; -import org.duniter.core.client.config.Configuration; -import org.duniter.core.client.model.bma.BlockchainBlock; -import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.StringUtils; -import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.rest.AbstractRestPostIndexAction; +import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.rest.RestXContentBuilder; import org.duniter.elasticsearch.rest.XContentRestResponse; import org.duniter.elasticsearch.rest.security.RestSecurityController; -import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.CurrencyService; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -50,11 +45,12 @@ import java.io.IOException; */ public class RestBlockchainBlockGetAction extends BaseRestHandler { - private BlockchainService blockchainService; + private Client client; + private CurrencyService currencyService; @Inject public RestBlockchainBlockGetAction(Settings settings, RestController controller, Client client, RestSecurityController securityController, - BlockchainService blockchainService) { + CurrencyService currencyService) { super(settings, controller, client); securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/blockchain/block/[0-9]+"); @@ -65,29 +61,21 @@ public class RestBlockchainBlockGetAction extends BaseRestHandler { controller.registerHandler(RestRequest.Method.GET, "/{index}/blockchain/block/{number}", this); controller.registerHandler(RestRequest.Method.GET, "/{index}/blockchain/current", this); - this.blockchainService = blockchainService; + this.client = client; + this.currencyService = currencyService; } @Override - protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { - String currency = request.param("index"); - int number = request.paramAsInt("number", -1); - boolean isCurrent = (number == -1); - - BlockchainBlock block; - if (isCurrent) { - block = blockchainService.getCurrentBlock(currency); - } - else { - block = blockchainService.getBlockById(currency, number); - } + protected void handleRequest(RestRequest request, RestChannel channel, Client client) { + String currency = currencyService.safeGetCurrency(request.param("index")); + String number = request.param("number"); + boolean isCurrent = StringUtils.isBlank(number); try { - channel.sendResponse(new BytesRestResponse(RestStatus.OK, - ContentType.APPLICATION_JSON.toString(), - getObjectMapper() - .writerWithDefaultPrettyPrinter() - .writeValueAsString(block))); + GetResponse response = client.prepareGet(currency, BlockDao.TYPE, isCurrent ? "current" : number) + .execute().actionGet(); + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).rawValue(response.getSourceAsBytesRef()); + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); } catch(IOException ioe) { if (isCurrent) @@ -96,8 +84,4 @@ public class RestBlockchainBlockGetAction extends BaseRestHandler { throw new TechnicalException(String.format("Error while generating JSON for [/blockchain/block/%s]: %s", number, ioe.getMessage()), ioe); } } - - protected ObjectMapper getObjectMapper() { - return JacksonUtils.getThreadObjectMapper(); - } } \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlocksGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlocksGetAction.java new file mode 100644 index 0000000000000000000000000000000000000000..ec3019ef99b198bd8290a814f8303a3558a9f9cd --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlocksGetAction.java @@ -0,0 +1,100 @@ +package org.duniter.elasticsearch.rest.blockchain; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.rest.RestXContentBuilder; +import org.duniter.elasticsearch.rest.XContentRestResponse; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.CurrencyService; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.*; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.sort.SortOrder; + +import java.io.IOException; + +/** + * A rest to post a request to process a new currency/peer. + * + */ +public class RestBlockchainBlocksGetAction extends BaseRestHandler { + + private Client client; + private CurrencyService currencyService; + + @Inject + public RestBlockchainBlocksGetAction(Settings settings, RestController controller, Client client, RestSecurityController securityController, + CurrencyService currencyService) { + super(settings, controller, client); + + securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/blockchain/blocks/[0-9]+/[0-9]+"); + + controller.registerHandler(RestRequest.Method.GET, "/blockchain/blocks/{count}/{from}", this); + controller.registerHandler(RestRequest.Method.GET, "/{index}/blockchain/blocks/{count}/{from}", this); + + this.client = client; + this.currencyService = currencyService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) { + String currency = currencyService.safeGetCurrency(request.param("index")); + int count = request.paramAsInt("count", 100); + int from = request.paramAsInt("from", 0); + + try { + SearchRequestBuilder req = client.prepareSearch(currency) + .setTypes(BlockDao.TYPE) + .setFrom(0) + .setSize(count) + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().filter(QueryBuilders.rangeQuery(BlockchainBlock.PROPERTY_NUMBER).lte(from)))) + .setFetchSource(true) + .addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC); + + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).startArray(); + + SearchResponse resp = req.execute().actionGet(); + for (SearchHit hit: resp.getHits().getHits()) { + builder.rawValue(hit.getSourceRef()); + } + builder.endArray(); + + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while generating JSON for [/blockchain/blocks/<count>/<from>]: %s", ioe.getMessage()), ioe); + } + } +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainWithNewcomersAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainWithNewcomersAction.java new file mode 100644 index 0000000000000000000000000000000000000000..0b17b595eed863315527f2921db83081c378bce9 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainWithNewcomersAction.java @@ -0,0 +1,80 @@ +package org.duniter.elasticsearch.rest.blockchain; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.exception.TechnicalException; +import org.duniter.elasticsearch.rest.RestXContentBuilder; +import org.duniter.elasticsearch.rest.XContentRestResponse; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.BlockchainService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; + +import java.io.IOException; + +/** + * A rest to post a request to process a new currency/peer. + * + */ +public class RestBlockchainWithNewcomersAction extends BaseRestHandler { + + private BlockchainService blockchainService; + + @Inject + public RestBlockchainWithNewcomersAction(Settings settings, RestController controller, Client client, RestSecurityController securityController, + BlockchainService blockchainService) { + super(settings, controller, client); + + securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/blockchain/with/newcomers"); + + controller.registerHandler(RestRequest.Method.GET, "/blockchain/with/newcomers", this); + controller.registerHandler(RestRequest.Method.GET, "/{index}/blockchain/with/newcomers", this); + + this.blockchainService = blockchainService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String currency = request.param("index"); + + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).startObject() + .startObject("result") + .field("blocks", blockchainService.getBlockNumberWithNewcomers(currency)) + .endObject() + .endObject(); + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + } catch(IOException ioe) { + throw new TechnicalException(String.format("Error while generating JSON for [/blockchain/with/newcomers]: %s", ioe.getMessage()), ioe); + } + } + + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainWithUdAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainWithUdAction.java new file mode 100644 index 0000000000000000000000000000000000000000..e6ebf152e55ff094e2889152a7d928eeae0a896b --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainWithUdAction.java @@ -0,0 +1,85 @@ +package org.duniter.elasticsearch.rest.blockchain; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.http.entity.ContentType; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.exception.TechnicalException; +import org.duniter.elasticsearch.rest.RestXContentBuilder; +import org.duniter.elasticsearch.rest.XContentRestResponse; +import org.duniter.elasticsearch.rest.XContentThrowableRestResponse; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.BlockchainService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; + +import java.io.IOException; + +import static org.elasticsearch.ExceptionsHelper.detailedMessage; + +/** + * A rest to post a request to process a new currency/peer. + * + */ +public class RestBlockchainWithUdAction extends BaseRestHandler { + + private BlockchainService blockchainService; + + @Inject + public RestBlockchainWithUdAction(Settings settings, RestController controller, Client client, RestSecurityController securityController, + BlockchainService blockchainService) { + super(settings, controller, client); + + securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/blockchain/with/ud"); + + controller.registerHandler(RestRequest.Method.GET, "/blockchain/with/ud", this); + controller.registerHandler(RestRequest.Method.GET, "/{index}/blockchain/with/ud", this); + + this.blockchainService = blockchainService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String currency = request.param("index"); + + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).startObject() + .startObject("result") + .field("blocks", blockchainService.getBlockNumberWithUd(currency)) + .endObject() + .endObject(); + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + } catch(IOException ioe) { + throw new TechnicalException(String.format("Error while generating JSON for [/blockchain/with/ud]: %s", ioe.getMessage()), ioe); + } + } + + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPostAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPeersPostAction.java similarity index 90% rename from cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPostAction.java rename to cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPeersPostAction.java index 8d9d6d28a03df879feff4f20bf6c7bbbe9f57782..d6fd48029b50dd34edd03d913653a85a0b98af86 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPostAction.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkPeeringPeersPostAction.java @@ -58,23 +58,23 @@ import java.util.Properties; * A rest to post a request to process a new currency/peer. * */ -public class RestNetworkPeeringPostAction extends BaseRestHandler { +public class RestNetworkPeeringPeersPostAction extends BaseRestHandler { private NetworkService networkService; @Inject - public RestNetworkPeeringPostAction(Settings settings, PluginSettings pluginSettings, RestController controller, Client client, - RestSecurityController securityController, - NetworkService networkService) { + public RestNetworkPeeringPeersPostAction(Settings settings, PluginSettings pluginSettings, RestController controller, Client client, + RestSecurityController securityController, + NetworkService networkService) { super(settings, controller, client); if (StringUtils.isBlank(pluginSettings.getClusterRemoteHost())) { logger.warn(String.format("The cluster address can not be published on the network. /\\!\\\\ Fill in the options [cluster.remote.xxx] in the configuration (recommended).")); } else { - securityController.allow(RestRequest.Method.POST, "/network/peering"); - controller.registerHandler(RestRequest.Method.POST, "/network/peering", this); + securityController.allow(RestRequest.Method.POST, "/network/peering/peers"); + controller.registerHandler(RestRequest.Method.POST, "/network/peering/peers", this); } this.networkService = networkService; diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/security/RestSecurityFilter.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/security/RestSecurityFilter.java index c38ed56e8efe701e88a2a78a4690a6b3c24d1166..53f20a82dd0602ff0cbe6725f9c919b9e02a0785 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/security/RestSecurityFilter.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/security/RestSecurityFilter.java @@ -42,7 +42,7 @@ public class RestSecurityFilter extends RestFilter { super(); logger = Loggers.getLogger("duniter.security", pluginSettings.getSettings(), new String[0]); if (pluginSettings.enableSecurity()) { - logger.info("Enable security on all duniter4j indices"); + logger.info("Enable security on all indices"); controller.registerFilter(this); } this.securityController = securityController; diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotLookupGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotLookupGetAction.java new file mode 100644 index 0000000000000000000000000000000000000000..7d2c0863da5f6e1d38a1b7996ae900989e1c9b20 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotLookupGetAction.java @@ -0,0 +1,87 @@ +package org.duniter.elasticsearch.rest.wot; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.rest.RestXContentBuilder; +import org.duniter.elasticsearch.rest.XContentRestResponse; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.BlockchainService; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; + +import java.io.IOException; + +/** + * A rest to post a request to process a new currency/peer. + * + */ +public class RestWotLookupGetAction extends BaseRestHandler { + + private Client client; + private BlockchainService blockchainService; + + @Inject + public RestWotLookupGetAction(Settings settings, RestController controller, Client client, RestSecurityController securityController, + BlockchainService blockchainService) { + super(settings, controller, client); + + securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/wot/lookup/[^/]+"); + + controller.registerHandler(RestRequest.Method.GET, "/wot/lookup/{uid}", this); + controller.registerHandler(RestRequest.Method.GET, "/{index}/wot/lookup/{uid}", this); + + this.client = client; + this.blockchainService = blockchainService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String currency = request.param("index"); + String uid = request.param("uid"); + + try { + // TODO: implement + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).startObject() + .field("partials", false) + .field("results", new Object[0]) + .endObject(); + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while generating JSON for [/wot/lookup]: %s", ioe.getMessage()), ioe); + } + } + + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotMembersGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotMembersGetAction.java new file mode 100644 index 0000000000000000000000000000000000000000..0a120b4c173cb0df7543256cede7c6eaf963a3ac --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotMembersGetAction.java @@ -0,0 +1,92 @@ +package org.duniter.elasticsearch.rest.wot; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.exception.TechnicalException; +import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.rest.RestXContentBuilder; +import org.duniter.elasticsearch.rest.XContentRestResponse; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.WotService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.*; + +import java.io.IOException; +import java.util.Map; + +/** + * A GET request similar as /wot/members in Duniter BMA API + * + */ +public class RestWotMembersGetAction extends BaseRestHandler { + + private Client client; + private WotService wotService; + + @Inject + public RestWotMembersGetAction(Settings settings, RestController controller, Client client, RestSecurityController securityController, + WotService wotService) { + super(settings, controller, client); + + securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/wot/members"); + + controller.registerHandler(RestRequest.Method.GET, "/wot/members", this); + controller.registerHandler(RestRequest.Method.GET, "/{index}/members", this); + + this.client = client; + this.wotService = wotService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String currency = request.param("index"); + + + try { + Map<String, String> members = wotService.getMembers(currency); + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).startObject() + .startArray("results"); + for (Map.Entry<String, String> entry: members.entrySet()) { + builder.startObject() + .field("pubkey", entry.getKey()) + .field("uid", entry.getValue()) + .endObject(); + } + builder.endArray().endObject(); + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while generating JSON for [/wot/lookup]: %s", ioe.getMessage()), ioe); + } + } + + protected ObjectMapper getObjectMapper() { + return JacksonUtils.getThreadObjectMapper(); + } +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index ec22a81156ef613951aa5f9077face5b7f4b586b..94f8050021332dcad9fab299cacad13b6fa33aad 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -44,7 +44,6 @@ import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; -import org.duniter.core.util.cache.Cache; import org.duniter.core.util.cache.SimpleCache; import org.duniter.core.util.json.JsonAttributeParser; import org.duniter.core.util.websocket.WebsocketClientEndpoint; @@ -57,6 +56,7 @@ import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.nuiton.i18n.I18n; @@ -479,37 +479,36 @@ public class BlockchainService extends AbstractService { } } - public BlockchainBlock getBlockById(String currency, final int number) { - // Retrieve the currency to use - boolean enableBlockchainIndexation = pluginSettings.enableBlockchainIndexation() && currencyDao.existsIndex(); - if (StringUtils.isBlank(currency)) { - List<String> currencyIds = enableBlockchainIndexation ? currencyDao.getCurrencyIds() : null; - if (CollectionUtils.isNotEmpty(currencyIds)) { - currency = currencyIds.get(0); - } else { - currency = DEFAULT_BLOCK.getCurrency(); - } - } + public long[] getBlockNumberWithUd(String currency) { + currency = safeGetCurrency(currency); + return blockDao.getBlockNumberWithUd(currency); + } + + public long[] getBlockNumberWithNewcomers(String currency) { + currency = safeGetCurrency(currency); + return blockDao.getBlockNumberWithNewcomers(currency); + } + public BlockchainBlock getBlockById(String currency, final int number) { + + currency = safeGetCurrency(currency); return blockDao.getBlockById(currency, String.valueOf(number)); } public BlockchainBlock getCurrentBlock(String currency) { - // Retrieve the currency to use - boolean enableBlockchainIndexation = pluginSettings.enableBlockchainIndexation() && currencyDao.existsIndex(); - if (StringUtils.isBlank(currency)) { - List<String> currencyIds = enableBlockchainIndexation ? currencyDao.getCurrencyIds() : null; - if (CollectionUtils.isNotEmpty(currencyIds)) { - currency = currencyIds.get(0); - } else { - currency = DEFAULT_BLOCK.getCurrency(); - } - } - + currency = safeGetCurrency(currency); return blockDao.getBlockById(currency, CURRENT_BLOCK_ID); } + public BytesReference getBlockByIdAsBytes(String currency, final int number) { + return blockDao.getBlockByIdAsBytes(safeGetCurrency(currency), String.valueOf(number)); + } + + public BytesReference getCurrentBlockAsBytes(String currency) { + return blockDao.getBlockByIdAsBytes(safeGetCurrency(currency), CURRENT_BLOCK_ID); + } + public void deleteFrom(final String currencyName, final int fromBlock) { int maxBlock = blockDao.getMaxBlockNumber(currencyName); @@ -624,7 +623,7 @@ public class BlockchainService extends AbstractService { } } - // Peer send no blocks + // Peer sendBlock no blocks if (CollectionUtils.isEmpty(blocksAsJson)) { // Add range to missing blocks @@ -895,4 +894,17 @@ public class BlockchainService extends AbstractService { private String getBlockId(int number) { return number == -1 ? CURRENT_BLOCK_ID : String.valueOf(number); } + + + /** + * Return the given currency, or the default currency + * @param currency + * @return + */ + protected String safeGetCurrency(String currency) { + + if (StringUtils.isNotBlank(currency)) return currency; + + return currencyDao.getDefaultCurrencyName(); + } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java index a7c3b7506ba700a2ec113e2bb5f128ecfb9f73e1..11df7407efe39c6c08f6bbc1dea3a24f8ab56c9d 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java @@ -35,6 +35,7 @@ import org.duniter.core.client.service.exception.HttpConnectException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.Preconditions; +import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.dao.*; @@ -93,6 +94,17 @@ public class CurrencyService extends AbstractService { return currencyDao.isExists(currencyName); } + /** + * Return the given currency, or the default currency + * @param currency + * @return + */ + public String safeGetCurrency(String currency) { + + if (StringUtils.isNotBlank(currency)) return currency; + return currencyDao.getDefaultCurrencyName(); + } + /** * Retrieve the blockchain data, from peer * diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java index 45c444a53907122e1e8063d17d7da8ca30672c22..b0a17f5d9931ebb146942dcd6491c1b2dd5f5a9d 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java @@ -64,7 +64,7 @@ public class NetworkService extends AbstractService { private BlockchainService blockchainService; private Map<String, NetworkPeering> peeringByCurrencyCache = Maps.newHashMap(); - // API where to send the peer document + // API where to sendBlock the peer document private final static Set<EndpointApi> targetPeersEndpointApis = Sets.newHashSet(); // API to include inside the peer document private final static Set<EndpointApi> publishedEndpointApis = Sets.newHashSet(); @@ -292,15 +292,7 @@ public class NetworkService extends AbstractService { waitReady(); // Retrieve the currency to use - boolean enableBlockchainIndexation = pluginSettings.enableBlockchainIndexation() && currencyDao.existsIndex(); - if (StringUtils.isBlank(currency)) { - List<String> currencyIds = enableBlockchainIndexation ? currencyDao.getCurrencyIds() : null; - if (CollectionUtils.isNotEmpty(currencyIds)) { - currency = currencyIds.get(0); - } else { - currency = DEFAULT_BLOCK.getCurrency(); - } - } + currency = blockchainService.safeGetCurrency(currency); // Get result from cache, is allow if (useCache) { @@ -312,7 +304,7 @@ public class NetworkService extends AbstractService { NetworkPeering result = new NetworkPeering(); // Get current block - BlockchainBlock currentBlock = enableBlockchainIndexation ? blockchainService.getCurrentBlock(currency) : null; + BlockchainBlock currentBlock = pluginSettings.enableBlockchainIndexation() ? blockchainService.getCurrentBlock(currency) : null; if (currentBlock == null) { currentBlock = DEFAULT_BLOCK; currency = currentBlock.getCurrency(); @@ -485,7 +477,7 @@ public class NetworkService extends AbstractService { currencyIds = null; } if (CollectionUtils.isEmpty(currencyIds)) { - logger.warn("Skipping the publication of peer document (no indexed currency)"); + logger.warn("Skipping publication of peer document (no indexed currency)"); return; } @@ -525,6 +517,8 @@ public class NetworkService extends AbstractService { Preconditions.checkNotNull(peerDocument); try { + if (logger.isDebugEnabled()) logger.debug(String.format("[%s] [%s] Sending peer document", currencyId, peer)); + networkRemoteService.postPeering(peer, peerDocument); return true; } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java index 744cd2b59324842f3d9c41c7b4d940f2ef403419..54c7cfe099e48602bd0ea7af78cd49f04b7c0b05 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java @@ -58,6 +58,9 @@ public class ServiceModule extends AbstractModule implements Module { bind(BlockchainListenerService.class).asEagerSingleton(); bind(PeerService.class).asEagerSingleton(); + // Wot service + bind(WotService.class).asEagerSingleton(); + // Duniter Client API beans bindWithLocator(BlockchainRemoteService.class); bindWithLocator(NetworkRemoteService.class); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/WotService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/WotService.java new file mode 100644 index 0000000000000000000000000000000000000000..5f2babb341e0b5d8f8bc4398538790d232327065 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/WotService.java @@ -0,0 +1,120 @@ +package org.duniter.elasticsearch.service; + +/* + * #%L + * Duniter4j :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.google.common.base.Objects; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import org.duniter.core.client.dao.CurrencyDao; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.BlockchainParameters; +import org.duniter.core.client.model.bma.EndpointApi; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.service.bma.BlockchainRemoteService; +import org.duniter.core.client.service.bma.NetworkRemoteService; +import org.duniter.core.client.service.bma.WotRemoteService; +import org.duniter.core.client.service.exception.BlockNotFoundException; +import org.duniter.core.client.util.KnownBlocks; +import org.duniter.core.client.util.KnownCurrencies; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.model.NullProgressionModel; +import org.duniter.core.model.ProgressionModel; +import org.duniter.core.model.ProgressionModelImpl; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.ObjectUtils; +import org.duniter.core.util.Preconditions; +import org.duniter.core.util.StringUtils; +import org.duniter.core.util.cache.SimpleCache; +import org.duniter.core.util.json.JsonAttributeParser; +import org.duniter.core.util.websocket.WebsocketClientEndpoint; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.dao.CurrencyExtendDao; +import org.duniter.elasticsearch.exception.DuplicateIndexIdException; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.common.inject.Inject; +import org.nuiton.i18n.I18n; + +import java.io.IOException; +import java.util.*; + +/** + * Created by Benoit on 30/03/2015. + */ +public class WotService extends AbstractService { + + private BlockDao blockDao; + private CurrencyExtendDao currencyDao; + private WotRemoteService wotRemoteService; + private BlockchainService blockchainService; + + @Inject + public WotService(Duniter4jClient client, + PluginSettings settings, + ThreadPool threadPool, + BlockDao blockDao, + CurrencyDao currencyDao, + BlockchainService blockchainService, + final ServiceLocator serviceLocator){ + super("duniter.wot", client, settings); + this.client = client; + this.blockDao = blockDao; + this.currencyDao = (CurrencyExtendDao) currencyDao; + this.blockchainService = blockchainService; + threadPool.scheduleOnStarted(() -> { + wotRemoteService = serviceLocator.getWotRemoteService(); + setIsReady(true); + }); + } + + public Map<String, String> getMembers(String currency) { + + currency = safeGetCurrency(currency); + + if (pluginSettings.enableBlockchainIndexation()) { + BlockchainParameters p = blockchainService.getParameters(currency); + return blockDao.getMembers(p); + } + else { + // TODO: check if it works ! + return wotRemoteService.getMembersUids(currency); + } + + } + + /** + * Return the given currency, or the default currency + * @param currency + * @return + */ + protected String safeGetCurrency(String currency) { + if (StringUtils.isNotBlank(currency)) return currency; + return currencyDao.getDefaultCurrencyName(); + } +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java index b01a30f13c29d2d6bf6f1e31be2a0bb6d9af1e5e..70ae0b7f30f69773e5fc758b5de3d4731c00ba83 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java @@ -39,9 +39,15 @@ package org.duniter.elasticsearch.service.changes; */ import com.fasterxml.jackson.annotation.JsonIgnore; +import org.duniter.core.exception.TechnicalException; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.joda.time.DateTime; +import java.io.IOException; + public class ChangeEvent { private final String id; private final String index; @@ -50,6 +56,7 @@ public class ChangeEvent { private final Operation operation; private final long version; private final BytesReference source; + private String sourceText; // cache public enum Operation { INDEX,CREATE,DELETE @@ -108,4 +115,18 @@ public class ChangeEvent { return source != null; } + @JsonIgnore + public String getSourceText(){ + if (sourceText != null) return sourceText; + if (source == null) return null; + try { + XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput()); + builder.rawValue(source); + sourceText = builder.string(); + return sourceText; + } catch (IOException e) { + throw new TechnicalException("Error while generating JSON from source", e); + } + } + } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java index d969c5bb2f479546e817c5399a85cc4ff0963975..14e201c7ce919c8661c7826248e33956f5c3b13a 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java @@ -177,7 +177,7 @@ public class ChangeService { try { listener.onChange(change); } catch (Exception e) { - log.error("Failed to send message", e); + log.error("Failed to sendBlock message", e); } } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java index f83a140fb5ecc56b52c4cff6eefd37dab74e2e6f..51ec6cd42134f9c756f388482a0e348a61d2ce00 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java @@ -92,7 +92,9 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { } public void doStop(){ - scheduler.shutdown(); + if (!scheduler.isShutdown()) { + scheduler.shutdown(); + } } public void doClose() {} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketModule.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketModule.java index 7152c435f85b8197afe04e84a1e995eae9b2fbf2..acacbd53498609816d41160d3beb957f243c98a3 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketModule.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketModule.java @@ -22,28 +22,28 @@ package org.duniter.elasticsearch.websocket; * #L% */ -/* - Copyright 2015 ForgeRock AS - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ +import org.duniter.elasticsearch.websocket.netty.NettyWebSocketBlockHandler; +import org.duniter.elasticsearch.websocket.netty.NettyWebSocketChangesHandler; +import org.duniter.elasticsearch.websocket.netty.NettyWebSocketPeerHandler; +import org.duniter.elasticsearch.websocket.tyrus.WebSocketBlockEndPoint; +import org.duniter.elasticsearch.websocket.tyrus.WebSocketChangesEndPoint; import org.elasticsearch.common.inject.AbstractModule; public class WebSocketModule extends AbstractModule { @Override protected void configure() { - bind(WebSocketServer.class).asEagerSingleton(); + + // Netty handler + bind(NettyWebSocketBlockHandler.Init.class).asEagerSingleton(); + bind(NettyWebSocketChangesHandler.Init.class).asEagerSingleton(); + bind(NettyWebSocketPeerHandler.Init.class).asEagerSingleton(); + + + // Tyrus Web socket Server bind(WebSocketChangesEndPoint.Init.class).asEagerSingleton(); + bind(WebSocketBlockEndPoint.Init.class).asEagerSingleton(); + } + } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketBlockHandler.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketBlockHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..259f919f86448dbfbd02069b775444926128828e --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketBlockHandler.java @@ -0,0 +1,200 @@ +package org.duniter.elasticsearch.websocket.netty; + +/* + * #%L + * Duniter4j :: ElasticSearch Plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.google.common.collect.ImmutableList; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.StringUtils; +import org.duniter.core.util.json.JsonAttributeParser; +import org.duniter.elasticsearch.http.netty.NettyWebSocketServer; +import org.duniter.elasticsearch.http.netty.websocket.NettyBaseWebSocketEndpoint; +import org.duniter.elasticsearch.http.netty.websocket.NettyWebSocketSession; +import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.CurrencyService; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; + +import javax.websocket.CloseReason; +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +public class NettyWebSocketBlockHandler extends NettyBaseWebSocketEndpoint implements ChangeService.ChangeListener { + + private final static String PATH = WEBSOCKET_PATH + "/block"; + + private final static JsonAttributeParser<Long> numberAttributeParser = new JsonAttributeParser<>(BlockchainBlock.PROPERTY_NUMBER, Long.class); + private final static JsonAttributeParser<String> hashAttributeParser = new JsonAttributeParser<>(BlockchainBlock.PROPERTY_HASH, String.class); + + private static ESLogger logger = null; + private static BlockchainService blockchainService; + private static CurrencyService currencyService; + private static boolean isReady = false; + + + public static class Init { + @Inject + public Init( NettyWebSocketServer webSocketServer, + CurrencyService currencyService, + BlockchainService blockchainService, + ThreadPool threadPool) { + logger = Loggers.getLogger("duniter.ws.block"); + + NettyWebSocketBlockHandler.currencyService = currencyService; + NettyWebSocketBlockHandler.blockchainService = blockchainService; + + webSocketServer.addEndpoint(PATH, NettyWebSocketBlockHandler.class); + + threadPool.scheduleOnClusterReady(() -> isReady = true); + } + } + + private NettyWebSocketSession session; + private String currency; + private List<ChangeSource> sources; + private String lastBlockstampSent; + + @Override + public void onOpen(NettyWebSocketSession session){ + + this.session = session; + this.lastBlockstampSent = null; + + if (!isReady) { + session.close(new CloseReason(CloseReason.CloseCodes.SERVICE_RESTART, "Pod is not ready")); + return; + } + + // Use given currency or default currency + try { + currency = currencyService.safeGetCurrency(session.getPathParameters().get("currency")); + } catch (Exception e) { + logger.debug(String.format("Cannot open websocket session on {/ws/block}: %s", e.getMessage()), e); + } + + // Failed if no currency on this pod, or if pod is not ready yet + if (StringUtils.isBlank(currency)) { + session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Missing currency to listen")); + return; + } + + logger.debug(String.format("[%s] Opening websocket session on {/ws/block}, id {%s}", currency, session.getId())); + + // After opening, sent the current block + BytesReference currentBlock = blockchainService.getCurrentBlockAsBytes(currency); + if (currentBlock != null) sendJson(currentBlock); + + // Listening changes + this.sources = ImmutableList.of(new ChangeSource(currency, BlockchainService.BLOCK_TYPE)); + ChangeService.registerListener(this); + } + + @Override + public void onChange(ChangeEvent event) { + switch (event.getOperation()) { + case CREATE: + //case INDEX: + sendSourceIfNotNull(event); + break; + default: + // Ignoring (if delete) + } + } + + @Override + public String getId() { + return session == null ? null : session.getId(); + } + + @Override + public Collection<ChangeSource> getChangeSources() { + return sources; + } + + @Override + public void onMessage(String message) { + // Ignoring + } + + @Override + public void onClose(CloseReason reason) { + logger.debug("Closing websocket: "+reason); + ChangeService.unregisterListener(this); + this.session = null; + this.lastBlockstampSent = null; + } + + public void onError(Throwable t) { + logger.error(String.format("[%s] Error on websocket endpoint {%s} session {%s}", currency, PATH, (session == null ? null : session.getId())), t); + } + + /* -- internal methods -- */ + + protected void sendSourceIfNotNull(ChangeEvent event) { + + if (!event.hasSource()) return; // Skip + + try { + String sourceText = event.getSourceText(); + + Long number = numberAttributeParser.getValue(sourceText); + String hash = hashAttributeParser.getValue(sourceText); + + // Check if not already sent + String blocktamp = String.format("%s-%s", number, hash); + if (!blocktamp.equals(this.lastBlockstampSent)) { + this.lastBlockstampSent = blocktamp; + session.sendText(sourceText); + } + + } catch(Exception e) { + logger.error(String.format("[%s] Cannot sent websocket response {%s} to session {%s}: %s", currency, PATH, session.getId(), e.getMessage()), e); + } + + } + + protected void sendJson(BytesReference bytes) { + try { + XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput()); + builder.rawValue(bytes); + session.sendText(builder.string()); + } catch (IOException e) { + throw new TechnicalException("Error while generating JSON from source", e); + } + } + +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketChangesHandler.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketChangesHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..e28c0628b18acf3a164675b383eba7c5aad20527 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketChangesHandler.java @@ -0,0 +1,154 @@ +package org.duniter.elasticsearch.websocket.netty; + +/* + * #%L + * Duniter4j :: ElasticSearch Plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +/* + Copyright 2015 ForgeRock AS + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +import com.google.common.collect.Maps; +import org.apache.commons.collections4.MapUtils; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.http.netty.NettyWebSocketServer; +import org.duniter.elasticsearch.http.netty.websocket.NettyBaseWebSocketEndpoint; +import org.duniter.elasticsearch.http.netty.websocket.NettyWebSocketSession; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeEvents; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; + +import javax.websocket.CloseReason; +import javax.websocket.OnError; +import javax.websocket.OnOpen; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint implements ChangeService.ChangeListener{ + + private final static String PATH = WEBSOCKET_PATH + "/_changes"; + public static Collection<ChangeSource> DEFAULT_SOURCES = null; + + private static ESLogger logger; + private NettyWebSocketSession session; + private Map<String, ChangeSource> sources; + + public static class Init { + + @Inject + public Init(NettyWebSocketServer webSocketServer, PluginSettings pluginSettings) { + logger = Loggers.getLogger("duniter.ws.changes"); + + // Init default sources + final String[] sourcesStr = pluginSettings.getWebSocketChangesListenSource(); + List<ChangeSource> sources = new ArrayList<>(); + for(String sourceStr : sourcesStr) { + sources.add(new ChangeSource(sourceStr)); + } + DEFAULT_SOURCES = sources; + + // Register endpoint + webSocketServer.addEndpoint(PATH, NettyWebSocketChangesHandler.class); + } + } + + + @OnOpen + public void onOpen(NettyWebSocketSession session){ + logger.debug("Connected ... " + session.getId()); + this.session = session; + this.sources = null; + ChangeService.registerListener(this); + } + + @Override + public void onChange(ChangeEvent changeEvent) { + session.sendText(ChangeEvents.toJson(changeEvent)); + } + + @Override + public String getId() { + return session == null ? null : session.getId(); + } + + @Override + public Collection<ChangeSource> getChangeSources() { + if (MapUtils.isEmpty(sources)) return DEFAULT_SOURCES; + return sources.values(); + } + + @Override + public void onMessage(String message) { + addSourceFilter(message); + } + + @Override + public void onClose(CloseReason reason) { + logger.debug("Closing websocket: "+reason); + ChangeService.unregisterListener(this); + this.session = null; + } + + @OnError + public void onError(Throwable t) { + logger.error("Error on websocket "+(session == null ? null : session.getId()), t); + } + + + /* -- internal methods -- */ + + private void addSourceFilter(String filter) { + + ChangeSource source = new ChangeSource(filter); + if (source.isEmpty()) { + logger.debug("Rejecting changes filter (seems to be empty): " + filter); + return; + } + + String sourceKey = source.toString(); + if (sources == null || !sources.containsKey(sourceKey)) { + logger.debug("Adding changes filter: " + filter); + if (sources == null) { + sources = Maps.newHashMap(); + } + sources.put(sourceKey, source); + ChangeService.refreshListener(this); + } + } +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketPeerHandler.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketPeerHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..4e6c30655c75f9cd79f9426731b8adf44d56a248 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketPeerHandler.java @@ -0,0 +1,165 @@ +package org.duniter.elasticsearch.websocket.netty; + +/* + * #%L + * Duniter4j :: ElasticSearch Plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.google.common.collect.ImmutableList; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.StringUtils; +import org.duniter.core.util.json.JsonAttributeParser; +import org.duniter.elasticsearch.dao.PeerDao; +import org.duniter.elasticsearch.http.netty.NettyWebSocketServer; +import org.duniter.elasticsearch.http.netty.websocket.NettyBaseWebSocketEndpoint; +import org.duniter.elasticsearch.http.netty.websocket.NettyWebSocketSession; +import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.CurrencyService; +import org.duniter.elasticsearch.service.PeerService; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; + +import javax.websocket.CloseReason; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +public class NettyWebSocketPeerHandler extends NettyBaseWebSocketEndpoint implements ChangeService.ChangeListener { + + private final static String PATH = WEBSOCKET_PATH + "/peer"; + + private static ESLogger logger = null; + private static CurrencyService currencyService; + private static boolean isReady = false; + + public static class Init { + @Inject + public Init( NettyWebSocketServer webSocketServer, + CurrencyService currencyService, + ThreadPool threadPool) { + logger = Loggers.getLogger("duniter.ws.peer"); + + NettyWebSocketPeerHandler.currencyService = currencyService; + + webSocketServer.addEndpoint(PATH, NettyWebSocketPeerHandler.class); + + threadPool.scheduleOnClusterReady(() -> isReady = true); + } + } + + private NettyWebSocketSession session; + private String currency; + private List<ChangeSource> sources; + + @Override + public void onOpen(NettyWebSocketSession session){ + + this.session = session; + + if (!isReady) { + session.close(new CloseReason(CloseReason.CloseCodes.SERVICE_RESTART, "Pod is not ready")); + return; + } + + // Use given currency or default currency + try { + currency = currencyService.safeGetCurrency(session.getPathParameters().get("currency")); + } catch (Exception e) { + logger.debug(String.format("Cannot open websocket session on {%s}: %s", PATH, e.getMessage()), e); + } + + // Failed if no currency on this pod, or if pod is not ready yet + if (StringUtils.isBlank(currency)) { + session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Missing currency to listen")); + return; + } + + logger.debug(String.format("[%s] Opening websocket session on {%s}, id {%s}", currency, PATH, session.getId())); + + // Listening changes + this.sources = ImmutableList.of(new ChangeSource(currency, PeerDao.TYPE)); + ChangeService.registerListener(this); + } + + @Override + public void onChange(ChangeEvent event) { + switch (event.getOperation()) { + case CREATE: + //case INDEX: + sendSourceIfNotNull(event); + break; + default: + // Ignoring (if delete) + } + } + + @Override + public String getId() { + return session == null ? null : session.getId(); + } + + @Override + public Collection<ChangeSource> getChangeSources() { + return sources; + } + + @Override + public void onMessage(String message) { + // Ignoring + } + + @Override + public void onClose(CloseReason reason) { + logger.debug("Closing websocket: "+reason); + ChangeService.unregisterListener(this); + this.session = null; + } + + public void onError(Throwable t) { + logger.error(String.format("[%s] Error on websocket endpoint {%s} session {%s}", currency, PATH, (session == null ? null : session.getId())), t); + } + + /* -- internal methods -- */ + + protected void sendSourceIfNotNull(ChangeEvent event) { + + if (!event.hasSource()) return; // Skip + + try { + session.sendText(event.getSourceText()); + + } catch(Exception e) { + logger.error(String.format("[%s] Cannot sent websocket response {%s} to session {%s}: %s", currency, PATH, session.getId(), e.getMessage()), e); + } + + } + +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketBlockEndPoint.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketBlockEndPoint.java new file mode 100644 index 0000000000000000000000000000000000000000..2912e3986a2dfeefd53fd1dc990cc8281ccddd52 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketBlockEndPoint.java @@ -0,0 +1,168 @@ +package org.duniter.elasticsearch.websocket.tyrus; + +/* + * #%L + * Duniter4j :: ElasticSearch Plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.fasterxml.jackson.databind.util.ByteBufferBackedOutputStream; +import com.google.common.collect.ImmutableList; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.http.tyrus.TyrusWebSocketServer; +import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.CurrencyService; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; + +import javax.websocket.*; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +@ServerEndpoint(value = "/block") +public class WebSocketBlockEndPoint implements ChangeService.ChangeListener{ + + private static BlockchainService blockchainService; + private static CurrencyService currencyService; + private static boolean isReady = false; + private static ESLogger logger = null; + + public static class Init { + + + @Inject + public Init(TyrusWebSocketServer webSocketServer, + CurrencyService currencyService, + BlockchainService blockchainService, + ThreadPool threadPool) { + webSocketServer.addEndPoint(WebSocketBlockEndPoint.class); + WebSocketBlockEndPoint.currencyService = currencyService; + WebSocketBlockEndPoint.blockchainService = blockchainService; + logger = Loggers.getLogger("duniter.ws.block"); + + //server.addLifecycleListener(); + threadPool.scheduleOnClusterReady(() -> { + isReady = true; + }); + } + } + + private Session session; + private String currency; + private List<ChangeSource> sources; + + @OnOpen + public void onOpen(Session session) throws IOException { + this.session = session; + + if (!isReady) { + session.close(new CloseReason(CloseReason.CloseCodes.SERVICE_RESTART, "Pod is not ready")); + return; + } + + // Use given currency or default currency + try { + currency = currencyService.safeGetCurrency(session.getPathParameters().get("currency")); + } catch (Exception e) { + logger.debug(String.format("Cannot open websocket session on {/ws/block}: %s", e.getMessage()), e); + } + + // Failed if no currency on this pod, or if pod is not ready yet + if (StringUtils.isBlank(currency)) { + session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "Missing currency to listen")); + return; + } + + logger.debug(String.format("[%s] Opening websocket session on {/ws/block}, id {%s}", currency, session.getId())); + + // After opening, sent the current block + sendBinary(blockchainService.getCurrentBlockAsBytes(currency)); + + // Listening changes + this.sources = ImmutableList.of(new ChangeSource(currency, BlockchainService.BLOCK_TYPE)); + ChangeService.registerListener(this); + } + + @Override + public void onChange(ChangeEvent changeEvent) { + switch (changeEvent.getOperation()) { + case CREATE: + case INDEX: + if (changeEvent.hasSource()) { + sendBinary(changeEvent.getSource()); + } + break; + default: + // Ignoring (if delete) + } + } + + @Override + public String getId() { + return session == null ? null : session.getId(); + } + + @Override + public Collection<ChangeSource> getChangeSources() { + return sources; + } + + @OnMessage + public void onMessage(String message) { + // Ignoring + } + + @OnClose + public void onClose(CloseReason reason) { + logger.debug("Closing websocket: "+reason); + ChangeService.unregisterListener(this); + this.session = null; + } + + @OnError + public void onError(Throwable t) { + logger.error("Error on websocket endpoint /ws/block "+(session == null ? null : session.getId()), t); + } + + /* -- internal methods -- */ + + protected void sendBinary(BytesReference source) { + try { + ByteBuffer bf = ByteBuffer.allocate(1024*1000); + XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new ByteBufferBackedOutputStream(bf)); + builder.rawValue(source); + session.getAsyncRemote().sendBinary(bf); + } catch(IOException e) { + logger.error(String.format("[%s] Cannot sent response to session {%s} on {/ws/block}: %s", currency, session.getId(), e.getMessage()), e); + } + + } +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketChangesEndPoint.java similarity index 86% rename from cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java rename to cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketChangesEndPoint.java index 32b22b1185e9a9dbea71e62420f039444086c6e2..0f5ff58c0e1983dda7afe09f26f1f2d3a3911ee8 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketChangesEndPoint.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.websocket; +package org.duniter.elasticsearch.websocket.tyrus; /* * #%L @@ -41,6 +41,7 @@ package org.duniter.elasticsearch.websocket; import com.google.common.collect.Maps; import org.apache.commons.collections4.MapUtils; import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.http.tyrus.TyrusWebSocketServer; import org.duniter.elasticsearch.service.changes.ChangeEvent; import org.duniter.elasticsearch.service.changes.ChangeEvents; import org.duniter.elasticsearch.service.changes.ChangeService; @@ -59,15 +60,14 @@ import java.util.Map; @ServerEndpoint(value = "/_changes") public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ - public static String PATH_PARAM_INDEX = "index"; - public static String PATH_PARAM_TYPE = "type"; - public static Collection<ChangeSource> DEFAULT_SOURCES = null; + private static ESLogger logger; public static class Init { @Inject - public Init(WebSocketServer webSocketServer, PluginSettings pluginSettings) { + public Init(TyrusWebSocketServer webSocketServer, PluginSettings pluginSettings) { + logger = Loggers.getLogger("duniter.ws.changes"); webSocketServer.addEndPoint(WebSocketChangesEndPoint.class); final String[] sourcesStr = pluginSettings.getWebSocketChangesListenSource(); List<ChangeSource> sources = new ArrayList<>(); @@ -78,13 +78,12 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ } } - private final ESLogger log = Loggers.getLogger("duniter.ws.changes"); private Session session; private Map<String, ChangeSource> sources; @OnOpen public void onOpen(Session session) { - log.debug("Connected ... " + session.getId()); + logger.debug("Connected ... " + session.getId()); this.session = session; this.sources = null; ChangeService.registerListener(this); @@ -113,14 +112,14 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ @OnClose public void onClose(CloseReason reason) { - log.debug("Closing websocket: "+reason); + logger.debug("Closing websocket: "+reason); ChangeService.unregisterListener(this); this.session = null; } @OnError public void onError(Throwable t) { - log.error("Error on websocket "+(session == null ? null : session.getId()), t); + logger.error("Error on websocket "+(session == null ? null : session.getId()), t); } @@ -130,13 +129,13 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ ChangeSource source = new ChangeSource(filter); if (source.isEmpty()) { - log.debug("Rejecting changes filter (seems to be empty): " + filter); + logger.debug("Rejecting changes filter (seems to be empty): " + filter); return; } String sourceKey = source.toString(); if (sources == null || !sources.containsKey(sourceKey)) { - log.debug("Adding changes filter: " + filter); + logger.debug("Adding changes filter: " + filter); if (sources == null) { sources = Maps.newHashMap(); } diff --git a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java index e6ba1ee8190fb40ee8452eb0c7ce92e1f8fe957e..60b293f41175f15c1a505c5090d4ec19dd967219 100644 --- a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java +++ b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/PluginSettings.java @@ -99,8 +99,12 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return this.settings.get("duniter.subscription.email.link.url", getCesiumUrl()); } + public String getEmailLinkName() { + return this.settings.get("duniter.subscription.email.link.name", "Cesium+"); + } + /** - * Should email subscription be send at startup ? + * Should email subscription be sendBlock at startup ? * @return */ public boolean isEmailSubscriptionsExecuteAtStartup() { diff --git a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java index 55c730927f4e834e1b16e7ab45eff85e14209315..f05e88663c45720c4ce20f3c6abedfce2b8af7e3 100644 --- a/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java +++ b/cesium-plus-pod-subscription/src/main/java/org/duniter/elasticsearch/subscription/service/SubscriptionService.java @@ -73,6 +73,7 @@ public class SubscriptionService extends AbstractService { private UserEventService userEventService; private UserService userService; private String emailSubjectPrefix; + private String emailLinkName; private STGroup templates; private boolean debug; @@ -99,6 +100,7 @@ public class SubscriptionService extends AbstractService { if (StringUtils.isNotBlank(emailSubjectPrefix)) { emailSubjectPrefix += " "; // add one trailing space } + this.emailLinkName = pluginSettings.getEmailLinkName().trim(); this.debug = logger.isDebugEnabled(); // Configure springtemplate engine @@ -290,7 +292,7 @@ public class SubscriptionService extends AbstractService { if (lastExecution != null) { lastExecutionTime = lastExecution.getTime(); } - // If first email execution: only send event from the last 7 days. + // If first email execution: only sendBlock event from the last 7 days. else { Calendar defaultDateLimit = new GregorianCalendar(); defaultDateLimit.setTimeInMillis(System.currentTimeMillis()); @@ -323,7 +325,8 @@ public class SubscriptionService extends AbstractService { profileTitles, userEvents, userLocale, - pluginSettings.getEmailLinkUrl()) + pluginSettings.getEmailLinkUrl(), + emailLinkName) .render(userLocale); // Compute HTML content @@ -335,12 +338,13 @@ public class SubscriptionService extends AbstractService { profileTitles, userEvents, userLocale, - pluginSettings.getEmailLinkUrl()) + pluginSettings.getEmailLinkUrl(), + emailLinkName) .render(userLocale); final String object = emailSubjectPrefix + I18n.t("duniter4j.es.subscription.email.subject", userEvents.size()); if (pluginSettings.isEmailSubscriptionsDebug()) { - logger.info(String.format("---- Email to send (debug mode) ------\nTo:%s\nObject: %s\nText content:\n%s", + logger.info(String.format("---- Email to sendBlock (debug mode) ------\nTo:%s\nObject: %s\nText content:\n%s", subscription.getContent().getEmail(), object, text)); @@ -376,7 +380,8 @@ public class SubscriptionService extends AbstractService { Map<String, String> issuerProfilNames, List<UserEvent> userEvents, final Locale issuerLocale, - String cesiumSiteUrl) { + String linkUrl, + String linkName) { String issuerName = issuerProfilNames != null && issuerProfilNames.containsKey(subscription.getIssuer()) ? issuerProfilNames.get(subscription.getIssuer()) : ModelUtils.minifyPubkey(subscription.getIssuer()); @@ -387,7 +392,8 @@ public class SubscriptionService extends AbstractService { try { // Compute body - template.add("url", cesiumSiteUrl); + template.add("linkName", linkName); + template.add("url", linkUrl); template.add("issuerPubkey", subscription.getIssuer()); template.add("issuerName", issuerName); template.add("senderPubkey", senderPubkey); diff --git a/cesium-plus-pod-subscription/src/main/resources/i18n/cesium-plus-pod-subscription_en_GB.properties b/cesium-plus-pod-subscription/src/main/resources/i18n/cesium-plus-pod-subscription_en_GB.properties index 3277f18a7ba82c3dca3b0711603f4c4197cd0492..d8340fa118eeff8fdef7d865a1235db682376fd3 100644 --- a/cesium-plus-pod-subscription/src/main/resources/i18n/cesium-plus-pod-subscription_en_GB.properties +++ b/cesium-plus-pod-subscription/src/main/resources/i18n/cesium-plus-pod-subscription_en_GB.properties @@ -1,13 +1,13 @@ -duniter4j.es.subscription.email.footer.disableHelp=You can disable this email notification service in the page %2$% (%1$s). +duniter4j.es.subscription.email.footer.disableHelp=You can disable this email notification service in the page %2$% %3$% (%1$s). duniter4j.es.subscription.email.footer.sendBy=This email has sent you the Cesium+ node of %2$s (%1$s). duniter4j.es.subscription.email.hello=Hello %s\! duniter4j.es.subscription.email.html.footer.disableHelp=You can disable this email notification service in <a href\="%s">online services</a> page. -duniter4j.es.subscription.email.html.footer.sendBy=This email has sent you the Cesium+ node of <a href\="%s">%s</a>.. +duniter4j.es.subscription.email.html.footer.sendBy=This email has sent you the %3$% node of <a href\="%1$%">%2$%</a>.. duniter4j.es.subscription.email.html.hello=Hello <b>%s</b>\! duniter4j.es.subscription.email.html.pubkey=Public key\: <a href\="%s">%s</a> duniter4j.es.subscription.email.html.unreadCount=You received <b>%s new notifications</b>. duniter4j.es.subscription.email.notificationsDivider=Notifications list\: -duniter4j.es.subscription.email.openCesium=Open Cesium+ +duniter4j.es.subscription.email.open=Open duniter4j.es.subscription.email.pubkey=Public key\: %2$s (%1$s) duniter4j.es.subscription.email.start=Email subscriptions\: daily mailing [at %1$s\:00] and weekly [on %2$s at %1$s\:00] duniter4j.es.subscription.email.subject=You received %s new notifications diff --git a/cesium-plus-pod-subscription/src/main/resources/i18n/cesium-plus-pod-subscription_fr_FR.properties b/cesium-plus-pod-subscription/src/main/resources/i18n/cesium-plus-pod-subscription_fr_FR.properties index 28faba1c1aefaca12ba5fcd2c051d9d857535925..ece559c0322b533af3ee266c83a4832ef701b0fb 100644 --- a/cesium-plus-pod-subscription/src/main/resources/i18n/cesium-plus-pod-subscription_fr_FR.properties +++ b/cesium-plus-pod-subscription/src/main/resources/i18n/cesium-plus-pod-subscription_fr_FR.properties @@ -1,13 +1,13 @@ -duniter4j.es.subscription.email.footer.disableHelp=Vous pouvez désactiver ce service de notification par email, dans la rubrique "Services en ligne" de Cesium+ (%s). -duniter4j.es.subscription.email.footer.sendBy=Cet email vous a été envoyé le noeud Cesium+ de %2$s (%1$s). +duniter4j.es.subscription.email.footer.disableHelp=Vous pouvez désactiver ce service de notification par email, dans la rubrique "Services en ligne" de %2$s (%1$s). +duniter4j.es.subscription.email.footer.sendBy=Cet email vous a été envoyé le noeud %3$s de %2$s (%1$s). duniter4j.es.subscription.email.hello=Bonjour %s \! -duniter4j.es.subscription.email.html.footer.disableHelp=Vous pouvez désactiver ce service de notification par email, dans <a href\="%s">la rubrique services en ligne</a> de Cesium+. -duniter4j.es.subscription.email.html.footer.sendBy=Cet email vous a été envoyé depuis le noeud Cesium+ de <a href\="%1$s">%2$s</a>. +duniter4j.es.subscription.email.html.footer.disableHelp=Vous pouvez désactiver ce service de notification par email, dans <a href\="%1$s">la rubrique services en ligne</a> de %2$s. +duniter4j.es.subscription.email.html.footer.sendBy=Cet email vous a été envoyé depuis le noeud %3$s de <a href\="%1$s">%2$s</a>. duniter4j.es.subscription.email.html.hello=Bonjour <b>%s</b> \! duniter4j.es.subscription.email.html.pubkey=Clé publique \: <a href\="%s">%s</a> duniter4j.es.subscription.email.html.unreadCount=Vous avez <b>%s notifications</b> non lues. duniter4j.es.subscription.email.notificationsDivider=Liste des notifications \: -duniter4j.es.subscription.email.openCesium=Ouvrir Cesium+ +duniter4j.es.subscription.email.open=Ouvrir duniter4j.es.subscription.email.pubkey=Clé publique \: %2$s (%1$s) duniter4j.es.subscription.email.start=Abonnement email\: envoi quotidien [à %1$s\:00] et hebdomadaire [le %2$s à %1$s\:00] duniter4j.es.subscription.email.subject=%s nouvelles notifications non lues diff --git a/cesium-plus-pod-subscription/src/main/resources/org/duniter/elasticsearch/subscription/templates/html_email_content.st b/cesium-plus-pod-subscription/src/main/resources/org/duniter/elasticsearch/subscription/templates/html_email_content.st index c6fa7ed4cb0dd2c8cb62918f4f474e4ec61fa800..af20871b0fff0a8101bd5691b3111a9c1054823b 100644 --- a/cesium-plus-pod-subscription/src/main/resources/org/duniter/elasticsearch/subscription/templates/html_email_content.st +++ b/cesium-plus-pod-subscription/src/main/resources/org/duniter/elasticsearch/subscription/templates/html_email_content.st @@ -1,4 +1,4 @@ -html_email_content(issuerPubkey, issuerName, senderPubkey, senderName, events, url) ::= << +html_email_content(issuerPubkey, issuerName, senderPubkey, senderName, events, url, linkName) ::= << <table cellspacing="0" cellpadding="0" width="100%" style="font-size:12px;font-family:Helvetica Neue,Helvetica,Lucida Grande,tahoma,verdana,arial,sans-serif;border-spacing:0px;border-collapse:collapse;max-width:600px!important;"> <tr> @@ -33,7 +33,7 @@ html_email_content(issuerPubkey, issuerName, senderPubkey, senderName, events, u <td> <p style="margin:0px;width:100%;text-align:right;min-height: 64px;padding: 16px 0px;"> <a style="overflow:hidden!important;background-color:#387ef5;border-color:transparent;border-radius:2px;border-shadow: 2px 2px rgba(50,50,50,0.32);box-sizing: border-box;color:white;display:inline-block;font-size:14px;font-weight: 500;height: 47px;letter-spacing: 0.5px;line-height:42px;margin:0;min-height:47px;min-width:52px;padding-bottom:0px;padding-left:24px;padding-right:24px;padding-top:0px;text-align:center;text-decoration:none;text-transform:uppercase;" - href="$url$">$i18n("duniter4j.es.subscription.email.openCesium")$ >></a> + href="$url$">$i18n("duniter4j.es.subscription.email.open")$ $linkName$ >></a> </p> </td> </tr> @@ -63,10 +63,10 @@ html_email_content(issuerPubkey, issuerName, senderPubkey, senderName, events, u <td> <div style="background-color: rgb(236, 240, 247) !important;border-color: rgb(221, 223, 226) !important;width:100%;text-align:center;border-radius:4px;"> <p style="margin:0px;padding:8px 0px;text-align:center;color:grey !important;text-decoration:none !important;"> - $i18n_args("duniter4j.es.subscription.email.html.footer.sendBy", [{$[url, "/#/app/wot/", senderPubkey, "/"]; separator=""$}, senderName])$ + $i18n_args("duniter4j.es.subscription.email.html.footer.sendBy", [{$[url, "/#/app/wot/", senderPubkey, "/"]; separator=""$}, senderName, linkName])$ <br/> <small> - $i18n_args("duniter4j.es.subscription.email.html.footer.disableHelp", {$[url, "/#/app/wallet/subscriptions"]; separator=""$})$ + $i18n_args("duniter4j.es.subscription.email.html.footer.disableHelp", [{$[url, "/#/app/wallet/subscriptions"]; separator=""$}, linkName])$ </small> </p> </div> diff --git a/cesium-plus-pod-subscription/src/main/resources/org/duniter/elasticsearch/subscription/templates/text_email.st b/cesium-plus-pod-subscription/src/main/resources/org/duniter/elasticsearch/subscription/templates/text_email.st index 004367d99c9518604cc2c57fa1d864fb0bc37690..870b88329920be57f1060a79c66f4178ecd0340b 100644 --- a/cesium-plus-pod-subscription/src/main/resources/org/duniter/elasticsearch/subscription/templates/text_email.st +++ b/cesium-plus-pod-subscription/src/main/resources/org/duniter/elasticsearch/subscription/templates/text_email.st @@ -1,15 +1,15 @@ -text_email(issuerPubkey, issuerName, senderPubkey, senderName, events, url) ::= << +text_email(issuerPubkey, issuerName, senderPubkey, senderName, events, url, linkName) ::= << $i18n_args("duniter4j.es.subscription.email.hello", issuerName)$ $i18n_args("duniter4j.es.subscription.email.unreadCount", {$length(events)$} )$ $i18n("duniter4j.es.subscription.email.notificationsDivider")$ $events:{e|$text_event_item(e)$}$ -$i18n("duniter4j.es.subscription.email.openCesium")$ : $url$ -$if(issuerPubkey)$$i18n_args("duniter4j.es.subscription.email.pubkey", [{$[url, "/#/app/wot/", issuerPubkey, "/"]; separator=""$}, {$issuerPubkey; format="pubkey"$}])$$endif$ +$i18n("duniter4j.es.subscription.email.open")$ $linkName$ : $url$ +$if(issuerPubkey)$$i18n_args("duniter4j.es.subscription.email.pubkey", [{$[url, "/#/app/wot/", issuerPubkey, "/"]; separator=""$}, {$issuerPubkey; format="pubkey"$}, linkName])$$endif$ ----------------------------------------------- -$i18n_args("duniter4j.es.subscription.email.footer.sendBy", [{$[url, "/#/app/wot/", senderPubkey, "/"]; separator=""$}, senderName])$ -$i18n_args("duniter4j.es.subscription.email.footer.disableHelp", {$[url, "/#/app/wallet/subscriptions"]; separator=""$})$ +$i18n_args("duniter4j.es.subscription.email.footer.sendBy", [{$[url, "/#/app/wot/", senderPubkey, "/"]; separator=""$}, senderName, linkName])$ +$i18n_args("duniter4j.es.subscription.email.footer.disableHelp", [{$[url, "/#/app/wallet/subscriptions"]; separator=""$}, linkName])$ >> \ No newline at end of file diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/WebsocketUserEventEndPoint.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/WebsocketUserEventEndPoint.java index 6f2c898394748fc3dad5ee8d37f44c65591ef929..fd425384c4f96e4e72226ce8ae08e10883cab603 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/WebsocketUserEventEndPoint.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/WebsocketUserEventEndPoint.java @@ -45,7 +45,7 @@ import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.user.PluginSettings; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.service.UserEventService; -import org.duniter.elasticsearch.websocket.WebSocketServer; +import org.duniter.elasticsearch.http.tyrus.TyrusWebSocketServer; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; @@ -66,7 +66,7 @@ public class WebsocketUserEventEndPoint implements UserEventService.UserEventLis public static class Init { @Inject - public Init(WebSocketServer webSocketServer, PluginSettings pluginSettings) { + public Init(TyrusWebSocketServer webSocketServer, PluginSettings pluginSettings) { webSocketServer.addEndPoint(WebsocketUserEventEndPoint.class); defaultLocale = pluginSettings.getI18nLocale(); if (defaultLocale == null) defaultLocale = new Locale("en", "GB"); diff --git a/pom.xml b/pom.xml index f66a1bdb9e369ef7516695feaac5296e35da3287..27a479681c092f84807556a5a01a7549a0c613b5 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ <signatureVersion>1.0</signatureVersion> <!-- Commons versions --> - <duniter4j.version>1.2.0</duniter4j.version> + <duniter4j.version>1.2.5-SNAPSHOT</duniter4j.version> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.6</slf4j.version> <guava.version>22.0</guava.version>