diff --git a/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/StringUtils.java b/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/StringUtils.java index ac3d50f590a68b006c8ca7636dae060a3226a3d6..b47620917b5fd32c7d30f7e3f2ff2df679b79f8a 100644 --- a/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/StringUtils.java +++ b/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/StringUtils.java @@ -27,6 +27,9 @@ package io.ucoin.ucoinj.core.util; */ public class StringUtils { + // FEFF because this is the Unicode char represented by the UTF-8 byte order mark (EF BB BF). + public static final String UTF8_BOM = "\uFEFF"; + public static boolean isNotBlank(String value) { return value != null && value.trim().length() > 0; } @@ -64,4 +67,11 @@ public class StringUtils { public static boolean equals(String cs1, String cs2) { return cs1 == null?cs2 == null:cs1.equals(cs2); } + + public static String removeUTF8BOM(String s) { + if (s.startsWith(UTF8_BOM)) { + s = s.substring(1); + } + return s; + } } diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/action/IndexerAction.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/action/IndexerAction.java index 9955fbd5d56ad8820d486d89cb5db7fb27a8ab26..85831caed19acce807cb96e9d7f440ef7ab182f5 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/action/IndexerAction.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/action/IndexerAction.java @@ -84,7 +84,7 @@ public class IndexerAction { public void resetAllData() { resetDataBlocks(); resetMarketRecords(); - resetRegistryRecords(); + resetRegistry(); } public void resetDataBlocks() { @@ -140,9 +140,10 @@ public class IndexerAction { } } - public void resetRegistryRecords() { + public void resetRegistry() { RegistryRecordIndexerService recordIndexerService = ServiceLocator.instance().getRegistryRecordIndexerService(); RegistryCategoryIndexerService categoryIndexerService = ServiceLocator.instance().getRegistryCategoryIndexerService(); + RegistryCitiesIndexerService citiesIndexerService = ServiceLocator.instance().getRegistryCitiesIndexerService(); try { // Delete then create index on records @@ -153,31 +154,15 @@ public class IndexerAction { log.info(String.format("Successfully reset registry records")); categoryIndexerService.createIndex(); + categoryIndexerService.initCategories(); log.info(String.format("Successfully re-initialized registry categories")); - } catch(Exception e) { - log.error("Error during reset registry records: " + e.getMessage(), e); - } - } - - public void resetCities() { - RegistryCitiesIndexerService service = ServiceLocator.instance().getRegistryCitiesIndexerService(); - - try { - // Delete then create index on records - boolean indexExists = service.existsIndex(); - if (indexExists) { - service.deleteIndex(); - } - log.info(String.format("Successfully reset registry cities")); - - service.createIndex(); - service.initCities(); + citiesIndexerService.initCities(); log.info(String.format("Successfully re-initialized registry cities")); } catch(Exception e) { - log.error("Error during reset registry cities: " + e.getMessage(), e); + log.error("Error during reset registry records: " + e.getMessage(), e); } } diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/Configuration.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/Configuration.java index 6dc7de12f7fdcc7d93eb338f243702e8785df756..875e258b9cd35db2198c7a9e3ecfd364245df4fb 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/Configuration.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/Configuration.java @@ -210,6 +210,12 @@ public class Configuration { return result; } + /** @return {@link ConfigurationOption#TEMP_DIRECTORY} value */ + public File getTempDirectory() { + File result = applicationConfig.getOptionAsFile(ConfigurationOption.TEMP_DIRECTORY.getKey()); + return result; + } + /** @return {@link ConfigurationOption#PLUGINS_DIRECTORY} value */ public File getPluginsDirectory() { File result = applicationConfig.getOptionAsFile(ConfigurationOption.PLUGINS_DIRECTORY.getKey()); diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationAction.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationAction.java index d53b353b0f49e917eb05f3af4f770c27cf2c0782..c092948c41af175f771b6723d186a0a8f2e30fea 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationAction.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationAction.java @@ -43,9 +43,7 @@ public enum ConfigurationAction implements ConfigActionDef { RESET_MARKET(IndexerAction.class.getName() + "#resetMarketRecords", "reset-market"), - RESET_REGISTRY(IndexerAction.class.getName() + "#resetRegistryRecords", "reset-registry"), - - RESET_CITIES(IndexerAction.class.getName() + "#resetCities", "reset-cities"); + RESET_REGISTRY(IndexerAction.class.getName() + "#resetRegistry", "reset-registry"); public String action; public String[] aliases; diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationOption.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationOption.java index 957492dec68fc4b5438ed4ed8275be70bd436bc1..7aef5d5a461291373c0b8680f909cf80f53ed630 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationOption.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationOption.java @@ -50,7 +50,7 @@ public enum ConfigurationOption implements ConfigOptionDef { BASEDIR( "ucoinj.basedir", n("ucoinj.config.option.basedir.description"), - "${user.home}/.ucoinj-elasticsearch", + "${user.home}/.config/duniter-es", File.class), DATA_DIRECTORY( @@ -59,6 +59,12 @@ public enum ConfigurationOption implements ConfigOptionDef { "${ucoinj.basedir}/data", File.class), + TEMP_DIRECTORY( + "ucoinj.temp.directory", + n("ucoinj.config.option.temp.directory.description"), + "${ucoinj.basedir}/temp", + File.class), + PLUGINS_DIRECTORY( "ucoinj.plugins.directory", n("ucoinj.config.option.plugins.directory.description"), diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BaseIndexerService.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BaseIndexerService.java index b0eb6b0b2631b86b6c50a33c5872b7fff01ef3cf..e0f9367c5694e1835a53b66811b22aa3127cd305 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BaseIndexerService.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BaseIndexerService.java @@ -24,6 +24,7 @@ package io.ucoin.ucoinj.elasticsearch.service; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import io.ucoin.ucoinj.core.beans.Bean; import io.ucoin.ucoinj.core.beans.InitializingBean; import io.ucoin.ucoinj.core.exception.TechnicalException; @@ -110,18 +111,62 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos } protected void bulkFromClasspathFile(String classpathFile, String indexName, String indexType) { - BulkRequest bulkRequest = Requests.bulkRequest(); - - InputStream ris = null; + InputStream is = null; try { - ris = getClass().getClassLoader().getResourceAsStream(classpathFile); - if (ris == null) { + is = getClass().getClassLoader().getResourceAsStream(classpathFile); + if (is == null) { throw new TechnicalException(String.format("Could not retrieve data file [%s] need to fill index [%s]: ", classpathFile, indexName)); } + bulkFromStream(is, indexName, indexType); + } + finally { + if (is != null) { + try { + is.close(); + } + catch(IOException e) { + // Silent is gold + } + } + } + } + + protected void bulkFromFile(File file, String indexName, String indexType) { + Preconditions.checkNotNull(file); + Preconditions.checkArgument(file.exists()); + + InputStream is = null; + try { + is = new BufferedInputStream(new FileInputStream(file)); + bulkFromStream(is, indexName, indexType); + } + catch(FileNotFoundException e) { + throw new TechnicalException(String.format("[%s] Could not find file %s", indexName, file.getPath()), e); + } + finally { + if (is != null) { + try { + is.close(); + } + catch(IOException e) { + // Silent is gold + } + } + } + } + + protected void bulkFromStream(InputStream is, String indexName, String indexType) { + Preconditions.checkNotNull(is); + BulkRequest bulkRequest = Requests.bulkRequest(); + + BufferedReader br = null; + + try { + br = new BufferedReader(new InputStreamReader(is)); + + String line = br.readLine(); StringBuilder builder = new StringBuilder(); - BufferedReader bf = new BufferedReader(new InputStreamReader(ris)); - String line = bf.readLine(); while(line != null) { if (StringUtils.isNotBlank(line)) { if (log.isTraceEnabled()) { @@ -129,7 +174,7 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos } builder.append(line).append('\n'); } - line = bf.readLine(); + line = br.readLine(); } byte[] data = builder.toString().getBytes(); @@ -139,9 +184,9 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", indexName, indexType), e); } finally { - if (ris != null) { + if (br != null) { try { - ris.close(); + br.close(); } catch(IOException e) { // Silent is gold diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/registry/RegistryCitiesIndexerService.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/registry/RegistryCitiesIndexerService.java index 136496efce6e3e8a5aff1406d3d270dfaa63077e..c31699ee095bd777a06eb6bdcfb7244a540b6912 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/registry/RegistryCitiesIndexerService.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/registry/RegistryCitiesIndexerService.java @@ -31,23 +31,16 @@ import io.ucoin.ucoinj.core.exception.TechnicalException; import io.ucoin.ucoinj.core.util.StringUtils; import io.ucoin.ucoinj.elasticsearch.config.Configuration; import io.ucoin.ucoinj.elasticsearch.service.BaseIndexerService; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Requests; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.HashMap; +import java.io.*; import java.util.Map; /** @@ -57,7 +50,9 @@ public class RegistryCitiesIndexerService extends BaseIndexerService { private static final Logger log = LoggerFactory.getLogger(RegistryCitiesIndexerService.class); - private static final String CITIES_CLASSPATH_FILE = "cities/countriesToCities.json"; + private static final String CITIES_BULK_FILENAME = "registry-cities-bulk-insert.json"; + + private static final String CITIES_SOURCE_CLASSPATH_FILE = "cities/countriesToCities.json"; public static final String INDEX_NAME = "registry"; public static final String INDEX_TYPE = "city"; @@ -133,66 +128,10 @@ public class RegistryCitiesIndexerService extends BaseIndexerService { log.debug("Initializing all registry cities"); } - // Insert cities - BulkRequest bulkRequest = Requests.bulkRequest(); - - InputStream ris = null; - try { - ris = getClass().getClassLoader().getResourceAsStream(CITIES_CLASSPATH_FILE); - if (ris == null) { - throw new TechnicalException(String.format("Could not retrieve data file [%s] need to fill index [%s/%s]: ", CITIES_CLASSPATH_FILE, INDEX_NAME, INDEX_TYPE)); - } - - StringBuilder builder = new StringBuilder(); - BufferedReader bf = new BufferedReader(new InputStreamReader(ris), 2048); - char[] buf = new char[2048]; - int len; - boolean ignoreBOM = true; - while((len = bf.read(buf)) != -1) { - if (ignoreBOM) { - builder.append(buf, 2, len-2); - ignoreBOM = false; - } - else { - builder.append(buf, 0, len); - } - } - - - java.lang.reflect.Type typeOfHashMap = new TypeToken<Map<String, String[]>>() { }.getType(); - - Map<String, String[]> cities = new HashMap<>(); - cities.put("chine", new String[]{"ABC", "def"}); - String json = gson.toJson(cities, typeOfHashMap); - log.debug(String.format("test json: ", json)); - log.debug(String.format("test json: ", builder.substring(0,19))); - - Map<String, String[]> citiesByCountry = gson.fromJson(builder.toString(), typeOfHashMap); - if (log.isDebugEnabled()) { - log.debug(String.format("Register %s countries", citiesByCountry.size())); - } - - //bulkRequest.add(new BytesArray(data), indexName, indexType, false); - - } catch(Exception e) { - throw new TechnicalException(String.format("[%s] Error while inserting rows", INDEX_TYPE), e); - } - finally { - if (ris != null) { - try { - ris.close(); - } - catch(IOException e) { - // Silent is gold - } - } - } + File bulkFile = createCitiesBulkFile(); - try { - getClient().bulk(bulkRequest).actionGet(); - } catch(Exception e) { - throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", INDEX_NAME, INDEX_TYPE), e); - } + // Insert cities + bulkFromFile(bulkFile, INDEX_NAME, INDEX_TYPE); } /* -- Internal methods -- */ @@ -204,7 +143,7 @@ public class RegistryCitiesIndexerService extends BaseIndexerService { .startObject("properties") // city - .startObject("city") + .startObject("name") .field("type", "string") .endObject() @@ -222,4 +161,104 @@ public class RegistryCitiesIndexerService extends BaseIndexerService { throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, INDEX_TYPE, ioe.getMessage()), ioe); } } + + public File createCitiesBulkFile() { + + File result = new File(config.getTempDirectory(), CITIES_BULK_FILENAME); + + InputStream ris = null; + BufferedReader bf = null; + FileWriter fw = null; + try { + if (result.exists()) { + FileUtils.forceDelete(result); + } + else if (!result.getParentFile().exists()) { + FileUtils.forceMkdir(result.getParentFile()); + } + + ris = getClass().getClassLoader().getResourceAsStream(CITIES_SOURCE_CLASSPATH_FILE); + if (ris == null) { + throw new TechnicalException(String.format("Could not retrieve file [%s] from test classpath. Make sure git submodules has been initialized before building.", CITIES_SOURCE_CLASSPATH_FILE)); + } + + boolean firstLine = true; + java.lang.reflect.Type typeOfHashMap = new TypeToken<Map<String, String[]>>() { }.getType(); + + Gson gson = GsonUtils.newBuilder().create(); + + StringBuilder builder = new StringBuilder(); + bf = new BufferedReader( + new InputStreamReader( + ris, "UTF-16LE"), 2048); + + fw = new FileWriter(result); + char[] buf = new char[2048]; + int len; + + while((len = bf.read(buf)) != -1) { + String bufStr = new String(buf, 0, len); + + if (firstLine) { + // Remove UTF-16 BOM char + int objectStartIndex = bufStr.indexOf('\uFEFF'); + if (objectStartIndex != -1) { + bufStr = bufStr.substring(objectStartIndex); + } + firstLine=false; + } + + int arrayEndIndex = bufStr.indexOf("],\""); + if (arrayEndIndex == -1) { + arrayEndIndex = bufStr.indexOf("]}"); + } + + if (arrayEndIndex == -1) { + builder.append(bufStr); + } + else { + builder.append(bufStr.substring(0, arrayEndIndex+1)); + builder.append("}"); + if (log.isTraceEnabled()) { + log.trace(builder.toString()); + } + Map<String, String[]> citiesByCountry = gson.fromJson(builder.toString(), typeOfHashMap); + + builder.setLength(0); + for (String country: citiesByCountry.keySet()) { + if (StringUtils.isNotBlank(country)) { + for (String city : citiesByCountry.get(country)) { + if (StringUtils.isNotBlank(city)) { + fw.write(String.format("{\"index\":{\"_id\" : \"%s-%s\"}}\n", country, city)); + fw.write(String.format("{\"country\":\"%s\", \"name\":\"%s\"}\n", country, city)); + } + } + } + } + + fw.flush(); + + // reset and prepare buffer for next country + builder.setLength(0); + builder.append("{"); + if (arrayEndIndex+2 < bufStr.length()) { + builder.append(bufStr.substring(arrayEndIndex+2)); + } + } + } + + fw.close(); + bf.close(); + + } catch(Exception e) { + throw new TechnicalException(String.format("Error while creating cities file [%s]", result.getName()), e); + } + finally { + IOUtils.closeQuietly(bf); + IOUtils.closeQuietly(ris); + IOUtils.closeQuietly(fw); + } + + return result; + } }