Skip to content
Snippets Groups Projects
Commit 26982b49 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

- Replace service beans creation

 - Code refactoring on ES service
 - Add ES start detection, then run init commands
parent 5c27bfc9
No related branches found
No related tags found
No related merge requests found
Showing
with 1159 additions and 357 deletions
......@@ -13,3 +13,4 @@ target
duniter4j-elasticsearch/src/main/misc/.index.sh.kate-swp
duniter4j-elasticsearch/src/main/misc/fr-cities.ods
duniter4j-elasticsearch/src/main/misc/geoflar-communes-2015.geojson
duniter4j.iml
\ No newline at end of file
......@@ -11,13 +11,24 @@ duniter4j has four main components :
- core-client: a Client API to access to a Duniter network.
- elasticsearch: a tools to index all blockchain and more.
- elasticsearch: a ES plugin, to store blockchain, registry, market and more.
- web: an web/mobile client, for data navigation, payment and more !
## Install
- Install ElasticSearch 2.3.3
- Install plugins :
/bin/plugin install elastic/elasticsearch
/bin/plugin install duniter/duniter4j
## Test it
The elasticsearch component is ready to use !
- Install Java JRE 8 or more.
......
......@@ -95,6 +95,10 @@ public class ServiceLocator implements Closeable {
return instance;
}
protected void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
public BlockchainRemoteService getBlockchainRemoteService() {
return getBean(BlockchainRemoteService.class);
}
......
......@@ -79,6 +79,7 @@ public class BeanFactory implements Closeable{
log.trace(String.format("Asking bean on type [%s]...", clazz.getName()));
}
synchronized (beansLoader) {
for (Bean bean : beansLoader) {
if (clazz.isInstance(bean)) {
......@@ -88,7 +89,9 @@ public class BeanFactory implements Closeable{
return (S) bean;
}
}
}
synchronized (beansClassMap) {
for (Map.Entry<Class<? extends Bean>, Class<? extends Bean>> beanDef : beansClassMap.entrySet()) {
if (log.isTraceEnabled()) {
log.trace(String.format(" Check against type [%s]", beanDef.getKey().getName()));
......@@ -109,6 +112,7 @@ public class BeanFactory implements Closeable{
}
}
}
}
throw new TechnicalException(String.format("Unable to create bean with type [%s]: not configured for the service loader [%s]", clazz.getName(), Bean.class.getCanonicalName()));
}
......
......@@ -17,6 +17,8 @@ log4j.logger.org.duniter.elasticsearch=DEBUG
# Other frameworks levels
log4j.logger.org.nuiton.util=WARN
log4j.logger.org.nuiton.config=WARN
log4j.logger.org.nuiton.converter=WARN
log4j.logger.org.nuiton.i18n=ERROR
log4j.logger.org.elasticsearch=WARN
#log4j.logger.org.elasticsearch=INFO
......
......@@ -24,7 +24,8 @@ package org.duniter.elasticsearch;
import com.google.common.collect.Lists;
import org.duniter.elasticsearch.action.RestModule;
import org.duniter.elasticsearch.job.BlockIndexer;
import org.duniter.elasticsearch.node.DuniterNode;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.security.SecurityModule;
import org.duniter.elasticsearch.service.ServiceModule;
import org.elasticsearch.common.component.LifecycleComponent;
......@@ -76,18 +77,9 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
if (disable) {
return components;
}
components.add(BlockIndexer.class);
//components.add(PluginSettings.class);
// Market
//components.add(CategoryMarketService.class);
//components.add(RecordMarketService.class);
// Registry
//components.add(CurrencyRegistryService.class);
//components.add(CategoryRegistryService.class);
//components.add(CitiesRegistryService.class);
//components.add(RecordRegistryService.class);
// BC
//components.add(BlockBlockchainService.class);
components.add(PluginSettings.class);
components.add(ThreadPool.class);
components.add(DuniterNode.class);
return components;
}
......
......@@ -23,16 +23,25 @@ package org.duniter.elasticsearch;
*/
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.io.FileUtils;
import org.duniter.core.client.config.Configuration;
import org.duniter.core.client.config.ConfigurationOption;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.duniter.core.client.config.ConfigurationProvider;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.service.ServiceLocator;
import org.elasticsearch.common.component.*;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.nuiton.config.ApplicationConfig;
import org.nuiton.config.ApplicationConfigHelper;
import org.nuiton.config.ApplicationConfigProvider;
import org.nuiton.config.ArgumentsParserException;
import org.nuiton.i18n.I18n;
import org.nuiton.i18n.init.DefaultI18nInitializer;
import org.nuiton.i18n.init.UserI18nInitializer;
......@@ -40,39 +49,100 @@ import org.nuiton.i18n.init.UserI18nInitializer;
import java.io.File;
import java.io.IOException;
import java.util.Locale;
import java.util.Set;
import static org.nuiton.i18n.I18n.t;
/**
* Access to configuration options
* @author Benoit Lavenier <benoit.lavenier@e-is.pro>
* @since 1.0
*/
public class PluginSettings {
/** Logger. */
private ESLogger log = ESLoggerFactory.getLogger(PluginSettings.class.getName());
public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
private org.elasticsearch.common.settings.Settings settings;
private Settings settings;
/**
* Delegate application config.
*/
protected final ApplicationConfig applicationConfig;
protected final org.duniter.core.client.config.Configuration clientConfig;
@Inject
public PluginSettings(org.elasticsearch.common.settings.Settings settings) {
super(settings);
this.settings = settings;
this.applicationConfig = new ApplicationConfig();
// Cascade the application config to the client module
org.duniter.core.client.config.Configuration clientConfig = new org.duniter.core.client.config.Configuration(applicationConfig);
org.duniter.core.client.config.Configuration.setInstance(clientConfig);
clientConfig = new org.duniter.core.client.config.Configuration(applicationConfig);
Configuration.setInstance(clientConfig);
}
@Override
protected void doStart() {
// get all config providers
Set<ApplicationConfigProvider> providers =
ImmutableSet.of(new ConfigurationProvider());
// load all default options
ApplicationConfigHelper.loadAllDefaultOption(applicationConfig,
providers);
// Ovverides defaults
String baseDir = settings.get("path.home");
applicationConfig.setDefaultOption(ConfigurationOption.BASEDIR.getKey(), baseDir);
applicationConfig.setDefaultOption(ConfigurationOption.NODE_HOST.getKey(), getNodeBmaHost());
applicationConfig.setDefaultOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getNodeBmaPort()));
applicationConfig.setDefaultOption(ConfigurationOption.NODE_PROTOCOL.getKey(), getNodeBmaPort() == 443 ? "https" : "http");
try {
applicationConfig.parse(new String[]{});
} catch (ArgumentsParserException e) {
throw new TechnicalException(t("duniter4j.config.parse.error"), e);
}
File appBasedir = applicationConfig.getOptionAsFile(
ConfigurationOption.BASEDIR.getKey());
if (appBasedir == null) {
appBasedir = new File("");
}
if (!appBasedir.isAbsolute()) {
appBasedir = new File(appBasedir.getAbsolutePath());
}
if (appBasedir.getName().equals("..")) {
appBasedir = appBasedir.getParentFile().getParentFile();
}
if (appBasedir.getName().equals(".")) {
appBasedir = appBasedir.getParentFile();
}
applicationConfig.setOption(
ConfigurationOption.BASEDIR.getKey(),
appBasedir.getAbsolutePath());
// Init i18n
try {
initI18n();
}
catch(IOException e) {
logger.error(String.format("Could not init i18n: %s", e.getMessage()), e);
}
}
@Override
protected void doStop() {
}
String baseDir = settings.get("es.path.home");
applicationConfig.setOption(ConfigurationOption.BASEDIR.getKey(), baseDir);
applicationConfig.setOption(ConfigurationOption.NODE_HOST.getKey(), getNodeBmaHost());
applicationConfig.setOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getNodeBmaPort()));
applicationConfig.setOption(ConfigurationOption.NODE_PROTOCOL.getKey(), getNodeBmaPort() == 443 ? "https" : "http");
@Override
protected void doClose() {
//initI18n();
}
public String getNodeBmaHost() {
......@@ -91,10 +161,14 @@ public class PluginSettings {
return settings.getAsInt("duniter.bulk.size", 1000);
}
public String getIndexStringAnalyzer() {
public String getDefaultStringAnalyzer() {
return settings.get("duniter.string.analyzer", "english");
}
public boolean reloadIndices() {
return settings.getAsBoolean("duniter.indices.reload", false);
}
public File getTempDirectory() {
return Configuration.instance().getTempDirectory();
}
......@@ -103,16 +177,31 @@ public class PluginSettings {
return settings.getAsBoolean("duniter.dev.enable", false);
}
/* */
public Peer checkAndGetPeer() {
if (StringUtils.isBlank(getNodeBmaHost())) {
logger.error("ERROR: node host is required");
System.exit(-1);
return null;
}
if (getNodeBmaPort() <= 0) {
logger.error("ERROR: node port is required");
System.exit(-1);
return null;
}
Peer peer = new Peer(getNodeBmaHost(), getNodeBmaPort());
return peer;
}
/* protected methods */
protected void initI18n() throws IOException {
Configuration config = Configuration.instance();
// --------------------------------------------------------------------//
// init i18n
// --------------------------------------------------------------------//
File i18nDirectory = new File(Configuration.instance().getDataDirectory(), "i18n");
File i18nDirectory = new File(clientConfig.getDataDirectory(), "i18n");
if (i18nDirectory.exists()) {
// clean i18n cache
FileUtils.cleanDirectory(i18nDirectory);
......@@ -120,14 +209,14 @@ public class PluginSettings {
FileUtils.forceMkdir(i18nDirectory);
if (log.isDebugEnabled()) {
log.debug("I18N directory: " + i18nDirectory);
if (logger.isDebugEnabled()) {
logger.debug("I18N directory: " + i18nDirectory);
}
Locale i18nLocale = config.getI18nLocale();
Locale i18nLocale = clientConfig.getI18nLocale();
if (log.isInfoEnabled()) {
log.info(String.format("Starts i18n with locale [%s] at [%s]",
if (logger.isInfoEnabled()) {
logger.info(String.format("Starts i18n with locale [%s] at [%s]",
i18nLocale, i18nDirectory));
}
I18n.init(new UserI18nInitializer(
......
......@@ -23,7 +23,7 @@ package org.duniter.elasticsearch.action.market;
*/
import org.duniter.core.exception.BusinessException;
import org.duniter.elasticsearch.service.market.RecordMarketService;
import org.duniter.elasticsearch.service.MarketService;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
......@@ -39,10 +39,10 @@ public class RestMarketRecordIndexAction extends BaseRestHandler {
private static final ESLogger log = ESLoggerFactory.getLogger(RestMarketRecordIndexAction.class.getName());
private RecordMarketService service;
private MarketService service;
@Inject
public RestMarketRecordIndexAction(Settings settings, RestController controller, Client client, RecordMarketService service) {
public RestMarketRecordIndexAction(Settings settings, RestController controller, Client client, MarketService service) {
super(settings, controller, client);
controller.registerHandler(POST, "/market/record", this);
this.service = service;
......
......@@ -23,7 +23,7 @@ package org.duniter.elasticsearch.action.registry;
*/
import org.duniter.core.exception.BusinessException;
import org.duniter.elasticsearch.service.registry.RecordRegistryService;
import org.duniter.elasticsearch.service.RegistryService;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
......@@ -39,10 +39,10 @@ public class RestRegistryRecordIndexAction extends BaseRestHandler {
private static final ESLogger log = ESLoggerFactory.getLogger(RestRegistryRecordIndexAction.class.getName());
private RecordRegistryService service;
private RegistryService service;
@Inject
public RestRegistryRecordIndexAction(Settings settings, RestController controller, Client client, RecordRegistryService service) {
public RestRegistryRecordIndexAction(Settings settings, RestController controller, Client client, RegistryService service) {
super(settings, controller, client);
controller.registerHandler(POST, "/registry/record", this);
this.service = service;
......
......@@ -41,7 +41,7 @@ public class IndexerCliAction {
public void run() {
PluginSettings config = PluginSettings.instance();
final Peer peer = checkConfigAndGetPeer(config);
final BlockBlockchainService blockIndexerService = ServiceLocator.instance().getBlockIndexerService();
final BlockchainService blockIndexerService = ServiceLocator.instance().getBlockIndexerService();
// Will create the blockchain if not exist
blockIndexerService.indexLastBlocks(peer);
......@@ -84,7 +84,7 @@ public class IndexerCliAction {
public void resetDataBlocks() {
BlockchainRemoteService blockchainService = ServiceLocator.instance().getBlockchainRemoteService();
BlockBlockchainService indexerService = ServiceLocator.instance().getBlockIndexerService();
BlockchainService indexerService = ServiceLocator.instance().getBlockIndexerService();
PluginSettings config = PluginSettings.instance();
Peer peer = checkConfigAndGetPeer(config);
......
package org.duniter.elasticsearch.service.exception;
package org.duniter.elasticsearch.exception;
/*
* #%L
......
package org.duniter.elasticsearch.service.exception;
package org.duniter.elasticsearch.exception;
/*
* #%L
......@@ -26,6 +26,7 @@ package org.duniter.elasticsearch.service.exception;
import org.duniter.core.exception.BusinessException;
/**
*
* Created by Benoit on 03/04/2015.
*/
public class DuplicateIndexIdException extends BusinessException{
......
package org.duniter.elasticsearch.service.exception;
package org.duniter.elasticsearch.exception;
/*
* #%L
......
package org.duniter.elasticsearch.service.exception;
package org.duniter.elasticsearch.exception;
/*
* #%L
......
package org.duniter.elasticsearch.node;
import org.duniter.core.client.model.local.Peer;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.service.MarketService;
import org.duniter.elasticsearch.service.RegistryService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.settings.Settings;
/**
* Created by blavenie on 17/06/16.
*/
public class DuniterNode extends AbstractLifecycleComponent<DuniterNode> {
private final PluginSettings pluginSettings;
@Inject
public DuniterNode(Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) {
super(settings);
this.pluginSettings = pluginSettings;
threadPool.scheduleOnStarted(() -> {
createIndices(injector);
});
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
protected void createIndices(Injector injector) {
if (logger.isInfoEnabled()) {
logger.info("Creating Duniter indices...");
}
boolean reloadIndices = pluginSettings.reloadIndices();
Peer peer = pluginSettings.checkAndGetPeer();
if (reloadIndices) {
injector.getInstance(RegistryService.class)
.deleteIndex()
.createIndexIfNotExists()
.fillRecordCategories()
.indexCurrencyFromPeer(peer);
injector.getInstance(MarketService.class)
.deleteIndex()
.createIndexIfNotExists()
.fillRecordCategories();
injector.getInstance(BlockchainService.class)
.indexLastBlocks(peer);
}
else {
injector.getInstance(RegistryService.class).createIndexIfNotExists();
injector.getInstance(MarketService.class).createIndexIfNotExists();
}
if (logger.isInfoEnabled()) {
logger.info("Duniter indices created.");
}
}
}
......@@ -25,6 +25,7 @@ package org.duniter.elasticsearch.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.duniter.core.beans.Bean;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.PluginSettings;
......@@ -37,12 +38,15 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
......@@ -56,97 +60,26 @@ import java.net.UnknownHostException;
/**
* Created by Benoit on 08/04/2015.
*/
public abstract class AbstractService<T> implements LifecycleComponent<T>{
public abstract class AbstractService implements Bean {
private static final ESLogger log = ESLoggerFactory.getLogger(AbstractService.class.getName());
private Lifecycle.State state;
private Client client;
private AdminClient admin;
private PluginSettings pluginSettings;
private ObjectMapper objectMapper;
protected final ESLogger logger;
protected final Client client;
protected final PluginSettings pluginSettings;
protected final ObjectMapper objectMapper;
@Inject
public AbstractService(Client client, PluginSettings pluginSettings) {
this.logger = Loggers.getLogger(getClass());
this.client = client;
this.pluginSettings = pluginSettings;
this.objectMapper = new ObjectMapper();
this.state = Lifecycle.State.INITIALIZED;
}
@Override
public Lifecycle.State lifecycleState() {
return state;
}
@Override
public void addLifecycleListener(LifecycleListener var1){
// TODO
}
@Override
public void removeLifecycleListener(LifecycleListener var1){
// TODO
}
@Override
public T start(){
state = Lifecycle.State.STARTED;
return (T)this;
}
@Override
public T stop(){
state = Lifecycle.State.STOPPED;
return (T)this;
}
@Override
public void close() {
state = Lifecycle.State.STOPPED;
}
/* -- protected methods -- */
protected void setState(Lifecycle.State state) {
this.state = state;
}
protected Client getClient() {
return client;
}
protected PluginSettings getPluginSettings() {
return pluginSettings;
}
protected ObjectMapper getObjectMapper() {
return objectMapper;
}
protected boolean existsIndex(String indexes) {
//if (admin == null) {
Settings.Builder settings = Settings.settingsBuilder()
.put("cluster.name", "duniter4j-elasticsearch")
.put("client.transport.sniff", true);
Client client = null;
try {
client = TransportClient.builder().settings(settings)
.build()
.addTransportAddress(new InetSocketTransportAddress(
InetAddress.getByName("localhost"), 9300));
} catch (UnknownHostException e) {
throw new TechnicalException(e);
}
// admin = client.admin();
//}
IndicesExistsRequestBuilder requestBuilder = client.admin().indices().prepareExists(indexes);
IndicesExistsResponse response = requestBuilder.execute().actionGet();
return response.isExists();
}
......@@ -154,7 +87,9 @@ public abstract class AbstractService<T> implements LifecycleComponent<T>{
if (!existsIndex(indexName)) {
return;
}
log.info(String.format("Deleting index [%s]", indexName));
if (logger.isInfoEnabled()) {
logger.info(String.format("Deleting index [%s]", indexName));
}
DeleteIndexRequestBuilder deleteIndexRequestBuilder = client.admin().indices().prepareDelete(indexName);
deleteIndexRequestBuilder.execute().actionGet();
......@@ -238,8 +173,8 @@ public abstract class AbstractService<T> implements LifecycleComponent<T>{
StringBuilder builder = new StringBuilder();
while(line != null) {
if (StringUtils.isNotBlank(line)) {
if (log.isTraceEnabled()) {
log.trace(String.format("[%s] Add to bulk: %s", indexName, line));
if (logger.isTraceEnabled()) {
logger.trace(String.format("[%s] Add to bulk: %s", indexName, line));
}
builder.append(line).append('\n');
}
......@@ -264,7 +199,7 @@ public abstract class AbstractService<T> implements LifecycleComponent<T>{
}
try {
getClient().bulk(bulkRequest).actionGet();
client.bulk(bulkRequest).actionGet();
} catch(Exception e) {
throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", indexName, indexType), e);
}
......
package org.duniter.elasticsearch.service.market;
package org.duniter.elasticsearch.service;
/*
* #%L
......@@ -32,18 +32,14 @@ import org.duniter.core.client.service.bma.WotRemoteService;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.service.AbstractService;
import org.duniter.elasticsearch.service.ServiceLocator;
import org.duniter.elasticsearch.service.exception.InvalidFormatException;
import org.duniter.elasticsearch.service.exception.InvalidSignatureException;
import org.duniter.elasticsearch.exception.InvalidFormatException;
import org.duniter.elasticsearch.exception.InvalidSignatureException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
......@@ -53,108 +49,197 @@ import java.util.Set;
/**
* Created by Benoit on 30/03/2015.
*/
@Singleton
public class RecordMarketService extends AbstractService<RecordMarketService> {
public class MarketService extends AbstractService {
private static final ESLogger log = ESLoggerFactory.getLogger(RecordMarketService.class.getName());
public static final String INDEX = "market";
public static final String RECORD_CATEGORY_TYPE = "category";
public static final String RECORD_TYPE = "record";
private static final String CATEGORIES_BULK_CLASSPATH_FILE = "market-categories-bulk-insert.json";
private static final String JSON_STRING_PROPERTY_REGEX = "[,]?[\"\\s\\n\\r]*%s[\"]?[\\s\\n\\r]*:[\\s\\n\\r]*\"[^\"]+\"";
public static final String INDEX_NAME = "market";
public static final String INDEX_TYPE = "record";
private WotRemoteService wotRemoteService;
private CryptoService cryptoService;
private ServiceLocator serviceLocator;
private WotRemoteService wotRemoteService;
@Inject
public RecordMarketService(Client client, PluginSettings config, ServiceLocator serviceLocator) {
super(client, config);
this.serviceLocator = serviceLocator;
}
@Override
public RecordMarketService start() {
wotRemoteService = serviceLocator.getWotRemoteService();
cryptoService = serviceLocator.getCryptoService();
return super.start();
}
@Override
public void close() {
wotRemoteService = null;
cryptoService = null;
super.close();
public MarketService(Client client, PluginSettings settings, CryptoService cryptoService, WotRemoteService wotRemoteService) {
super(client, settings);
this.cryptoService = cryptoService;
this.wotRemoteService = wotRemoteService;
}
/**
* Delete blockchain index, and all data
* @throws JsonProcessingException
*/
public void deleteIndex() throws JsonProcessingException {
deleteIndexIfExists(INDEX_NAME);
public MarketService deleteIndex() {
deleteIndexIfExists(INDEX);
return this;
}
public boolean existsIndex() {
return super.existsIndex(INDEX_NAME);
return super.existsIndex(INDEX);
}
/**
* Create index need for blockchain registry, if need
*/
public void createIndexIfNotExists() {
public MarketService createIndexIfNotExists() {
try {
if (!existsIndex(INDEX_NAME)) {
if (!existsIndex(INDEX)) {
createIndex();
}
}
catch(JsonProcessingException e) {
throw new TechnicalException(String.format("Error while creating index [%s]", INDEX_NAME));
throw new TechnicalException(String.format("Error while creating index [%s]", INDEX));
}
return this;
}
/**
* Create index need for record registry
* Create index need for category registry
* @throws JsonProcessingException
*/
public void createIndex() throws JsonProcessingException {
CreateIndexRequestBuilder createIndexRequestBuilder = getClient().admin().indices().prepareCreate(INDEX_NAME);
org.elasticsearch.common.settings.Settings indexSettings = org.elasticsearch.common.settings.Settings.settingsBuilder()
.put("number_of_shards", 3)
.put("number_of_replicas", 2)
public MarketService createIndex() throws JsonProcessingException {
logger.info(String.format("Creating index [%s/%s]", INDEX, RECORD_CATEGORY_TYPE));
CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(INDEX);
Settings indexSettings = Settings.settingsBuilder()
.put("number_of_shards", 2)
.put("number_of_replicas", 1)
//.put("analyzer", createDefaultAnalyzer())
.build();
// Create record index type
log.info(String.format("Creating index [%s/%s]", INDEX_NAME, INDEX_TYPE));
createIndexRequestBuilder.setSettings(indexSettings);
createIndexRequestBuilder.addMapping(INDEX_NAME, createIndexMapping(INDEX_TYPE));
createIndexRequestBuilder.addMapping(RECORD_CATEGORY_TYPE, createRecordCategoryType());
createIndexRequestBuilder.addMapping(RECORD_TYPE, createRecordType());
createIndexRequestBuilder.execute().actionGet();
return this;
}
/**
* Index a new record
* @param recordJson
* @return the record id
*
* @param jsonCategory
* @return the product id
*/
public String indexCategoryFromJson(String jsonCategory) {
if (logger.isDebugEnabled()) {
logger.debug("Indexing a category");
}
// Preparing indexBlocksFromNode
IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_CATEGORY_TYPE)
.setSource(jsonCategory);
// Execute indexBlocksFromNode
IndexResponse response = indexRequest
.setRefresh(false)
.execute().actionGet();
return response.getId();
}
public String indexRecordFromJson(String recordJson) {
return indexRecordFromJson(recordJson, INDEX_TYPE);
try {
JsonNode actualObj = objectMapper.readTree(recordJson);
Set<String> fieldNames = Sets.newHashSet(actualObj.fieldNames());
if (!fieldNames.contains(Record.PROPERTY_ISSUER)
|| !fieldNames.contains(Record.PROPERTY_SIGNATURE)) {
throw new InvalidFormatException("Invalid record JSON format. Required fields [issuer,signature]");
}
String issuer = actualObj.get(Record.PROPERTY_ISSUER).asText();
String signature = actualObj.get(Record.PROPERTY_SIGNATURE).asText();
String recordNoSign = recordJson.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_SIGNATURE), "")
.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), "");
if (!cryptoService.verify(recordNoSign, signature, issuer)) {
throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign);
}
// TODO : check if issuer is a valid member
//wotRemoteService.getRequirments();
if (logger.isDebugEnabled()) {
logger.debug(String.format("Indexing a record from issuer [%s]", issuer.substring(0, 8)));
}
}
catch(IOException | JsonSyntaxException e) {
throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e);
}
// Preparing indexBlocksFromNode
IndexRequestBuilder indexRequest = client.prepareIndex(INDEX, RECORD_TYPE)
.setSource(recordJson);
// Execute indexBlocksFromNode
IndexResponse response = indexRequest
.setRefresh(false)
.execute().actionGet();
return response.getId();
}
public void fillRecordCategories() {
if (logger.isDebugEnabled()) {
logger.debug(String.format("[%s/%s] fill data", INDEX, RECORD_CATEGORY_TYPE));
}
// Insert categories
bulkFromClasspathFile(CATEGORIES_BULK_CLASSPATH_FILE, INDEX, RECORD_CATEGORY_TYPE);
}
/* -- Internal methods -- */
public XContentBuilder createIndexMapping(String indexType) {
String stringAnalyzer = getPluginSettings().getIndexStringAnalyzer();
public XContentBuilder createRecordCategoryType() {
try {
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_CATEGORY_TYPE)
.startObject("properties")
// name
.startObject("name")
.field("type", "string")
.endObject()
// description
/*.startObject("description")
.field("type", "string")
.endObject()*/
// parent
.startObject("parent")
.field("type", "string")
.endObject()
// tags
/*.startObject("tags")
.field("type", "completion")
.field("search_analyzer", "simple")
.field("analyzer", "simple")
.field("preserve_separators", "false")
.endObject()*/
.endObject()
.endObject().endObject();
return mapping;
}
catch(IOException ioe) {
throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_CATEGORY_TYPE, ioe.getMessage()), ioe);
}
}
public XContentBuilder createRecordType() {
String stringAnalyzer = pluginSettings.getDefaultStringAnalyzer();
try {
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(indexType)
XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_TYPE)
.startObject("properties")
// title
......@@ -205,11 +290,13 @@ public class RecordMarketService extends AbstractService<RecordMarketService> {
.field("type", "nested")
.startObject("properties")
.startObject("src") // src
.field("type", "attachment")
// FISME : add attachment plugin
//.field("type", "attachment")
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
.startObject("title") // title
.field("title", "string")
.field("type", "string")
.field("analyzer", stringAnalyzer)
.startObject("norms") // disabled norms on title
.field("enabled", "false")
......@@ -247,52 +334,8 @@ public class RecordMarketService extends AbstractService<RecordMarketService> {
return mapping;
}
catch(IOException ioe) {
throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, indexType, ioe.getMessage()), ioe);
throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_TYPE, ioe.getMessage()), ioe);
}
}
public String indexRecordFromJson(String recordJson, String indexType) {
try {
JsonNode actualObj = getObjectMapper().readTree(recordJson);
Set<String> fieldNames = Sets.newHashSet(actualObj.fieldNames());
if (!fieldNames.contains(Record.PROPERTY_ISSUER)
|| !fieldNames.contains(Record.PROPERTY_SIGNATURE)) {
throw new InvalidFormatException("Invalid record JSON format. Required fields [issuer,signature]");
}
String issuer = actualObj.get(Record.PROPERTY_ISSUER).asText();
String signature = actualObj.get(Record.PROPERTY_SIGNATURE).asText();
String recordNoSign = recordJson.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_SIGNATURE), "")
.replaceAll(String.format(JSON_STRING_PROPERTY_REGEX, Record.PROPERTY_HASH), "");
if (!cryptoService.verify(recordNoSign, signature, issuer)) {
throw new InvalidSignatureException("Invalid signature for JSON string: " + recordNoSign);
}
// TODO : check if issuer is a valid member
//wotRemoteService.getRequirments();
if (log.isDebugEnabled()) {
log.debug(String.format("Indexing a record from issuer [%s]", issuer.substring(0, 8)));
}
}
catch(IOException | JsonSyntaxException e) {
throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e);
}
// Preparing indexBlocksFromNode
IndexRequestBuilder indexRequest = getClient().prepareIndex(INDEX_NAME, indexType)
.setSource(recordJson);
// Execute indexBlocksFromNode
IndexResponse response = indexRequest
.setRefresh(false)
.execute().actionGet();
return response.getId();
}
}
......@@ -23,6 +23,7 @@ package org.duniter.elasticsearch.service;
*/
import org.duniter.core.beans.Bean;
import org.duniter.core.beans.BeanFactory;
import org.duniter.core.client.dao.CurrencyDao;
import org.duniter.core.client.dao.PeerDao;
......@@ -41,23 +42,33 @@ import org.duniter.core.service.CryptoService;
import org.duniter.core.service.Ed25519CryptoServiceImpl;
import org.duniter.elasticsearch.PluginSettings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.ServiceLoader;
@Singleton
public class ServiceLocator
extends org.duniter.core.client.service.ServiceLocator
{
private static final ESLogger log = ESLoggerFactory.getLogger(ServiceLocator.class.getName());
private static final ESLogger logger = ESLoggerFactory.getLogger(ServiceLocator.class.getName());
@Inject
public ServiceLocator(Injector injector) {
super();
if (logger.isDebugEnabled()) {
logger.debug("Starting Duniter4j client ServiceLocator...");
}
//setBeanFactory(new BeanFactory(injector));
setBeanFactory(createBeanFactory());
public ServiceLocator() {
super(createBeanFactory());
log.info("Starting ServiceLocator (guice)");
org.duniter.core.client.service.ServiceLocator.setInstance(this);
log.info("Starting ServiceLocator [OK]");
}
@Override
......@@ -73,8 +84,20 @@ public class ServiceLocator
/* -- Internal methods -- */
protected static BeanFactory createBeanFactory() {
BeanFactory beanFactory = new BeanFactory()
class BeanFactory extends org.duniter.core.beans.BeanFactory{
private final Injector injector;
public BeanFactory(Injector injector) {
super();
this.injector = injector;
}
public <S extends Bean> S newBean(Class<S> clazz) {
return injector.getInstance(clazz);
}
}
protected static org.duniter.core.beans.BeanFactory createBeanFactory() {
org.duniter.core.beans.BeanFactory beanFactory = new org.duniter.core.beans.BeanFactory()
.bind(BlockchainRemoteService.class, BlockchainRemoteServiceImpl.class)
.bind(NetworkRemoteService.class, NetworkRemoteServiceImpl.class)
.bind(WotRemoteService.class, WotRemoteServiceImpl.class)
......@@ -88,4 +111,17 @@ public class ServiceLocator
.add(DataContext.class);
return beanFactory;
}
public static class Provider<T extends Bean> implements org.elasticsearch.common.inject.Provider<T> {
private final Class<T> clazz;
public Provider(Class<T> clazz) {
this.clazz = clazz;
}
public T get() {
return ServiceLocator.instance().getBean(clazz);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment