diff --git a/.gitignore b/.gitignore index 5e9a2324a3bde2260dc46b7f147324b5c802c316..535e5df50329583e8df5cbc48e38e0304813433f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ target */target duniter4j-elasticsearch/src/main/misc/.index.sh.kate-swp duniter4j-elasticsearch/src/main/misc/fr-cities.ods -duniter4j-elasticsearch/src/main/misc/geoflar-communes-2015.geojson \ No newline at end of file +duniter4j-elasticsearch/src/main/misc/geoflar-communes-2015.geojson +duniter4j.iml \ No newline at end of file diff --git a/README.md b/README.md index cd971ee76f3795127863bbaacdcfdb0139502fea..45450e15235d53272aaa03cbd0ce9173a8f687b2 100644 --- a/README.md +++ b/README.md @@ -11,13 +11,24 @@ duniter4j has four main components : - core-client: a Client API to access to a Duniter network. - - elasticsearch: a tools to index all blockchain and more. + - elasticsearch: a ES plugin, to store blockchain, registry, market and more. - - web: an web/mobile client, for data navigation, payment and more ! + +## Install + + - Install ElasticSearch 2.3.3 + + - Install plugins : + + /bin/plugin install elastic/elasticsearch + + /bin/plugin install duniter/duniter4j ## Test it + + The elasticsearch component is ready to use ! - Install Java JRE 8 or more. diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java index 668013b5a77b71341c985c70f2ff224a9f55ae3d..b399f51d3b652288c98204e1ca4adc0f31af46a7 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/ServiceLocator.java @@ -95,6 +95,10 @@ public class ServiceLocator implements Closeable { return instance; } + protected void setBeanFactory(BeanFactory beanFactory) { + this.beanFactory = beanFactory; + } + public BlockchainRemoteService getBlockchainRemoteService() { return getBean(BlockchainRemoteService.class); } diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/beans/BeanFactory.java b/duniter4j-core-shared/src/main/java/org/duniter/core/beans/BeanFactory.java index d1c767bb7bbf5de21d262999970c54083b350f3a..2b69210d3f752e0036901e409ac1fb574247c157 100644 --- a/duniter4j-core-shared/src/main/java/org/duniter/core/beans/BeanFactory.java +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/beans/BeanFactory.java @@ -79,33 +79,37 @@ public class BeanFactory implements Closeable{ log.trace(String.format("Asking bean on type [%s]...", clazz.getName())); } - for (Bean bean: beansLoader) { + synchronized (beansLoader) { + for (Bean bean : beansLoader) { - if (clazz.isInstance(bean)) { - if (log.isDebugEnabled()) { - log.debug(String.format(" Creating new bean of type [%s]", clazz.getName())); + if (clazz.isInstance(bean)) { + if (log.isDebugEnabled()) { + log.debug(String.format(" Creating new bean of type [%s]", clazz.getName())); + } + return (S) bean; } - return (S)bean; } } - for (Map.Entry<Class<? extends Bean>, Class<? extends Bean>> beanDef : beansClassMap.entrySet()) { - if (log.isTraceEnabled()) { - log.trace(String.format(" Check against type [%s]", beanDef.getKey().getName())); - } - if (clazz.equals(beanDef.getKey())) { - if (log.isDebugEnabled()) { - log.debug(String.format("Creating new bean of type [%s] with class [%s]", clazz.getName(), beanDef.getValue().getName())); + synchronized (beansClassMap) { + for (Map.Entry<Class<? extends Bean>, Class<? extends Bean>> beanDef : beansClassMap.entrySet()) { + if (log.isTraceEnabled()) { + log.trace(String.format(" Check against type [%s]", beanDef.getKey().getName())); } + if (clazz.equals(beanDef.getKey())) { + if (log.isDebugEnabled()) { + log.debug(String.format("Creating new bean of type [%s] with class [%s]", clazz.getName(), beanDef.getValue().getName())); + } - Class<? extends Bean> beanDefClass = beanDef.getValue(); - try { - Bean bean = beanDefClass.newInstance(); - if (clazz.isInstance(bean)) { - return (S) beanDefClass.newInstance(); + Class<? extends Bean> beanDefClass = beanDef.getValue(); + try { + Bean bean = beanDefClass.newInstance(); + if (clazz.isInstance(bean)) { + return (S) beanDefClass.newInstance(); + } + } catch (Exception e) { + // skip } - } catch(Exception e) { - // skip } } } diff --git a/duniter4j-elasticsearch/src/main/filtered-resources/log4j.properties b/duniter4j-elasticsearch/src/main/filtered-resources/log4j.properties index 553bc6d0c9cdbe276cfe7af687e42670ef4c9968..7b6667b1facc361ed8b1993869f728e2c01f1799 100644 --- a/duniter4j-elasticsearch/src/main/filtered-resources/log4j.properties +++ b/duniter4j-elasticsearch/src/main/filtered-resources/log4j.properties @@ -17,6 +17,8 @@ log4j.logger.org.duniter.elasticsearch=DEBUG # Other frameworks levels log4j.logger.org.nuiton.util=WARN log4j.logger.org.nuiton.config=WARN +log4j.logger.org.nuiton.converter=WARN +log4j.logger.org.nuiton.i18n=ERROR log4j.logger.org.elasticsearch=WARN #log4j.logger.org.elasticsearch=INFO diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/Plugin.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/Plugin.java index 2411c189b080f43dd4c728151f37d9b3b19d8cb6..73c9f28828c61d45042be5776da28a9bbb1deff3 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/Plugin.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/Plugin.java @@ -24,7 +24,8 @@ package org.duniter.elasticsearch; import com.google.common.collect.Lists; import org.duniter.elasticsearch.action.RestModule; -import org.duniter.elasticsearch.job.BlockIndexer; +import org.duniter.elasticsearch.node.DuniterNode; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.security.SecurityModule; import org.duniter.elasticsearch.service.ServiceModule; import org.elasticsearch.common.component.LifecycleComponent; @@ -76,18 +77,9 @@ public class Plugin extends org.elasticsearch.plugins.Plugin { if (disable) { return components; } - components.add(BlockIndexer.class); - //components.add(PluginSettings.class); - // Market - //components.add(CategoryMarketService.class); - //components.add(RecordMarketService.class); - // Registry - //components.add(CurrencyRegistryService.class); - //components.add(CategoryRegistryService.class); - //components.add(CitiesRegistryService.class); - //components.add(RecordRegistryService.class); - // BC - //components.add(BlockBlockchainService.class); + components.add(PluginSettings.class); + components.add(ThreadPool.class); + components.add(DuniterNode.class); return components; } diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 304ee5167167e313b7e297f71c13764809aeadf5..07688f0e9ff78710dd8da02504fdd7962bb85378 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -23,16 +23,25 @@ package org.duniter.elasticsearch; */ +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import org.apache.commons.io.FileUtils; import org.duniter.core.client.config.Configuration; import org.duniter.core.client.config.ConfigurationOption; -import org.elasticsearch.common.component.Lifecycle; -import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.component.LifecycleListener; +import org.duniter.core.client.config.ConfigurationProvider; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.service.ServiceLocator; +import org.elasticsearch.common.component.*; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.Settings; import org.nuiton.config.ApplicationConfig; +import org.nuiton.config.ApplicationConfigHelper; +import org.nuiton.config.ApplicationConfigProvider; +import org.nuiton.config.ArgumentsParserException; import org.nuiton.i18n.I18n; import org.nuiton.i18n.init.DefaultI18nInitializer; import org.nuiton.i18n.init.UserI18nInitializer; @@ -40,39 +49,100 @@ import org.nuiton.i18n.init.UserI18nInitializer; import java.io.File; import java.io.IOException; import java.util.Locale; +import java.util.Set; + +import static org.nuiton.i18n.I18n.t; /** * Access to configuration options * @author Benoit Lavenier <benoit.lavenier@e-is.pro> * @since 1.0 */ -public class PluginSettings { - /** Logger. */ - private ESLogger log = ESLoggerFactory.getLogger(PluginSettings.class.getName()); +public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { - private org.elasticsearch.common.settings.Settings settings; + private Settings settings; /** * Delegate application config. */ protected final ApplicationConfig applicationConfig; + protected final org.duniter.core.client.config.Configuration clientConfig; @Inject public PluginSettings(org.elasticsearch.common.settings.Settings settings) { + super(settings); + this.settings = settings; this.applicationConfig = new ApplicationConfig(); // Cascade the application config to the client module - org.duniter.core.client.config.Configuration clientConfig = new org.duniter.core.client.config.Configuration(applicationConfig); - org.duniter.core.client.config.Configuration.setInstance(clientConfig); + clientConfig = new org.duniter.core.client.config.Configuration(applicationConfig); + Configuration.setInstance(clientConfig); + + } + + @Override + protected void doStart() { + + + // get all config providers + Set<ApplicationConfigProvider> providers = + ImmutableSet.of(new ConfigurationProvider()); + + // load all default options + ApplicationConfigHelper.loadAllDefaultOption(applicationConfig, + providers); + + // Ovverides defaults + String baseDir = settings.get("path.home"); + applicationConfig.setDefaultOption(ConfigurationOption.BASEDIR.getKey(), baseDir); + applicationConfig.setDefaultOption(ConfigurationOption.NODE_HOST.getKey(), getNodeBmaHost()); + applicationConfig.setDefaultOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getNodeBmaPort())); + applicationConfig.setDefaultOption(ConfigurationOption.NODE_PROTOCOL.getKey(), getNodeBmaPort() == 443 ? "https" : "http"); + + try { + applicationConfig.parse(new String[]{}); + + } catch (ArgumentsParserException e) { + throw new TechnicalException(t("duniter4j.config.parse.error"), e); + } + + File appBasedir = applicationConfig.getOptionAsFile( + ConfigurationOption.BASEDIR.getKey()); + + if (appBasedir == null) { + appBasedir = new File(""); + } + if (!appBasedir.isAbsolute()) { + appBasedir = new File(appBasedir.getAbsolutePath()); + } + if (appBasedir.getName().equals("..")) { + appBasedir = appBasedir.getParentFile().getParentFile(); + } + if (appBasedir.getName().equals(".")) { + appBasedir = appBasedir.getParentFile(); + } + applicationConfig.setOption( + ConfigurationOption.BASEDIR.getKey(), + appBasedir.getAbsolutePath()); + + // Init i18n + try { + initI18n(); + } + catch(IOException e) { + logger.error(String.format("Could not init i18n: %s", e.getMessage()), e); + } + } + + @Override + protected void doStop() { + + } - String baseDir = settings.get("es.path.home"); - applicationConfig.setOption(ConfigurationOption.BASEDIR.getKey(), baseDir); - applicationConfig.setOption(ConfigurationOption.NODE_HOST.getKey(), getNodeBmaHost()); - applicationConfig.setOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getNodeBmaPort())); - applicationConfig.setOption(ConfigurationOption.NODE_PROTOCOL.getKey(), getNodeBmaPort() == 443 ? "https" : "http"); + @Override + protected void doClose() { - //initI18n(); } public String getNodeBmaHost() { @@ -91,10 +161,14 @@ public class PluginSettings { return settings.getAsInt("duniter.bulk.size", 1000); } - public String getIndexStringAnalyzer() { + public String getDefaultStringAnalyzer() { return settings.get("duniter.string.analyzer", "english"); } + public boolean reloadIndices() { + return settings.getAsBoolean("duniter.indices.reload", false); + } + public File getTempDirectory() { return Configuration.instance().getTempDirectory(); } @@ -103,16 +177,31 @@ public class PluginSettings { return settings.getAsBoolean("duniter.dev.enable", false); } - /* */ + public Peer checkAndGetPeer() { + if (StringUtils.isBlank(getNodeBmaHost())) { + logger.error("ERROR: node host is required"); + System.exit(-1); + return null; + } + if (getNodeBmaPort() <= 0) { + logger.error("ERROR: node port is required"); + System.exit(-1); + return null; + } + + Peer peer = new Peer(getNodeBmaHost(), getNodeBmaPort()); + return peer; + } + + /* protected methods */ protected void initI18n() throws IOException { - Configuration config = Configuration.instance(); // --------------------------------------------------------------------// // init i18n // --------------------------------------------------------------------// - File i18nDirectory = new File(Configuration.instance().getDataDirectory(), "i18n"); + File i18nDirectory = new File(clientConfig.getDataDirectory(), "i18n"); if (i18nDirectory.exists()) { // clean i18n cache FileUtils.cleanDirectory(i18nDirectory); @@ -120,14 +209,14 @@ public class PluginSettings { FileUtils.forceMkdir(i18nDirectory); - if (log.isDebugEnabled()) { - log.debug("I18N directory: " + i18nDirectory); + if (logger.isDebugEnabled()) { + logger.debug("I18N directory: " + i18nDirectory); } - Locale i18nLocale = config.getI18nLocale(); + Locale i18nLocale = clientConfig.getI18nLocale(); - if (log.isInfoEnabled()) { - log.info(String.format("Starts i18n with locale [%s] at [%s]", + if (logger.isInfoEnabled()) { + logger.info(String.format("Starts i18n with locale [%s] at [%s]", i18nLocale, i18nDirectory)); } I18n.init(new UserI18nInitializer( diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/market/RestMarketRecordIndexAction.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/market/RestMarketRecordIndexAction.java index 9aff904a6dc877821d32ef95f3cb593739b74909..67ba7708c23f6b3cda743a7e3e8da2b1a2209fa9 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/market/RestMarketRecordIndexAction.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/market/RestMarketRecordIndexAction.java @@ -23,7 +23,7 @@ package org.duniter.elasticsearch.action.market; */ import org.duniter.core.exception.BusinessException; -import org.duniter.elasticsearch.service.market.RecordMarketService; +import org.duniter.elasticsearch.service.MarketService; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; @@ -39,10 +39,10 @@ public class RestMarketRecordIndexAction extends BaseRestHandler { private static final ESLogger log = ESLoggerFactory.getLogger(RestMarketRecordIndexAction.class.getName()); - private RecordMarketService service; + private MarketService service; @Inject - public RestMarketRecordIndexAction(Settings settings, RestController controller, Client client, RecordMarketService service) { + public RestMarketRecordIndexAction(Settings settings, RestController controller, Client client, MarketService service) { super(settings, controller, client); controller.registerHandler(POST, "/market/record", this); this.service = service; diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/registry/RestRegistryRecordIndexAction.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/registry/RestRegistryRecordIndexAction.java index 287ee35f330a4e50d2a3ed07dc07c6d55aa3564c..858b4368df7e477e9866349da54ffdc449f4500c 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/registry/RestRegistryRecordIndexAction.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/action/registry/RestRegistryRecordIndexAction.java @@ -23,7 +23,7 @@ package org.duniter.elasticsearch.action.registry; */ import org.duniter.core.exception.BusinessException; -import org.duniter.elasticsearch.service.registry.RecordRegistryService; +import org.duniter.elasticsearch.service.RegistryService; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; @@ -39,10 +39,10 @@ public class RestRegistryRecordIndexAction extends BaseRestHandler { private static final ESLogger log = ESLoggerFactory.getLogger(RestRegistryRecordIndexAction.class.getName()); - private RecordRegistryService service; + private RegistryService service; @Inject - public RestRegistryRecordIndexAction(Settings settings, RestController controller, Client client, RecordRegistryService service) { + public RestRegistryRecordIndexAction(Settings settings, RestController controller, Client client, RegistryService service) { super(settings, controller, client); controller.registerHandler(POST, "/registry/record", this); this.service = service; diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/cli/action/IndexerCliAction.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/cli/action/IndexerCliAction.java index 892630851ddaa3736886c5563e81b5600e478b92..b79bda9f63376edc6011ef5ebef48e46142f1538 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/cli/action/IndexerCliAction.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/cli/action/IndexerCliAction.java @@ -41,7 +41,7 @@ public class IndexerCliAction { public void run() { PluginSettings config = PluginSettings.instance(); final Peer peer = checkConfigAndGetPeer(config); - final BlockBlockchainService blockIndexerService = ServiceLocator.instance().getBlockIndexerService(); + final BlockchainService blockIndexerService = ServiceLocator.instance().getBlockIndexerService(); // Will create the blockchain if not exist blockIndexerService.indexLastBlocks(peer); @@ -84,7 +84,7 @@ public class IndexerCliAction { public void resetDataBlocks() { BlockchainRemoteService blockchainService = ServiceLocator.instance().getBlockchainRemoteService(); - BlockBlockchainService indexerService = ServiceLocator.instance().getBlockIndexerService(); + BlockchainService indexerService = ServiceLocator.instance().getBlockIndexerService(); PluginSettings config = PluginSettings.instance(); Peer peer = checkConfigAndGetPeer(config); diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/AccessDeniedException.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/AccessDeniedException.java similarity index 95% rename from duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/AccessDeniedException.java rename to duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/AccessDeniedException.java index ce06a30fdc332b3288b93e0881ecf850b417595a..c4603b3e446e10627c34ebf19afac4d3383fbb1b 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/AccessDeniedException.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/AccessDeniedException.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.service.exception; +package org.duniter.elasticsearch.exception; /* * #%L diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/DuplicateIndexIdException.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/DuplicateIndexIdException.java similarity index 96% rename from duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/DuplicateIndexIdException.java rename to duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/DuplicateIndexIdException.java index c0972a95405eb872f439cbd418752c09331be8c8..c72d3888afc3e93674021e2a894c6ff6fbb9a852 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/DuplicateIndexIdException.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/DuplicateIndexIdException.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.service.exception; +package org.duniter.elasticsearch.exception; /* * #%L @@ -26,6 +26,7 @@ package org.duniter.elasticsearch.service.exception; import org.duniter.core.exception.BusinessException; /** + * * Created by Benoit on 03/04/2015. */ public class DuplicateIndexIdException extends BusinessException{ diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/InvalidFormatException.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/InvalidFormatException.java similarity index 95% rename from duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/InvalidFormatException.java rename to duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/InvalidFormatException.java index 85c25649b0cba7bd4c2a2cedae4c78f629f2b621..63bba9c6bcb3b35537c542aecf38624e1d2ab92e 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/InvalidFormatException.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/InvalidFormatException.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.service.exception; +package org.duniter.elasticsearch.exception; /* * #%L diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/InvalidSignatureException.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/InvalidSignatureException.java similarity index 96% rename from duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/InvalidSignatureException.java rename to duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/InvalidSignatureException.java index c6e0c4b68f2c2cf6f4a0d8ab9fec7dfc2545aafb..09a83041fa3cb432d18dbfe6d4282150e42d694d 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/exception/InvalidSignatureException.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/exception/InvalidSignatureException.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.service.exception; +package org.duniter.elasticsearch.exception; /* * #%L diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/job/BlockIndexer.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/job/BlockIndexer.java deleted file mode 100644 index 74a6b8126517b4042345c640108680c896a6e006..0000000000000000000000000000000000000000 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/job/BlockIndexer.java +++ /dev/null @@ -1,212 +0,0 @@ -package org.duniter.elasticsearch.job; - -import org.duniter.core.client.model.bma.BlockchainParameters; -import org.duniter.core.client.model.local.Peer; -import org.duniter.core.client.service.bma.BlockchainRemoteService; -import org.duniter.core.util.StringUtils; -import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.service.ServiceLocator; -import org.duniter.elasticsearch.service.blockchain.BlockBlockchainService; -import org.duniter.elasticsearch.service.market.CategoryMarketService; -import org.duniter.elasticsearch.service.market.RecordMarketService; -import org.duniter.elasticsearch.service.registry.CategoryRegistryService; -import org.duniter.elasticsearch.service.registry.CitiesRegistryService; -import org.duniter.elasticsearch.service.registry.CurrencyRegistryService; -import org.duniter.elasticsearch.service.registry.RecordRegistryService; -import org.elasticsearch.common.component.Lifecycle; -import org.elasticsearch.common.component.LifecycleComponent; -import org.elasticsearch.common.component.LifecycleListener; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; - -/** - * Created by eis on 17/06/16. - */ -public class BlockIndexer implements LifecycleComponent<BlockIndexer> { - private static final ESLogger log = ESLoggerFactory.getLogger(BlockIndexer.class.getName()); - - private Lifecycle.State state; - - private ServiceLocator serviceLocator; - private PluginSettings pluginSettings; - private RecordMarketService recordMarketService; - private CurrencyRegistryService currencyRegistryService; - private BlockBlockchainService blockBlockchainService; - private CategoryMarketService categoryMarketService; - private RecordRegistryService recordRegistryService; - private CategoryRegistryService categoryRegistryService; - private CitiesRegistryService citiesRegistryService; - - @Inject - public BlockIndexer(ServiceLocator serviceLocator, - PluginSettings pluginSettings, - RecordMarketService recordMarketService, - CurrencyRegistryService currencyRegistryService, - BlockBlockchainService blockBlockchainService, - CategoryMarketService categoryMarketService, - RecordRegistryService recordRegistryService, - CategoryRegistryService categoryRegistryService, - CitiesRegistryService citiesRegistryService - ) { - this.serviceLocator = serviceLocator; - this.pluginSettings = pluginSettings; - this.recordMarketService = recordMarketService; - this.currencyRegistryService = currencyRegistryService; - this.blockBlockchainService = blockBlockchainService; - this.categoryMarketService = categoryMarketService; - this.recordRegistryService = recordRegistryService; - this.categoryRegistryService = categoryRegistryService; - this.citiesRegistryService = citiesRegistryService; - this.state = Lifecycle.State.INITIALIZED; - } - - @Override - public Lifecycle.State lifecycleState() { - return state; - } - - @Override - public void addLifecycleListener(LifecycleListener var1){ - // TODO - } - - @Override - public void removeLifecycleListener(LifecycleListener var1){ - // TODO - } - - @Override - public BlockIndexer start(){ - state = Lifecycle.State.STARTED; - if (log.isDebugEnabled()) { - log.debug(String.format("Starting indexing blocks from node [%s:%s]...", - pluginSettings.getNodeBmaHost(), pluginSettings.getNodeBmaPort())); - } - - //resetAllData(); - return this; - } - - @Override - public BlockIndexer stop(){ - state = Lifecycle.State.STOPPED; - return this; - } - - @Override - public void close() { - state = Lifecycle.State.STOPPED; - } - - /* -- protected methods -- */ - - protected void setState(Lifecycle.State state) { - this.state = state; - } - - public void resetAllData() { - resetAllCurrencies(); - //resetDataBlocks(); - //resetMarketRecords(); - //resetRegistry(); - } - - public void resetAllCurrencies() { - currencyRegistryService.deleteAllCurrencies(); - } - - public void resetDataBlocks() { - BlockchainRemoteService blockchainService = serviceLocator.getBlockchainRemoteService(); - Peer peer = checkConfigAndGetPeer(pluginSettings); - - try { - // Get the blockchain name from node - BlockchainParameters parameter = blockchainService.getParameters(peer); - if (parameter == null) { - log.error(String.format("Could not connect to node [%s:%s]", - pluginSettings.getNodeBmaHost(), pluginSettings.getNodeBmaPort())); - return; - } - String currencyName = parameter.getCurrency(); - - log.info(String.format("Reset data for index [%s]", currencyName)); - - // Delete then create index on blockchain - boolean indexExists = blockBlockchainService.existsIndex(currencyName); - if (indexExists) { - blockBlockchainService.deleteIndex(currencyName); - blockBlockchainService.createIndex(currencyName); - } - - - log.info(String.format("Successfully reset data for index [%s]", currencyName)); - } catch(Exception e) { - log.error("Error during reset data: " + e.getMessage(), e); - } - } - - public void resetMarketRecords() { - try { - // Delete then create index on records - boolean indexExists = recordMarketService.existsIndex(); - if (indexExists) { - recordMarketService.deleteIndex(); - } - log.info(String.format("Successfully reset market records")); - - categoryMarketService.createIndex(); - categoryMarketService.initCategories(); - log.info(String.format("Successfully re-initialized market categories data")); - - } catch(Exception e) { - log.error("Error during reset market records: " + e.getMessage(), e); - } - } - - public void resetRegistry() { - try { - // Delete then create index on records - if (recordRegistryService.existsIndex()) { - recordRegistryService.deleteIndex(); - } - recordRegistryService.createIndex(); - log.info(String.format("Successfully reset registry records")); - - - if (categoryRegistryService.existsIndex()) { - categoryRegistryService.deleteIndex(); - } - categoryRegistryService.createIndex(); - categoryRegistryService.initCategories(); - log.info(String.format("Successfully re-initialized registry categories")); - - if (citiesRegistryService.existsIndex()) { - citiesRegistryService.deleteIndex(); - } - citiesRegistryService.initCities(); - log.info(String.format("Successfully re-initialized registry cities")); - - } catch(Exception e) { - log.error("Error during reset registry records: " + e.getMessage(), e); - } - } - - /* -- internal methods -- */ - - protected Peer checkConfigAndGetPeer(PluginSettings pluginSettings) { - if (StringUtils.isBlank(pluginSettings.getNodeBmaHost())) { - log.error("ERROR: node host is required"); - System.exit(-1); - return null; - } - if (pluginSettings.getNodeBmaPort() <= 0) { - log.error("ERROR: node port is required"); - System.exit(-1); - return null; - } - - Peer peer = new Peer(pluginSettings.getNodeBmaHost(), pluginSettings.getNodeBmaPort()); - return peer; - } -} diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java new file mode 100644 index 0000000000000000000000000000000000000000..e3cef1a09ab982eb3eb653b4521d708e519c019a --- /dev/null +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/node/DuniterNode.java @@ -0,0 +1,77 @@ +package org.duniter.elasticsearch.node; + +import org.duniter.core.client.model.local.Peer; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.MarketService; +import org.duniter.elasticsearch.service.RegistryService; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.Injector; +import org.elasticsearch.common.settings.Settings; + +/** + * Created by blavenie on 17/06/16. + */ +public class DuniterNode extends AbstractLifecycleComponent<DuniterNode> { + + private final PluginSettings pluginSettings; + + @Inject + public DuniterNode(Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) { + super(settings); + this.pluginSettings = pluginSettings; + + threadPool.scheduleOnStarted(() -> { + createIndices(injector); + }); + } + + @Override + protected void doStart() { + + } + + @Override + protected void doStop() { + + } + + @Override + protected void doClose() { + + } + + protected void createIndices(Injector injector) { + if (logger.isInfoEnabled()) { + logger.info("Creating Duniter indices..."); + } + + boolean reloadIndices = pluginSettings.reloadIndices(); + Peer peer = pluginSettings.checkAndGetPeer(); + if (reloadIndices) { + injector.getInstance(RegistryService.class) + .deleteIndex() + .createIndexIfNotExists() + .fillRecordCategories() + .indexCurrencyFromPeer(peer); + injector.getInstance(MarketService.class) + .deleteIndex() + .createIndexIfNotExists() + .fillRecordCategories(); + + injector.getInstance(BlockchainService.class) + .indexLastBlocks(peer); + } + else { + injector.getInstance(RegistryService.class).createIndexIfNotExists(); + + injector.getInstance(MarketService.class).createIndexIfNotExists(); + } + + if (logger.isInfoEnabled()) { + logger.info("Duniter indices created."); + } + } +} diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index 5a88d438a4496ad7917c09f4d0fe2f6e23a03ae2..0c6ba4dbf2744482d8d5085711d07e86bb703d0a 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -25,6 +25,7 @@ package org.duniter.elasticsearch.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import org.duniter.core.beans.Bean; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; @@ -37,12 +38,15 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -56,97 +60,26 @@ import java.net.UnknownHostException; /** * Created by Benoit on 08/04/2015. */ -public abstract class AbstractService<T> implements LifecycleComponent<T>{ +public abstract class AbstractService implements Bean { - private static final ESLogger log = ESLoggerFactory.getLogger(AbstractService.class.getName()); - - private Lifecycle.State state; - - private Client client; - private AdminClient admin; - - private PluginSettings pluginSettings; - - private ObjectMapper objectMapper; + protected final ESLogger logger; + protected final Client client; + protected final PluginSettings pluginSettings; + protected final ObjectMapper objectMapper; @Inject public AbstractService(Client client, PluginSettings pluginSettings) { + this.logger = Loggers.getLogger(getClass()); this.client = client; this.pluginSettings = pluginSettings; this.objectMapper = new ObjectMapper(); - this.state = Lifecycle.State.INITIALIZED; - } - - - @Override - public Lifecycle.State lifecycleState() { - return state; - } - - @Override - public void addLifecycleListener(LifecycleListener var1){ - // TODO - } - - @Override - public void removeLifecycleListener(LifecycleListener var1){ - // TODO - } - - @Override - public T start(){ - state = Lifecycle.State.STARTED; - return (T)this; - } - - @Override - public T stop(){ - state = Lifecycle.State.STOPPED; - return (T)this; - } - - @Override - public void close() { - state = Lifecycle.State.STOPPED; } /* -- protected methods -- */ - protected void setState(Lifecycle.State state) { - this.state = state; - } - - protected Client getClient() { - return client; - } - - protected PluginSettings getPluginSettings() { - return pluginSettings; - } - - protected ObjectMapper getObjectMapper() { - return objectMapper; - } - protected boolean existsIndex(String indexes) { - //if (admin == null) { - Settings.Builder settings = Settings.settingsBuilder() - .put("cluster.name", "duniter4j-elasticsearch") - .put("client.transport.sniff", true); - Client client = null; - try { - client = TransportClient.builder().settings(settings) - .build() - .addTransportAddress(new InetSocketTransportAddress( - InetAddress.getByName("localhost"), 9300)); - } catch (UnknownHostException e) { - throw new TechnicalException(e); - } - // admin = client.admin(); - //} IndicesExistsRequestBuilder requestBuilder = client.admin().indices().prepareExists(indexes); IndicesExistsResponse response = requestBuilder.execute().actionGet(); - return response.isExists(); } @@ -154,7 +87,9 @@ public abstract class AbstractService<T> implements LifecycleComponent<T>{ if (!existsIndex(indexName)) { return; } - log.info(String.format("Deleting index [%s]", indexName)); + if (logger.isInfoEnabled()) { + logger.info(String.format("Deleting index [%s]", indexName)); + } DeleteIndexRequestBuilder deleteIndexRequestBuilder = client.admin().indices().prepareDelete(indexName); deleteIndexRequestBuilder.execute().actionGet(); @@ -238,8 +173,8 @@ public abstract class AbstractService<T> implements LifecycleComponent<T>{ StringBuilder builder = new StringBuilder(); while(line != null) { if (StringUtils.isNotBlank(line)) { - if (log.isTraceEnabled()) { - log.trace(String.format("[%s] Add to bulk: %s", indexName, line)); + if (logger.isTraceEnabled()) { + logger.trace(String.format("[%s] Add to bulk: %s", indexName, line)); } builder.append(line).append('\n'); } @@ -264,7 +199,7 @@ public abstract class AbstractService<T> implements LifecycleComponent<T>{ } try { - getClient().bulk(bulkRequest).actionGet(); + client.bulk(bulkRequest).actionGet(); } catch(Exception e) { throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", indexName, indexType), e); } diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/blockchain/BlockBlockchainService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java similarity index 82% rename from duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/blockchain/BlockBlockchainService.java rename to duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 29485f0d940a7c62076a781910d8efbe59bbaf74..1840ccf61c6b2a495f55aee9d168cfe2733ccd85 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/blockchain/BlockBlockchainService.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.service.blockchain; +package org.duniter.elasticsearch.service; /* * #%L @@ -44,9 +44,8 @@ import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.ObjectUtils; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.service.AbstractService; -import org.duniter.elasticsearch.service.ServiceLocator; -import org.duniter.elasticsearch.service.exception.DuplicateIndexIdException; +import org.duniter.elasticsearch.exception.*; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -59,8 +58,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -79,44 +76,33 @@ import java.util.*; /** * Created by Benoit on 30/03/2015. */ -public class BlockBlockchainService extends AbstractService<BlockBlockchainService> { +public class BlockchainService extends AbstractService { - private static final ESLogger log = ESLoggerFactory.getLogger(BlockBlockchainService.class.getName()); - - public static final String INDEX_TYPE_BLOCK = "block"; - - public static final String INDEX_BLOCK_CURRENT_ID = "current"; + public static final String BLOCK_TYPE = "block"; + public static final String CURRENT_BLOCK_ID = "current"; private static final int SYNC_MISSING_BLOCK_MAX_RETRY = 5; private BlockchainRemoteService blockchainRemoteService; + //private CurrencyRegistryService currencyRegistryService; private Gson gson; @Inject - public BlockBlockchainService(Client client, PluginSettings settings){ + public BlockchainService(Client client, PluginSettings settings, ThreadPool threadPool, final ServiceLocator serviceLocator){ super(client, settings); this.gson = GsonUtils.newBuilder().create(); + threadPool.scheduleOnStarted(() -> { + blockchainRemoteService = serviceLocator.getBlockchainRemoteService(); + }); } - @Inject - public void setBlockBlockchainService(ServiceLocator serviceLocator) { - blockchainRemoteService = ServiceLocator.instance().getBlockchainRemoteService(); - } - - @Override - public void close() { - blockchainRemoteService = null; - gson = null; - super.close(); - } - - public void indexLastBlocks(Peer peer) { - indexLastBlocks(peer, new ProgressionModelImpl()); + public BlockchainService indexLastBlocks(Peer peer) { + return indexLastBlocks(peer, new ProgressionModelImpl()); } - public void indexLastBlocks(Peer peer, ProgressionModel progressionModel) { - boolean bulkIndex = getPluginSettings().isIndexBulkEnable(); + public BlockchainService indexLastBlocks(Peer peer, ProgressionModel progressionModel) { + boolean bulkIndex = pluginSettings.isIndexBulkEnable(); progressionModel.setStatus(ProgressionModel.Status.RUNNING); progressionModel.setTotal(100); @@ -127,15 +113,15 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi BlockchainParameters parameter = blockchainRemoteService.getParameters(peer); if (parameter == null) { progressionModel.setStatus(ProgressionModel.Status.FAILED); - log.error(String.format("Could not connect to node [%s]", + logger.error(String.format("Could not connect to node [%s]", peer.getUrl())); - return; + return this; } String currencyName = parameter.getCurrency(); progressionModel.setTask(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.task", currencyName, peer.getHost(), peer.getPort())); - log.info(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.task", - currencyName, getPluginSettings().getNodeBmaHost(), getPluginSettings().getNodeBmaPort())); + logger.info(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.task", + currencyName, pluginSettings.getNodeBmaHost(), pluginSettings.getNodeBmaPort())); // Create index blockchain if need // FIXME: avoid circular dependency @@ -156,7 +142,7 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi int maxBlockNumber = currentBlock.getNumber(); // DEV mode - if (getPluginSettings().isDevMode() && maxBlockNumber > 5000) { + if (pluginSettings.isDevMode() && maxBlockNumber > 5000) { maxBlockNumber = 5000; } @@ -193,56 +179,60 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi } if (CollectionUtils.isEmpty(missingBlocks)) { - log.info(String.format("All blocks indexed [%s ms]", (System.currentTimeMillis() - timeStart))); + logger.info(String.format("All blocks indexed [%s ms]", (System.currentTimeMillis() - timeStart))); progressionModel.setStatus(ProgressionModel.Status.SUCCESS); } else { - log.warn(String.format("Could not indexed all blocks. Missing %s blocks.", missingBlocks.size())); + logger.warn(String.format("Could not indexed all blocks. Missing %s blocks.", missingBlocks.size())); progressionModel.setStatus(ProgressionModel.Status.FAILED); } } else { - if (log.isDebugEnabled()) { - log.debug(String.format("Current block from peer [%s] is #%s. Index is up to date.", peer.getUrl(), maxBlockNumber)); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Current block from peer [%s] is #%s. Index is up to date.", peer.getUrl(), maxBlockNumber)); } progressionModel.setStatus(ProgressionModel.Status.SUCCESS); } } } catch(Exception e) { - log.error("Error during indexBlocksFromNode: " + e.getMessage(), e); + logger.error("Error during indexBlocksFromNode: " + e.getMessage(), e); progressionModel.setStatus(ProgressionModel.Status.FAILED); } + + return this; } - public void deleteIndex(String currencyName) { + public BlockchainService deleteIndex(String currencyName) { deleteIndexIfExists(currencyName); + return this; } public boolean existsIndex(String currencyName) { return super.existsIndex(currencyName); } - public void createIndexIfNotExists(String currencyName) { + public BlockchainService createIndexIfNotExists(String currencyName) { if (!existsIndex(currencyName)) { createIndex(currencyName); } + return this; } public void createIndex(String currencyName) { - log.info(String.format("Creating index [%s]", currencyName)); + logger.info(String.format("Creating index [%s]", currencyName)); - CreateIndexRequestBuilder createIndexRequestBuilder = getClient().admin().indices().prepareCreate(currencyName); + CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(currencyName); org.elasticsearch.common.settings.Settings indexSettings = org.elasticsearch.common.settings.Settings.settingsBuilder() .put("number_of_shards", 1) .put("number_of_replicas", 1) //.put("analyzer", createDefaultAnalyzer()) .build(); createIndexRequestBuilder.setSettings(indexSettings); - createIndexRequestBuilder.addMapping(INDEX_TYPE_BLOCK, createIndexMapping()); + createIndexRequestBuilder.addMapping(BLOCK_TYPE, createBlockType()); createIndexRequestBuilder.execute().actionGet(); } - public void createBlock(BlockchainBlock block) throws DuplicateIndexIdException { + public void createBlock(BlockchainBlock block) throws org.duniter.elasticsearch.exception.DuplicateIndexIdException { ObjectUtils.checkNotNull(block, "block could not be null") ; ObjectUtils.checkNotNull(block.getCurrency(), "block attribute 'blockchain' could not be null"); ObjectUtils.checkNotNull(block.getNumber(), "block attribute 'number' could not be null"); @@ -272,8 +262,8 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi // Currency not exists, or has changed, so create it if (existingBlock == null) { - if (log.isTraceEnabled()) { - log.trace(String.format("Insert new block [%s]", block.getNumber())); + if (logger.isTraceEnabled()) { + logger.trace(String.format("Insert new block [%s]", block.getNumber())); } // Create new block @@ -285,18 +275,18 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi boolean doUpdate = false; if (updateWhenSameHash) { doUpdate = true; - if (log.isTraceEnabled() && doUpdate) { - log.trace(String.format("Update block [%s]", block.getNumber())); + if (logger.isTraceEnabled() && doUpdate) { + logger.trace(String.format("Update block [%s]", block.getNumber())); } } else { doUpdate = !StringUtils.equals(existingBlock.getHash(), block.getHash()); - if (log.isTraceEnabled()) { + if (logger.isTraceEnabled()) { if (doUpdate) { - log.trace(String.format("Update block [%s]: hash has been changed, old=[%s] new=[%s]", block.getNumber(), existingBlock.getHash(), block.getHash())); + logger.trace(String.format("Update block [%s]: hash has been changed, old=[%s] new=[%s]", block.getNumber(), existingBlock.getHash(), block.getHash())); } else { - log.trace(String.format("Skipping update block [%s]: hash is up to date.", block.getNumber())); + logger.trace(String.format("Skipping update block [%s]: hash is up to date.", block.getNumber())); } } } @@ -319,7 +309,7 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi String json = gson.toJson(block); // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(block.getCurrency(), INDEX_TYPE_BLOCK) + IndexRequestBuilder indexRequest = client.prepareIndex(block.getCurrency(), BLOCK_TYPE) .setId(block.getNumber().toString()) .setSource(json); @@ -344,7 +334,7 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi ObjectUtils.checkArgument(json.length > 0); // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(currencyName, INDEX_TYPE_BLOCK) + IndexRequestBuilder indexRequest = client.prepareIndex(currencyName, BLOCK_TYPE) .setId(String.valueOf(number)) .setRefresh(refresh) .setSource(json); @@ -374,10 +364,10 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi String currencyName = blockCurrencyParser.getValueAsString(json); int number = blockNumberParser.getValueAsInt(json); - log.info(I18n.t("duniter4j.blockIndexerService.indexBlock", currencyName, peer, number)); + logger.info(I18n.t("duniter4j.blockIndexerService.indexBlock", currencyName, peer, number)); // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(currencyName, INDEX_TYPE_BLOCK) + IndexRequestBuilder indexRequest = client.prepareIndex(currencyName, BLOCK_TYPE) .setId(String.valueOf(number)) .setRefresh(refresh) .setSource(json); @@ -419,8 +409,8 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi ObjectUtils.checkArgument(currentBlockJson.length() > 0); // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(currencyName, INDEX_TYPE_BLOCK) - .setId(INDEX_BLOCK_CURRENT_ID) + IndexRequestBuilder indexRequest = client.prepareIndex(currencyName, BLOCK_TYPE) + .setId(CURRENT_BLOCK_ID) .setRefresh(true) .setSource(currentBlockJson); @@ -451,9 +441,9 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi String[] queryParts = query.split("[\\t ]+"); // Prepare request - SearchRequestBuilder searchRequest = getClient() + SearchRequestBuilder searchRequest = client .prepareSearch(currencyName) - .setTypes(INDEX_TYPE_BLOCK) + .setTypes(BLOCK_TYPE) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); // If only one term, search as prefix @@ -485,9 +475,9 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi public int getMaxBlockNumber(String currencyName) { // Prepare request - SearchRequestBuilder searchRequest = getClient() + SearchRequestBuilder searchRequest = client .prepareSearch(currencyName) - .setTypes(INDEX_TYPE_BLOCK) + .setTypes(BLOCK_TYPE) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); // Get max(number) @@ -512,17 +502,17 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi } public BlockchainBlock getCurrentBlock(String currencyName) { - return getBlockByIdStr(currencyName, INDEX_BLOCK_CURRENT_ID); + return getBlockByIdStr(currencyName, CURRENT_BLOCK_ID); } /* -- Internal methods -- */ - public XContentBuilder createIndexMapping() { + public XContentBuilder createBlockType() { try { XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() - .startObject(INDEX_TYPE_BLOCK) + .startObject(BLOCK_TYPE) .startObject("properties") // block number @@ -567,9 +557,9 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi public BlockchainBlock getBlockByIdStr(String currencyName, String blockId) { // Prepare request - SearchRequestBuilder searchRequest = getClient() + SearchRequestBuilder searchRequest = client .prepareSearch(currencyName) - .setTypes(INDEX_TYPE_BLOCK) + .setTypes(BLOCK_TYPE) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); // If more than a word, search on terms match @@ -603,8 +593,8 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi try { block = GsonUtils.newBuilder().create().fromJson(jsonString, BlockchainBlock.class); } catch(Exception e) { - if (log.isDebugEnabled()) { - log.debug("Error while parsing block from JSON:\n" + jsonString); + if (logger.isDebugEnabled()) { + logger.debug("Error while parsing block from JSON:\n" + jsonString); } throw new JsonSyntaxException("Error while read block from JSON: " + e.getMessage(), e); } @@ -639,8 +629,8 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi // Check is stopped if (progressionModel.isCancel()) { progressionModel.setStatus(ProgressionModel.Status.STOPPED); - if (log.isInfoEnabled()) { - log.info(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.stopped", peer)); + if (logger.isInfoEnabled()) { + logger.info(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.stopped", peer)); } return missingBlockNumbers; } @@ -660,7 +650,7 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi } } catch(Throwable t) { - log.debug(String.format("Error while getting block #%s: %s. Skipping this block.", curNumber, t.getMessage())); + logger.debug(String.format("Error while getting block #%s: %s. Skipping this block.", curNumber, t.getMessage())); missingBlockNumbers.add(String.valueOf(curNumber)); } } @@ -671,10 +661,9 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi public Collection<String> indexBlocksUsingBulk(Peer peer, String currencyName, int firstNumber, int lastNumber, ProgressionModel progressionModel) { Set<String> missingBlockNumbers = new LinkedHashSet<>(); - Client client = getClient(); - boolean debug = log.isDebugEnabled(); + boolean debug = logger.isDebugEnabled(); - int batchSize = getPluginSettings().getIndexBulkSize(); + int batchSize = pluginSettings.getIndexBulkSize(); String currentBlockJson = null; JsonAttributeParser blockNumberParser = new JsonAttributeParser("number"); @@ -682,8 +671,8 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi // Check if stop (e.g. ask by user) if (progressionModel.isCancel()) { progressionModel.setStatus(ProgressionModel.Status.STOPPED); - if (log.isInfoEnabled()) { - log.info(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.stopped", currencyName, peer.getUrl())); + if (logger.isInfoEnabled()) { + logger.info(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.stopped", currencyName, peer.getUrl())); } return missingBlockNumbers; } @@ -692,8 +681,8 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi try { blocksAsJson = blockchainRemoteService.getBlocksAsJson(peer, batchSize, batchFirstNumber); } catch(HttpBadRequestException e) { - if (log.isDebugEnabled()) { - log.debug(String.format("Error while getting blocks from #%s (count=%s): %s. Skipping blocks.", batchFirstNumber, batchSize, e.getMessage())); + if (logger.isDebugEnabled()) { + logger.debug(String.format("Error while getting blocks from #%s (count=%s): %s. Skipping blocks.", batchFirstNumber, batchSize, e.getMessage())); } } @@ -722,7 +711,7 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi if (!processedBlockNumbers.contains(itemNumber)) { // Add to bulk - bulkRequest.add(client.prepareIndex(currencyName, INDEX_TYPE_BLOCK, String.valueOf(itemNumber)) + bulkRequest.add(client.prepareIndex(currencyName, BLOCK_TYPE, String.valueOf(itemNumber)) .setRefresh(false) .setSource(blockAsJson) ); @@ -745,12 +734,12 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi // process failures by iterating through each bulk response item for (BulkItemResponse itemResponse : bulkResponse) { boolean skip = !itemResponse.isFailed() - || Objects.equal(INDEX_BLOCK_CURRENT_ID, itemResponse.getId()) + || Objects.equal(CURRENT_BLOCK_ID, itemResponse.getId()) || missingBlockNumbers.contains(Integer.parseInt(itemResponse.getId())); if (!skip) { int itemNumber = Integer.parseInt(itemResponse.getId()); if (debug) { - log.debug(String.format("Error while getting block #%s: %s. Skipping this block.", itemNumber, itemResponse.getFailureMessage())); + logger.debug(String.format("Error while getting block #%s: %s. Skipping this block.", itemNumber, itemResponse.getFailureMessage())); } missingBlockNumbers.add(itemResponse.getId()); } @@ -790,13 +779,13 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi NetworkRemoteService networkRemoteService = ServiceLocator.instance().getNetworkRemoteService(); BlockchainRemoteService blockchainRemoteService = ServiceLocator.instance().getBlockchainRemoteService(); String currencyName = currentBlock.getCurrency(); - boolean debug = log.isDebugEnabled(); + boolean debug = logger.isDebugEnabled(); Set<String> newMissingBlocks = new LinkedHashSet<>(); newMissingBlocks.addAll(sortedMissingBlocks); if (debug) { - log.debug(String.format("Missing blocks are: %s", newMissingBlocks.toString())); + logger.debug(String.format("Missing blocks are: %s", newMissingBlocks.toString())); } // Select other peers, in filtering on the same blockchain version @@ -808,8 +797,8 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi null, null); for(Peer childPeer: otherPeers) { - if (log.isInfoEnabled()) { - log.info(String.format("[%s] Trying to get missing blocks from other peer [%s]...", currencyName, childPeer)); + if (logger.isInfoEnabled()) { + logger.info(String.format("[%s] Trying to get missing blocks from other peer [%s]...", currencyName, childPeer)); } try { for(String blockNumberStr: ImmutableSet.copyOf(sortedMissingBlocks)) { @@ -839,7 +828,7 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi String blockAsJson = blockchainRemoteService.getBlockAsJson(childPeer, blockNumber); if (StringUtils.isNotBlank(blockAsJson)) { if (debug) { - log.debug(String.format("Found missing block #%s on peer [%s].", blockNumber, childPeer)); + logger.debug(String.format("Found missing block #%s on peer [%s].", blockNumber, childPeer)); } // Index the missing block @@ -860,7 +849,7 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi } catch(TechnicalException e) { if (debug) { - log.debug(String.format("Error while getting blocks from peer [%s]: %s. Skipping this peer.", childPeer), e.getMessage()); + logger.debug(String.format("Error while getting blocks from peer [%s]: %s. Skipping this peer.", childPeer), e.getMessage()); } continue; // skip this peer @@ -875,12 +864,12 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi tryCounter++; if (tryCounter >= SYNC_MISSING_BLOCK_MAX_RETRY) { // Max retry : stop here - log.error("Some blocks are still missing, after %s try: %s", SYNC_MISSING_BLOCK_MAX_RETRY, newMissingBlocks.toArray(new String[0])); + logger.error("Some blocks are still missing, after %s try: %s", SYNC_MISSING_BLOCK_MAX_RETRY, newMissingBlocks.toArray(new String[0])); return newMissingBlocks; } if (debug) { - log.debug("Some blocks are still missing: %s. Will retry later (%s/%s)...", newMissingBlocks.toArray(new String[0]), tryCounter, SYNC_MISSING_BLOCK_MAX_RETRY); + logger.debug("Some blocks are still missing: %s. Will retry later (%s/%s)...", newMissingBlocks.toArray(new String[0]), tryCounter, SYNC_MISSING_BLOCK_MAX_RETRY); } try { Thread.sleep(60 *1000); // wait 1 min @@ -900,8 +889,8 @@ public class BlockBlockchainService extends AbstractService<BlockBlockchainServi progressionModel.setCurrent(pct); progressionModel.setMessage(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.progress", currencyName, peer, curNumber, lastNumber, pct)); - if (log.isInfoEnabled()) { - log.info(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.progress", currencyName, peer, curNumber, lastNumber, pct)); + if (logger.isInfoEnabled()) { + logger.info(I18n.t("duniter4j.blockIndexerService.indexLastBlocks.progress", currencyName, peer, curNumber, lastNumber, pct)); } } diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/market/RecordMarketService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MarketService.java similarity index 61% rename from duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/market/RecordMarketService.java rename to duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MarketService.java index a2209c7646690f7251ad547179e1657c2c830674..393b06ebfd8d3e858501e96fd146b82860b85c1d 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/market/RecordMarketService.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/MarketService.java @@ -1,4 +1,4 @@ -package org.duniter.elasticsearch.service.market; +package org.duniter.elasticsearch.service; /* * #%L @@ -32,18 +32,14 @@ import org.duniter.core.client.service.bma.WotRemoteService; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.service.AbstractService; -import org.duniter.elasticsearch.service.ServiceLocator; -import org.duniter.elasticsearch.service.exception.InvalidFormatException; -import org.duniter.elasticsearch.service.exception.InvalidSignatureException; +import org.duniter.elasticsearch.exception.InvalidFormatException; +import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.inject.Singleton; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -53,108 +49,197 @@ import java.util.Set; /** * Created by Benoit on 30/03/2015. */ -@Singleton -public class RecordMarketService extends AbstractService<RecordMarketService> { +public class MarketService extends AbstractService { - private static final ESLogger log = ESLoggerFactory.getLogger(RecordMarketService.class.getName()); + public static final String INDEX = "market"; + public static final String RECORD_CATEGORY_TYPE = "category"; + public static final String RECORD_TYPE = "record"; + private static final String CATEGORIES_BULK_CLASSPATH_FILE = "market-categories-bulk-insert.json"; private static final String JSON_STRING_PROPERTY_REGEX = "[,]?[\"\\s\\n\\r]*%s[\"]?[\\s\\n\\r]*:[\\s\\n\\r]*\"[^\"]+\""; - public static final String INDEX_NAME = "market"; - - public static final String INDEX_TYPE = "record"; - - private WotRemoteService wotRemoteService; - private CryptoService cryptoService; - - private ServiceLocator serviceLocator; + private WotRemoteService wotRemoteService; @Inject - public RecordMarketService(Client client, PluginSettings config, ServiceLocator serviceLocator) { - super(client, config); - this.serviceLocator = serviceLocator; - } - - @Override - public RecordMarketService start() { - wotRemoteService = serviceLocator.getWotRemoteService(); - cryptoService = serviceLocator.getCryptoService(); - return super.start(); - } - - @Override - public void close() { - wotRemoteService = null; - cryptoService = null; - super.close(); + public MarketService(Client client, PluginSettings settings, CryptoService cryptoService, WotRemoteService wotRemoteService) { + super(client, settings); + this.cryptoService = cryptoService; + this.wotRemoteService = wotRemoteService; } /** * Delete blockchain index, and all data * @throws JsonProcessingException */ - public void deleteIndex() throws JsonProcessingException { - deleteIndexIfExists(INDEX_NAME); + public MarketService deleteIndex() { + deleteIndexIfExists(INDEX); + return this; } public boolean existsIndex() { - return super.existsIndex(INDEX_NAME); + return super.existsIndex(INDEX); } /** * Create index need for blockchain registry, if need */ - public void createIndexIfNotExists() { + public MarketService createIndexIfNotExists() { try { - if (!existsIndex(INDEX_NAME)) { + if (!existsIndex(INDEX)) { createIndex(); } } catch(JsonProcessingException e) { - throw new TechnicalException(String.format("Error while creating index [%s]", INDEX_NAME)); + throw new TechnicalException(String.format("Error while creating index [%s]", INDEX)); } + + return this; } /** - * Create index need for record registry + * Create index need for category registry * @throws JsonProcessingException */ - public void createIndex() throws JsonProcessingException { - CreateIndexRequestBuilder createIndexRequestBuilder = getClient().admin().indices().prepareCreate(INDEX_NAME); - org.elasticsearch.common.settings.Settings indexSettings = org.elasticsearch.common.settings.Settings.settingsBuilder() - .put("number_of_shards", 3) - .put("number_of_replicas", 2) + public MarketService createIndex() throws JsonProcessingException { + logger.info(String.format("Creating index [%s/%s]", INDEX, RECORD_CATEGORY_TYPE)); + + CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(INDEX); + Settings indexSettings = Settings.settingsBuilder() + .put("number_of_shards", 2) + .put("number_of_replicas", 1) //.put("analyzer", createDefaultAnalyzer()) .build(); - - // Create record index type - log.info(String.format("Creating index [%s/%s]", INDEX_NAME, INDEX_TYPE)); createIndexRequestBuilder.setSettings(indexSettings); - createIndexRequestBuilder.addMapping(INDEX_NAME, createIndexMapping(INDEX_TYPE)); + createIndexRequestBuilder.addMapping(RECORD_CATEGORY_TYPE, createRecordCategoryType()); + createIndexRequestBuilder.addMapping(RECORD_TYPE, createRecordType()); createIndexRequestBuilder.execute().actionGet(); + + return this; } /** - * Index a new record - * @param recordJson - * @return the record id + * + * @param jsonCategory + * @return the product id */ + public String indexCategoryFromJson(String jsonCategory) { + if (logger.isDebugEnabled()) { + logger.debug("Indexing a category"); + } + + // Preparing indexBlocksFromNode + IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_CATEGORY_TYPE) + .setSource(jsonCategory); + + // Execute indexBlocksFromNode + IndexResponse response = indexRequest + .setRefresh(false) + .execute().actionGet(); + + return response.getId(); + } + public String indexRecordFromJson(String recordJson) { - return indexRecordFromJson(recordJson, INDEX_TYPE); + try { + JsonNode actualObj = objectMapper.readTree(recordJson); + Set<String> fieldNames = Sets.newHashSet(actualObj.fieldNames()); + if (!fieldNames.contains(Record.PROPERTY_ISSUER) + || !fieldNames.contains(Record.PROPERTY_SIGNATURE)) { + throw new InvalidFormatException("Invalid record JSON format. Required fields [issuer,signature]"); + } + String issuer = actualObj.get(Record.PROPERTY_ISSUER).asText(); + + String signature = actualObj.get(Record.PROPERTY_SIGNATURE).asText(); + + String recordNoSign = recordJson.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_SIGNATURE), "") + .replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), ""); + + if (!cryptoService.verify(recordNoSign, signature, issuer)) { + throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign); + } + + // TODO : check if issuer is a valid member + //wotRemoteService.getRequirments(); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Indexing a record from issuer [%s]", issuer.substring(0, 8))); + } + + } + catch(IOException | JsonSyntaxException e) { + throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e); + } + + // Preparing indexBlocksFromNode + IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_TYPE) + .setSource(recordJson); + + // Execute indexBlocksFromNode + IndexResponse response = indexRequest + .setRefresh(false) + .execute().actionGet(); + + return response.getId(); + } + + public void fillRecordCategories() { + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s/%s] fill data", INDEX, RECORD_CATEGORY_TYPE)); + } + + // Insert categories + bulkFromClasspathFile(CATEGORIES_BULK_CLASSPATH_FILE, INDEX, RECORD_CATEGORY_TYPE); } /* -- Internal methods -- */ - public XContentBuilder createIndexMapping(String indexType) { - String stringAnalyzer = getPluginSettings().getIndexStringAnalyzer(); + public XContentBuilder createRecordCategoryType() { + try { + XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_CATEGORY_TYPE) + .startObject("properties") + // name + .startObject("name") + .field("type", "string") + .endObject() + + // description + /*.startObject("description") + .field("type", "string") + .endObject()*/ + + // parent + .startObject("parent") + .field("type", "string") + .endObject() + + // tags + /*.startObject("tags") + .field("type", "completion") + .field("search_analyzer", "simple") + .field("analyzer", "simple") + .field("preserve_separators", "false") + .endObject()*/ + + .endObject() + .endObject().endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_CATEGORY_TYPE, ioe.getMessage()), ioe); + } + } + + public XContentBuilder createRecordType() { + String stringAnalyzer = pluginSettings.getDefaultStringAnalyzer(); try { - XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(indexType) + XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_TYPE) .startObject("properties") // title @@ -205,11 +290,13 @@ public class RecordMarketService extends AbstractService<RecordMarketService> { .field("type", "nested") .startObject("properties") .startObject("src") // src - .field("type", "attachment") + // FISME : add attachment plugin + //.field("type", "attachment") + .field("type", "string") .field("index", "not_analyzed") .endObject() .startObject("title") // title - .field("title", "string") + .field("type", "string") .field("analyzer", stringAnalyzer) .startObject("norms") // disabled norms on title .field("enabled", "false") @@ -247,52 +334,8 @@ public class RecordMarketService extends AbstractService<RecordMarketService> { return mapping; } catch(IOException ioe) { - throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, indexType, ioe.getMessage()), ioe); - } - } - - public String indexRecordFromJson(String recordJson, String indexType) { - - try { - JsonNode actualObj = getObjectMapper().readTree(recordJson); - Set<String> fieldNames = Sets.newHashSet(actualObj.fieldNames()); - if (!fieldNames.contains(Record.PROPERTY_ISSUER) - || !fieldNames.contains(Record.PROPERTY_SIGNATURE)) { - throw new InvalidFormatException("Invalid record JSON format. Required fields [issuer,signature]"); - } - String issuer = actualObj.get(Record.PROPERTY_ISSUER).asText(); - - String signature = actualObj.get(Record.PROPERTY_SIGNATURE).asText(); - - String recordNoSign = recordJson.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_SIGNATURE), "") - .replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), ""); - - if (!cryptoService.verify(recordNoSign, signature, issuer)) { - throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign); - } - - // TODO : check if issuer is a valid member - //wotRemoteService.getRequirments(); - - if (log.isDebugEnabled()) { - log.debug(String.format("Indexing a record from issuer [%s]", issuer.substring(0, 8))); - } - - } - catch(IOException | JsonSyntaxException e) { - throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e); + throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_TYPE, ioe.getMessage()), ioe); } - - // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(INDEX_NAME, indexType) - .setSource(recordJson); - - // Execute indexBlocksFromNode - IndexResponse response = indexRequest - .setRefresh(false) - .execute().actionGet(); - - return response.getId(); } } diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/RegistryService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/RegistryService.java new file mode 100644 index 0000000000000000000000000000000000000000..009a031a047a58afd583da15b4b598c1a9c26f0f --- /dev/null +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/RegistryService.java @@ -0,0 +1,618 @@ +package org.duniter.elasticsearch.service; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.BlockchainParameters; +import org.duniter.core.client.model.bma.gson.GsonUtils; +import org.duniter.core.client.model.elasticsearch.Currency; +import org.duniter.core.client.model.elasticsearch.Record; +import org.duniter.core.client.model.local.Peer; +import org.duniter.core.client.service.bma.BlockchainRemoteService; +import org.duniter.core.client.service.bma.WotRemoteService; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.ObjectUtils; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.exception.AccessDeniedException; +import org.duniter.elasticsearch.exception.DuplicateIndexIdException; +import org.duniter.elasticsearch.exception.InvalidFormatException; +import org.duniter.elasticsearch.exception.InvalidSignatureException; +import org.duniter.elasticsearch.model.SearchResult; +import org.duniter.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.suggest.SuggestRequestBuilder; +import org.elasticsearch.action.suggest.SuggestResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHitField; +import org.elasticsearch.search.highlight.HighlightField; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; + +import java.io.File; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Created by Benoit on 30/03/2015. + */ +public class RegistryService extends AbstractService { + + public static final String INDEX = "registry"; + public static final String RECORD_TYPE = "record"; + public static final String RECORD_CATEGORY_TYPE = "category"; + public static final String CURRENCY_TYPE = "currency"; + private static final String JSON_STRING_PROPERTY_REGEX = "[,]?[\"\\s\\n\\r]*%s[\"]?[\\s\\n\\r]*:[\\s\\n\\r]*\"[^\"]+\""; + private static final String CATEGORIES_BULK_CLASSPATH_FILE = "registry-categories-bulk-insert.json"; + public static final String REGEX_WORD_SEPARATOR = "[-\\t@# ]+"; + public static final String REGEX_SPACE = "[\\t\\n\\r ]+"; + + private final Gson gson; + private WotRemoteService wotRemoteService; + private CryptoService cryptoService; + private BlockchainRemoteService blockchainRemoteService; + + @Inject + public RegistryService(Client client, + PluginSettings settings, + WotRemoteService wotRemoteService, + CryptoService cryptoService, + BlockchainRemoteService blockchainRemoteService) { + super(client, settings); + gson = GsonUtils.newBuilder().create(); + this.wotRemoteService = wotRemoteService; + this.cryptoService = cryptoService; + this.blockchainRemoteService = blockchainRemoteService; + } + + /** + * Create index need for blockchain registry, if need + */ + public RegistryService createIndexIfNotExists() { + try { + if (!existsIndex(INDEX)) { + createIndex(); + } + } + catch(JsonProcessingException e) { + throw new TechnicalException(String.format("Error while creating index [%s]", INDEX)); + } + return this; + } + + /** + * Create index for registry + * @throws JsonProcessingException + */ + public RegistryService createIndex() throws JsonProcessingException { + logger.info(String.format("Creating index [%s]", INDEX)); + + CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(INDEX); + org.elasticsearch.common.settings.Settings indexSettings = org.elasticsearch.common.settings.Settings.settingsBuilder() + .put("number_of_shards", 3) + .put("number_of_replicas", 1) + //.put("analyzer", createDefaultAnalyzer()) + .build(); + createIndexRequestBuilder.setSettings(indexSettings); + createIndexRequestBuilder.addMapping(CURRENCY_TYPE, createCurrencyType()); + createIndexRequestBuilder.addMapping(RECORD_CATEGORY_TYPE, createRecordCategoryType()); + createIndexRequestBuilder.addMapping(RECORD_TYPE, createRecordType()); + createIndexRequestBuilder.execute().actionGet(); + + return this; + } + + public RegistryService deleteIndex() { + deleteIndexIfExists(INDEX); + return this; + } + + public boolean existsIndex() { + return super.existsIndex(INDEX); + } + + public RegistryService fillRecordCategories() { + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s/%s] Fill data", INDEX, RECORD_CATEGORY_TYPE)); + } + + // Insert categories + bulkFromClasspathFile(CATEGORIES_BULK_CLASSPATH_FILE, INDEX, RECORD_CATEGORY_TYPE); + + return this; + } + + /** + * + * @param recordJson + * @return the record id + */ + public String indexRecordFromJson(String recordJson) { + + try { + ObjectMapper mapper = new ObjectMapper(); + JsonNode actualObj = mapper.readTree(recordJson); + Set<String> fieldNames = Sets.newHashSet(actualObj.fieldNames()); + if (!fieldNames.contains(Record.PROPERTY_ISSUER) + || !fieldNames.contains(Record.PROPERTY_SIGNATURE)) { + throw new InvalidFormatException("Invalid record JSON format. Required fields [issuer,signature]"); + } + String issuer = actualObj.get(Record.PROPERTY_ISSUER).asText(); + String signature = actualObj.get(Record.PROPERTY_SIGNATURE).asText(); + + String recordNoSign = recordJson.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_SIGNATURE), "") + .replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), ""); + + if (!cryptoService.verify(recordNoSign, signature, issuer)) { + throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign); + } + + // TODO verify hash + //if (!cryptoService.verifyHash(recordNoSign, signature, issuer)) { + // throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign); + //} + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Indexing a record from issuer [%s]", issuer.substring(0, 8))); + } + + } + catch(IOException | JsonSyntaxException e) { + throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e); + } + + // Preparing indexBlocksFromNode + IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_TYPE) + .setSource(recordJson); + + // Execute indexBlocksFromNode + IndexResponse response = indexRequest + .setRefresh(false) + .execute().actionGet(); + + return response.getId(); + } + + + public void insertRecordFromBulkFile(File bulkFile) { + + if (logger.isDebugEnabled()) { + logger.debug("Inserting records from file"); + } + + // Insert cities + bulkFromFile(bulkFile, INDEX, RECORD_TYPE); + } + + /** + * Retrieve the blockchain data, from peer + * + * @param peer + * @return the created blockchain + */ + public Currency indexCurrencyFromPeer(Peer peer) { + BlockchainParameters parameters = blockchainRemoteService.getParameters(peer); + BlockchainBlock firstBlock = blockchainRemoteService.getBlock(peer, 0); + BlockchainBlock currentBlock = blockchainRemoteService.getCurrentBlock(peer); + long lastUD = blockchainRemoteService.getLastUD(peer); + + Currency result = new Currency(); + result.setCurrencyName(parameters.getCurrency()); + result.setFirstBlockSignature(firstBlock.getSignature()); + result.setMembersCount(currentBlock.getMembersCount()); + result.setLastUD(lastUD); + result.setParameters(parameters); + result.setPeers(new Peer[]{peer}); + + indexCurrency(result); + + // Index the first block + // FIXME : attention au dependence circulaire : cela devrait plutot etre fait à l'exetrieure e registry + // par exemple dans l'action REST + //blockBlockchainService.createIndexIfNotExists(parameters.getCurrency()); + //blockBlockchainService.indexBlock(firstBlock, false); + //blockBlockchainService.indexCurrentBlock(firstBlock, true); + + return result; + } + + /** + * Index a blockchain + * @param currency + */ + public void indexCurrency(Currency currency) { + try { + ObjectUtils.checkNotNull(currency.getCurrencyName()); + + // Fill tags + if (ArrayUtils.isEmpty(currency.getTags())) { + String currencyName = currency.getCurrencyName(); + String[] tags = currencyName.split(REGEX_WORD_SEPARATOR); + List<String> tagsList = Lists.newArrayList(tags); + + // Convert as a sentence (replace seprator with a space) + String sentence = currencyName.replaceAll(REGEX_WORD_SEPARATOR, " "); + if (!tagsList.contains(sentence)) { + tagsList.add(sentence); + } + + currency.setTags(tagsList.toArray(new String[tagsList.size()])); + } + + // Serialize into JSON + byte[] json = objectMapper.writeValueAsBytes(currency); + + // Preparing indexBlocksFromNode + IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, CURRENCY_TYPE) + .setId(currency.getCurrencyName()) + .setSource(json); + + // Execute indexBlocksFromNode + indexRequest + .setRefresh(true) + .execute().actionGet(); + + } catch(JsonProcessingException e) { + throw new TechnicalException(e); + } + } + + /** + * Get suggestions from a string query. Useful for web autocomplete field (e.g. text full search) + * @param query + * @return + */ + /* public List<String> getSuggestions(String query) { + CompletionSuggestionBuilder suggestionBuilder = new CompletionSuggestionBuilder(INDEX_TYPE) + .text(query) + .size(10) // limit to 10 results + .field("tags"); + + // Prepare request + SuggestRequestBuilder suggestRequest = client + .prepareSuggest(INDEX_NAME) + .addSuggestion(suggestionBuilder); + + // Execute query + SuggestResponse response = suggestRequest.execute().actionGet(); + + // Read query result + return toSuggestions(response, RECORD_CATEGORY_TYPE, query); + }*/ + + /** + * Save a blockchain (update or create) into the blockchain index. + * @param currency + * @param senderPubkey + * @throws DuplicateIndexIdException + * @throws AccessDeniedException if exists and user if not the original blockchain sender + */ + public void saveCurrency(Currency currency, String senderPubkey) throws DuplicateIndexIdException { + ObjectUtils.checkNotNull(currency, "currency could not be null") ; + ObjectUtils.checkNotNull(currency.getCurrencyName(), "currency attribute 'currencyName' could not be null"); + + String previousSenderPubkey = getSenderPubkeyByCurrencyId(currency.getCurrencyName()); + + // Currency not exists, so create it + if (previousSenderPubkey == null) { + // make sure to fill the sender + currency.setSenderPubkey(senderPubkey); + + // Save it + indexCurrency(currency); + } + + // Exists, so check the owner signature + else { + if (!Objects.equals(senderPubkey, previousSenderPubkey)) { + throw new AccessDeniedException("Could not change the currency, because it has been registered by another public key."); + } + + // Make sure the sender is not changed + currency.setSenderPubkey(previousSenderPubkey); + + // Save changes + indexCurrency(currency); + } + } + + /** + * Registrer a new blockchain. + * @param pubkey the sender pubkey + * @param jsonCurrency the blockchain, as JSON + * @param signature the signature of sender. + * @throws InvalidSignatureException if signature not correspond to sender pubkey + */ + public void insertCurrency(String pubkey, String jsonCurrency, String signature) { + Preconditions.checkNotNull(pubkey); + Preconditions.checkNotNull(jsonCurrency); + Preconditions.checkNotNull(signature); + + if (!cryptoService.verify(jsonCurrency, signature, pubkey)) { + String currencyName = GsonUtils.getValueFromJSONAsString(jsonCurrency, "currencyName"); + logger.warn(String.format("Currency not added, because bad signature. blockchain [%s]", currencyName)); + throw new InvalidSignatureException("Bad signature"); + } + + Currency currency = null; + try { + currency = gson.fromJson(jsonCurrency, Currency.class); + Preconditions.checkNotNull(currency); + Preconditions.checkNotNull(currency.getCurrencyName()); + } catch(Throwable t) { + logger.error("Error while reading blockchain JSON: " + jsonCurrency); + throw new TechnicalException("Error while reading blockchain JSON: " + jsonCurrency, t); + } + + saveCurrency(currency, pubkey); + } + + /* -- Internal methods -- */ + + + public XContentBuilder createRecordType() { + String stringAnalyzer = pluginSettings.getDefaultStringAnalyzer(); + + try { + XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_TYPE) + .startObject("properties") + + // title + .startObject("title") + .field("type", "string") + .field("analyzer", stringAnalyzer) + .endObject() + + // description + .startObject("description") + .field("type", "string") + .field("analyzer", stringAnalyzer) + .endObject() + + // time + .startObject("time") + .field("type", "integer") + .endObject() + + // issuer + .startObject("issuer") + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // location + .startObject("location") + .field("type", "string") + .endObject() + + // geoPoint + .startObject("geoPoint") + .field("type", "geo_point") + .endObject() + + // avatar + .startObject("avatar") + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // categories + .startObject("categories") + .field("type", "nested") + .startObject("properties") + .startObject("cat1") // cat1 + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .startObject("cat2") // cat2 + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .endObject() + .endObject() + + // tags + .startObject("tags") + .field("type", "completion") + .field("search_analyzer", "simple") + .field("analyzer", "simple") + .field("preserve_separators", "false") + .endObject() + + .endObject() + .endObject().endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_TYPE, ioe.getMessage()), ioe); + } + } + + public XContentBuilder createRecordCategoryType() { + try { + XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_CATEGORY_TYPE) + .startObject("properties") + + // name + .startObject("name") + .field("type", "string") + .field("analyzer", pluginSettings.getDefaultStringAnalyzer()) + .endObject() + + // description + /*.startObject("description") + .field("type", "string") + .endObject()*/ + + // parent + .startObject("parent") + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // tags + /*.startObject("tags") + .field("type", "completion") + .field("search_analyzer", "simple") + .field("analyzer", "simple") + .field("preserve_separators", "false") + .endObject()*/ + + .endObject() + .endObject().endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_CATEGORY_TYPE, ioe.getMessage()), ioe); + } + } + + public XContentBuilder createCurrencyType() { + try { + XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(CURRENCY_TYPE) + .startObject("properties") + + // blockchain name + .startObject("currencyName") + .field("type", "string") + .endObject() + + // member count + .startObject("membersCount") + .field("type", "long") + .endObject() + + // tags + .startObject("tags") + .field("type", "completion") + .field("search_analyzer", "simple") + .field("analyzer", "simple") + .field("preserve_separators", "false") + + .endObject() + .endObject() + .endObject().endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, CURRENCY_TYPE, ioe.getMessage()), ioe); + } + } + + /** + * + * @param jsonCategory + * @return the product id + */ + public String indexCategoryFromJson(String jsonCategory) { + if (logger.isDebugEnabled()) { + logger.debug("Indexing a category"); + } + + // Preparing indexBlocksFromNode + IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_CATEGORY_TYPE) + .setSource(jsonCategory); + + // Execute indexBlocksFromNode + IndexResponse response = indexRequest + .setRefresh(false) + .execute().actionGet(); + + return response.getId(); + } + + + + /** + * Retrieve a blockchain from its name + * @param currencyId + * @return + */ + protected String getSenderPubkeyByCurrencyId(String currencyId) { + + if (!existsIndex(currencyId)) { + return null; + } + + // Prepare request + SearchRequestBuilder searchRequest = client + .prepareSearch(INDEX) + .setTypes(CURRENCY_TYPE) + .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); + + // If more than a word, search on terms match + searchRequest.setQuery(QueryBuilders.matchQuery("_id", currencyId)); + + // Execute query + try { + SearchResponse response = searchRequest.execute().actionGet(); + + // Read query result + SearchHit[] searchHits = response.getHits().getHits(); + for (SearchHit searchHit : searchHits) { + if (searchHit.source() != null) { + Currency currency = gson.fromJson(new String(searchHit.source(), "UTF-8"), Currency.class); + return currency.getSenderPubkey(); + } + else { + SearchHitField field = searchHit.getFields().get("senderPubkey"); + return field.getValue().toString(); + } + } + } + catch(SearchPhaseExecutionException | JsonSyntaxException | UnsupportedEncodingException e) { + // Failed or no item on index + } + + return null; + } +} diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java index cd2e54c21721d2ce51c177e343f677733002ad76..54f45a0da931a33316717bf11baa35166cad0220 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceLocator.java @@ -23,6 +23,7 @@ package org.duniter.elasticsearch.service; */ +import org.duniter.core.beans.Bean; import org.duniter.core.beans.BeanFactory; import org.duniter.core.client.dao.CurrencyDao; import org.duniter.core.client.dao.PeerDao; @@ -41,23 +42,33 @@ import org.duniter.core.service.CryptoService; import org.duniter.core.service.Ed25519CryptoServiceImpl; import org.duniter.elasticsearch.PluginSettings; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.Injector; +import org.elasticsearch.common.inject.Provider; import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import java.io.IOException; +import java.util.HashMap; +import java.util.ServiceLoader; @Singleton public class ServiceLocator extends org.duniter.core.client.service.ServiceLocator { - private static final ESLogger log = ESLoggerFactory.getLogger(ServiceLocator.class.getName()); + private static final ESLogger logger = ESLoggerFactory.getLogger(ServiceLocator.class.getName()); + + + @Inject + public ServiceLocator(Injector injector) { + super(); + if (logger.isDebugEnabled()) { + logger.debug("Starting Duniter4j client ServiceLocator..."); + } + //setBeanFactory(new BeanFactory(injector)); + setBeanFactory(createBeanFactory()); - public ServiceLocator() { - super(createBeanFactory()); - log.info("Starting ServiceLocator (guice)"); org.duniter.core.client.service.ServiceLocator.setInstance(this); - log.info("Starting ServiceLocator [OK]"); } @Override @@ -73,8 +84,20 @@ public class ServiceLocator /* -- Internal methods -- */ - protected static BeanFactory createBeanFactory() { - BeanFactory beanFactory = new BeanFactory() + class BeanFactory extends org.duniter.core.beans.BeanFactory{ + private final Injector injector; + public BeanFactory(Injector injector) { + super(); + this.injector = injector; + } + + public <S extends Bean> S newBean(Class<S> clazz) { + return injector.getInstance(clazz); + } + } + + protected static org.duniter.core.beans.BeanFactory createBeanFactory() { + org.duniter.core.beans.BeanFactory beanFactory = new org.duniter.core.beans.BeanFactory() .bind(BlockchainRemoteService.class, BlockchainRemoteServiceImpl.class) .bind(NetworkRemoteService.class, NetworkRemoteServiceImpl.class) .bind(WotRemoteService.class, WotRemoteServiceImpl.class) @@ -88,4 +111,17 @@ public class ServiceLocator .add(DataContext.class); return beanFactory; } + + public static class Provider<T extends Bean> implements org.elasticsearch.common.inject.Provider<T> { + + private final Class<T> clazz; + + public Provider(Class<T> clazz) { + this.clazz = clazz; + } + + public T get() { + return ServiceLocator.instance().getBean(clazz); + } + } } diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java index 6bf2993d380f07eabc90909a51727dbf489e450f..33ec0d97419612e2e2fea4d59fad5c954ec53494 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java @@ -22,14 +22,22 @@ package org.duniter.elasticsearch.service; * #L% */ +import org.duniter.core.beans.Bean; +import org.duniter.core.client.dao.CurrencyDao; +import org.duniter.core.client.dao.PeerDao; +import org.duniter.core.client.dao.mem.MemoryCurrencyDaoImpl; +import org.duniter.core.client.dao.mem.MemoryPeerDaoImpl; +import org.duniter.core.client.service.DataContext; +import org.duniter.core.client.service.HttpService; +import org.duniter.core.client.service.HttpServiceImpl; +import org.duniter.core.client.service.bma.*; +import org.duniter.core.client.service.local.CurrencyService; +import org.duniter.core.client.service.local.CurrencyServiceImpl; +import org.duniter.core.client.service.local.PeerService; +import org.duniter.core.client.service.local.PeerServiceImpl; +import org.duniter.core.service.CryptoService; +import org.duniter.core.service.Ed25519CryptoServiceImpl; import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.service.blockchain.BlockBlockchainService; -import org.duniter.elasticsearch.service.market.CategoryMarketService; -import org.duniter.elasticsearch.service.market.RecordMarketService; -import org.duniter.elasticsearch.service.registry.CategoryRegistryService; -import org.duniter.elasticsearch.service.registry.CitiesRegistryService; -import org.duniter.elasticsearch.service.registry.CurrencyRegistryService; -import org.duniter.elasticsearch.service.registry.RecordRegistryService; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -37,18 +45,43 @@ public class ServiceModule extends AbstractModule implements Module { @Override protected void configure() { bind(ServiceLocator.class).asEagerSingleton(); + + // ES service bind(PluginSettings.class).asEagerSingleton(); - // Market - bind(CategoryMarketService.class).asEagerSingleton(); - bind(RecordMarketService.class).asEagerSingleton(); - - // Registry - bind(CurrencyRegistryService.class); - bind(CategoryRegistryService.class); - bind(CitiesRegistryService.class); - bind(RecordRegistryService.class); - - // BC - bind(BlockBlockchainService.class); + bind(RegistryService.class); + bind(MarketService.class); + bind(BlockchainService.class); + + // Duniter Client API beans + bindWithLocator(BlockchainRemoteService.class); + bindWithLocator(NetworkRemoteService.class); + bindWithLocator(WotRemoteService.class); + bindWithLocator(TransactionRemoteService.class); + bindWithLocator(CryptoService.class); + bindWithLocator(PeerService.class); + bindWithLocator(CurrencyService.class); + bindWithLocator(HttpService.class); + bindWithLocator(CurrencyDao.class); + bindWithLocator(PeerDao.class); + bindWithLocator(DataContext.class); +/* + bindWithLocator(BlockchainRemoteServiceImpl.class); + bindWithLocator(NetworkRemoteServiceImpl.class); + bindWithLocator(WotRemoteServiceImpl.class); + bindWithLocator(TransactionRemoteServiceImpl.class); + bindWithLocator(Ed25519CryptoServiceImpl.class); + bindWithLocator(PeerServiceImpl.class); + bindWithLocator(CurrencyServiceImpl.class); + bindWithLocator(HttpServiceImpl.class); + bindWithLocator(MemoryCurrencyDaoImpl.class); + bindWithLocator(MemoryPeerDaoImpl.class); + bindWithLocator(DataContext.class);*/ } + + /* protected methods */ + + protected <T extends Bean> void bindWithLocator(Class<T> clazz) { + bind(clazz).toProvider(new ServiceLocator.Provider<>(clazz)); + } + } \ No newline at end of file diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/market/CategoryMarketService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/market/CategoryMarketService.java deleted file mode 100644 index 886072af03e7de5734bfa68f4cc5f62ad137477f..0000000000000000000000000000000000000000 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/market/CategoryMarketService.java +++ /dev/null @@ -1,177 +0,0 @@ -package org.duniter.elasticsearch.service.market; - -/* - * #%L - * UCoin Java Client :: Core API - * %% - * Copyright (C) 2014 - 2015 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ - - -import com.fasterxml.jackson.core.JsonProcessingException; -import org.duniter.core.exception.TechnicalException; -import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.service.AbstractService; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; - -import java.io.IOException; - -/** - * Created by Benoit on 30/03/2015. - */ -public class CategoryMarketService extends AbstractService<CategoryMarketService> { - - private static final ESLogger log = ESLoggerFactory.getLogger(CategoryMarketService.class.getName()); - private static final String CATEGORIES_BULK_CLASSPATH_FILE = "market-categories-bulk-insert.json"; - - public static final String INDEX_NAME = "market"; - public static final String INDEX_TYPE = "category"; - - @Inject - public CategoryMarketService(Client client, PluginSettings settings) { - super(client, settings); - } - - /** - * Delete blockchain index, and all data - * @throws JsonProcessingException - */ - public void deleteIndex() throws JsonProcessingException { - deleteIndexIfExists(INDEX_NAME); - } - - - public boolean existsIndex() { - return super.existsIndex(INDEX_NAME); - } - - /** - * Create index need for blockchain registry, if need - */ - public void createIndexIfNotExists() { - try { - if (!existsIndex(INDEX_NAME)) { - createIndex(); - } - } - catch(JsonProcessingException e) { - throw new TechnicalException(String.format("Error while creating index [%s]", INDEX_NAME)); - } - } - - /** - * Create index need for category registry - * @throws JsonProcessingException - */ - public void createIndex() throws JsonProcessingException { - log.info(String.format("Creating index [%s/%s]", INDEX_NAME, INDEX_TYPE)); - - CreateIndexRequestBuilder createIndexRequestBuilder = getClient().admin().indices().prepareCreate(INDEX_NAME); - Settings indexSettings = Settings.settingsBuilder() - .put("number_of_shards", 1) - .put("number_of_replicas", 1) - //.put("analyzer", createDefaultAnalyzer()) - .build(); - createIndexRequestBuilder.setSettings(indexSettings); - createIndexRequestBuilder.addMapping(INDEX_TYPE, createIndexMapping()); - createIndexRequestBuilder.execute().actionGet(); - } - - /** - * - * @param jsonCategory - * @return the product id - */ - public String indexCategoryFromJson(String jsonCategory) { - if (log.isDebugEnabled()) { - log.debug("Indexing a category"); - } - - // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(INDEX_NAME, INDEX_TYPE) - .setSource(jsonCategory); - - // Execute indexBlocksFromNode - IndexResponse response = indexRequest - .setRefresh(false) - .execute().actionGet(); - - return response.getId(); - } - - - public void initCategories() { - if (log.isDebugEnabled()) { - log.debug("Initializing all market categories"); - } - - // Insert categories - bulkFromClasspathFile(CATEGORIES_BULK_CLASSPATH_FILE, INDEX_NAME, INDEX_TYPE); - } - - /* -- Internal methods -- */ - - - public XContentBuilder createIndexMapping() { - try { - XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(INDEX_TYPE) - .startObject("properties") - - // name - .startObject("name") - .field("type", "string") - .endObject() - - // description - /*.startObject("description") - .field("type", "string") - .endObject()*/ - - // parent - .startObject("parent") - .field("type", "string") - .endObject() - - // tags - /*.startObject("tags") - .field("type", "completion") - .field("search_analyzer", "simple") - .field("analyzer", "simple") - .field("preserve_separators", "false") - .endObject()*/ - - .endObject() - .endObject().endObject(); - - return mapping; - } - catch(IOException ioe) { - throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, INDEX_TYPE, ioe.getMessage()), ioe); - } - } - -} diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CategoryRegistryService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CategoryRegistryService.java deleted file mode 100644 index 09460e67b5d57c4fb9cd90fe6c1104c372684ab8..0000000000000000000000000000000000000000 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CategoryRegistryService.java +++ /dev/null @@ -1,184 +0,0 @@ -package org.duniter.elasticsearch.service.registry; - -/* - * #%L - * UCoin Java Client :: Core API - * %% - * Copyright (C) 2014 - 2015 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ - - -import com.fasterxml.jackson.core.JsonProcessingException; -import org.duniter.core.exception.TechnicalException; -import org.duniter.core.util.StringUtils; -import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.service.AbstractService; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; - -import java.io.IOException; - -/** - * Created by Benoit on 30/03/2015. - */ -public class CategoryRegistryService extends AbstractService<CategoryRegistryService> { - - private static final ESLogger log = ESLoggerFactory.getLogger(CategoryRegistryService.class.getName()); - - private static final String CATEGORIES_BULK_CLASSPATH_FILE = "registry-categories-bulk-insert.json"; - - public static final String INDEX_NAME = "registry"; - public static final String INDEX_TYPE = "category"; - - @Inject - public CategoryRegistryService(Client client, PluginSettings settings) { - super(client, settings); - } - - /** - * Delete blockchain index, and all data - * @throws JsonProcessingException - */ - public void deleteIndex() throws JsonProcessingException { - deleteIndexIfExists(INDEX_NAME); - } - - public boolean existsIndex() { - return super.existsIndex(INDEX_NAME); - } - - /** - * Create index need for blockchain registry, if need - */ - public void createIndexIfNotExists() { - try { - if (!existsIndex(INDEX_NAME)) { - createIndex(); - } - } - catch(JsonProcessingException e) { - throw new TechnicalException(String.format("Error while creating index [%s]", INDEX_NAME)); - } - } - - /** - * Create index need for category registry - * @throws JsonProcessingException - */ - public void createIndex() throws JsonProcessingException { - log.info(String.format("Creating index [%s/%s]", INDEX_NAME, INDEX_TYPE)); - - CreateIndexRequestBuilder createIndexRequestBuilder = getClient().admin().indices().prepareCreate(INDEX_NAME); - org.elasticsearch.common.settings.Settings indexSettings = org.elasticsearch.common.settings.Settings.settingsBuilder() - .put("number_of_shards", 1) - .put("number_of_replicas", 1) - //.put("analyzer", createDefaultAnalyzer()) - .build(); - createIndexRequestBuilder.setSettings(indexSettings); - createIndexRequestBuilder.addMapping(INDEX_TYPE, createIndexMapping()); - createIndexRequestBuilder.execute().actionGet(); - } - - /** - * - * @param jsonCategory - * @return the product id - */ - public String indexCategoryFromJson(String jsonCategory) { - if (log.isDebugEnabled()) { - log.debug("Indexing a category"); - } - - // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(INDEX_NAME, INDEX_TYPE) - .setSource(jsonCategory); - - // Execute indexBlocksFromNode - IndexResponse response = indexRequest - .setRefresh(false) - .execute().actionGet(); - - return response.getId(); - } - - - public void initCategories() { - if (log.isDebugEnabled()) { - log.debug("Initializing all registry categories"); - } - - // Insert categories - bulkFromClasspathFile(CATEGORIES_BULK_CLASSPATH_FILE, INDEX_NAME, INDEX_TYPE); - } - - /* -- Internal methods -- */ - - - public XContentBuilder createIndexMapping() { - String stringAnalyzer = getPluginSettings().getIndexStringAnalyzer(); - if (StringUtils.isBlank(stringAnalyzer)) { - stringAnalyzer = "english"; - } - - try { - XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(INDEX_TYPE) - .startObject("properties") - - // name - .startObject("name") - .field("type", "string") - .field("analyzer", stringAnalyzer) - .endObject() - - // description - /*.startObject("description") - .field("type", "string") - .endObject()*/ - - // parent - .startObject("parent") - .field("type", "string") - .field("index", "not_analyzed") - .endObject() - - // tags - /*.startObject("tags") - .field("type", "completion") - .field("search_analyzer", "simple") - .field("analyzer", "simple") - .field("preserve_separators", "false") - .endObject()*/ - - .endObject() - .endObject().endObject(); - - return mapping; - } - catch(IOException ioe) { - throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, INDEX_TYPE, ioe.getMessage()), ioe); - } - } - -} diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CitiesRegistryService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CitiesRegistryService.java index eac61b700e25c14e41b09b9f8e918492fea1ba50..891791a829f951384016238925e07c2d31f78cea 100644 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CitiesRegistryService.java +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CitiesRegistryService.java @@ -47,7 +47,7 @@ import java.util.Map; /** * Created by Benoit on 30/03/2015. */ -public class CitiesRegistryService extends AbstractService<CitiesRegistryService> { +public class CitiesRegistryService extends AbstractService { private static final ESLogger log = ESLoggerFactory.getLogger(CitiesRegistryService.class.getName()); @@ -60,7 +60,7 @@ public class CitiesRegistryService extends AbstractService<CitiesRegistryService public static final String INDEX_NAME = "registry"; public static final String INDEX_TYPE = "city"; - private Gson gson; + private final Gson gson; @Inject public CitiesRegistryService(Client client, PluginSettings settings) { @@ -68,12 +68,6 @@ public class CitiesRegistryService extends AbstractService<CitiesRegistryService gson = GsonUtils.newBuilder().create(); } - @Override - public void close() { - gson = null; - super.close(); - } - /** * Delete blockchain index, and all data * @throws JsonProcessingException @@ -108,7 +102,7 @@ public class CitiesRegistryService extends AbstractService<CitiesRegistryService public void createIndex() throws JsonProcessingException { log.info(String.format("Creating index [%s/%s]", INDEX_NAME, INDEX_TYPE)); - CreateIndexRequestBuilder createIndexRequestBuilder = getClient().admin().indices().prepareCreate(INDEX_NAME); + CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(INDEX_NAME); org.elasticsearch.common.settings.Settings indexSettings = org.elasticsearch.common.settings.Settings.settingsBuilder() .put("number_of_shards", 1) .put("number_of_replicas", 1) @@ -163,7 +157,7 @@ public class CitiesRegistryService extends AbstractService<CitiesRegistryService public File createCitiesBulkFile() { - File result = new File(getPluginSettings().getTempDirectory(), CITIES_BULK_FILENAME); + File result = new File(pluginSettings.getTempDirectory(), CITIES_BULK_FILENAME); InputStream ris = null; BufferedReader bf = null; @@ -263,7 +257,7 @@ public class CitiesRegistryService extends AbstractService<CitiesRegistryService public File createCitiesBulkFile2() { - File result = new File(getPluginSettings().getTempDirectory(), CITIES_BULK_FILENAME); + File result = new File(pluginSettings.getTempDirectory(), CITIES_BULK_FILENAME); File inputFile = new File(CITIES_SOURCE_FILE2); InputStream ris = null; diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CurrencyRegistryService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CurrencyRegistryService.java deleted file mode 100644 index acbc74543b62caaa9563794177d39a546c728fed..0000000000000000000000000000000000000000 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/CurrencyRegistryService.java +++ /dev/null @@ -1,631 +0,0 @@ -package org.duniter.elasticsearch.service.registry; - -/* - * #%L - * UCoin Java Client :: Core API - * %% - * Copyright (C) 2014 - 2015 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ - - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.gson.Gson; -import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.ArrayUtils; -import org.duniter.core.client.model.bma.BlockchainBlock; -import org.duniter.core.client.model.bma.BlockchainParameters; -import org.duniter.core.client.model.bma.gson.GsonUtils; -import org.duniter.core.client.model.elasticsearch.Currency; -import org.duniter.core.client.model.local.Peer; -import org.duniter.core.client.service.ServiceLocator; -import org.duniter.core.client.service.bma.BlockchainRemoteService; -import org.duniter.core.exception.TechnicalException; -import org.duniter.core.service.CryptoService; -import org.duniter.core.util.ObjectUtils; -import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.model.SearchResult; -import org.duniter.elasticsearch.service.AbstractService; -import org.duniter.elasticsearch.service.exception.AccessDeniedException; -import org.duniter.elasticsearch.service.exception.DuplicateIndexIdException; -import org.duniter.elasticsearch.service.exception.InvalidSignatureException; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.suggest.SuggestRequestBuilder; -import org.elasticsearch.action.suggest.SuggestResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHitField; -import org.elasticsearch.search.highlight.HighlightField; -import org.elasticsearch.search.sort.SortOrder; -import org.elasticsearch.search.suggest.Suggest; -import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder; - -import java.io.IOException; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * Created by Benoit on 30/03/2015. - */ -public class CurrencyRegistryService extends AbstractService<CurrencyRegistryService> { - - private static final ESLogger log = ESLoggerFactory.getLogger(CurrencyRegistryService.class.getName()); - - public static final String INDEX_NAME = "registry"; - - public static final String INDEX_TYPE = "blockchain"; - - public static final String REGEX_WORD_SEPARATOR = "[-\\t@# ]+"; - public static final String REGEX_SPACE = "[\\t\\n\\r ]+"; - - private CryptoService cryptoService; - private Gson gson; - - @Inject - public CurrencyRegistryService(Client client, PluginSettings settings) { - super(client, settings); - this.gson = GsonUtils.newBuilder().create(); - } - - @Override - public CurrencyRegistryService start() { - this.cryptoService = ServiceLocator.instance().getCryptoService(); - return super.start(); - } - - @Override - public void close() { - this.cryptoService = null; - this.gson = null; - super.close(); - } - - /** - * Delete blockchain index, and all data - * @throws JsonProcessingException - */ - public void deleteIndex() throws JsonProcessingException { - deleteIndexIfExists(INDEX_NAME); - } - - /** - * Create index need for blockchain registry, if need - */ - public void createIndexIfNotExists() { - try { - if (!existsIndex(INDEX_NAME)) { - createIndex(); - } - } - catch(JsonProcessingException e) { - throw new TechnicalException(String.format("Error while creating index [%s]", INDEX_NAME)); - } - } - - /** - * Create index need for blockchain registry - * @throws JsonProcessingException - */ - public void createIndex() throws JsonProcessingException { - log.info(String.format("Creating index [%s/]", INDEX_NAME, INDEX_TYPE)); - - CreateIndexRequestBuilder createIndexRequestBuilder = getClient().admin().indices().prepareCreate(INDEX_NAME); - Settings indexSettings = Settings.settingsBuilder() - .put("number_of_shards", 1) - .put("number_of_replicas", 1) - //.put("analyzer", createDefaultAnalyzer()) - .build(); - createIndexRequestBuilder.setSettings(indexSettings); - createIndexRequestBuilder.addMapping(INDEX_TYPE, createIndexMapping()); - createIndexRequestBuilder.execute().actionGet(); - } - - /** - * Retrieve the blockchain data, from peer - * - * @param peer - * @return the created blockchain - */ - public Currency indexCurrencyFromPeer(Peer peer) { - BlockchainRemoteService blockchainRemoteService = ServiceLocator.instance().getBlockchainRemoteService(); - BlockchainParameters parameters = blockchainRemoteService.getParameters(peer); - BlockchainBlock firstBlock = blockchainRemoteService.getBlock(peer, 0); - BlockchainBlock currentBlock = blockchainRemoteService.getCurrentBlock(peer); - long lastUD = blockchainRemoteService.getLastUD(peer); - - Currency result = new Currency(); - result.setCurrencyName(parameters.getCurrency()); - result.setFirstBlockSignature(firstBlock.getSignature()); - result.setMembersCount(currentBlock.getMembersCount()); - result.setLastUD(lastUD); - result.setParameters(parameters); - result.setPeers(new Peer[]{peer}); - - indexCurrency(result); - - // Index the first block - // FIXME : attention au dependence circulaire : cela devrait plutot etre fait à l'exetrieure e registry - // par exemple dans l'action REST - //blockBlockchainService.createIndexIfNotExists(parameters.getCurrency()); - //blockBlockchainService.indexBlock(firstBlock, false); - //blockBlockchainService.indexCurrentBlock(firstBlock, true); - - return result; - } - - /** - * Index a blockchain - * @param currency - */ - public void indexCurrency(Currency currency) { - try { - ObjectUtils.checkNotNull(currency.getCurrencyName()); - - // Fill tags - if (ArrayUtils.isEmpty(currency.getTags())) { - String currencyName = currency.getCurrencyName(); - String[] tags = currencyName.split(REGEX_WORD_SEPARATOR); - List<String> tagsList = Lists.newArrayList(tags); - - // Convert as a sentence (replace seprator with a space) - String sentence = currencyName.replaceAll(REGEX_WORD_SEPARATOR, " "); - if (!tagsList.contains(sentence)) { - tagsList.add(sentence); - } - - currency.setTags(tagsList.toArray(new String[tagsList.size()])); - } - - // Serialize into JSON - byte[] json = getObjectMapper().writeValueAsBytes(currency); - - // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(INDEX_NAME, INDEX_TYPE) - .setId(currency.getCurrencyName()) - .setSource(json); - - // Execute indexBlocksFromNode - indexRequest - .setRefresh(true) - .execute().actionGet(); - - } catch(JsonProcessingException e) { - throw new TechnicalException(e); - } - } - - /** - * Get suggestions from a string query. Useful for web autocomplete field (e.g. text full search) - * @param query - * @return - */ - public List<String> getSuggestions(String query) { - CompletionSuggestionBuilder suggestionBuilder = new CompletionSuggestionBuilder(INDEX_TYPE) - .text(query) - .size(10) // limit to 10 results - .field("tags"); - - // Prepare request - SuggestRequestBuilder suggestRequest = getClient() - .prepareSuggest(INDEX_NAME) - .addSuggestion(suggestionBuilder); - - // Execute query - SuggestResponse response = suggestRequest.execute().actionGet(); - - // Read query result - return toSuggestions(response, INDEX_TYPE, query); - } - - /** - * Find blockchain that match the givent string query (Full text search) - * @param query - * @return - */ - public List<Currency> searchCurrencies(String query) { - String[] queryParts = query.split(REGEX_SPACE); - - // Prepare request - SearchRequestBuilder searchRequest = getClient() - .prepareSearch(INDEX_NAME) - .setTypes(INDEX_TYPE) - .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); - - // If only one term, search as prefix - if (queryParts.length == 1) { - searchRequest.setQuery(QueryBuilders.prefixQuery("currencyName", query)); - } - - // If more than a word, search on terms match - else { - searchRequest.setQuery(QueryBuilders.matchQuery("currencyName", query)); - } - - // Sort as score/membersCount - searchRequest.addSort("_score", SortOrder.DESC) - .addSort("membersCount", SortOrder.DESC); - - // Highlight matched words - searchRequest.setHighlighterTagsSchema("styled") - .addHighlightedField("currencyName") - .addFields("currencyName") - .addFields("currencyName", "_source"); - - // Execute query - SearchResponse searchResponse = searchRequest.execute().actionGet(); - - // Read query result - return toCurrencies(searchResponse, true); - } - - public void deleteAllCurrencies() { - if (!existsIndex(INDEX_NAME)) { - return; - } - - log.info(String.format("Deleting all blockchain indexes")); - - // Prepare request - SearchRequestBuilder request = getClient() - .prepareSearch(INDEX_NAME) - .setTypes(INDEX_TYPE); - - // Execute query - SearchResponse response = request.execute().actionGet(); - - // Delete every currencies - List<Currency> currencies = toCurrencies(response); - for (Currency currency: currencies){ - log.info(String.format("Deleting blockchain [%s]...", currency.getCurrencyName())); - deleteIndexIfExists(currency.getCurrencyName()); - } - - deleteIndexIfExists(INDEX_NAME); - - log.info("All blockchain successfully deleted"); - } - - /** - * find blockchain that match string query (full text search), and return a generic VO for search result. - * @param query - * @return a list of generic search result - */ - public List<SearchResult> searchCurrenciesAsVO(String query) { - String[] queryParts = query.split(REGEX_SPACE); - - // Prepare request - SearchRequestBuilder searchRequest = getClient() - .prepareSearch(INDEX_NAME) - .setTypes(INDEX_TYPE) - .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); - - // If only one term, search as prefix - if (queryParts.length == 1) { - searchRequest.setQuery(QueryBuilders.prefixQuery("currencyName", query)); - } - - // If more than a word, search on terms match - else { - searchRequest.setQuery(QueryBuilders.matchQuery("currencyName", query)); - } - - // Sort as score/membersCount - searchRequest.addSort("_score", SortOrder.DESC) - .addSort("membersCount", SortOrder.DESC); - - // Highlight matched words - searchRequest.setHighlighterTagsSchema("styled") - .addHighlightedField("currencyName") - .addFields("currencyName") - .addFields("membersCount"); - - // Execute query - SearchResponse searchResponse = searchRequest.execute().actionGet(); - - // Read query result - return toSearchResults(searchResponse, true); - } - - /** - * Retrieve a blockchain from its name - * @param currencyId - * @return - */ - public Currency getCurrencyById(String currencyId) { - - if (!existsIndex(currencyId)) { - return null; - } - - // Prepare request - SearchRequestBuilder searchRequest = getClient() - .prepareSearch(INDEX_NAME) - .setTypes(INDEX_TYPE) - .setSearchType(SearchType.DFS_QUERY_THEN_FETCH); - - // If more than a word, search on terms match - searchRequest.setQuery(QueryBuilders.matchQuery("_id", currencyId)); - - // Execute query - List<Currency> currencies = null; - try { - SearchResponse searchResponse = searchRequest.execute().actionGet(); - currencies = toCurrencies(searchResponse); - } - catch(SearchPhaseExecutionException e) { - // Failed or no item on index - } - - // No result : return null - if (CollectionUtils.isEmpty(currencies)) { - return null; - } - - // Return the unique result - return CollectionUtils.extractSingleton(currencies); - } - - /** - * Save a blockchain (update or create) into the blockchain index. - * @param currency - * @param senderPubkey - * @throws DuplicateIndexIdException - * @throws AccessDeniedException if exists and user if not the original blockchain sender - */ - public void saveCurrency(Currency currency, String senderPubkey) throws DuplicateIndexIdException { - ObjectUtils.checkNotNull(currency, "blockchain could not be null") ; - ObjectUtils.checkNotNull(currency.getCurrencyName(), "blockchain attribute 'currencyName' could not be null"); - - Currency existingCurrency = getCurrencyById(currency.getCurrencyName()); - - // Currency not exists, so create it - if (existingCurrency == null || currency.getSenderPubkey() == null) { - // make sure to fill the sender - currency.setSenderPubkey(senderPubkey); - - // Save it - indexCurrency(currency); - } - - // Exists, so check the owner signature - else { - if (!Objects.equals(currency.getSenderPubkey(), senderPubkey)) { - throw new AccessDeniedException("Could not change blockchain, because it has been registered by another public key."); - } - - // Make sure the sender is not changed - currency.setSenderPubkey(senderPubkey); - - // Save changes - indexCurrency(currency); - } - } - - /** - * Get the full list of currencies names, from blockchain index - * @return - */ - public List<String> getAllCurrencyNames() { - // Prepare request - SearchRequestBuilder searchRequest = getClient() - .prepareSearch(INDEX_NAME) - .setTypes(INDEX_TYPE); - - // Sort as score/membersCount - searchRequest.addSort("currencyName", SortOrder.ASC) - .addField("_id"); - - // Execute query - SearchResponse searchResponse = searchRequest.execute().actionGet(); - - // Read query result - return toCurrencyNames(searchResponse, true); - } - - /** - * Registrer a new blockchain. - * @param pubkey the sender pubkey - * @param jsonCurrency the blockchain, as JSON - * @param signature the signature of sender. - * @throws InvalidSignatureException if signature not correspond to sender pubkey - */ - public void registerCurrency(String pubkey, String jsonCurrency, String signature) { - Preconditions.checkNotNull(pubkey); - Preconditions.checkNotNull(jsonCurrency); - Preconditions.checkNotNull(signature); - - if (!cryptoService.verify(jsonCurrency, signature, pubkey)) { - String currencyName = GsonUtils.getValueFromJSONAsString(jsonCurrency, "currencyName"); - log.warn(String.format("Currency not added, because bad signature. blockchain [%s]", currencyName)); - throw new InvalidSignatureException("Bad signature"); - } - - Currency currency = null; - try { - currency = gson.fromJson(jsonCurrency, Currency.class); - Preconditions.checkNotNull(currency); - Preconditions.checkNotNull(currency.getCurrencyName()); - } catch(Throwable t) { - log.error("Error while reading blockchain JSON: " + jsonCurrency); - throw new TechnicalException("Error while reading blockchain JSON: " + jsonCurrency, t); - } - - saveCurrency(currency, pubkey); - } - - /* -- Internal methods -- */ - - public XContentBuilder createIndexMapping() { - try { - XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(INDEX_TYPE) - .startObject("properties") - - // blockchain name - .startObject("currencyName") - .field("type", "string") - .endObject() - - // member count - .startObject("membersCount") - .field("type", "long") - .endObject() - - // tags - .startObject("tags") - .field("type", "completion") - .field("search_analyzer", "simple") - .field("analyzer", "simple") - .field("preserve_separators", "false") - - .endObject() - .endObject() - .endObject().endObject(); - - return mapping; - } - catch(IOException ioe) { - throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, INDEX_TYPE, ioe.getMessage()), ioe); - } - } - - protected void createCurrency(Currency currency) throws DuplicateIndexIdException, JsonProcessingException { - ObjectUtils.checkNotNull(currency, "blockchain could not be null") ; - ObjectUtils.checkNotNull(currency.getCurrencyName(), "blockchain attribute 'currencyName' could not be null"); - - Currency existingCurrency = getCurrencyById(currency.getCurrencyName()); - if (existingCurrency != null) { - throw new DuplicateIndexIdException(String.format("Currency with name [%s] already exists.", currency.getCurrencyName())); - } - - // register to blockchain - indexCurrency(currency); - - // Create sub indexes - // FIXME : circular reference - // may be use an EVentBus (guava) - //blockBlockchainService.createIndex(currency.getCurrencyName()); - } - - protected List<Currency> toCurrencies(SearchResponse response) { - return toCurrencies(response, false); - } - - protected List<Currency> toCurrencies(SearchResponse response, boolean withHighlight) { - try { - // Read query result - SearchHit[] searchHits = response.getHits().getHits(); - List<Currency> result = Lists.newArrayListWithCapacity(searchHits.length); - for (SearchHit searchHit : searchHits) { - Currency currency = null; - if (searchHit.source() != null) { - currency = gson.fromJson(new String(searchHit.source(), "UTF-8"), Currency.class); - } - else { - currency = new Currency(); - SearchHitField field = searchHit.getFields().get("currencyName"); - currency.setCurrencyName((String)field.getValue()); - } - result.add(currency); - - // If possible, use highlights - if (withHighlight) { - Map<String, HighlightField> fields = searchHit.getHighlightFields(); - for (HighlightField field : fields.values()) { - String currencyNameHighLight = field.getFragments()[0].string(); - currency.setCurrencyName(currencyNameHighLight); - } - } - } - - return result; - } catch(IOException e) { - throw new TechnicalException("Error while reading blockchain search result: " + e.getMessage(), e); - } - } - - protected List<SearchResult> toSearchResults(SearchResponse response, boolean withHighlight) { - // Read query result - SearchHit[] searchHits = response.getHits().getHits(); - List<SearchResult> result = Lists.newArrayListWithCapacity(searchHits.length); - for (SearchHit searchHit : searchHits) { - SearchResult value = new SearchResult(); - value.setId(searchHit.getId()); - value.setType(searchHit.getType()); - value.setValue(searchHit.getId()); - - result.add(value); - - // If possible, use highlights - if (withHighlight) { - Map<String, HighlightField> fields = searchHit.getHighlightFields(); - for (HighlightField field : fields.values()) { - String currencyNameHighLight = field.getFragments()[0].string(); - value.setValue(currencyNameHighLight); - } - } - } - - return result; - } - - protected List<String> toSuggestions(SuggestResponse response, String suggestionName, String query) { - if (response.getSuggest() == null - || response.getSuggest().getSuggestion(suggestionName) == null) { - return null; - } - - // Read query result - Iterator<? extends Suggest.Suggestion.Entry.Option> iterator = - response.getSuggest().getSuggestion(suggestionName).iterator().next().getOptions().iterator(); - - List<String> result = Lists.newArrayList(); - while (iterator.hasNext()) { - Suggest.Suggestion.Entry.Option next = iterator.next(); - String suggestion = next.getText().string(); - result.add(suggestion); - } - - return result; - } - - protected List<String> toCurrencyNames(SearchResponse response, boolean withHighlight) { - // Read query result - SearchHit[] searchHits = response.getHits().getHits(); - List<String> result = Lists.newArrayListWithCapacity(searchHits.length); - for (SearchHit searchHit : searchHits) { - result.add(searchHit.getId()); - } - - return result; - } -} diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/RecordRegistryService.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/RecordRegistryService.java deleted file mode 100644 index 7164f14ebe6223f5bfd7d596b76dd641609e92af..0000000000000000000000000000000000000000 --- a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/service/registry/RecordRegistryService.java +++ /dev/null @@ -1,287 +0,0 @@ -package org.duniter.elasticsearch.service.registry; - -/* - * #%L - * UCoin Java Client :: Core API - * %% - * Copyright (C) 2014 - 2015 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ - - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import com.google.gson.JsonSyntaxException; -import org.duniter.core.client.model.bma.gson.GsonUtils; -import org.duniter.core.client.model.elasticsearch.Record; -import org.duniter.core.client.service.ServiceLocator; -import org.duniter.core.client.service.bma.WotRemoteService; -import org.duniter.core.exception.TechnicalException; -import org.duniter.core.service.CryptoService; -import org.duniter.core.util.StringUtils; -import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.service.AbstractService; -import org.duniter.elasticsearch.service.exception.InvalidFormatException; -import org.duniter.elasticsearch.service.exception.InvalidSignatureException; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.logging.ESLoggerFactory; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; - -import java.io.File; -import java.io.IOException; -import java.util.Set; - -/** - * Created by Benoit on 30/03/2015. - */ -public class RecordRegistryService extends AbstractService<RecordRegistryService> { - - private static final ESLogger log = ESLoggerFactory.getLogger(RecordRegistryService.class.getName()); - - private static final String JSON_STRING_PROPERTY_REGEX = "[,]?[\"\\s\\n\\r]*%s[\"]?[\\s\\n\\r]*:[\\s\\n\\r]*\"[^\"]+\""; - - public static final String INDEX_NAME = "registry"; - public static final String INDEX_TYPE = "record"; - - private Gson gson; - - private WotRemoteService wotRemoteService; - - private CryptoService cryptoService; - - @Inject - public RecordRegistryService(Client client, PluginSettings settings) { - super(client, settings); - gson = GsonUtils.newBuilder().create(); - } - - @Override - public RecordRegistryService start() { - wotRemoteService = ServiceLocator.instance().getWotRemoteService(); - cryptoService = ServiceLocator.instance().getCryptoService(); - return super.start(); - } - - @Override - public void close(){ - wotRemoteService = null; - gson = null; - super.close(); - } - - /** - * Delete blockchain index, and all data - * @throws JsonProcessingException - */ - public void deleteIndex() throws JsonProcessingException { - deleteIndexIfExists(INDEX_NAME); - } - - - public boolean existsIndex() { - return super.existsIndex(INDEX_NAME); - } - - /** - * Create index need for blockchain registry, if need - */ - public void createIndexIfNotExists() { - try { - if (!existsIndex(INDEX_NAME)) { - createIndex(); - } - } - catch(JsonProcessingException e) { - throw new TechnicalException(String.format("Error while creating index [%s]", INDEX_NAME)); - } - } - - /** - * Create index need for record registry - * @throws JsonProcessingException - */ - public void createIndex() throws JsonProcessingException { - log.info(String.format("Creating index [%s/%s]", INDEX_NAME, INDEX_TYPE)); - - CreateIndexRequestBuilder createIndexRequestBuilder = getClient().admin().indices().prepareCreate(INDEX_NAME); - org.elasticsearch.common.settings.Settings indexSettings = org.elasticsearch.common.settings.Settings.settingsBuilder() - .put("number_of_shards", 1) - .put("number_of_replicas", 1) - //.put("analyzer", createDefaultAnalyzer()) - .build(); - createIndexRequestBuilder.setSettings(indexSettings); - createIndexRequestBuilder.addMapping(INDEX_TYPE, createIndexMapping()); - createIndexRequestBuilder.execute().actionGet(); - } - - /** - * - * @param recordJson - * @return the record id - */ - public String indexRecordFromJson(String recordJson) { - - try { - ObjectMapper mapper = new ObjectMapper(); - JsonNode actualObj = mapper.readTree(recordJson); - Set<String> fieldNames = Sets.newHashSet(actualObj.fieldNames()); - if (!fieldNames.contains(Record.PROPERTY_ISSUER) - || !fieldNames.contains(Record.PROPERTY_SIGNATURE)) { - throw new InvalidFormatException("Invalid record JSON format. Required fields [issuer,signature]"); - } - String issuer = actualObj.get(Record.PROPERTY_ISSUER).asText(); - String signature = actualObj.get(Record.PROPERTY_SIGNATURE).asText(); - - String recordNoSign = recordJson.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_SIGNATURE), "") - .replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), ""); - - if (!cryptoService.verify(recordNoSign, signature, issuer)) { - throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign); - } - - // TODO verify hash - //if (!cryptoService.verifyHash(recordNoSign, signature, issuer)) { - // throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign); - //} - - if (log.isDebugEnabled()) { - log.debug(String.format("Indexing a record from issuer [%s]", issuer.substring(0, 8))); - } - - } - catch(IOException | JsonSyntaxException e) { - throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e); - } - - // Preparing indexBlocksFromNode - IndexRequestBuilder indexRequest = getClient().prepareIndex(INDEX_NAME, INDEX_TYPE) - .setSource(recordJson); - - // Execute indexBlocksFromNode - IndexResponse response = indexRequest - .setRefresh(false) - .execute().actionGet(); - - return response.getId(); - } - - - public void insertRecordFromBulkFile(File bulkFile) { - - if (log.isDebugEnabled()) { - log.debug("Inserting records from file"); - } - - // Insert cities - bulkFromFile(bulkFile, INDEX_NAME, INDEX_TYPE); - } - - /* -- Internal methods -- */ - - - public XContentBuilder createIndexMapping() { - String stringAnalyzer = getPluginSettings().getIndexStringAnalyzer(); - if (StringUtils.isBlank(stringAnalyzer)) { - stringAnalyzer = "english"; - } - - try { - XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(INDEX_TYPE) - .startObject("properties") - - // title - .startObject("title") - .field("type", "string") - .field("analyzer", stringAnalyzer) - .endObject() - - // description - .startObject("description") - .field("type", "string") - .field("analyzer", stringAnalyzer) - .endObject() - - // time - .startObject("time") - .field("type", "integer") - .endObject() - - // issuer - .startObject("issuer") - .field("type", "string") - .field("index", "not_analyzed") - .endObject() - - // location - .startObject("location") - .field("type", "string") - .endObject() - - // geoPoint - .startObject("geoPoint") - .field("type", "geo_point") - .endObject() - - // avatar - .startObject("avatar") - .field("type", "string") - .field("index", "not_analyzed") - .endObject() - - // categories - .startObject("categories") - .field("type", "nested") - .startObject("properties") - .startObject("cat1") // cat1 - .field("type", "string") - .field("index", "not_analyzed") - .endObject() - .startObject("cat2") // cat2 - .field("type", "string") - .field("index", "not_analyzed") - .endObject() - .endObject() - .endObject() - - // tags - .startObject("tags") - .field("type", "completion") - .field("search_analyzer", "simple") - .field("analyzer", "simple") - .field("preserve_separators", "false") - .endObject() - - .endObject() - .endObject().endObject(); - - return mapping; - } - catch(IOException ioe) { - throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, INDEX_TYPE, ioe.getMessage()), ioe); - } - } - -} diff --git a/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java new file mode 100644 index 0000000000000000000000000000000000000000..69aacc4b3b736a480ea045c08a20d8aa8e48f2db --- /dev/null +++ b/duniter4j-elasticsearch/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java @@ -0,0 +1,251 @@ +package org.duniter.elasticsearch.threadpool; + +import com.google.common.collect.Lists; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.inject.Injector; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsAbortPolicy; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.transport.TransportService; + +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Manage thread pool, to execute tasks asynchronously. + * Created by eis on 17/06/16. + */ +public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { + + private ScheduledThreadPoolExecutor scheduler = null; + private Injector injector; + + private final List<Runnable> afterStartedCommands; + + @Inject + public ThreadPool(Settings settings, + Injector injector + ) { + super(settings); + this.injector = injector; + this.afterStartedCommands = Lists.newArrayList(); + + this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "duniter4j-scheduler"), new EsAbortPolicy()); + this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); + this.scheduler.setRemoveOnCancelPolicy(true); + } + + public void doStart(){ + if (logger.isDebugEnabled()) { + logger.debug("Starting Duniter4j ThreadPool..."); + } + + if (!afterStartedCommands.isEmpty()) { + scheduleOnStarted(() -> { + for (Runnable command: afterStartedCommands) { + command.run(); + } + this.afterStartedCommands.clear(); + }); + } + } + + public void doStop(){ + scheduler.shutdown(); + // TODO : cancel all aiting jobs + } + + public void doClose() {} + + public ScheduledExecutorService scheduler() { + return this.scheduler; + } + + /** + * Schedules an action when node is started (all services and modules ready) + * + * @param command the action to take + * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled + */ + public void scheduleOnStarted(Runnable command) { + /*if (lifecycle.state() == Lifecycle.State.INITIALIZED ) { + afterStartedCommands.add(command); + } + else {*/ + scheduleAfterServiceState(TransportService.class, Lifecycle.State.STARTED, command); + // } + } + + /** + * Schedules an action that runs on the scheduler thread, after a delay. + * + * @param command the action to take + * @param interval the delay interval + * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled + */ + public ScheduledFuture<?> schedule(Runnable command, TimeValue interval) { + return scheduler.schedule(new LoggingRunnable(command), interval.millis(), TimeUnit.MILLISECONDS); + } + + /** + * Schedules a periodic action that always runs on the scheduler thread. + * + * @param command the action to take + * @param interval the delay interval + * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled + */ + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) { + return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS); + } + + + /* -- protected methods -- */ + + protected <T extends LifecycleComponent<T>> ScheduledFuture<?> scheduleAfterServiceState(Class<T> waitingServiceClass, final Lifecycle.State waitingState, final Runnable job) { + final T service = injector.getInstance(waitingServiceClass); + return schedule(() -> { + while(service.lifecycleState() != waitingState) { + try { + Thread.sleep(100); // wait 100 ms + } + catch(InterruptedException e) { + } + } + + // continue + job.run(); + }, TimeValue.timeValueSeconds(10)); + } + + + /*public void resetAllData() { + //resetAllCurrencies(); + //resetDataBlocks(); + //resetMarketRecords(); + //resetRegistry(); + } + + public void resetAllCurrencies() { + currencyRegistryService.deleteAllCurrencies(); + } + + public void resetDataBlocks() { + BlockchainRemoteService blockchainService = serviceLocator.getBlockchainRemoteService(); + Peer peer = checkConfigAndGetPeer(pluginSettings); + + try { + // Get the blockchain name from node + BlockchainParameters parameter = blockchainService.getParameters(peer); + if (parameter == null) { + logger.error(String.format("Could not connect to node [%s:%s]", + pluginSettings.getNodeBmaHost(), pluginSettings.getNodeBmaPort())); + return; + } + String currencyName = parameter.getCurrency(); + + logger.info(String.format("Reset data for index [%s]", currencyName)); + + // Delete then create index on blockchain + boolean indexExists = blockBlockchainService.existsIndex(currencyName); + if (indexExists) { + blockBlockchainService.deleteIndex(currencyName); + blockBlockchainService.createIndex(currencyName); + } + + + logger.info(String.format("Successfully reset data for index [%s]", currencyName)); + } catch(Exception e) { + logger.error("Error during reset data: " + e.getMessage(), e); + } + } + + public void resetMarketRecords() { + try { + // Delete then create index on records + boolean indexExists = recordMarketService.existsIndex(); + if (indexExists) { + recordMarketService.deleteIndex(); + } + logger.info(String.format("Successfully reset market records")); + + categoryMarketService.createIndex(); + categoryMarketService.initCategories(); + logger.info(String.format("Successfully re-initialized market categories data")); + + } catch(Exception e) { + logger.error("Error during reset market records: " + e.getMessage(), e); + } + } + + public void resetRegistry() { + try { + // Delete then create index on records + if (recordRegistryService.existsIndex()) { + recordRegistryService.deleteIndex(); + } + recordRegistryService.createIndex(); + logger.info(String.format("Successfully reset registry records")); + + + if (categoryRegistryService.existsIndex()) { + categoryRegistryService.deleteIndex(); + } + categoryRegistryService.createIndex(); + categoryRegistryService.initCategories(); + logger.info(String.format("Successfully re-initialized registry categories")); + + if (citiesRegistryService.existsIndex()) { + citiesRegistryService.deleteIndex(); + } + citiesRegistryService.initCities(); + logger.info(String.format("Successfully re-initialized registry cities")); + + } catch(Exception e) { + logger.error("Error during reset registry records: " + e.getMessage(), e); + } + }*/ + + /* -- internal methods -- */ + + class LoggingRunnable implements Runnable { + + private final Runnable runnable; + + LoggingRunnable(Runnable runnable) { + this.runnable = runnable; + } + + @Override + public void run() { + try { + runnable.run(); + } catch (Throwable t) { + logger.warn("failed to run {}", t, runnable.toString()); + throw t; + } + } + + @Override + public int hashCode() { + return runnable.hashCode(); + } + + @Override + public boolean equals(Object obj) { + return runnable.equals(obj); + } + + @Override + public String toString() { + return "[threaded] " + runnable.toString(); + } + } +} diff --git a/duniter4j-elasticsearch/src/main/resources/market-categories-bulk-insert.json b/duniter4j-elasticsearch/src/main/resources/market-categories-bulk-insert.json index 6cdbee7f31b5ad78a1b018cb6a155c6765bf65a7..c19ddf13f2d397fc4e0c5acc66115b04df3b44ea 100644 --- a/duniter4j-elasticsearch/src/main/resources/market-categories-bulk-insert.json +++ b/duniter4j-elasticsearch/src/main/resources/market-categories-bulk-insert.json @@ -1,24 +1,3 @@ -/* - * #%L - * Duniter4j :: ElasticSearch Plugin - * %% - * Copyright (C) 2014 - 2016 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ { "index": { "_id": "cat71"}} { "name": "EMPLOI" , "parent": null} { "index": { "_id": "cat33"}} diff --git a/duniter4j-elasticsearch/src/main/resources/registry-categories-bulk-insert.json b/duniter4j-elasticsearch/src/main/resources/registry-categories-bulk-insert.json index aa856e9c4a6bf398a5cabed519552caaa86d8e65..10d5d9e963fa3520e09f4689c6d1e49c40772e96 100644 --- a/duniter4j-elasticsearch/src/main/resources/registry-categories-bulk-insert.json +++ b/duniter4j-elasticsearch/src/main/resources/registry-categories-bulk-insert.json @@ -1,24 +1,3 @@ -/* - * #%L - * Duniter4j :: ElasticSearch Plugin - * %% - * Copyright (C) 2014 - 2016 EIS - * %% - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public - * License along with this program. If not, see - * <http://www.gnu.org/licenses/gpl-3.0.html>. - * #L% - */ {"index": {"_id": "0"}} {"name": "Autres", "parent": null} {"index": {"_id": "particulier"}} diff --git a/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml b/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml index 1f9ff306c63e03a1f6381fa734529de1b63faabd..24bd0f68fbf768dc815a1d37660e381f76ec0bc2 100644 --- a/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-elasticsearch/src/test/es-home/config/elasticsearch.yml @@ -99,10 +99,12 @@ http.cors.enabled: true security.manager.enabled: false + #duniter.disable: true duniter.host: cgeek.fr duniter.port: 9330 duniter.string.analyzer: french +duniter.indices.reload: true -duniter.dev.enable: true \ No newline at end of file +#duniter.dev.enable: true \ No newline at end of file diff --git a/duniter4j-elasticsearch/src/test/es-home/config/shield/logging.yml b/duniter4j-elasticsearch/src/test/es-home/config/shield/logging.yml deleted file mode 100644 index 4adf913a371cb30f2db3cdd5d73a6c0334fe0392..0000000000000000000000000000000000000000 --- a/duniter4j-elasticsearch/src/test/es-home/config/shield/logging.yml +++ /dev/null @@ -1,26 +0,0 @@ -# default configuration for the audit trail logs -# -# Error Levels: -# -# ERROR authentication_failed, access_denied, tampered_request, connection_denied -# WARN authentication_failed, access_denied, tampered_request, connection_denied, anonymous_access -# INFO authentication_failed, access_denied, tampered_request, connection_denied, anonymous_access, access_granted -# DEBUG doesn't output additional entry types beyond INFO, but extends the information emmitted for each entry -# TRACE authentication_failed, access_denied, tampered_request, connection_denied, anonymous_access, access_granted, connection_granted, authentication_failed [<realm>]. In addition, internal system requests (self-management requests triggered by elasticsearch itself) will also be logged for "access_granted" entry type. -# - -logger: - shield.audit.logfile: INFO, access_log - -additivity: - shield.audit.logfile: false - -appender: - - access_log: - type: dailyRollingFile - file: ${path.logs}/${cluster.name}-access.log - datePattern: "'.'yyyy-MM-dd" - layout: - type: pattern - conversionPattern: "[%d{ISO8601}] %m%n" diff --git a/duniter4j-elasticsearch/src/test/es-home/config/shield/role_mapping.yml b/duniter4j-elasticsearch/src/test/es-home/config/shield/role_mapping.yml deleted file mode 100644 index 68c82f7e7c05b02e9aa539262ffa2df332cd078e..0000000000000000000000000000000000000000 --- a/duniter4j-elasticsearch/src/test/es-home/config/shield/role_mapping.yml +++ /dev/null @@ -1,14 +0,0 @@ -# Role mapping configuration file which has elasticsearch roles as keys -# that map to one or more user or group distinguished names - -#roleA: this is an elasticsearch role -# - groupA-DN this is a group distinguished name -# - groupB-DN -# - user1-DN this is the full user distinguished name - -#power_user: -# - "cn=admins,dc=example,dc=com" -#user: -# - "cn=users,dc=example,dc=com" -# - "cn=admins,dc=example,dc=com" -# - "cn=John Doe,cn=other users,dc=example,dc=com" diff --git a/duniter4j-elasticsearch/src/test/es-home/config/shield/roles.yml b/duniter4j-elasticsearch/src/test/es-home/config/shield/roles.yml deleted file mode 100644 index 633156f7611ee1257c930a1d86a10828cd8e2fb7..0000000000000000000000000000000000000000 --- a/duniter4j-elasticsearch/src/test/es-home/config/shield/roles.yml +++ /dev/null @@ -1,71 +0,0 @@ -# All cluster rights -# All operations on all indices -admin: - cluster: all - indices: - '*': - privileges: all - -# monitoring cluster privileges -# All operations on all indices -power_user: - cluster: monitor - indices: - '*': - privileges: all - -# Read-only operations on indices -user: - indices: - '*': - privileges: read - -# Defines the required permissions for transport clients -transport_client: - cluster: - - cluster:monitor/nodes/liveness - #uncomment the following for sniffing - #- cluster:monitor/state - -# The required permissions for kibana 4 users. -kibana4: - cluster: - - cluster:monitor/nodes/info - - cluster:monitor/health - indices: - '*': - privileges: indices:admin/mappings/fields/get, indices:admin/validate/query, indices:data/read/search, indices:data/read/msearch, indices:data/read/field_stats, indices:admin/get - '.kibana': - privileges: indices:admin/exists, indices:admin/mapping/put, indices:admin/mappings/fields/get, indices:admin/refresh, indices:admin/validate/query, indices:data/read/get, indices:data/read/mget, indices:data/read/search, indices:data/write/delete, indices:data/write/index, indices:data/write/update - -# The required permissions for the kibana 4 server -kibana4_server: - cluster: - - cluster:monitor/nodes/info - - cluster:monitor/health - indices: - '.kibana': - privileges: indices:admin/create, indices:admin/exists, indices:admin/mapping/put, indices:admin/mappings/fields/get, indices:admin/refresh, indices:admin/validate/query, indices:data/read/get, indices:data/read/mget, indices:data/read/search, indices:data/write/delete, indices:data/write/index, indices:data/write/update - -# The required role for logstash users -logstash: - cluster: indices:admin/template/get, indices:admin/template/put - indices: - 'logstash-*': - privileges: indices:data/write/bulk, indices:data/write/delete, indices:data/write/update, indices:data/read/search, indices:data/read/scroll, create_index - -# Marvel user role. Assign to marvel users. -marvel_user: - indices: - '.marvel-es-*': - privileges: read - '.kibana': - privileges: indices:admin/exists, indices:admin/mappings/fields/get, indices:admin/validate/query, indices:data/read/get, indices:data/read/mget, indices:data/read/search - -# Marvel remote agent role. Assign to the agent user on the remote marvel cluster -# to which the marvel agent will export all its data -remote_marvel_agent: - cluster: indices:admin/template/put, indices:admin/template/get - indices: - '.marvel-es-*': - privileges: all \ No newline at end of file diff --git a/duniter4j-elasticsearch/src/test/es-home/config/shield/users b/duniter4j-elasticsearch/src/test/es-home/config/shield/users deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/duniter4j-elasticsearch/src/test/es-home/config/shield/users_roles b/duniter4j-elasticsearch/src/test/es-home/config/shield/users_roles deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/duniter4j-elasticsearch/src/test/java/org/duniter/elasticsearch/service/RegistryRecordIndexerServiceTest.java b/duniter4j-elasticsearch/src/test/java/org/duniter/elasticsearch/service/RegistryRecordIndexerServiceTest.java index 17e4ff0b46392e6a28460b605749d23907c194a9..c6f3c7a25d9bd4a8c211cf30e1abc16e52acf10d 100644 --- a/duniter4j-elasticsearch/src/test/java/org/duniter/elasticsearch/service/RegistryRecordIndexerServiceTest.java +++ b/duniter4j-elasticsearch/src/test/java/org/duniter/elasticsearch/service/RegistryRecordIndexerServiceTest.java @@ -23,15 +23,12 @@ package org.duniter.elasticsearch.service; */ import org.duniter.elasticsearch.TestResource; -import org.duniter.elasticsearch.service.registry.RecordRegistryService; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; - /** * Created by Benoit on 06/05/2015. */ @@ -41,16 +38,17 @@ public class RegistryRecordIndexerServiceTest { @ClassRule public static final TestResource resource = TestResource.create(); - private RecordRegistryService service; + private RegistryService service; @Before public void setUp() throws Exception { + // FIXME use google guice ? //service = ServiceLocator.instance().getRegistryRecordIndexerService(); } @Test public void insertTestData() { - service.insertRecordFromBulkFile(new File("src/test/resources/registry-test-records.json")); + //service.insertRecordFromBulkFile(new File("src/test/resources/registry-test-records.json")); } } diff --git a/duniter4j-elasticsearch/src/test/java/org/duniter/elasticsearch/service/blockchain/BlockIndexerServiceTest.java b/duniter4j-elasticsearch/src/test/java/org/duniter/elasticsearch/service/blockchain/BlockIndexerServiceTest.java index b9a03c41b63eb31080d6d3949e1da9ff7b2efea1..dde01c73ee4e58b4afd72dec1a2e063580a509d4 100644 --- a/duniter4j-elasticsearch/src/test/java/org/duniter/elasticsearch/service/blockchain/BlockIndexerServiceTest.java +++ b/duniter4j-elasticsearch/src/test/java/org/duniter/elasticsearch/service/blockchain/BlockIndexerServiceTest.java @@ -28,6 +28,7 @@ import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.bma.BlockchainRemoteService; import org.duniter.elasticsearch.TestResource; +import org.duniter.elasticsearch.service.BlockchainService; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,7 @@ public class BlockIndexerServiceTest { @ClassRule public static final TestResource resource = TestResource.create(); - private BlockBlockchainService service; + private BlockchainService service; private BlockchainRemoteService blockchainRemoteService; private Configuration config; private Peer peer; diff --git a/pom.xml b/pom.xml index c5ca982a3f17d52961fb3f57934def6b74d285a2..65dacf61448731d295bf0dd90df587c9277a8ace 100644 --- a/pom.xml +++ b/pom.xml @@ -10,6 +10,7 @@ <properties> <!-- source file encoding --> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <javaVersion>8</javaVersion> <!-- Commons versions --> <file.encoding>UTF-8</file.encoding> @@ -434,8 +435,8 @@ <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> - <source>1.7</source> - <target>1.7</target> + <source>${javaVersion}</source> + <target>${javaVersion}</target> <optimize>true</optimize> <debug>true</debug> <encoding>${project.build.sourceEncoding}</encoding> @@ -697,19 +698,13 @@ <configuration> <addSvnKeyWords>false</addSvnKeyWords> <excludes> - <exclude>**/site/**/*.*</exclude> - <exclude>**/misc/**/*.*</exclude> - <exclude>**/webapp/js/jquery-mobile/**/*.*</exclude> - <exclude>**/webapp/css/jquery-mobile/**/*.*</exclude> - <exclude>**/webapp/META-INF/**/*.*</exclude> - <exclude>**/webapp/WEB-INF/**/*.*</exclude> + <exclude>**/*.json</exclude> <exclude>**/*.properties</exclude> <exclude>**/*.xml</exclude> - <exclude>**/jquery.tokeninput.js</exclude> - <exclude>**/jquery.watermark.js</exclude> - <exclude>**/jquery.qtip.*</exclude> - <!-- since sh scripts must begins by the line #!/bin/sh, can not use the mojo for the mojo --> <exclude>**/*.sh</exclude> + <exclude>**/site/**/*.*</exclude> + <exclude>**/misc/**/*.*</exclude> + <exclude>**/es-home/**/*.*</exclude> </excludes> </configuration> <phase>process-resources</phase>