Skip to content
Snippets Groups Projects
Commit 8f0789d4 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

ES: fill registry/city index type, with world cities

parent 5799c9a2
No related branches found
No related tags found
No related merge requests found
...@@ -27,6 +27,9 @@ package io.ucoin.ucoinj.core.util; ...@@ -27,6 +27,9 @@ package io.ucoin.ucoinj.core.util;
*/ */
public class StringUtils { public class StringUtils {
// FEFF because this is the Unicode char represented by the UTF-8 byte order mark (EF BB BF).
public static final String UTF8_BOM = "\uFEFF";
public static boolean isNotBlank(String value) { public static boolean isNotBlank(String value) {
return value != null && value.trim().length() > 0; return value != null && value.trim().length() > 0;
} }
...@@ -64,4 +67,11 @@ public class StringUtils { ...@@ -64,4 +67,11 @@ public class StringUtils {
public static boolean equals(String cs1, String cs2) { public static boolean equals(String cs1, String cs2) {
return cs1 == null?cs2 == null:cs1.equals(cs2); return cs1 == null?cs2 == null:cs1.equals(cs2);
} }
public static String removeUTF8BOM(String s) {
if (s.startsWith(UTF8_BOM)) {
s = s.substring(1);
}
return s;
}
} }
...@@ -84,7 +84,7 @@ public class IndexerAction { ...@@ -84,7 +84,7 @@ public class IndexerAction {
public void resetAllData() { public void resetAllData() {
resetDataBlocks(); resetDataBlocks();
resetMarketRecords(); resetMarketRecords();
resetRegistryRecords(); resetRegistry();
} }
public void resetDataBlocks() { public void resetDataBlocks() {
...@@ -140,9 +140,10 @@ public class IndexerAction { ...@@ -140,9 +140,10 @@ public class IndexerAction {
} }
} }
public void resetRegistryRecords() { public void resetRegistry() {
RegistryRecordIndexerService recordIndexerService = ServiceLocator.instance().getRegistryRecordIndexerService(); RegistryRecordIndexerService recordIndexerService = ServiceLocator.instance().getRegistryRecordIndexerService();
RegistryCategoryIndexerService categoryIndexerService = ServiceLocator.instance().getRegistryCategoryIndexerService(); RegistryCategoryIndexerService categoryIndexerService = ServiceLocator.instance().getRegistryCategoryIndexerService();
RegistryCitiesIndexerService citiesIndexerService = ServiceLocator.instance().getRegistryCitiesIndexerService();
try { try {
// Delete then create index on records // Delete then create index on records
...@@ -153,31 +154,15 @@ public class IndexerAction { ...@@ -153,31 +154,15 @@ public class IndexerAction {
log.info(String.format("Successfully reset registry records")); log.info(String.format("Successfully reset registry records"));
categoryIndexerService.createIndex(); categoryIndexerService.createIndex();
categoryIndexerService.initCategories(); categoryIndexerService.initCategories();
log.info(String.format("Successfully re-initialized registry categories")); log.info(String.format("Successfully re-initialized registry categories"));
} catch(Exception e) { citiesIndexerService.initCities();
log.error("Error during reset registry records: " + e.getMessage(), e);
}
}
public void resetCities() {
RegistryCitiesIndexerService service = ServiceLocator.instance().getRegistryCitiesIndexerService();
try {
// Delete then create index on records
boolean indexExists = service.existsIndex();
if (indexExists) {
service.deleteIndex();
}
log.info(String.format("Successfully reset registry cities"));
service.createIndex();
service.initCities();
log.info(String.format("Successfully re-initialized registry cities")); log.info(String.format("Successfully re-initialized registry cities"));
} catch(Exception e) { } catch(Exception e) {
log.error("Error during reset registry cities: " + e.getMessage(), e); log.error("Error during reset registry records: " + e.getMessage(), e);
} }
} }
......
...@@ -210,6 +210,12 @@ public class Configuration { ...@@ -210,6 +210,12 @@ public class Configuration {
return result; return result;
} }
/** @return {@link ConfigurationOption#TEMP_DIRECTORY} value */
public File getTempDirectory() {
File result = applicationConfig.getOptionAsFile(ConfigurationOption.TEMP_DIRECTORY.getKey());
return result;
}
/** @return {@link ConfigurationOption#PLUGINS_DIRECTORY} value */ /** @return {@link ConfigurationOption#PLUGINS_DIRECTORY} value */
public File getPluginsDirectory() { public File getPluginsDirectory() {
File result = applicationConfig.getOptionAsFile(ConfigurationOption.PLUGINS_DIRECTORY.getKey()); File result = applicationConfig.getOptionAsFile(ConfigurationOption.PLUGINS_DIRECTORY.getKey());
......
...@@ -43,9 +43,7 @@ public enum ConfigurationAction implements ConfigActionDef { ...@@ -43,9 +43,7 @@ public enum ConfigurationAction implements ConfigActionDef {
RESET_MARKET(IndexerAction.class.getName() + "#resetMarketRecords", "reset-market"), RESET_MARKET(IndexerAction.class.getName() + "#resetMarketRecords", "reset-market"),
RESET_REGISTRY(IndexerAction.class.getName() + "#resetRegistryRecords", "reset-registry"), RESET_REGISTRY(IndexerAction.class.getName() + "#resetRegistry", "reset-registry");
RESET_CITIES(IndexerAction.class.getName() + "#resetCities", "reset-cities");
public String action; public String action;
public String[] aliases; public String[] aliases;
......
...@@ -50,7 +50,7 @@ public enum ConfigurationOption implements ConfigOptionDef { ...@@ -50,7 +50,7 @@ public enum ConfigurationOption implements ConfigOptionDef {
BASEDIR( BASEDIR(
"ucoinj.basedir", "ucoinj.basedir",
n("ucoinj.config.option.basedir.description"), n("ucoinj.config.option.basedir.description"),
"${user.home}/.ucoinj-elasticsearch", "${user.home}/.config/duniter-es",
File.class), File.class),
DATA_DIRECTORY( DATA_DIRECTORY(
...@@ -59,6 +59,12 @@ public enum ConfigurationOption implements ConfigOptionDef { ...@@ -59,6 +59,12 @@ public enum ConfigurationOption implements ConfigOptionDef {
"${ucoinj.basedir}/data", "${ucoinj.basedir}/data",
File.class), File.class),
TEMP_DIRECTORY(
"ucoinj.temp.directory",
n("ucoinj.config.option.temp.directory.description"),
"${ucoinj.basedir}/temp",
File.class),
PLUGINS_DIRECTORY( PLUGINS_DIRECTORY(
"ucoinj.plugins.directory", "ucoinj.plugins.directory",
n("ucoinj.config.option.plugins.directory.description"), n("ucoinj.config.option.plugins.directory.description"),
......
...@@ -24,6 +24,7 @@ package io.ucoin.ucoinj.elasticsearch.service; ...@@ -24,6 +24,7 @@ package io.ucoin.ucoinj.elasticsearch.service;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import io.ucoin.ucoinj.core.beans.Bean; import io.ucoin.ucoinj.core.beans.Bean;
import io.ucoin.ucoinj.core.beans.InitializingBean; import io.ucoin.ucoinj.core.beans.InitializingBean;
import io.ucoin.ucoinj.core.exception.TechnicalException; import io.ucoin.ucoinj.core.exception.TechnicalException;
...@@ -110,18 +111,62 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos ...@@ -110,18 +111,62 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos
} }
protected void bulkFromClasspathFile(String classpathFile, String indexName, String indexType) { protected void bulkFromClasspathFile(String classpathFile, String indexName, String indexType) {
BulkRequest bulkRequest = Requests.bulkRequest(); InputStream is = null;
InputStream ris = null;
try { try {
ris = getClass().getClassLoader().getResourceAsStream(classpathFile); is = getClass().getClassLoader().getResourceAsStream(classpathFile);
if (ris == null) { if (is == null) {
throw new TechnicalException(String.format("Could not retrieve data file [%s] need to fill index [%s]: ", classpathFile, indexName)); throw new TechnicalException(String.format("Could not retrieve data file [%s] need to fill index [%s]: ", classpathFile, indexName));
} }
bulkFromStream(is, indexName, indexType);
}
finally {
if (is != null) {
try {
is.close();
}
catch(IOException e) {
// Silent is gold
}
}
}
}
protected void bulkFromFile(File file, String indexName, String indexType) {
Preconditions.checkNotNull(file);
Preconditions.checkArgument(file.exists());
InputStream is = null;
try {
is = new BufferedInputStream(new FileInputStream(file));
bulkFromStream(is, indexName, indexType);
}
catch(FileNotFoundException e) {
throw new TechnicalException(String.format("[%s] Could not find file %s", indexName, file.getPath()), e);
}
finally {
if (is != null) {
try {
is.close();
}
catch(IOException e) {
// Silent is gold
}
}
}
}
protected void bulkFromStream(InputStream is, String indexName, String indexType) {
Preconditions.checkNotNull(is);
BulkRequest bulkRequest = Requests.bulkRequest();
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(is));
String line = br.readLine();
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
BufferedReader bf = new BufferedReader(new InputStreamReader(ris));
String line = bf.readLine();
while(line != null) { while(line != null) {
if (StringUtils.isNotBlank(line)) { if (StringUtils.isNotBlank(line)) {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
...@@ -129,7 +174,7 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos ...@@ -129,7 +174,7 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos
} }
builder.append(line).append('\n'); builder.append(line).append('\n');
} }
line = bf.readLine(); line = br.readLine();
} }
byte[] data = builder.toString().getBytes(); byte[] data = builder.toString().getBytes();
...@@ -139,9 +184,9 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos ...@@ -139,9 +184,9 @@ public abstract class BaseIndexerService implements Bean, InitializingBean, Clos
throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", indexName, indexType), e); throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", indexName, indexType), e);
} }
finally { finally {
if (ris != null) { if (br != null) {
try { try {
ris.close(); br.close();
} }
catch(IOException e) { catch(IOException e) {
// Silent is gold // Silent is gold
......
...@@ -31,23 +31,16 @@ import io.ucoin.ucoinj.core.exception.TechnicalException; ...@@ -31,23 +31,16 @@ import io.ucoin.ucoinj.core.exception.TechnicalException;
import io.ucoin.ucoinj.core.util.StringUtils; import io.ucoin.ucoinj.core.util.StringUtils;
import io.ucoin.ucoinj.elasticsearch.config.Configuration; import io.ucoin.ucoinj.elasticsearch.config.Configuration;
import io.ucoin.ucoinj.elasticsearch.service.BaseIndexerService; import io.ucoin.ucoinj.elasticsearch.service.BaseIndexerService;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.BufferedReader; import java.io.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
/** /**
...@@ -57,7 +50,9 @@ public class RegistryCitiesIndexerService extends BaseIndexerService { ...@@ -57,7 +50,9 @@ public class RegistryCitiesIndexerService extends BaseIndexerService {
private static final Logger log = LoggerFactory.getLogger(RegistryCitiesIndexerService.class); private static final Logger log = LoggerFactory.getLogger(RegistryCitiesIndexerService.class);
private static final String CITIES_CLASSPATH_FILE = "cities/countriesToCities.json"; private static final String CITIES_BULK_FILENAME = "registry-cities-bulk-insert.json";
private static final String CITIES_SOURCE_CLASSPATH_FILE = "cities/countriesToCities.json";
public static final String INDEX_NAME = "registry"; public static final String INDEX_NAME = "registry";
public static final String INDEX_TYPE = "city"; public static final String INDEX_TYPE = "city";
...@@ -133,66 +128,10 @@ public class RegistryCitiesIndexerService extends BaseIndexerService { ...@@ -133,66 +128,10 @@ public class RegistryCitiesIndexerService extends BaseIndexerService {
log.debug("Initializing all registry cities"); log.debug("Initializing all registry cities");
} }
// Insert cities File bulkFile = createCitiesBulkFile();
BulkRequest bulkRequest = Requests.bulkRequest();
InputStream ris = null;
try {
ris = getClass().getClassLoader().getResourceAsStream(CITIES_CLASSPATH_FILE);
if (ris == null) {
throw new TechnicalException(String.format("Could not retrieve data file [%s] need to fill index [%s/%s]: ", CITIES_CLASSPATH_FILE, INDEX_NAME, INDEX_TYPE));
}
StringBuilder builder = new StringBuilder();
BufferedReader bf = new BufferedReader(new InputStreamReader(ris), 2048);
char[] buf = new char[2048];
int len;
boolean ignoreBOM = true;
while((len = bf.read(buf)) != -1) {
if (ignoreBOM) {
builder.append(buf, 2, len-2);
ignoreBOM = false;
}
else {
builder.append(buf, 0, len);
}
}
java.lang.reflect.Type typeOfHashMap = new TypeToken<Map<String, String[]>>() { }.getType(); // Insert cities
bulkFromFile(bulkFile, INDEX_NAME, INDEX_TYPE);
Map<String, String[]> cities = new HashMap<>();
cities.put("chine", new String[]{"ABC", "def"});
String json = gson.toJson(cities, typeOfHashMap);
log.debug(String.format("test json: ", json));
log.debug(String.format("test json: ", builder.substring(0,19)));
Map<String, String[]> citiesByCountry = gson.fromJson(builder.toString(), typeOfHashMap);
if (log.isDebugEnabled()) {
log.debug(String.format("Register %s countries", citiesByCountry.size()));
}
//bulkRequest.add(new BytesArray(data), indexName, indexType, false);
} catch(Exception e) {
throw new TechnicalException(String.format("[%s] Error while inserting rows", INDEX_TYPE), e);
}
finally {
if (ris != null) {
try {
ris.close();
}
catch(IOException e) {
// Silent is gold
}
}
}
try {
getClient().bulk(bulkRequest).actionGet();
} catch(Exception e) {
throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", INDEX_NAME, INDEX_TYPE), e);
}
} }
/* -- Internal methods -- */ /* -- Internal methods -- */
...@@ -204,7 +143,7 @@ public class RegistryCitiesIndexerService extends BaseIndexerService { ...@@ -204,7 +143,7 @@ public class RegistryCitiesIndexerService extends BaseIndexerService {
.startObject("properties") .startObject("properties")
// city // city
.startObject("city") .startObject("name")
.field("type", "string") .field("type", "string")
.endObject() .endObject()
...@@ -222,4 +161,104 @@ public class RegistryCitiesIndexerService extends BaseIndexerService { ...@@ -222,4 +161,104 @@ public class RegistryCitiesIndexerService extends BaseIndexerService {
throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, INDEX_TYPE, ioe.getMessage()), ioe); throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX_NAME, INDEX_TYPE, ioe.getMessage()), ioe);
} }
} }
public File createCitiesBulkFile() {
File result = new File(config.getTempDirectory(), CITIES_BULK_FILENAME);
InputStream ris = null;
BufferedReader bf = null;
FileWriter fw = null;
try {
if (result.exists()) {
FileUtils.forceDelete(result);
}
else if (!result.getParentFile().exists()) {
FileUtils.forceMkdir(result.getParentFile());
}
ris = getClass().getClassLoader().getResourceAsStream(CITIES_SOURCE_CLASSPATH_FILE);
if (ris == null) {
throw new TechnicalException(String.format("Could not retrieve file [%s] from test classpath. Make sure git submodules has been initialized before building.", CITIES_SOURCE_CLASSPATH_FILE));
}
boolean firstLine = true;
java.lang.reflect.Type typeOfHashMap = new TypeToken<Map<String, String[]>>() { }.getType();
Gson gson = GsonUtils.newBuilder().create();
StringBuilder builder = new StringBuilder();
bf = new BufferedReader(
new InputStreamReader(
ris, "UTF-16LE"), 2048);
fw = new FileWriter(result);
char[] buf = new char[2048];
int len;
while((len = bf.read(buf)) != -1) {
String bufStr = new String(buf, 0, len);
if (firstLine) {
// Remove UTF-16 BOM char
int objectStartIndex = bufStr.indexOf('\uFEFF');
if (objectStartIndex != -1) {
bufStr = bufStr.substring(objectStartIndex);
}
firstLine=false;
}
int arrayEndIndex = bufStr.indexOf("],\"");
if (arrayEndIndex == -1) {
arrayEndIndex = bufStr.indexOf("]}");
}
if (arrayEndIndex == -1) {
builder.append(bufStr);
}
else {
builder.append(bufStr.substring(0, arrayEndIndex+1));
builder.append("}");
if (log.isTraceEnabled()) {
log.trace(builder.toString());
}
Map<String, String[]> citiesByCountry = gson.fromJson(builder.toString(), typeOfHashMap);
builder.setLength(0);
for (String country: citiesByCountry.keySet()) {
if (StringUtils.isNotBlank(country)) {
for (String city : citiesByCountry.get(country)) {
if (StringUtils.isNotBlank(city)) {
fw.write(String.format("{\"index\":{\"_id\" : \"%s-%s\"}}\n", country, city));
fw.write(String.format("{\"country\":\"%s\", \"name\":\"%s\"}\n", country, city));
}
}
}
}
fw.flush();
// reset and prepare buffer for next country
builder.setLength(0);
builder.append("{");
if (arrayEndIndex+2 < bufStr.length()) {
builder.append(bufStr.substring(arrayEndIndex+2));
}
}
}
fw.close();
bf.close();
} catch(Exception e) {
throw new TechnicalException(String.format("Error while creating cities file [%s]", result.getName()), e);
}
finally {
IOUtils.closeQuietly(bf);
IOUtils.closeQuietly(ris);
IOUtils.closeQuietly(fw);
}
return result;
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment