diff --git a/cesium-plus-pod-assembly/src/main/assembly/config/elasticsearch.yml b/cesium-plus-pod-assembly/src/main/assembly/config/elasticsearch.yml index 2b0f6ed953767dcb6a5b9a94756a33a09d92c3c3..9fddcbd1c589de8554a478124dd4199ec5cce731 100644 --- a/cesium-plus-pod-assembly/src/main/assembly/config/elasticsearch.yml +++ b/cesium-plus-pod-assembly/src/main/assembly/config/elasticsearch.yml @@ -127,26 +127,32 @@ duniter.string.analyzer: french # Enabling blockchain synchronization (default: false) # duniter.blockchain.enable: true -#duniter.blockchain.event.user.enable: true -#duniter.blockchain.event.admin.enable: true # -# Force blockchain full synchronization - /!\ WARNING: all user events will be reset to 'unread' +# Enabling blockchain user event indexation ? (default: ${duniter.blockchain.enable}) # -# duniter.blockchain.reload: true -# duniter.blockchain.reload.from: 18900 -# duniter.blockchain.reload.to: 19000 +#duniter.blockchain.event.user.enable: false +#duniter.blockchain.event.admin.enable: false # # Enabling blockchain peers indexation ? (default: ${duniter.blockchain.enable}) -# This will listen new peer events on the Duniter node (using BMA /ws/peer) # # duniter.blockchain.peer.enable: false # +# Force blockchain full synchronization - /!\ WARNING: all user events will be reset to 'unread' +# +# duniter.blockchain.reload: true +# duniter.blockchain.reload.from: 18900 +# duniter.blockchain.reload.to: 19000 +# # Duniter node address # duniter.host: g1.duniter.org duniter.port: 10901 # duniter.useSsl: true # +# Network timeout, in millisecond (default: 20000 = 20s) +# +# duniter.network.timeout: 5000 +# # Compute statistics on indices (each hour) ? (default: true) # # duniter.stats.enable: false @@ -273,9 +279,9 @@ duniter.mail.enable: false # ---------------------------------- Cesium+ Pod > Websocket server ---------------------- # -# Websocket port (default: 9400-9410) +# Websocket port (default: ${http.port}) # -duniter.ws.port: 9400-9410 +# duniter.ws.port: 9400-9410 # # ---------------------------------- Cesium+ Pod > Subscription module ------------------- # diff --git a/cesium-plus-pod-assembly/src/test/es-home/config/elasticsearch.yml b/cesium-plus-pod-assembly/src/test/es-home/config/elasticsearch.yml index cbdcb4f932f719914b69dbc8dafdaa29727dfa52..406d2a9eb269e48374d3cfb420eab32979f15ff6 100644 --- a/cesium-plus-pod-assembly/src/test/es-home/config/elasticsearch.yml +++ b/cesium-plus-pod-assembly/src/test/es-home/config/elasticsearch.yml @@ -16,6 +16,9 @@ # # cluster.name: my-application cluster.name: cesium-plus-cluster-DEV +# +# Host + port, to join your cluster, from external network +# cluster.remote.host: localhost cluster.remote.port: 9200 # diff --git a/cesium-plus-pod-assembly/src/test/es-home/config/logging.yml b/cesium-plus-pod-assembly/src/test/es-home/config/logging.yml index a281deaa87cc39b47ef1e77294607fe452d7fad4..7a8fb32cfd0db2f18846459eacf07be6152a8219 100644 --- a/cesium-plus-pod-assembly/src/test/es-home/config/logging.yml +++ b/cesium-plus-pod-assembly/src/test/es-home/config/logging.yml @@ -17,6 +17,8 @@ logger: duniter: DEBUG #duniter.core: DEBUG duniter.security: WARN + duniter.dao.member: WARN + duniter.threadpool: INFO #duniter.user.event: DEBUG #duniter.network.p2p: DEBUG #duniter.network.peer: DEBUG diff --git a/cesium-plus-pod-assembly/src/test/misc/peer_UP.sh b/cesium-plus-pod-assembly/src/test/misc/peer_UP.sh index ec912a0864b9e590f395027e6f2e910e9b66c400..99d25cab2da368b21ff543f0a1fa89b22e4e9fc2 100755 --- a/cesium-plus-pod-assembly/src/test/misc/peer_UP.sh +++ b/cesium-plus-pod-assembly/src/test/misc/peer_UP.sh @@ -1,7 +1,7 @@ #!/bin/sh -curl -XPOST 'http://localhost:9205/g1/peer/_search?pretty' -d ' +curl -XPOST 'http://localhost:9200/g1/peer/_search?pretty' -d ' { "size" : 1000, "query" : { @@ -16,22 +16,19 @@ curl -XPOST 'http://localhost:9205/g1/peer/_search?pretty' -d ' }, { "nested" : { + "path" : "stats", "query" : { "bool" : { - "filter" : { - "term" : { - "stats.status" : "DOWN" - } - } + "filter" : [ + {"term": {"stats.status" : "UP" }} + ] } - }, - "path" : "stats" + } } } ] } } } - }, - _source: ["dns", "ipv4", "ipv6", "port", "path"] + } }' diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/Plugin.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/Plugin.java index 71beba0f60f260977c20c7c9da2701f74ac7789c..03ad139576fa98dc6de1b49b2762868daba011bc 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/Plugin.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/Plugin.java @@ -27,6 +27,7 @@ import org.duniter.elasticsearch.dao.DaoModule; import org.duniter.elasticsearch.rest.RestModule; import org.duniter.elasticsearch.script.BlockchainTxCountScriptFactory; import org.duniter.elasticsearch.security.SecurityModule; +import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.ServiceModule; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.websocket.WebSocketModule; diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginInit.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginInit.java index a3d9b4fe2b4d7c73c8a2dd4402aa34d99a81396c..d668969b75fb5de2a7e973e68e0d85bf0294e519 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginInit.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginInit.java @@ -24,6 +24,7 @@ package org.duniter.elasticsearch; import org.duniter.core.client.model.elasticsearch.Currency; import org.duniter.core.client.model.local.Peer; +import org.duniter.core.util.Preconditions; import org.duniter.elasticsearch.dao.*; import org.duniter.elasticsearch.rest.security.RestSecurityController; import org.duniter.elasticsearch.service.*; @@ -139,7 +140,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { // Synchronize blockchain if (pluginSettings.enableBlockchainIndexation()) { - Peer peer = pluginSettings.checkAndGetPeer(); + Peer peer = pluginSettings.checkAndGetDuniterPeer(); Currency currency; try { @@ -151,53 +152,69 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { throw e; } - final String currencyName = currency.getCurrencyName(); - peer.setCurrency(currencyName); + final String currencyId = currency.getId(); + peer.setCurrency(currencyId); // Define the main peer for this currency (will fill a cache in PeerService) - injector.getInstance(PeerService.class).setCurrencyMainPeer(currencyName, peer); + injector.getInstance(PeerService.class).setCurrencyMainPeer(currencyId, peer); // Add access security rules, for the currency indices injector.getInstance(RestSecurityController.class) + // Add access to currencies/record index + .allowIndexType(RestRequest.Method.GET, + CurrencyExtendDao.INDEX, + CurrencyExtendDao.RECORD_TYPE) + .allowPostSearchIndexType( + CurrencyExtendDao.INDEX, + CurrencyExtendDao.RECORD_TYPE) + // Add access to <currency>/block index .allowIndexType(RestRequest.Method.GET, - currencyName, + currencyId, BlockDao.TYPE) .allowPostSearchIndexType( - currencyName, + currencyId, BlockDao.TYPE) // Add access to <currency>/blockStat index .allowIndexType(RestRequest.Method.GET, - currencyName, + currencyId, BlockStatDao.TYPE) .allowPostSearchIndexType( - currencyName, + currencyId, BlockStatDao.TYPE) // Add access to <currency>/peer index .allowIndexType(RestRequest.Method.GET, - currencyName, + currencyId, PeerDao.TYPE) .allowPostSearchIndexType( - currencyName, + currencyId, PeerDao.TYPE) // Add access to <currency>/movement index .allowIndexType(RestRequest.Method.GET, - currencyName, + currencyId, MovementDao.TYPE) .allowPostSearchIndexType( - currencyName, + currencyId, MovementDao.TYPE) + // Add access to <currency>/member index + .allowIndexType(RestRequest.Method.GET, + currencyId, + MemberDao.TYPE) + .allowPostSearchIndexType( + currencyId, + MemberDao.TYPE) + // Add access to <currency>/synchro index .allowIndexType(RestRequest.Method.GET, - currencyName, + currencyId, SynchroExecutionDao.TYPE) .allowPostSearchIndexType( - currencyName, + currencyId, SynchroExecutionDao.TYPE); /* TODO à décommenter quand les pending seront sauvegardés @@ -217,7 +234,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { } injector.getInstance(BlockchainService.class) - .deleteRange(currencyName, + .deleteRange(currencyId, pluginSettings.reloadBlockchainIndicesFrom(), pluginSettings.reloadBlockchainIndicesTo()); } @@ -227,12 +244,12 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { } injector.getInstance(BlockchainService.class) - .deleteFrom(currencyName, pluginSettings.reloadBlockchainIndicesFrom()); + .deleteFrom(currencyId, pluginSettings.reloadBlockchainIndicesFrom()); } } else { if (logger.isInfoEnabled()) { - logger.info(String.format("[%s] Indexing blockchain...", currencyName)); + logger.info(String.format("[%s] Indexing blockchain...", currencyId)); } } @@ -250,29 +267,14 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { pluginSettings.reloadBlockchainIndicesTo()); } - try { - // Index blocks (and listen if new block appear) - injector.getInstance(BlockchainService.class) - .indexLastBlocks(peer) - .listenAndIndexNewBlock(peer); - - if (logger.isInfoEnabled()) { - logger.info(String.format("[%s] Indexing blockchain [OK]", currencyName)); - } - + // Index blocks (and listen if new block appear) + startIndexBlocks(peer); - - } catch(Throwable e){ - logger.error(String.format("[%s] Indexing blockchain error: %s", currencyName, e.getMessage()), e); - throw e; - } + // Index WoT members + startIndexMembers(peer); // Index peers (and listen if new peer appear) - if (pluginSettings.enableBlockchainPeerIndexation()) { - logger.info(String.format("[%s] Indexing blockchain peers...", currencyName)); - injector.getInstance(PeerService.class) - .listenAndIndexPeers(peer); - } + startIndexPeers(peer); // Start synchro and peering startSynchroAndPeering(); @@ -302,6 +304,58 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { .allow(RestRequest.Method.POST, "^/_search/scroll$"); } + protected void startIndexBlocks(Peer peer) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(peer.getCurrency()); + + if (pluginSettings.enableBlockchainIndexation()) { + + // Index blocks (and listen if new block appear) + try { + injector.getInstance(BlockchainService.class) + .indexLastBlocks(peer) + .listenAndIndexNewBlock(peer); + + if (logger.isInfoEnabled()) logger.info(String.format("[%s] Indexing blockchain [OK]", peer.getCurrency())); + + } catch (Throwable e) { + logger.error(String.format("[%s] Indexing blockchain error: %s", peer.getCurrency(), e.getMessage()), e); + throw e; + } + } + } + + protected void startIndexMembers(Peer peer) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(peer.getCurrency()); + + if (pluginSettings.enableBlockchainIndexation()) { + + // Index Wot members + try { + injector.getInstance(WotService.class) + .indexMembers(peer.getCurrency()) + .listenAndIndexMembers(peer.getCurrency()); + + } catch (Throwable e) { + logger.error(String.format("[%s] Indexing WoT members error: %s", peer.getCurrency(), e.getMessage()), e); + throw e; + } + } + } + + protected void startIndexPeers(Peer peer) { + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(peer.getCurrency()); + + // Index peers (and listen if new peer appear) + if (pluginSettings.enableBlockchainPeerIndexation()) { + logger.info(String.format("[%s] Indexing blockchain peers...", peer.getCurrency())); + injector.getInstance(PeerService.class) + .listenAndIndexPeers(peer); + } + } + protected void startSynchroAndPeering() { // Start synchro, if enable in config @@ -333,7 +387,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> { // Add index [currency/record] to stats final DocStatService docStatService = injector .getInstance(DocStatService.class) - .registerIndex(CurrencyService.INDEX, CurrencyService.RECORD_TYPE); + .registerIndex(CurrencyExtendDao.INDEX, CurrencyExtendDao.RECORD_TYPE); // Wait end of currency index creation, then index blocks threadPool.scheduleOnClusterReady(docStatService::startScheduling); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index f7148d4d514307fbc63757096caf6af4e3f0aa95..9637787e8d4906b15f2456726e4d6c1932dbcad9 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -67,11 +67,13 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { private static String nodePubkey; private static List<String> i18nBundleNames = new CopyOnWriteArrayList<>(); // Default private static boolean isI18nStarted = false; + private static Peer duniterPeer; protected final Settings settings; private String clusterRemoteUrl; private final CryptoService cryptoService; + /** * Delegate application config. */ @@ -111,8 +113,8 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { String baseDir = settings.get("path.home"); applicationConfig.setConfigFileName("duniter4j.config"); applicationConfig.setDefaultOption(ConfigurationOption.BASEDIR.getKey(), baseDir); - applicationConfig.setDefaultOption(ConfigurationOption.NODE_HOST.getKey(), getNodeBmaHost()); - applicationConfig.setDefaultOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getNodeBmaPort())); + applicationConfig.setDefaultOption(ConfigurationOption.NODE_HOST.getKey(), getDuniterNodeHost()); + applicationConfig.setDefaultOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getDuniterNodePort())); applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_TIMEOUT.getKey(), String.valueOf(getNetworkTimeout())); applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_MAX_CONNECTIONS.getKey(), String.valueOf(getNetworkMaxConnections())); applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_MAX_CONNECTIONS_PER_ROUTE.getKey(), String.valueOf(getNetworkMaxConnectionsPerRoute())); @@ -213,16 +215,16 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { /* -- Settings on Duniter node (with BMA API) -- */ - public String getNodeBmaHost() { + public String getDuniterNodeHost() { return settings.get("duniter.host", "g1.duniter.org"); } - public int getNodeBmaPort() { + public int getDuniterNodePort() { return settings.getAsInt("duniter.port", 10901); } - public boolean getNodeBmaUseSsl() { - return settings.getAsBoolean("duniter.useSsl", getNodeBmaPort() == 443); + public boolean getDuniterNodeUseSsl() { + return settings.getAsBoolean("duniter.useSsl", getDuniterNodePort() == 443); } /* -- Other settings -- */ @@ -271,7 +273,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public int getNetworkTimeout() { - return settings.getAsInt("duniter.network.timeout", 30000 /*30s*/); + return settings.getAsInt("duniter.network.timeout", 20000 /*20s*/); } public int getNetworkMaxConnections() { @@ -391,20 +393,22 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return settings.get("duniter.share.base.url"); } - public Peer checkAndGetPeer() { - if (StringUtils.isBlank(getNodeBmaHost())) { + public Peer checkAndGetDuniterPeer() { + if (duniterPeer != null) return duniterPeer; + + if (StringUtils.isBlank(getDuniterNodeHost())) { logger.error("ERROR: node host is required"); System.exit(-1); return null; } - if (getNodeBmaPort() <= 0) { + if (getDuniterNodePort() <= 0) { logger.error("ERROR: node port is required"); System.exit(-1); return null; } - Peer peer = Peer.newBuilder().setHost(getNodeBmaHost()).setPort(getNodeBmaPort()).setUseSsl(getNodeBmaUseSsl()).build(); - return peer; + this.duniterPeer = Peer.newBuilder().setHost(getDuniterNodeHost()).setPort(getDuniterNodePort()).setUseSsl(getDuniterNodeUseSsl()).build(); + return duniterPeer; } public String getKeyringSalt() { diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/beans/ESBeanFactory.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/beans/ESBeanFactory.java index 7bc9af4cc448d6dc131b5a1e8a055524f3e48a85..7312760acdcbc00e76e62fee32d9fb832ba8a03e 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/beans/ESBeanFactory.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/beans/ESBeanFactory.java @@ -25,6 +25,7 @@ package org.duniter.elasticsearch.beans; import org.duniter.core.beans.Bean; import org.duniter.core.beans.BeanCreationException; import org.duniter.core.beans.BeanFactory; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Injector; @@ -34,15 +35,21 @@ import org.elasticsearch.common.inject.Injector; public class ESBeanFactory extends BeanFactory { private Injector injector = null; + private ThreadPool threadPool = null; @Inject public void setInjector(Injector injector) { this.injector = injector; } + @Inject + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + @Override protected <S extends Bean> void initBean(S bean) { - super.initBean(bean); + threadPool.scheduleOnClusterReady(() -> super.initBean(bean)); if (injector != null) { injector.injectMembers(bean); } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java index 21f248515a7f601c4ef3cb35010ae54e1623fbfd..7d1518f17488982573cdf4d1eed7da0e50cbe37f 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/AbstractDao.java @@ -25,6 +25,8 @@ package org.duniter.elasticsearch.dao; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.collect.Streams; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.local.LocalEntity; @@ -32,6 +34,7 @@ import org.duniter.core.client.model.local.Peer; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; @@ -39,7 +42,13 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.search.SearchHit; import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Created by Benoit on 08/04/2015. @@ -81,33 +90,60 @@ public abstract class AbstractDao implements Bean { return JacksonUtils.getThreadObjectMapper(); } - protected <C extends LocalEntity<String>> List<C> toList(SearchResponse response, Class<? extends C> clazz) { - ObjectMapper objectMapper = getObjectMapper(); + protected <C> List<C> toList(SearchResponse response, final Function<SearchHit, C> mapper) { if (response.getHits() == null || response.getHits().getTotalHits() == 0) return null; - List<C> result = Lists.newArrayList(); - for (SearchHit hit: response.getHits().getHits()) { + return Arrays.stream(response.getHits().getHits()) + .map(hit -> mapper.apply(hit)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + + protected Stream<SearchHit> toStream(SearchResponse response) { + + if (response.getHits() == null || response.getHits().getTotalHits() == 0) return Stream.empty(); + + return Arrays.stream(response.getHits().getHits()); + } + + protected <C extends LocalEntity<String>> List<C> toList(SearchResponse response, Class<? extends C> clazz) { + ObjectMapper objectMapper = getObjectMapper(); + return toList(response, hit -> { try { - C value = objectMapper.readValue(hit.getSourceRef().streamInput(), clazz); - value.setId(hit.getId()); - result.add(value); + return objectMapper.readValue(hit.getSourceRef().streamInput(), clazz); + } catch(IOException e) { logger.warn(String.format("Unable to deserialize source [%s/%s/%s] into [%s]: %s", hit.getIndex(), hit.getType(), hit.getId(), clazz.getName(), e.getMessage())); + return null; } - } - return result; + }); } - protected List<String> toListIds(SearchResponse response) { - if (response.getHits() == null || response.getHits().getTotalHits() == 0) return null; + protected Set<String> executeAndGetIds(SearchResponse response) { + return toStream(response).map(SearchHit::getId).collect(Collectors.toSet()); + } + + protected Set<String> executeAndGetIds(SearchRequestBuilder request) { + + Set<String> result = Sets.newHashSet(); + int size = this.pluginSettings.getIndexBulkSize(); + request.setSize(size); + + long total = -1; + int from = 0; + do { + request.setFrom(from); + SearchResponse response = request.execute().actionGet(); + toStream(response).forEach(hit -> result.add(hit.getId())); + + if (total == -1) total = response.getHits().getTotalHits(); + from += size; + } while(from<total); - List<String> result = Lists.newArrayList(); - for (SearchHit hit: response.getHits().getHits()) { - result.add(hit.getId()); - } return result; } + } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java index 9770ad5da39e4c42c85660d5431fbd449080034b..f790fa3d096154e1a4ea5ea84ad37026f03b308f 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/BlockDao.java @@ -25,11 +25,13 @@ package org.duniter.elasticsearch.dao; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.BlockchainParameters; +import org.duniter.core.client.model.local.Member; import org.elasticsearch.common.bytes.BytesReference; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; /** * Created by blavenie on 03/04/17. @@ -79,5 +81,7 @@ public interface BlockDao extends Bean, TypeDao<BlockDao> { void deleteById(final String currencyName, String id); - Map<String, String> getMembers(BlockchainParameters parameters); + List<Member> getMembers(BlockchainParameters parameters); + + Set<String> getUniqueIssuersBetween(String currencyName, int startNumber, int endNumber); } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java index 08482695830c1992b1f41e0a35894576fefd9d15..d6bcf699bb85655394b97e90d64ac220effb5873 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/CurrencyExtendDao.java @@ -32,5 +32,5 @@ public interface CurrencyExtendDao extends CurrencyDao, IndexTypeDao<CurrencyExt String RECORD_TYPE = "record"; - String getDefaultCurrencyName(); + String getDefaultId(); } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java index acb7f105a2e853240b44d47af79dd7b95b89ef8b..f2f11618942a805352ba17738634272b471169ba 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/DaoModule.java @@ -54,7 +54,7 @@ public class DaoModule extends AbstractModule implements Module { bindWithLocator(BlockDao.class); bindWithLocator(PeerDao.class); bindWithLocator(CurrencyDao.class); - + bindWithLocator(MemberDao.class); } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/MemberDao.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/MemberDao.java new file mode 100644 index 0000000000000000000000000000000000000000..c06c442691779a3303d5be07788f6d9128ab0837 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/MemberDao.java @@ -0,0 +1,53 @@ +package org.duniter.elasticsearch.dao; + +/*- + * #%L + * Duniter4j :: ElasticSearch Core plugin + * %% + * Copyright (C) 2014 - 2017 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.core.beans.Bean; +import org.duniter.core.client.model.local.Identity; +import org.duniter.core.client.model.local.Member; + +import java.util.Collection; +import java.util.List; +import java.util.Set; + +/** + * Created by blavenie on 30/01/19. + */ +public interface MemberDao extends Bean, TypeDao<MemberDao>{ + + String TYPE = "member"; + + List<Member> getMembers(String currencyId); + + boolean isExists(String currencyId, String pubkey); + + Identity create(Identity identity); + + Identity update(Identity identity); + + Set<String> getMemberPubkeys(String currency); + + void save(String currencyId, List<Member> members); + + void updateAsWasMember(String currency, Collection<String> wasMemberPubkeys); +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java index 3cd69c296d6ec1a47554b99e915df58685019bbc..650a8e48d97268787def011ca2c742993e5c03b8 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockDaoImpl.java @@ -26,10 +26,9 @@ package org.duniter.elasticsearch.dao.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Streams; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.BlockchainParameters; -import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.model.local.Member; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; @@ -37,12 +36,9 @@ import org.duniter.core.util.StringUtils; import org.duniter.core.util.json.JsonSyntaxException; import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.BlockDao; -import org.duniter.elasticsearch.model.SynchroExecution; import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; @@ -62,9 +58,7 @@ import org.elasticsearch.search.sort.SortOrder; import java.io.IOException; import java.util.*; -import java.util.stream.Collector; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Created by Benoit on 30/03/2015. @@ -136,15 +130,11 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { Preconditions.checkNotNull(block.getHash()); Preconditions.checkNotNull(block.getNumber()); - // Serialize into JSON - // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = getObjectMapper().writeValueAsString(block); - // Preparing UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString()) .setRefresh(true) - .setDoc(json); + .setDoc(getObjectMapper().writeValueAsBytes(block)); // Execute client.safeExecuteRequest(request, wait); @@ -285,78 +275,78 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { } @Override - public Map<String, String> getMembers(BlockchainParameters parameters) { + public List<Member> getMembers(BlockchainParameters parameters) { Preconditions.checkNotNull(parameters); - Number medianTime = client.getMandatoryTypedFieldById(parameters.getCurrency(), TYPE, "current", BlockchainBlock.PROPERTY_MEDIAN_TIME); - long startMedianTime = medianTime.longValue() - parameters.getMsValidity() - (parameters.getAvgGenTime() / 2); + long now = System.currentTimeMillis(); + Number currentMedianTime = client.getMandatoryTypedFieldById(parameters.getCurrency(), TYPE, "current", BlockchainBlock.PROPERTY_MEDIAN_TIME); + long startMedianTime = currentMedianTime.longValue() - parameters.getMsValidity() - (parameters.getAvgGenTime() / 2); + + int size = pluginSettings.getIndexBulkSize(); QueryBuilder withEvents = QueryBuilders.boolQuery() .minimumNumberShouldMatch(1) .should(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_JOINERS)) + .should(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_EXCLUDED)) .should(QueryBuilders.existsQuery(BlockchainBlock.PROPERTY_ACTIVES)); QueryBuilder timeQuery = QueryBuilders.rangeQuery(BlockchainBlock.PROPERTY_MEDIAN_TIME) .gte(startMedianTime); + SearchRequestBuilder req = client.prepareSearch(parameters.getCurrency()) + .setTypes(BlockDao.TYPE) + .setSize(size) + .addFields(BlockchainBlock.PROPERTY_JOINERS, + BlockchainBlock.PROPERTY_ACTIVES, + BlockchainBlock.PROPERTY_EXCLUDED, + BlockchainBlock.PROPERTY_LEAVERS, + BlockchainBlock.PROPERTY_REVOKED) + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().must(withEvents).must(timeQuery))) + .addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC) + .setFetchSource(false); + long total = -1; int from = 0; - int size = pluginSettings.getIndexBulkSize(); Map<String, String> results = Maps.newHashMap(); do { - SearchRequestBuilder req = client.prepareSearch(parameters.getCurrency()) - .setTypes(BlockDao.TYPE) - .setFrom(from) - .setSize(size) - .addFields(BlockchainBlock.PROPERTY_JOINERS, - BlockchainBlock.PROPERTY_ACTIVES, - BlockchainBlock.PROPERTY_EXCLUDED, - BlockchainBlock.PROPERTY_LEAVERS, - BlockchainBlock.PROPERTY_REVOKED) - .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().must(withEvents).must(timeQuery))) - .addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC) - .setFetchSource(false); SearchResponse response = req.execute().actionGet(); - if (total == -1) total = response.getHits().getTotalHits(); - - if (total > 0) { - for (SearchHit hit: response.getHits().getHits()) { - Map<String, SearchHitField> fields = hit.getFields(); - // membership IN - updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_JOINERS), true); - updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_ACTIVES), true); - // membership OUT - updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_EXCLUDED), false); - updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_LEAVERS), false); - updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_REVOKED), false); - } - } + toStream(response).forEach(hit -> { + Map<String, SearchHitField> fields = hit.getFields(); + // membership IN + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_JOINERS), true); + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_ACTIVES), true); + // membership OUT + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_EXCLUDED), false); + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_LEAVERS), false); + updateMembershipsMap(results, fields.get(BlockchainBlock.PROPERTY_REVOKED), false); + }); from += size; + req.setFrom(from); + if (total == -1) total = response.getHits().getTotalHits(); } while(from<total); - if (logger.isDebugEnabled()) logger.debug("Wot members found: " + results); - return results; - } - - private void updateMembershipsMap(Map<String, String> result, SearchHitField field, boolean membershipIn) { - List<Object> values = field != null ? field.values() : null; - if (CollectionUtils.isEmpty(values)) return; - for (Object value: values) { - String[] parts = value.toString().split(":"); - String pubkey = parts[0]; - if (membershipIn) { - String uid = parts[parts.length -1 ]; - result.put(pubkey, uid); - } - else { - result.remove(pubkey); + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] WoT has {%s} members, computed from {%s} blocks reading in %s ms.", + parameters.getCurrency(), + results.size(), + total, + System.currentTimeMillis() - now)); + if (logger.isTraceEnabled()) { + logger.trace(String.format("[%s] Wot members are: %s", parameters.getCurrency(), results)); } } - + return results.entrySet().stream().map(e -> { + Member member = new Member(); + member.setPubkey(e.getKey()); + member.setUid(e.getValue()); + member.setMember(true); + return member; + }).collect(Collectors.toList()); } + /** * Delete blocks from a start number (using bulk) * @param currencyName @@ -389,7 +379,39 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { client.prepareDelete(currencyName, TYPE, number).execute().actionGet(); } + @Override + public Set<String> getUniqueIssuersBetween(String currencyName, int start, int end) { + + int firstBlock = Math.max(0, start); + int lastBlock = Math.max(0, end); + + Preconditions.checkArgument(firstBlock<=lastBlock); + int length = lastBlock-firstBlock + 1; + Preconditions.checkArgument(length <= 1000, "Maximum size of range [start,end] is 1000, but got " + length); + + List<String> numbers = Lists.newArrayListWithCapacity(lastBlock-firstBlock + 1); + for (int i=start; i<=end; i++) numbers.add(String.valueOf(i)); + + QueryBuilder numbersQuery = QueryBuilders.idsQuery(TYPE).ids(numbers); + BoolQueryBuilder query = QueryBuilders.boolQuery().must(QueryBuilders.boolQuery() + .must(numbersQuery) + ); + + SearchRequestBuilder request = client.prepareSearch(currencyName) + .setTypes(TYPE) + .setSize(1000) + .setFetchSource(BlockchainBlock.PROPERTY_ISSUER, null) + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery() + .must(numbersQuery) + )) + .setFetchSource(false); + + return toStream(request.execute().actionGet()) + .map(hit -> (String)hit.getSource().get(BlockchainBlock.PROPERTY_ISSUER)) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } @Override public XContentBuilder createTypeMapping() { @@ -483,6 +505,24 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { /* -- Internal methods -- */ + + private void updateMembershipsMap(Map<String, String> result, SearchHitField field, boolean membershipIn) { + List<Object> values = field != null ? field.values() : null; + if (CollectionUtils.isEmpty(values)) return; + for (Object value: values) { + String[] parts = value.toString().split(":"); + String pubkey = parts[0]; + if (membershipIn) { + String uid = parts[parts.length -1 ]; + result.put(pubkey, uid); + } + else { + result.remove(pubkey); + } + } + + } + protected List<BlockchainBlock> toBlocks(SearchResponse response, boolean withHighlight) { // Read query result List<BlockchainBlock> result = Lists.newArrayList(); @@ -530,16 +570,15 @@ public class BlockDaoImpl extends AbstractDao implements BlockDao { .setTypes(TYPE) .setFrom(offset) .setSize(size) - .addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC) .setQuery(query) .setFetchSource(false); SearchResponse response = request.execute().actionGet(); - ids.addAll(toListIds(response)); + ids.addAll(executeAndGetIds(response)); if (total == -1) total = response.getHits().getTotalHits(); offset += size; } while (offset < total); - return ids.stream().mapToLong(Long::parseLong).toArray(); + return ids.stream().mapToLong(Long::parseLong).sorted().toArray(); } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java index d0ac965980494abb05e04a6384065c4b237d4318..218e364ebaa88cf544e5d503e9f1d4fc1120f3af 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/BlockStatDaoImpl.java @@ -110,14 +110,11 @@ public class BlockStatDaoImpl extends AbstractDao implements BlockStatDao { Preconditions.checkNotNull(block.getNumber()); // Serialize into JSON - // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) try { - String json = getObjectMapper().writeValueAsString(block); - // Preparing UpdateRequestBuilder request = client.prepareUpdate(block.getCurrency(), TYPE, block.getNumber().toString()) .setRefresh(true) - .setDoc(json); + .setDoc(getObjectMapper().writeValueAsBytes(block)); // Execute client.safeExecuteRequest(request, wait); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java index 61f73b7b2d12688f6f3bdf0d9c0dcfd3917ef3fb..00446a9b58815a3e88a41c293c2435cb84dbe495 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/CurrencyDaoImpl.java @@ -25,7 +25,6 @@ package org.duniter.elasticsearch.dao.impl; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Lists; import org.duniter.core.client.model.local.Currency; -import org.duniter.core.client.service.ServiceLocator; import org.duniter.core.client.util.KnownCurrencies; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.CollectionUtils; @@ -43,6 +42,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; /** * Created by blavenie on 29/12/15. @@ -50,7 +50,8 @@ import java.util.Map; public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> implements CurrencyExtendDao { protected static final String REGEX_WORD_SEPARATOR = "[-\\t@# _]+"; - private String defaultCurrency; + + private static String defaultId; public CurrencyDaoImpl(){ super(INDEX, RECORD_TYPE); @@ -65,18 +66,14 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp fillTags((org.duniter.core.client.model.elasticsearch.Currency)currency); } - // Serialize into JSON - byte[] json = getObjectMapper().writeValueAsBytes(currency); - // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_TYPE) + IndexRequestBuilder request = client.prepareIndex(INDEX, RECORD_TYPE) .setId(currency.getId()) - .setSource(json); + .setRefresh(true) + .setSource(getObjectMapper().writeValueAsBytes(currency)); // Execute indexBlocksFromNode - indexRequest - .setRefresh(true) - .execute().actionGet(); + client.safeExecuteRequest(request, true); } catch(JsonProcessingException e) { throw new TechnicalException(e); @@ -127,18 +124,27 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp } @Override - public List<Currency> getCurrencies(long accountId) { + public List<Currency> getAllByAccount(long accountId) { throw new TechnicalException("Not implemented yet"); } @Override - public List<String> getCurrencyIds() { + public List<Currency> getAll() { + SearchRequestBuilder request = client.prepareSearch(INDEX) + .setTypes(RECORD_TYPE) + .setSize(pluginSettings.getIndexBulkSize()) + .setFetchSource(true); + return toList(request.execute().actionGet(), Currency.class); + } + + @Override + public Set<String> getAllIds() { SearchRequestBuilder request = client.prepareSearch(INDEX) .setTypes(RECORD_TYPE) .setSize(pluginSettings.getIndexBulkSize()) .setFetchSource(false); - return toListIds(request.execute().actionGet()); + return executeAndGetIds(request.execute().actionGet()); } @Override @@ -220,16 +226,16 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp * Return the default currency * @return */ - public String getDefaultCurrencyName() { + public String getDefaultId() { - if (defaultCurrency != null) return defaultCurrency; + if (defaultId != null) return defaultId; boolean enableBlockchainIndexation = pluginSettings.enableBlockchainIndexation() && existsIndex(); try { - List<String> currencyIds = enableBlockchainIndexation ? getCurrencyIds() : null; - if (CollectionUtils.isNotEmpty(currencyIds)) { - defaultCurrency = currencyIds.get(0); - return defaultCurrency; + Set<String> ids = enableBlockchainIndexation ? getAllIds() : null; + if (CollectionUtils.isNotEmpty(ids)) { + defaultId = ids.iterator().next(); + return defaultId; } } catch(Throwable t) { // Continue (index not read yet?) @@ -255,7 +261,7 @@ public class CurrencyDaoImpl extends AbstractIndexTypeDao<CurrencyExtendDao> imp } protected void fillTags(org.duniter.core.client.model.elasticsearch.Currency currency) { - String currencyName = currency.getCurrencyName(); + String currencyName = currency.getId(); String[] tags = currencyName.split(REGEX_WORD_SEPARATOR); List<String> tagsList = Lists.newArrayList(tags); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/MemberDaoImpl.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/MemberDaoImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..37645c49223d5d8314045502a32c90167e10d066 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/MemberDaoImpl.java @@ -0,0 +1,296 @@ +package org.duniter.elasticsearch.dao.impl; + +/* + * #%L + * UCoin Java :: Core Client API + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Lists; +import org.duniter.core.client.model.local.Identity; +import org.duniter.core.client.model.local.Member; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.Preconditions; +import org.duniter.elasticsearch.dao.AbstractDao; +import org.duniter.elasticsearch.dao.MemberDao; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +/** + * Created by blavenie on 29/12/15. + */ +public class MemberDaoImpl extends AbstractDao implements MemberDao { + + public MemberDaoImpl(){ + super("duniter.dao.member"); + } + + @Override + public String getType() { + return TYPE; + } + + @Override + public List<Member> getMembers(String currencyId) { + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] Getting WoT members...", currencyId)); + } + + int bulkSize = pluginSettings.getIndexBulkSize(); + List<Member> result = Lists.newArrayList(); + + // Query = filter on isMember + BoolQueryBuilder query = QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Identity.PROPERTY_IS_MEMBER, true)); + + SearchRequestBuilder searchRequest = client.prepareSearch(currencyId) + .setFetchSource(true) + .setTypes(TYPE) + .setSize(bulkSize) + .setQuery(QueryBuilders.constantScoreQuery(query)); + + // Execute query, while there is some data + try { + + int counter = 0; + long now = System.currentTimeMillis(); + boolean loop = true; + SearchResponse response = searchRequest.execute().actionGet(); + + // Execute query, while there is some data + do { + + // Read response + List<Member> hits = toList(response, Member.class); + + // Add to result + if (CollectionUtils.size(hits) > 0) { + counter += hits.size(); + result.addAll(hits); + } + + // Prepare next iteration + if (counter == 0 || counter >= response.getHits().getTotalHits()) { + loop = false; + } + // Prepare next iteration + else { + searchRequest.setFrom(counter); + response = searchRequest.execute().actionGet(); + } + } while(loop); + + if (counter > 0 && logger.isDebugEnabled()) { + logger.debug(String.format("[%s] Get %s WoT members in %s ms", currencyId, counter, (System.currentTimeMillis() - now))); + } + + } catch (SearchPhaseExecutionException e) { + // Failed or no item on index + logger.error(String.format("Error while getting WoT members: %s.", e.getMessage()), e); + } + return result; + } + + @Override + public boolean isExists(String currencyId, String pubkey) { + return client.isDocumentExists(currencyId, TYPE, pubkey); + } + + @Override + public Identity create(Identity identity) { + Preconditions.checkNotNull(identity); + Preconditions.checkNotNull(identity.getPubkey()); + Preconditions.checkNotNull(identity.getCurrency()); + + // Serialize into JSON + try { + // Preparing indexBlocksFromNode + IndexRequestBuilder request = client.prepareIndex(identity.getCurrency(), TYPE) + .setId(identity.getPubkey()) + .setRefresh(true) + .setSource(getObjectMapper().writeValueAsBytes(identity)); + + // Execute indexBlocksFromNode + client.safeExecuteRequest(request, false); + } + catch(JsonProcessingException e) { + throw new TechnicalException(e); + } + return identity; + } + + @Override + public Identity update(Identity identity) { + Preconditions.checkNotNull(identity); + Preconditions.checkNotNull(identity.getPubkey()); + Preconditions.checkNotNull(identity.getCurrency()); + + // Serialize into JSON + try { + + // Preparing indexBlocksFromNode + UpdateRequestBuilder request = client.prepareUpdate(identity.getCurrency(), TYPE, identity.getPubkey()) + .setRefresh(true) + .setDoc(getObjectMapper().writeValueAsBytes(identity)); + + // Execute + client.safeExecuteRequest(request, false); + } + catch(JsonProcessingException e) { + throw new TechnicalException(e); + } + return identity; + } + + @Override + public Set<String> getMemberPubkeys(String currency) { + + QueryBuilder query = QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Member.PROPERTY_IS_MEMBER, true)); + + return executeAndGetIds(client.prepareSearch(currency) + .setTypes(TYPE) + .setQuery(QueryBuilders.constantScoreQuery(query)) + .setFetchSource(false)); + } + + @Override + public void save(String currency, List<Member> members) { + int bulkSize = pluginSettings.getIndexBulkSize(); + BulkRequestBuilder bulkRequest = client.prepareBulk(); + ObjectMapper objectMapper = getObjectMapper(); + + int counter = 0; + for (Member m: members) { + try { + boolean exists = isExists(currency, m.getPubkey()); + + // Add insert to bulk + if (!exists) { + bulkRequest.add( + client.prepareIndex(currency, TYPE) + .setId(m.getPubkey()) + .setSource(objectMapper.writeValueAsBytes(m)) + ); + } + // Add update to bulk + else { + bulkRequest.add( + client.prepareUpdate(currency, TYPE, m.getPubkey()) + .setDoc(objectMapper.writeValueAsBytes(m)) + ); + } + counter++; + } catch (Exception e) { + throw new TechnicalException(e); + } + + // Flush the bulk if not empty + if ((counter % bulkSize) == 0) { + client.flushBulk(bulkRequest); + bulkRequest = client.prepareBulk(); + } + } + + // Flush the bulk if not empty + if (counter > 0 && (counter % bulkSize) != 0) { + client.flushBulk(bulkRequest); + } + } + + @Override + public void updateAsWasMember(String currency, Collection<String> wasMemberPubkeys) { + int bulkSize = pluginSettings.getIndexBulkSize(); + BulkRequestBuilder bulkRequest = client.prepareBulk().setRefresh(true); + + int counter = 0; + + // Update old members (set wasMember to true) + for (String pubkey: wasMemberPubkeys) { + bulkRequest.add( + client.prepareUpdate(currency, TYPE, pubkey) + .setDoc(String.format("{\"%s\": false, \"%s\": true}", Member.PROPERTY_IS_MEMBER, Member.PROPERTY_WAS_MEMBER)) + ); + counter++; + // Flush the bulk if not empty + if ((counter % bulkSize) == 0) { + client.flushBulk(bulkRequest); + bulkRequest = client.prepareBulk(); + } + } + + // Flush the bulk if not empty + if (counter > 0 && (counter % bulkSize) != 0) { + client.flushBulk(bulkRequest); + } + } + + @Override + public XContentBuilder createTypeMapping() { + try { + XContentBuilder mapping = XContentFactory.jsonBuilder() + .startObject() + .startObject(TYPE) + .startObject("properties") + + // uid + .startObject(Identity.PROPERTY_UID) + .field("type", "string") + .endObject() + + // pubkey + .startObject(Identity.PROPERTY_PUBKEY) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // isMember + .startObject(Identity.PROPERTY_PUBKEY) + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + .endObject() + .endObject() + .endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException("Error while getting mapping for peer index: " + ioe.getMessage(), ioe); + } + } +} diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java index 0ad6d3c17f132fc1f5e16dc0765acf7ef399cfd7..c67bcf17a220cb997a15ee8f3e53c2de8312e3bf 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/dao/impl/PeerDaoImpl.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableList; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.bma.NetworkPeers; +import org.duniter.core.client.model.bma.NetworkWs2pHeads; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.model.local.Peers; import org.duniter.core.exception.TechnicalException; @@ -45,7 +46,6 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.NestedQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.AggregationBuilders; @@ -57,6 +57,7 @@ import java.util.Collection; import java.util.Date; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; /** * Created by blavenie on 29/12/15. @@ -147,8 +148,9 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { @Override public List<Peer> getPeersByCurrencyId(String currencyId) { - logger.warn("Calling method PeerSevice.getPeersByCurrencyId() may be unsafe, as it load all peers in memory. Applying workaround: return peer define in config."); - return ImmutableList.of(pluginSettings.checkAndGetPeer()); + // Loading all peers in memory may be unsafe ! + // Applying workaround: return only the Duniter peer defined in config. + return ImmutableList.of(pluginSettings.checkAndGetDuniterPeer()); } @Override @@ -211,6 +213,46 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { return Peers.toBmaPeers(toList(response, Peer.class)); } + @Override + public List<NetworkWs2pHeads.Head> getWs2pPeersByCurrencyId(String currencyId, String[] pubkeys) { + Preconditions.checkNotNull(currencyId); + + SearchRequestBuilder request = client.prepareSearch(currencyId) + .setTypes(TYPE) + .setSize(1000); + + BoolQueryBuilder query = QueryBuilders.boolQuery(); + + // Query = filter on UP status + NestedQueryBuilder statusQuery = QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, + QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name()))); + query.must(statusQuery); + + // Filter on pubkeys + if (CollectionUtils.isNotEmpty(pubkeys)) { + BoolQueryBuilder pubkeysQuery = QueryBuilders.boolQuery(); + pubkeysQuery.filter(QueryBuilders.termsQuery(Peer.PROPERTY_PUBKEY, pubkeys)); + query.must(pubkeysQuery); + } + + // Filter on WS2P api + if (CollectionUtils.isNotEmpty(pubkeys)) { + BoolQueryBuilder apiQuery = QueryBuilders.boolQuery(); + apiQuery.filter(QueryBuilders.termsQuery(Peer.PROPERTY_API, EndpointApi.WS2P.name())); + query.must(apiQuery); + } + + request.setQuery(QueryBuilders.constantScoreQuery(query)); + + SearchResponse response = request.execute().actionGet(); + return toList(response, Peer.class).stream() + .map(Peers::toWs2pHead) + // Skip if no message + .filter(head -> head.getMessage() != null) + .collect(Collectors.toList()); + } + @Override public boolean isExists(String currencyId, String peerId) { return client.isDocumentExists(currencyId, TYPE, peerId); @@ -256,7 +298,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { public void updatePeersAsDown(String currencyName, long upTimeLimitInSec, Collection<String> endpointApis) { if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] %s Setting peers as DOWN, if older than [%s]...", currencyName, endpointApis, new Date(upTimeLimitInSec*1000))); + logger.debug(String.format("[%s] %s Mark peers as DOWN when {last up time <= %s}...", currencyName, endpointApis, new Date(upTimeLimitInSec*1000))); } SearchRequestBuilder searchRequest = client.prepareSearch(currencyName) @@ -274,9 +316,9 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { NestedQueryBuilder statsQuery = QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, QueryBuilders.boolQuery() // lastUpTime < upTimeLimit - .filter(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(upTimeLimitInSec)) + .must(QueryBuilders.rangeQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_LAST_UP_TIME).lte(upTimeLimitInSec)) // status = UP - .filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name()))); + .must(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name()))); query.must(statsQuery); searchRequest.setQuery(QueryBuilders.constantScoreQuery(query)); @@ -330,7 +372,7 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { } if (counter > 0) { - logger.info(String.format("[%s] Updated %s peers status has DOWN", currencyName, counter)); + logger.info(String.format("[%s] %s peers DOWN", currencyName, counter)); } } catch (SearchPhaseExecutionException e) { diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/tyrus/TyrusWebSocketServer.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/tyrus/TyrusWebSocketServer.java index 93bb75190287483da04ed0e5dc6f2756a412ed3e..1697a7da138aed72750d0c0b047ab24504ff0b32 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/tyrus/TyrusWebSocketServer.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/http/tyrus/TyrusWebSocketServer.java @@ -45,6 +45,7 @@ import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.http.HttpServer; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestRequest; @@ -67,9 +68,10 @@ public class TyrusWebSocketServer { private List<Class<?>> endPoints = new ArrayList<>(); @Inject - public TyrusWebSocketServer(final PluginSettings pluginSettings, + public TyrusWebSocketServer(final Settings settings, + final PluginSettings pluginSettings, ThreadPool threadPool) { - logger = Loggers.getLogger("duniter.ws", pluginSettings.getSettings(), new String[0]); + logger = Loggers.getLogger("duniter.ws", settings, new String[0]); // If WS enable if (pluginSettings.getWebSocketEnable() && pluginSettings.getWebSocketPort() != null) { diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java index c8f497bd05c47bdaa3a958ba3aa844f897a06a2b..061839a157acfed0d9b23fbfc108b8a9230ee7ce 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/RestModule.java @@ -27,6 +27,7 @@ import org.duniter.elasticsearch.rest.blockchain.*; import org.duniter.elasticsearch.rest.network.RestNetworkPeeringGetAction; import org.duniter.elasticsearch.rest.network.RestNetworkPeeringPeersPostAction; import org.duniter.elasticsearch.rest.network.RestNetworkPeersGetAction; +import org.duniter.elasticsearch.rest.network.RestNetworkWs2pHeadsGetAction; import org.duniter.elasticsearch.rest.node.RestNodeSummaryGetAction; import org.duniter.elasticsearch.rest.security.RestSecurityAuthAction; import org.duniter.elasticsearch.rest.security.RestSecurityController; @@ -54,12 +55,13 @@ public class RestModule extends AbstractModule implements Module { bind(RestImageAttachmentAction.class).asEagerSingleton(); // Currency - //bind(RestCurrencyIndexAction.class).asEagerSingleton(); + //bind(RestCurrencyPostAction.class).asEagerSingleton(); // Network bind(RestNetworkPeeringGetAction.class).asEagerSingleton(); bind(RestNetworkPeeringPeersPostAction.class).asEagerSingleton(); bind(RestNetworkPeersGetAction.class).asEagerSingleton(); + bind(RestNetworkWs2pHeadsGetAction.class).asEagerSingleton(); // Blockchain bind(RestBlockchainParametersGetAction.class).asEagerSingleton(); @@ -67,6 +69,7 @@ public class RestModule extends AbstractModule implements Module { bind(RestBlockchainWithUdAction.class).asEagerSingleton(); bind(RestBlockchainWithNewcomersAction.class).asEagerSingleton(); bind(RestBlockchainBlocksGetAction.class).asEagerSingleton(); + bind(RestBlockchainDifficultiesGetAction.class).asEagerSingleton(); // Wot bind(RestWotLookupGetAction.class).asEagerSingleton(); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlockGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlockGetAction.java index e37fbe281cea613784616d31b4c0b4a8da2d4f0d..4a64336b6cfc1e7619105e0284a223108a8f2c52 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlockGetAction.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlockGetAction.java @@ -32,10 +32,13 @@ import org.duniter.elasticsearch.service.CurrencyService; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.*; +import org.elasticsearch.search.SearchHit; import java.io.IOException; @@ -70,12 +73,17 @@ public class RestBlockchainBlockGetAction extends BaseRestHandler { String currency = currencyService.safeGetCurrency(request.param("index")); String number = request.param("number"); boolean isCurrent = StringUtils.isBlank(number); + String[] includes = request.paramAsStringArray("_source", null); + String[] excludes = request.paramAsStringArray("_source_exclude", null); try { GetResponse response = client.prepareGet(currency, BlockDao.TYPE, isCurrent ? "current" : number) + .setFetchSource(includes, excludes) .execute().actionGet(); - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).rawValue(response.getSourceAsBytesRef()); - channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + + BytesStreamOutput bso = new BytesStreamOutput(); + response.getSourceAsBytesRef().writeTo(bso); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, XContentType.JSON.restContentType(), bso.bytes())); } catch(IOException ioe) { if (isCurrent) diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlocksGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlocksGetAction.java index ec3019ef99b198bd8290a814f8303a3558a9f9cd..fca9294be9913eb4f08a32836da064b2cfaa3cf5 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlocksGetAction.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainBlocksGetAction.java @@ -24,25 +24,28 @@ package org.duniter.elasticsearch.rest.blockchain; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.exception.TechnicalException; -import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.dao.BlockDao; import org.duniter.elasticsearch.rest.RestXContentBuilder; import org.duniter.elasticsearch.rest.XContentRestResponse; import org.duniter.elasticsearch.rest.security.RestSecurityController; import org.duniter.elasticsearch.service.CurrencyService; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.*; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; +import java.io.ByteArrayOutputStream; import java.io.IOException; /** @@ -51,7 +54,6 @@ import java.io.IOException; */ public class RestBlockchainBlocksGetAction extends BaseRestHandler { - private Client client; private CurrencyService currencyService; @Inject @@ -64,7 +66,6 @@ public class RestBlockchainBlocksGetAction extends BaseRestHandler { controller.registerHandler(RestRequest.Method.GET, "/blockchain/blocks/{count}/{from}", this); controller.registerHandler(RestRequest.Method.GET, "/{index}/blockchain/blocks/{count}/{from}", this); - this.client = client; this.currencyService = currencyService; } @@ -73,25 +74,33 @@ public class RestBlockchainBlocksGetAction extends BaseRestHandler { String currency = currencyService.safeGetCurrency(request.param("index")); int count = request.paramAsInt("count", 100); int from = request.paramAsInt("from", 0); + String[] includes = request.paramAsStringArray("_source", null); + String[] excludes = request.paramAsStringArray("_source_exclude", null); try { SearchRequestBuilder req = client.prepareSearch(currency) .setTypes(BlockDao.TYPE) .setFrom(0) .setSize(count) - .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().filter(QueryBuilders.rangeQuery(BlockchainBlock.PROPERTY_NUMBER).lte(from)))) - .setFetchSource(true) + .setQuery(QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery().filter(QueryBuilders.rangeQuery(BlockchainBlock.PROPERTY_NUMBER).gte(from)))) + .setFetchSource(includes, excludes) .addSort(BlockchainBlock.PROPERTY_NUMBER, SortOrder.ASC); + SearchResponse resp = req.execute().actionGet(); - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).startArray(); + BytesStreamOutput bso = new BytesStreamOutput(); - SearchResponse resp = req.execute().actionGet(); + boolean first = true; + bso.write('['); for (SearchHit hit: resp.getHits().getHits()) { - builder.rawValue(hit.getSourceRef()); + BytesReference bytes = hit.getSourceRef(); + if (bytes != null) { + if (!first) bso.write(','); + hit.getSourceRef().writeTo(bso); + first = false; + } } - builder.endArray(); - - channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + bso.write(']'); + channel.sendResponse(new BytesRestResponse(RestStatus.OK, XContentType.JSON.restContentType(), bso.bytes())); } catch(IOException ioe) { throw new TechnicalException(String.format("Error while generating JSON for [/blockchain/blocks/<count>/<from>]: %s", ioe.getMessage()), ioe); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainDifficultiesGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainDifficultiesGetAction.java new file mode 100644 index 0000000000000000000000000000000000000000..601e9d956a4374999f1876f7714681a4ff371e62 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/blockchain/RestBlockchainDifficultiesGetAction.java @@ -0,0 +1,108 @@ +package org.duniter.elasticsearch.rest.blockchain; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import org.apache.commons.collections4.MapUtils; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.rest.RestXContentBuilder; +import org.duniter.elasticsearch.rest.XContentRestResponse; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.CurrencyService; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.rest.*; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * A rest to post a request to process a new currency/peer. + * + */ +public class RestBlockchainDifficultiesGetAction extends BaseRestHandler { + + private Client client; + private CurrencyService currencyService; + private BlockchainService blockchainService; + + @Inject + public RestBlockchainDifficultiesGetAction(Settings settings, RestController controller, Client client, RestSecurityController securityController, + CurrencyService currencyService, + BlockchainService blockchainService) { + super(settings, controller, client); + + securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/blockchain/difficulties"); + + controller.registerHandler(RestRequest.Method.GET, "/blockchain/difficulties", this); + controller.registerHandler(RestRequest.Method.GET, "/{index}/blockchain/difficulties", this); + + this.client = client; + this.currencyService = currencyService; + this.blockchainService = blockchainService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) { + String currency = currencyService.safeGetCurrency(request.param("index")); + + + try { + int currentBlockNumber = blockchainService.getMaxBlockNumber(currency); + + final XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).startObject() + .field("block", currentBlockNumber + 1) + .startArray("levels"); + Map<String, Integer> difficulties = blockchainService.getDifficulties(currency); + + if (MapUtils.isNotEmpty(difficulties)) { + // Sort by level + List<Map.Entry<String, Integer>> difficultyEntries = Lists.newArrayList(difficulties.entrySet()); + difficultyEntries.sort(Map.Entry.comparingByValue()); + + // Add as json object + for (Map.Entry<String, Integer> level : difficultyEntries) { + builder.startObject() + .field("uid", level.getKey()) + .field("level", level.getValue()) + .endObject(); + } + } + builder.endArray().endObject(); + channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); + } catch(IOException ioe) { + throw new TechnicalException(String.format("Error while generating JSON for [/blockchain/difficulties]: %s", ioe.getMessage()), ioe); + } + } +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/currency/RestCurrencyIndexAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/currency/RestCurrencyPostAction.java similarity index 85% rename from cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/currency/RestCurrencyIndexAction.java rename to cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/currency/RestCurrencyPostAction.java index 674b7e720af4139b95600cb7f05b832b30ed3e3e..f11ccb6003b9c38688ea22b0dacf95b09386e0d6 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/currency/RestCurrencyIndexAction.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/currency/RestCurrencyPostAction.java @@ -35,11 +35,11 @@ import org.elasticsearch.rest.RestController; * A rest to post a request to process a new currency/peer. * */ -public class RestCurrencyIndexAction extends AbstractRestPostIndexAction { +public class RestCurrencyPostAction extends AbstractRestPostIndexAction { @Inject - public RestCurrencyIndexAction(Settings settings, RestController controller, Client client, - RestSecurityController securityController, CurrencyService currencyService) { + public RestCurrencyPostAction(Settings settings, RestController controller, Client client, + RestSecurityController securityController, CurrencyService currencyService) { super(settings, controller, client, securityController, CurrencyService.INDEX, CurrencyService.RECORD_TYPE, (json) -> { diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkWs2pHeadsGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkWs2pHeadsGetAction.java new file mode 100644 index 0000000000000000000000000000000000000000..df8a4404995b2e633a95592f1960d29b18589cc2 --- /dev/null +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/network/RestNetworkWs2pHeadsGetAction.java @@ -0,0 +1,91 @@ +package org.duniter.elasticsearch.rest.network; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.core.client.model.bma.NetworkPeers; +import org.duniter.core.client.model.bma.NetworkWs2pHeads; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.rest.JacksonJsonRestResponse; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.service.NetworkService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.*; + +import java.io.IOException; +import java.util.List; + +/** + * A rest to post a request to process a new currency/peer. + * + */ +public class RestNetworkWs2pHeadsGetAction extends BaseRestHandler { + + + private NetworkService networkService; + + @Inject + public RestNetworkWs2pHeadsGetAction(Settings settings, PluginSettings pluginSettings, RestController controller, Client client, RestSecurityController securityController, + NetworkService networkService) { + super(settings, controller, client); + + if (StringUtils.isBlank(pluginSettings.getClusterRemoteHost())) { + logger.warn(String.format("The cluster address can not be published on the network. /\\!\\\\ Fill in the options [cluster.remote.xxx] in the configuration (recommended).")); + } + else { + securityController.allow(RestRequest.Method.GET, "(/[^/]+)?/network/ws2p/heads"); + + controller.registerHandler(RestRequest.Method.GET, "/network/ws2p/heads", this); + controller.registerHandler(RestRequest.Method.GET, "/{currency}/network/ws2p/heads", this); + } + + this.networkService = networkService; + } + + @Override + protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { + String currency = request.param("currency"); + + + try { + NetworkWs2pHeads result = new NetworkWs2pHeads(); + List<NetworkWs2pHeads.Head> heads = networkService.getWs2pHeads(currency); + if (CollectionUtils.isNotEmpty(heads)) { + result.heads = heads.toArray(new NetworkWs2pHeads.Head[heads.size()]); + } + else { + result.heads = new NetworkWs2pHeads.Head[0]; + } + + channel.sendResponse(new JacksonJsonRestResponse(request, RestStatus.OK, result)); + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while generating JSON for [/network/peers]: %s", ioe.getMessage()), ioe); + } + } + +} \ No newline at end of file diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotMembersGetAction.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotMembersGetAction.java index 0a120b4c173cb0df7543256cede7c6eaf963a3ac..a59fb8af6cad9128c881f0a957d67fbb9187714f 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotMembersGetAction.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/rest/wot/RestWotMembersGetAction.java @@ -24,12 +24,11 @@ package org.duniter.elasticsearch.rest.wot; import com.fasterxml.jackson.databind.ObjectMapper; import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.client.model.local.Member; import org.duniter.core.exception.TechnicalException; -import org.duniter.elasticsearch.dao.BlockDao; import org.duniter.elasticsearch.rest.RestXContentBuilder; import org.duniter.elasticsearch.rest.XContentRestResponse; import org.duniter.elasticsearch.rest.security.RestSecurityController; -import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.service.WotService; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; @@ -38,7 +37,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.*; import java.io.IOException; -import java.util.Map; +import java.util.List; /** * A GET request similar as /wot/members in Duniter BMA API @@ -46,7 +45,6 @@ import java.util.Map; */ public class RestWotMembersGetAction extends BaseRestHandler { - private Client client; private WotService wotService; @Inject @@ -59,7 +57,6 @@ public class RestWotMembersGetAction extends BaseRestHandler { controller.registerHandler(RestRequest.Method.GET, "/wot/members", this); controller.registerHandler(RestRequest.Method.GET, "/{index}/members", this); - this.client = client; this.wotService = wotService; } @@ -67,22 +64,21 @@ public class RestWotMembersGetAction extends BaseRestHandler { protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception { String currency = request.param("index"); - try { - Map<String, String> members = wotService.getMembers(currency); + List<Member> members = wotService.getMembers(currency); XContentBuilder builder = RestXContentBuilder.restContentBuilder(request).startObject() .startArray("results"); - for (Map.Entry<String, String> entry: members.entrySet()) { + for (Member member: members) { builder.startObject() - .field("pubkey", entry.getKey()) - .field("uid", entry.getValue()) + .field("pubkey", member.getPubkey()) + .field("uid", member.getUid()) .endObject(); } builder.endArray().endObject(); channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); } catch(IOException ioe) { - throw new TechnicalException(String.format("Error while generating JSON for [/wot/lookup]: %s", ioe.getMessage()), ioe); + throw new TechnicalException(String.format("Error while generating JSON for [/wot/members]: %s", ioe.getMessage()), ioe); } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java index b3c74160afa3e5631b9d8a584bd4a1ca5e1a480d..19c1f7616d741f41d3b44065976f12758e753359 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainListenerService.java @@ -108,6 +108,9 @@ public class BlockchainListenerService extends AbstractBlockchainListenerService } }); } + + // TODO: get members IN + } protected void processBlockDelete(ChangeEvent change) { diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 94f8050021332dcad9fab299cacad13b6fa33aad..efb032fc0ea6e6ecdfabb1920cc7b68024b99459 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import org.duniter.core.client.dao.CurrencyDao; import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.BlockchainDifficulties; import org.duniter.core.client.model.bma.BlockchainParameters; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; @@ -62,6 +63,8 @@ import org.nuiton.i18n.I18n; import java.io.IOException; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; /** * Created by Benoit on 30/03/2015. @@ -86,6 +89,7 @@ public class BlockchainService extends AbstractService { private final JsonAttributeParser<String> blockPreviousHashParser = new JsonAttributeParser<>("previousHash", String.class); private SimpleCache<String, BlockchainParameters> blockchainParametersCurrencyIdCache; + private static Map<String, BlockchainParameters> blockchainParametersCurrencyId = new ConcurrentHashMap<>(); private BlockDao blockDao; private CurrencyExtendDao currencyDao; @@ -121,8 +125,14 @@ public class BlockchainService extends AbstractService { blockchainParametersCurrencyIdCache = new SimpleCache<String, BlockchainParameters>(/*eternal*/) { @Override public BlockchainParameters load(String currencyId) { - if (!isReady()) throw new IllegalStateException("Could not load blockchain parameters (service is not started)"); - return blockchainRemoteService.getParameters(currencyId); + if (blockchainParametersCurrencyId.containsKey(currencyId)) { + return blockchainParametersCurrencyId.get(currencyId); + } + checkReady(); + + BlockchainParameters parameters = blockchainRemoteService.getParameters(currencyId); + blockchainParametersCurrencyId.put(currencyId, parameters); + return parameters; } }; } @@ -534,6 +544,22 @@ public class BlockchainService extends AbstractService { } + public int getMaxBlockNumber(String currencyName) { + return blockDao.getMaxBlockNumber(currencyName); + } + + public Map<String, Integer> getDifficulties(String currencyName) { + checkReady(); + + // TODO: recompute difficulties, using BlockDao, instead of asking to remote peer + + Peer peer = pluginSettings.checkAndGetDuniterPeer(); + BlockchainDifficulties diff = blockchainRemoteService.getDifficulties(peer); + if (diff == null || diff.getLevels() == null) return null; + return Arrays.stream(diff.getLevels()) + .collect(Collectors.toMap(BlockchainDifficulties.DifficultyLevel::getUid, BlockchainDifficulties.DifficultyLevel::getLevel)); + } + /* -- Internal methods -- */ private Collection<String> indexBlocksNoBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel, boolean isLastCurrent) { @@ -585,7 +611,10 @@ public class BlockchainService extends AbstractService { // Or fill default using duniter node if (result == null && pluginSettings.enableBlockchainIndexation()) { - Peer peer = pluginSettings.checkAndGetPeer(); + + if (!isReady()) throw new IllegalStateException("Could not load blockchain parameters (service is not started)"); + + Peer peer = pluginSettings.checkAndGetDuniterPeer(); result = blockchainRemoteService.getParameters(peer); blockchainParametersCurrencyIdCache.put(result.getCurrency(), result); blockchainParametersCurrencyIdCache.put("DEFAULT", result); @@ -905,6 +934,10 @@ public class BlockchainService extends AbstractService { if (StringUtils.isNotBlank(currency)) return currency; - return currencyDao.getDefaultCurrencyName(); + return currencyDao.getDefaultId(); + } + + protected void checkReady() throws IllegalStateException{ + if (!isReady()) throw new IllegalStateException("Service not started"); } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java index 11df7407efe39c6c08f6bbc1dea3a24f8ab56c9d..51ad9d9b9b9330a64954fbe817a769208c93b8a4 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java @@ -32,6 +32,7 @@ import org.duniter.core.client.model.elasticsearch.Currency; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.bma.BlockchainRemoteService; import org.duniter.core.client.service.exception.HttpConnectException; +import org.duniter.core.client.service.exception.HttpTimeoutException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.Preconditions; @@ -102,7 +103,7 @@ public class CurrencyService extends AbstractService { public String safeGetCurrency(String currency) { if (StringUtils.isNotBlank(currency)) return currency; - return currencyDao.getDefaultCurrencyName(); + return currencyDao.getDefaultId(); } /** @@ -120,7 +121,7 @@ public class CurrencyService extends AbstractService { while(true) { try { return indexCurrencyFromPeer(peer); - } catch (HttpConnectException e) { + } catch (HttpConnectException | HttpTimeoutException e) { // log then retry logger.warn(String.format("[%s] Unable to connect. Retrying in 10s...", peer.toString())); } @@ -148,16 +149,15 @@ public class CurrencyService extends AbstractService { BlockchainBlock currentBlock = blockchainRemoteService.getCurrentBlock(peer); Long lastUD = blockchainRemoteService.getLastUD(peer); - Currency result = new Currency(); - result.setCurrencyName(parameters.getCurrency()); + result.setId(parameters.getCurrency()); result.setFirstBlockSignature(firstBlock.getSignature()); result.setMembersCount(currentBlock.getMembersCount()); result.setLastUD(lastUD); result.setParameters(parameters); // Save it - saveCurrency(result); + save(result); return result; } @@ -168,7 +168,7 @@ public class CurrencyService extends AbstractService { * @throws DuplicateIndexIdException * @throws AccessDeniedException if exists and user if not the original blockchain sender */ - public void saveCurrency(Currency currency) throws DuplicateIndexIdException { + public void save(Currency currency) throws DuplicateIndexIdException { Preconditions.checkNotNull(currency, "currency could not be null") ; Preconditions.checkNotNull(currency.getId(), "currency attribute 'currency' could not be null"); @@ -229,6 +229,10 @@ public class CurrencyService extends AbstractService { MovementDao movementDao = ServiceLocator.instance().getBean(MovementDao.class); createIndexRequestBuilder.addMapping(movementDao.getType(), movementDao.createTypeMapping()); + // Add wot type + MemberDao memberDao = ServiceLocator.instance().getBean(MemberDao.class); + createIndexRequestBuilder.addMapping(memberDao.getType(), memberDao.createTypeMapping()); + // Add blockStat type BlockStatDao blockStatDao = injector.getInstance(BlockStatDao.class); createIndexRequestBuilder.addMapping(blockStatDao.getType(), blockStatDao.createTypeMapping()); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java index d4f65e755e4b5661f22a68107a82742c85c59e19..bbb136faef991814fbb0e4db179c77538cc53e6c 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/NetworkService.java @@ -163,7 +163,7 @@ public class NetworkService extends AbstractService { public boolean hasSomePeers(Set<EndpointApi> peerApiFilters) { - List<String> currencyIds = currencyDao.getCurrencyIds(); + Set<String> currencyIds = currencyDao.getAllIds(); if (CollectionUtils.isEmpty(currencyIds)) return false; for (String currencyId: currencyIds) { @@ -266,6 +266,7 @@ public class NetworkService extends AbstractService { else { List<Peer> configIncludedPeers = getConfigIncludesPeers(currency); + // TODO: review this code ! if (CollectionUtils.isNotEmpty(configIncludedPeers)) { final long now = Math.round(System.currentTimeMillis() / 1000); configIncludedPeers.stream().forEach(peer -> { @@ -299,6 +300,58 @@ public class NetworkService extends AbstractService { } + public List<NetworkWs2pHeads.Head> getWs2pHeads(String currency) { + + // Retrieve the currency to use + currency = blockchainService.safeGetCurrency(currency); + + List<NetworkWs2pHeads.Head> result = null; + try { + + // Discovery enable: use index '/<currency>/peer' + if (pluginSettings.enableSynchroDiscovery()) { + result = peerDao.getWs2pPeersByCurrencyId(currency, null); + } + + // Discovery disable, so get it from Duniter node + else { + List<Peer> configIncludedPeers = getConfigIncludesPeers(currency); + + // TODO: review this code ! + if (CollectionUtils.isNotEmpty(configIncludedPeers)) { + final long now = Math.round(System.currentTimeMillis() / 1000); + configIncludedPeers.stream().forEach(peer -> { + try { + NetworkPeering peering = networkRemoteService.getPeering(peer); + peer.setPubkey(peering.getPubkey()); + peer.setCurrency(peering.getCurrency()); + Peer.Stats stats = peer.getStats(); + stats.setStatus(Peer.PeerStatus.UP); + stats.setLastUpTime(now); + String blockstamp = peering.getBlock(); + if (StringUtils.isNotBlank(blockstamp)) { + String[] blockParts = blockstamp.split("-"); + stats.setBlockNumber(Integer.parseInt(blockParts[0])); + stats.setBlockHash(blockParts[1]); + } + } catch(Exception e) { + logger.error(String.format("[%s] Error while getting peering document: %s", peer, e.getMessage()), e); + } + }); + + result = configIncludedPeers.stream().map(Peers::toWs2pHead).collect(Collectors.toList()); + } + } + return result; + } + catch (Exception e) { + logger.error("Could not get peers (BMA format)", e); + return result; + } + + } + + public boolean isEsNodeAliveAndValid(Peer peer) { Preconditions.checkNotNull(peer); Preconditions.checkNotNull(peer.getCurrency()); @@ -542,9 +595,9 @@ public class NetworkService extends AbstractService { } protected void publishPeerDocumentToNetwork() { - List<String> currencyIds; + Set<String> currencyIds; try { - currencyIds = currencyDao.getCurrencyIds(); + currencyIds = currencyDao.getAllIds(); } catch (Exception e) { logger.error("Could not retrieve indexed currencies", e); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java index 1dcec71da8813bef98d2bfa4695e350a18363d1b..a6aa0d48925871cb2c083ef7cced57260e1023c8 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/PeerService.java @@ -26,6 +26,7 @@ package org.duniter.elasticsearch.service; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import org.duniter.core.client.dao.PeerDao; +import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.BlockchainParameters; import org.duniter.core.client.model.bma.EndpointApi; import org.duniter.core.client.model.local.Peer; @@ -35,6 +36,7 @@ import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.Preconditions; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; +import org.duniter.elasticsearch.dao.BlockDao; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; import org.nuiton.i18n.I18n; @@ -122,9 +124,15 @@ public class PeerService extends AbstractService { try { logger.info(I18n.t("duniter4j.es.networkService.indexPeers.task", currency, firstPeer)); + // Default filter NetworkService.Filter filterDef = getDefaultFilter(currency); + Number currentNumber = client.getTypedFieldById(currency, BlockDao.TYPE, "current", BlockchainBlock.PROPERTY_NUMBER); + if (currentNumber != null) { + filterDef.minBlockNumber = currentNumber.intValue() - 100; + } + // Default sort org.duniter.core.client.service.local.NetworkService.Sort sortDef = new org.duniter.core.client.service.local.NetworkService.Sort(); sortDef.sortType = null; @@ -165,7 +173,7 @@ public class PeerService extends AbstractService { } if (refreshPeers) { - final Peer mainPeer = pluginSettings.checkAndGetPeer(); + final Peer mainPeer = pluginSettings.checkAndGetDuniterPeer(); // Async refresh networkService.refreshPeersAsync(mainPeer, peers, threadPool.scheduler()) @@ -190,21 +198,21 @@ public class PeerService extends AbstractService { logger.error(I18n.t("duniter4j.es.networkService.indexPeers.remoteParametersError", mainPeer)); return; } - String currencyName = parameter.getCurrency(); + String currency = parameter.getCurrency(); // Default filter NetworkService.Filter filterDef = new NetworkService.Filter(); filterDef.filterType = null; filterDef.filterStatus = Peer.PeerStatus.UP; filterDef.filterEndpoints = ImmutableList.copyOf(indexedEndpointApis); - filterDef.currency = currencyName; + filterDef.currency = currency; // Default sort NetworkService.Sort sortDef = new NetworkService.Sort(); sortDef.sortType = null; networkService.addPeersChangeListener(mainPeer, - peers -> logger.debug(String.format("[%s] Update peers: %s found", currencyName, CollectionUtils.size(peers))), + peers -> logger.info(String.format("[%s] %s peers UP", currency, CollectionUtils.size(peers))), filterDef, sortDef, true /*autoreconnect*/, threadPool.scheduler()); } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java index a59de6884a96f78f2377d57319c5f5ffc17cbf12..bc2897ff9345048f853d3f5b12a5e5430ad06ab1 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java @@ -40,9 +40,12 @@ import org.duniter.core.service.MailService; import org.duniter.core.service.MailServiceImpl; import org.duniter.elasticsearch.beans.ESBeanFactory; import org.duniter.elasticsearch.dao.BlockDao; +import org.duniter.elasticsearch.dao.MemberDao; import org.duniter.elasticsearch.dao.impl.BlockDaoImpl; import org.duniter.elasticsearch.dao.impl.CurrencyDaoImpl; import org.duniter.elasticsearch.dao.impl.PeerDaoImpl; +import org.duniter.elasticsearch.dao.impl.MemberDaoImpl; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.logging.ESLogger; @@ -59,13 +62,14 @@ public class ServiceLocator private static ESBeanFactory beanFactory = null; @Inject - public ServiceLocator() { + public ServiceLocator(ThreadPool threadPool) { super(getOrCreateBeanFactory()); if (logger.isDebugEnabled()) { - logger.debug("Starting Duniter4j ServiceLocator..."); + logger.debug("Starting ES ServiceLocator..."); } org.duniter.core.client.service.ServiceLocator.setInstance(this); + } @Override @@ -105,6 +109,7 @@ public class ServiceLocator .bind(CurrencyDao.class, CurrencyDaoImpl.class) .bind(PeerDao.class, PeerDaoImpl.class) .bind(BlockDao.class, BlockDaoImpl.class) + .bind(MemberDao.class, MemberDaoImpl.class) .add(DataContext.class); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/WotService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/WotService.java index 5f2babb341e0b5d8f8bc4398538790d232327065..d03a7c34a34d9b3d5a7157e0c03996aab2754213 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/WotService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/WotService.java @@ -23,89 +23,208 @@ package org.duniter.elasticsearch.service; */ -import com.google.common.base.Objects; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; +import com.google.common.collect.ImmutableList; import org.duniter.core.client.dao.CurrencyDao; -import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.BlockchainParameters; -import org.duniter.core.client.model.bma.EndpointApi; -import org.duniter.core.client.model.local.Peer; -import org.duniter.core.client.service.bma.BlockchainRemoteService; -import org.duniter.core.client.service.bma.NetworkRemoteService; +import org.duniter.core.client.model.local.Member; import org.duniter.core.client.service.bma.WotRemoteService; -import org.duniter.core.client.service.exception.BlockNotFoundException; -import org.duniter.core.client.util.KnownBlocks; -import org.duniter.core.client.util.KnownCurrencies; -import org.duniter.core.exception.TechnicalException; -import org.duniter.core.model.NullProgressionModel; -import org.duniter.core.model.ProgressionModel; -import org.duniter.core.model.ProgressionModelImpl; import org.duniter.core.util.CollectionUtils; -import org.duniter.core.util.ObjectUtils; +import org.duniter.core.util.LockManager; import org.duniter.core.util.Preconditions; import org.duniter.core.util.StringUtils; -import org.duniter.core.util.cache.SimpleCache; -import org.duniter.core.util.json.JsonAttributeParser; -import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.client.Duniter4jClient; import org.duniter.elasticsearch.dao.BlockDao; import org.duniter.elasticsearch.dao.CurrencyExtendDao; -import org.duniter.elasticsearch.exception.DuplicateIndexIdException; +import org.duniter.elasticsearch.dao.MemberDao; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; import org.duniter.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.common.inject.Inject; -import org.nuiton.i18n.I18n; -import java.io.IOException; import java.util.*; +import java.util.concurrent.TimeUnit; /** * Created by Benoit on 30/03/2015. */ public class WotService extends AbstractService { + private static final String LOCK_NAME_COMPUTE_MEMBERS = "Index WoT members"; + + private BlockDao blockDao; + private MemberDao memberDao; private CurrencyExtendDao currencyDao; private WotRemoteService wotRemoteService; private BlockchainService blockchainService; + private ThreadPool threadPool; + private final LockManager lockManager = new LockManager(4, 10); @Inject public WotService(Duniter4jClient client, PluginSettings settings, ThreadPool threadPool, BlockDao blockDao, + MemberDao memberDao, CurrencyDao currencyDao, BlockchainService blockchainService, final ServiceLocator serviceLocator){ super("duniter.wot", client, settings); this.client = client; this.blockDao = blockDao; + this.memberDao = memberDao; this.currencyDao = (CurrencyExtendDao) currencyDao; this.blockchainService = blockchainService; - threadPool.scheduleOnStarted(() -> { + this.threadPool = threadPool; + this.threadPool.scheduleOnStarted(() -> { wotRemoteService = serviceLocator.getWotRemoteService(); setIsReady(true); }); } - public Map<String, String> getMembers(String currency) { + public List<Member> getMembers(String currency) { - currency = safeGetCurrency(currency); + final String currencyId = safeGetCurrency(currency); + // Index is enable: use dao if (pluginSettings.enableBlockchainIndexation()) { - BlockchainParameters p = blockchainService.getParameters(currency); - return blockDao.getMembers(p); + + List<Member> members = memberDao.getMembers(currencyId); + + // No members, or not indexed yet ? + if (CollectionUtils.isEmpty(members)) { + logger.warn("No member found. Trying to index members..."); + return indexAndGetMembers(currencyId); + } + + return members; + } + else { + return wotRemoteService.getMembers(currencyId); + } + + } + + public void save(String currencyId, final List<Member> members) { + // skip if nothing to save + if (CollectionUtils.isEmpty(members)) return; + + memberDao.save(currencyId, members); + } + + + + public Member save(final Member member) { + Preconditions.checkNotNull(member); + Preconditions.checkNotNull(member.getCurrency()); + Preconditions.checkNotNull(member.getPubkey()); + Preconditions.checkNotNull(member.getUid()); + + boolean exists = memberDao.isExists(member.getCurrency(), member.getPubkey()); + + // Create + if (!exists) { + memberDao.create(member); } + + // or update else { - // TODO: check if it works ! - return wotRemoteService.getMembersUids(currency); + memberDao.update(member); } + return member; + } + + public WotService indexMembers(final String currency) { + indexAndGetMembers(currency); + return this; + } + + public WotService listenAndIndexMembers(final String currency) { + + // Listen changes on block + ChangeService.registerListener(new ChangeService.ChangeListener() { + @Override + public String getId() { + return "duniter.wot"; + } + @Override + public Collection<ChangeSource> getChangeSources() { + return ImmutableList.of(new ChangeSource(currency, BlockDao.TYPE, "current")); + } + @Override + public void onChange(ChangeEvent change) { + // If current block indexed + switch (change.getOperation()) { + case CREATE: + case INDEX: + logger.debug(String.format("[%s] Scheduling indexation of WoT members", currency)); + threadPool.schedule(() -> { + try { + // Acquire lock (once members indexation at a time) + if (lockManager.tryLock(LOCK_NAME_COMPUTE_MEMBERS, 10, TimeUnit.SECONDS)) { + try { + indexMembers(currency); + } + catch (Exception e) { + logger.error("Error while indexing WoT members: " + e.getMessage(), e); + } + finally { + // Release the lock + lockManager.unlock(LOCK_NAME_COMPUTE_MEMBERS); + } + } + else { + logger.debug("Could not acquire lock for indexing members. Skipping."); + } + } catch (InterruptedException e) { + logger.warn("Stopping indexation of WoT members: " + e.getMessage()); + } + }, 30, TimeUnit.SECONDS); + break; + default: + // Skip deletion + break; + } + + } + }); + + return this; + } + + /* -- protected methods -- */ + + protected List<Member> indexAndGetMembers(final String currency) { + + logger.info(String.format("[%s] Indexing WoT members...", currency)); + + Set<String> wasMemberPubkeys = memberDao.getMemberPubkeys(currency); + + BlockchainParameters p = blockchainService.getParameters(currency); + List<Member> members = blockDao.getMembers(p); + + // Save members into index + if (CollectionUtils.isNotEmpty(members)) { + // Set currency + members.forEach(m -> { + wasMemberPubkeys.remove(m.getPubkey()); + m.setCurrency(currency); + }); + + // Save members + memberDao.save(currency, members); + } + + // Update old members as "was member" + if (CollectionUtils.isNotEmpty(wasMemberPubkeys)) { + memberDao.updateAsWasMember(currency, wasMemberPubkeys); + } + + logger.info(String.format("[%s] Indexing WoT members [OK]", currency)); + return members; } /** @@ -115,6 +234,6 @@ public class WotService extends AbstractService { */ protected String safeGetCurrency(String currency) { if (StringUtils.isNotBlank(currency)) return currency; - return currencyDao.getDefaultCurrencyName(); + return currencyDao.getDefaultId(); } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java index 70ae0b7f30f69773e5fc758b5de3d4731c00ba83..d86cc7b01e612679d0ca49eb8edbb37f1b7efc5f 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java @@ -120,11 +120,11 @@ public class ChangeEvent { if (sourceText != null) return sourceText; if (source == null) return null; try { - XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput()); - builder.rawValue(source); - sourceText = builder.string(); + //XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput()); + //builder.rawValue(source); + sourceText = new String(source.toBytesArray().toBytes(), "UTF8"); return sourceText; - } catch (IOException e) { + } catch (Exception e) { throw new TechnicalException("Error while generating JSON from source", e); } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java index ae966abf5a961c1be84c8976e60bad2341f7f873..dd968e00e2d8023d679e75af709c38445ff9237a 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/synchro/SynchroService.java @@ -159,9 +159,9 @@ public class SynchroService extends AbstractService { closeWsClientEndpoints(); } - List<String> currencyIds; + Set<String> currencyIds; try { - currencyIds = currencyDao.getCurrencyIds(); + currencyIds = currencyDao.getAllIds(); } catch (Exception e) { logger.error("Could not retrieve indexed currencies", e); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java index 51ec6cd42134f9c756f388482a0e348a61d2ce00..eb9020274487cb9a4417d8b90b33b0a7d7b1ac77 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java @@ -52,7 +52,7 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { private ScheduledThreadPoolExecutor scheduler = null; private final Injector injector; - private final ESLogger logger = Loggers.getLogger("duniter.threadpool"); + private final ESLogger logger; private final org.elasticsearch.threadpool.ThreadPool delegate; @@ -64,6 +64,7 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { org.elasticsearch.threadpool.ThreadPool esThreadPool ) { super(settings); + this.logger = Loggers.getLogger("duniter.threadpool", settings, new String[0]); this.injector = injector; this.afterStartedCommands = Lists.newArrayList(); @@ -71,7 +72,7 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); this.scheduler = new LoggingScheduledThreadPoolExecutor(logger, availableProcessors, - EsExecutors.daemonThreadFactory(settings, "duniter-scheduler"), + EsExecutors.daemonThreadFactory(settings, "cesium_plus_scheduler"), new RetryPolicy(1, TimeUnit.SECONDS)); this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketBlockHandler.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketBlockHandler.java index f79e03d8304d455b8a72331d565a6158e92eb1bd..345fcb3263d8414eff1ba5205cfc0d13c72e5e21 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketBlockHandler.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketBlockHandler.java @@ -28,6 +28,7 @@ import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.StringUtils; import org.duniter.core.util.json.JsonAttributeParser; +import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.http.netty.NettyWebSocketServer; import org.duniter.elasticsearch.http.netty.websocket.NettyBaseWebSocketEndpoint; import org.duniter.elasticsearch.http.netty.websocket.NettyWebSocketSession; @@ -42,6 +43,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -65,11 +67,12 @@ public class NettyWebSocketBlockHandler extends NettyBaseWebSocketEndpoint imple public static class Init { @Inject - public Init( NettyWebSocketServer webSocketServer, - CurrencyService currencyService, - BlockchainService blockchainService, - ThreadPool threadPool) { - logger = Loggers.getLogger("duniter.ws.block"); + public Init(PluginSettings pluginSettings, + NettyWebSocketServer webSocketServer, + CurrencyService currencyService, + BlockchainService blockchainService, + ThreadPool threadPool) { + logger = Loggers.getLogger("duniter.ws.block", pluginSettings.getSettings(), new String[0]); NettyWebSocketBlockHandler.currencyService = currencyService; NettyWebSocketBlockHandler.blockchainService = blockchainService; @@ -132,11 +135,10 @@ public class NettyWebSocketBlockHandler extends NettyBaseWebSocketEndpoint imple public void onChange(ChangeEvent event) { switch (event.getOperation()) { case CREATE: - //case INDEX: sendSourceIfNotNull(event); break; default: - // Ignoring (if delete) + // Ignoring (if delete or update) } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketChangesHandler.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketChangesHandler.java index e28c0628b18acf3a164675b383eba7c5aad20527..d28a67a0c4cd18ef2f517afc44c2908b37ef46d3 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketChangesHandler.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketChangesHandler.java @@ -73,7 +73,7 @@ public class NettyWebSocketChangesHandler extends NettyBaseWebSocketEndpoint imp @Inject public Init(NettyWebSocketServer webSocketServer, PluginSettings pluginSettings) { - logger = Loggers.getLogger("duniter.ws.changes"); + logger = Loggers.getLogger("duniter.ws.changes", pluginSettings.getSettings(), new String[0]); // Init default sources final String[] sourcesStr = pluginSettings.getWebSocketChangesListenSource(); diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketPeerHandler.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketPeerHandler.java index aaea137b9397c8f1f126fd800390d86e9e4c9498..78f3a8a10e742a36f3b2df1bf58ec8852e7b3571 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketPeerHandler.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/netty/NettyWebSocketPeerHandler.java @@ -37,6 +37,7 @@ import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import javax.websocket.CloseReason; import java.io.IOException; @@ -53,10 +54,11 @@ public class NettyWebSocketPeerHandler extends NettyBaseWebSocketEndpoint implem public static class Init { @Inject - public Init( NettyWebSocketServer webSocketServer, - CurrencyService currencyService, - ThreadPool threadPool) { - logger = Loggers.getLogger("duniter.ws.peer"); + public Init(Settings settings, + NettyWebSocketServer webSocketServer, + CurrencyService currencyService, + ThreadPool threadPool) { + logger = Loggers.getLogger("duniter.ws.peer", settings, new String[0]); NettyWebSocketPeerHandler.currencyService = currencyService; diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketBlockEndPoint.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketBlockEndPoint.java index 2912e3986a2dfeefd53fd1dc990cc8281ccddd52..4b4eef7c257d5ce0429d72c50b4fd1ae12ebae0f 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketBlockEndPoint.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketBlockEndPoint.java @@ -37,6 +37,7 @@ import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -59,19 +60,18 @@ public class WebSocketBlockEndPoint implements ChangeService.ChangeListener{ @Inject - public Init(TyrusWebSocketServer webSocketServer, + public Init(Settings settings, + TyrusWebSocketServer webSocketServer, CurrencyService currencyService, BlockchainService blockchainService, ThreadPool threadPool) { + logger = Loggers.getLogger("duniter.ws.block", settings, new String[0]); + webSocketServer.addEndPoint(WebSocketBlockEndPoint.class); WebSocketBlockEndPoint.currencyService = currencyService; WebSocketBlockEndPoint.blockchainService = blockchainService; - logger = Loggers.getLogger("duniter.ws.block"); - //server.addLifecycleListener(); - threadPool.scheduleOnClusterReady(() -> { - isReady = true; - }); + threadPool.scheduleOnClusterReady(() -> isReady = true); } } diff --git a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketChangesEndPoint.java b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketChangesEndPoint.java index 0f5ff58c0e1983dda7afe09f26f1f2d3a3911ee8..08f287d6b3ac92b94af18dc84b76174baccfa75b 100644 --- a/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketChangesEndPoint.java +++ b/cesium-plus-pod-core/src/main/java/org/duniter/elasticsearch/websocket/tyrus/WebSocketChangesEndPoint.java @@ -49,6 +49,7 @@ import org.duniter.elasticsearch.service.changes.ChangeSource; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; @@ -67,7 +68,7 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ @Inject public Init(TyrusWebSocketServer webSocketServer, PluginSettings pluginSettings) { - logger = Loggers.getLogger("duniter.ws.changes"); + logger = Loggers.getLogger("duniter.ws.changes", pluginSettings.getSettings(), new String[0]); webSocketServer.addEndPoint(WebSocketChangesEndPoint.class); final String[] sourcesStr = pluginSettings.getWebSocketChangesListenSource(); List<ChangeSource> sources = new ArrayList<>(); diff --git a/cesium-plus-pod-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java b/cesium-plus-pod-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java index d78efbe8567704bb03dc41001c4e47aaff7dbc31..9d7cb09ef37eb08fd8c11b587c643cf7fdfeccee 100644 --- a/cesium-plus-pod-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java +++ b/cesium-plus-pod-core/src/test/java/org/duniter/elasticsearch/service/PeerServiceTest.java @@ -94,7 +94,7 @@ public class PeerServiceTest { peer2.getStats().setLastUpTime(peer1.getStats().getLastUpTime() - 150); // Set UP just before the peer 1 // Save peers - localService.save(peer1.getCurrency(), ImmutableList.of(peer1, peer2), false); + localService.save(peer1.getCurrency(), ImmutableList.of(peer1, peer2)); // Wait propagation Thread.sleep(2000); diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java index fdc05f1b3a43a7c579ea4a815212c76dc68d6030..25f64e082c6abd6ee0ab7980b679cf83e9376893 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/PluginSettings.java @@ -66,6 +66,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } + public Settings getSettings() { + return delegate.getSettings(); + } + public org.duniter.elasticsearch.PluginSettings getDelegate() { return delegate; } @@ -174,11 +178,11 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { } public String getNodeBmaHost() { - return delegate.getNodeBmaHost(); + return delegate.getDuniterNodeHost(); } public int getNodeBmaPort() { - return delegate.getNodeBmaPort(); + return delegate.getDuniterNodePort(); } public int getIndexBulkSize() { diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index d80732cb115d6babb026721df5379d85bf156c6b..846e3789541ff96222335a6036dcbadd722d5f8d 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -170,8 +170,8 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic errorNotified = false; adminService.notifyAdmin(UserEvent.newBuilder(UserEvent.EventType.INFO, UserEventCodes.NODE_BMA_UP.name()) .setMessage(I18n.n("duniter.user.event.NODE_BMA_UP"), - pluginSettings.getNodeBmaHost(), - String.valueOf(pluginSettings.getNodeBmaPort()), + pluginSettings.getDuniterNodeHost(), + String.valueOf(pluginSettings.getDuniterNodePort()), pluginSettings.getClusterName()) .build()); } @@ -188,8 +188,8 @@ public class BlockchainUserEventService extends AbstractBlockchainListenerServic errorNotified = true; adminService.notifyAdmin(UserEvent.newBuilder(UserEvent.EventType.ERROR, UserEventCodes.NODE_BMA_DOWN.name()) .setMessage(I18n.n("duniter.user.event.NODE_BMA_DOWN"), - pluginSettings.getNodeBmaHost(), - String.valueOf(pluginSettings.getNodeBmaPort()), + pluginSettings.getDuniterNodeHost(), + String.valueOf(pluginSettings.getDuniterNodePort()), pluginSettings.getClusterName(), String.valueOf(lastTimeUp)) .build()); diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/netty/NettyWebSocketUserEventHandler.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/netty/NettyWebSocketUserEventHandler.java index 64274a9412c16afbc8b749a410f848a4a217b6d6..f273918cfd134e53bf1a585c61de5c1e09520c95 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/netty/NettyWebSocketUserEventHandler.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/netty/NettyWebSocketUserEventHandler.java @@ -73,7 +73,7 @@ public class NettyWebSocketUserEventHandler extends NettyBaseWebSocketEndpoint i @Inject public Init(NettyWebSocketServer webSocketServer, PluginSettings pluginSettings) { - logger = Loggers.getLogger("duniter.ws.event"); + logger = Loggers.getLogger("duniter.ws.event", pluginSettings.getSettings(), new String[0]); // Default locale defaultLocale = pluginSettings.getI18nLocale(); diff --git a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/tyrus/WebsocketUserEventEndPoint.java b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/tyrus/WebsocketUserEventEndPoint.java index 3deaa212c3dfdf6f1bcb7f285471232f88dc68b0..b0e5aa63844ed768e58a5defd23d1c8ac1c5e4d3 100644 --- a/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/tyrus/WebsocketUserEventEndPoint.java +++ b/cesium-plus-pod-user/src/main/java/org/duniter/elasticsearch/user/websocket/tyrus/WebsocketUserEventEndPoint.java @@ -49,6 +49,7 @@ import org.duniter.elasticsearch.user.service.UserEventService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; import org.nuiton.i18n.I18n; import javax.websocket.*; @@ -67,8 +68,8 @@ public class WebsocketUserEventEndPoint implements UserEventService.UserEventLis public static class Init { @Inject - public Init(TyrusWebSocketServer webSocketServer, PluginSettings pluginSettings) { - logger = Loggers.getLogger("duniter.ws.user.event"); + public Init(TyrusWebSocketServer webSocketServer, Settings settings, PluginSettings pluginSettings) { + logger = Loggers.getLogger("duniter.ws.user.event", pluginSettings.getSettings(), new String[0]); defaultLocale = pluginSettings.getI18nLocale(); if (defaultLocale == null) defaultLocale = new Locale("en", "GB"); diff --git a/pom.xml b/pom.xml index 2c3b93b14cc74761171f9cf025ffcd49addab047..8caf7f15be8e069ec1bb9efe934461d0b4d74a26 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ <signatureVersion>1.0</signatureVersion> <!-- Commons versions --> - <duniter4j.version>1.2.5</duniter4j.version> + <duniter4j.version>1.2.8</duniter4j.version> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.6</slf4j.version> <guava.version>22.0</guava.version>