Commit 119e9ec2 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[enh] Start migration to ES 5 (v5.6.16)

parent efe46773
Pipeline #11477 failed
#!/bin/sh
# TODO: wait migration into ES 5 + install plugin to create geo cluster
curl -XPOST 'https://g1.data.e-is.pro/user/profile/_search?pretty&_source=title,geoPoint' -d '
{
"size": 0
}'
......@@ -32,6 +32,10 @@
<groupId>org.duniter</groupId>
<artifactId>duniter4j-core-client</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- LOGGING DEPENDENCIES - SLF4J -->
<dependency>
......@@ -60,6 +64,13 @@
<artifactId>elasticsearch</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
......
......@@ -23,30 +23,25 @@ package org.duniter.elasticsearch;
*/
import com.google.common.collect.Lists;
import org.apache.logging.log4j.Logger;
import org.duniter.elasticsearch.dao.DaoModule;
import org.duniter.elasticsearch.http.WebSocketServerModule;
import org.duniter.elasticsearch.rest.RestModule;
import org.duniter.elasticsearch.script.BlockchainTxCountScriptFactory;
import org.duniter.elasticsearch.security.SecurityModule;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.service.ServiceModule;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.websocket.WebSocketModule;
import org.duniter.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.duniter.elasticsearch.http.WebSocketServerModule;
import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.script.ScriptModule;
import java.util.Collection;
public class Plugin extends org.elasticsearch.plugins.Plugin {
private ESLogger logger;
private final Logger logger;
private boolean enable;
private boolean enableWs;
......@@ -58,29 +53,29 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
this.enableWs = settings.getAsBoolean("duniter.ws.enable", this.enable);
}
@Override
/*@Override
public String name() {
return "cesium-plus-pod-core";
}
*/
@Override
public String description() {
return "Duniter Core Plugin";
return "Cesium+ Core Plugin";
}
public void onModule(ScriptModule scriptModule) {
/*public void onModule(ScriptModule scriptModule) {
// TODO: in ES v5+, see example here :
// https://github.com/imotov/elasticsearch-native-script-example/blob/60a390f77f2fb25cb89d76de5071c52207a57b5f/src/main/java/org/elasticsearch/examples/nativescript/plugin/NativeScriptExamplesPlugin.java
scriptModule.registerScript("txcount", BlockchainTxCountScriptFactory.class);
}
}*/
/*
public void onModule(HttpServerModule httpServerModule) {
if (this.enableWs) httpServerModule.setHttpServerTransport(NettyHttpServerTransport.class, "cesium-plus-core");
}
if (this.enableWs) httpServerModule.setHttpServerTransport(Netty3HttpServerTransport.class, "cesium-plus-core");
}*/
@Override
public Collection<Module> nodeModules() {
public Collection<Module> createGuiceModules() {
if (!enable) {
logger.warn(description() + " has been disabled.");
return Lists.newArrayList();
......@@ -103,7 +98,7 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
}
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
if (!enable) return Lists.newArrayList();
Collection<Class<? extends LifecycleComponent>> components = Lists.newArrayList();
components.add(PluginSettings.class);
......@@ -115,4 +110,5 @@ public class Plugin extends org.elasticsearch.plugins.Plugin {
/* -- protected methods -- */
}
\ No newline at end of file
......@@ -23,23 +23,24 @@ package org.duniter.elasticsearch;
*/
import com.google.common.base.Joiner;
import org.apache.lucene.search.join.ScoreMode;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.elasticsearch.Currency;
import org.duniter.core.client.model.local.Identity;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.dao.*;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.service.*;
import org.duniter.elasticsearch.synchro.SynchroService;
import org.duniter.elasticsearch.threadpool.ScheduledActionFuture;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.util.StringUtils;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.logging.ESLogger;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.BoolQueryBuilder;
......@@ -55,14 +56,14 @@ import java.util.concurrent.TimeUnit;
/**
* Created by blavenie on 17/06/16.
*/
public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
public class PluginInit extends AbstractLifecycleComponent {
public static final String CURRENCY_NAME_REGEXP = "[a-zA-Z0-9_-]+";
private final PluginSettings pluginSettings;
private final ThreadPool threadPool;
private final Injector injector;
private final ESLogger logger;
private final Logger logger;
@Inject
public PluginInit(Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) {
......@@ -434,7 +435,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Identity.PROPERTY_IS_MEMBER, true))
);
String queryName = Joiner.on('_').join(peer.getCurrency(), Strings.toUnderscoreCase(Identity.PROPERTY_IS_MEMBER));
String queryName = Joiner.on('_').join(peer.getCurrency(), StringUtils.changeCaseToUnderscore(Identity.PROPERTY_IS_MEMBER));
docStatService.registerIndex(peer.getCurrency(), MemberDao.TYPE, queryName, query, null);
}
}
......@@ -473,7 +474,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
// Peers UP
{
QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, statusQuery)));
.must(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, statusQuery, ScoreMode.None)));
String queryName = Joiner.on('_').join(peer.getCurrency(), PeerDao.TYPE, Peer.PeerStatus.UP.name()).toLowerCase();
docStatService.registerIndex(peer.getCurrency(), PeerDao.TYPE, queryName, query, null);
}
......@@ -485,7 +486,7 @@ public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
.filter(QueryBuilders.termQuery(Peer.PROPERTY_API, api.name()));
QueryBuilder query = QueryBuilders.constantScoreQuery(apiQuery
.must(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, statusQuery)));
.must(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, statusQuery, ScoreMode.None)));
String queryName = Joiner.on('_').join(peer.getCurrency(), PeerDao.TYPE, api.name()).toLowerCase();
docStatService.registerIndex(peer.getCurrency(), PeerDao.TYPE, queryName, query, null);
......
......@@ -62,7 +62,7 @@ import static org.nuiton.i18n.I18n.t;
* @author Benoit Lavenier <benoit.lavenier@e-is.pro>
* @since 1.0
*/
public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
public class PluginSettings extends AbstractLifecycleComponent {
private static KeyPair nodeKeyPair;
private static boolean isRandomNodeKeyPair;
......
......@@ -63,7 +63,7 @@ public class ESBeanFactory extends BeanFactory {
catch(BeanCreationException e) {
// try using injector, if exists
if (injector != null) {
return injector.getBinding(clazz).getProvider().get();
return injector.getProvider(clazz).get();
}
throw e;
}
......
......@@ -26,6 +26,7 @@ package org.duniter.elasticsearch.client;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import org.apache.commons.collections4.MapUtils;
import org.apache.logging.log4j.Logger;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
import org.duniter.core.client.model.elasticsearch.Record;
import org.duniter.core.client.model.local.LocalEntity;
......@@ -46,18 +47,15 @@ import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.exists.ExistsRequest;
import org.elasticsearch.action.exists.ExistsRequestBuilder;
import org.elasticsearch.action.exists.ExistsResponse;
import org.elasticsearch.action.explain.ExplainRequest;
import org.elasticsearch.action.explain.ExplainRequestBuilder;
import org.elasticsearch.action.explain.ExplainResponse;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequestBuilder;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.elasticsearch.action.fieldstats.FieldStatsRequest;
import org.elasticsearch.action.fieldstats.FieldStatsRequestBuilder;
import org.elasticsearch.action.fieldstats.FieldStatsResponse;
......@@ -65,20 +63,8 @@ import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptRequest;
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptRequestBuilder;
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptResponse;
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptRequest;
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptRequestBuilder;
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequestBuilder;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptResponse;
import org.elasticsearch.action.percolate.*;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.suggest.SuggestRequest;
import org.elasticsearch.action.suggest.SuggestRequestBuilder;
import org.elasticsearch.action.suggest.SuggestResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.termvectors.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
......@@ -86,14 +72,13 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
......@@ -108,7 +93,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
*/
public class Duniter4jClientImpl implements Duniter4jClient {
private final ESLogger logger;
private final Logger logger;
private final Client client;
private final org.duniter.elasticsearch.threadpool.ThreadPool threadPool;
......@@ -144,8 +129,8 @@ public class Duniter4jClientImpl implements Duniter4jClient {
@Override
public String indexDocumentFromJson(String index, String type, String json) {
IndexResponse response = client.prepareIndex(index, type)
.setSource(json)
.setRefresh(true)
.setSource(json, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute().actionGet();
return response.getId();
}
......@@ -154,22 +139,22 @@ public class Duniter4jClientImpl implements Duniter4jClient {
public void updateDocumentFromJson(String index, String type, String id, String json) {
// Execute indexBlocksFromNode
safeExecuteRequest(client.prepareUpdate(index, type, id)
.setRefresh(true)
.setDoc(json), true);
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.setDoc(json, XContentType.JSON), true);
}
@Override
public void checkSameDocumentField(String index, String type, String id, String fieldName, String expectedvalue) throws ElasticsearchException {
public void checkSameDocumentField(String index, String type, String id, String fieldName, String expectedValue) throws ElasticsearchException {
GetResponse response = client.prepareGet(index, type, id)
.setFields(fieldName)
.setFetchSource(fieldName, null)
.execute().actionGet();
boolean failed = !response.isExists();
if (failed) {
throw new NotFoundException(String.format("Document [%s/%s/%s] not exists.", index, type, id));
} else {
String docValue = (String)response.getFields().get(fieldName).getValue();
if (!Objects.equals(expectedvalue, docValue)) {
if (!Objects.equals(expectedValue, docValue)) {
throw new AccessDeniedException(String.format("Could not delete this document: not same [%s].", fieldName));
}
}
......@@ -230,8 +215,8 @@ public class Duniter4jClientImpl implements Duniter4jClient {
.setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequest.setQuery(QueryBuilders.idsQuery().ids(docId));
searchRequest.addFields(fieldNames);
searchRequest.setQuery(QueryBuilders.idsQuery().addIds(docId));
searchRequest.setFetchSource(fieldNames, null);
// Execute query
try {
......@@ -255,7 +240,7 @@ public class Duniter4jClientImpl implements Duniter4jClient {
// Failed or no item on index
throw new TechnicalException(String.format("[%s/%s] Unable to retrieve fields [%s] for id [%s]",
index, type,
Joiner.on(',').join(fieldNames).toString(),
Joiner.on(',').join(fieldNames),
docId), e);
}
}
......@@ -276,8 +261,8 @@ public class Duniter4jClientImpl implements Duniter4jClient {
.setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequest.setQuery(QueryBuilders.idsQuery().ids(ids));
searchRequest.addFields(fieldName);
searchRequest.setQuery(QueryBuilders.idsQuery().addIds(ids.toArray(new String[0])));
searchRequest.setFetchSource(fieldName, null);
// Execute query
try {
......@@ -298,13 +283,12 @@ public class Duniter4jClientImpl implements Duniter4jClient {
// Failed or no item on index
throw new TechnicalException(String.format("[%s/%s] Unable to retrieve field [%s] for ids [%s]",
index, type, fieldName,
Joiner.on(',').join(ids).toString()), e);
Joiner.on(',').join(ids)), e);
}
}
/**
* Retrieve a field from a document id
* @param docId
* @return
*/
@Override
......@@ -357,9 +341,9 @@ public class Duniter4jClientImpl implements Duniter4jClient {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(index)
.setSearchType(SearchType.QUERY_AND_FETCH);
.setSearchType(SearchType.DEFAULT); // TODO was QUERY_AND_FETCH
searchRequest.setQuery(QueryBuilders.idsQuery(type).ids(docId));
searchRequest.setQuery(QueryBuilders.idsQuery(type).addIds(docId));
if (CollectionUtils.isNotEmpty(fieldNames)) {
searchRequest.setFetchSource(fieldNames, null);
}
......@@ -395,18 +379,17 @@ public class Duniter4jClientImpl implements Duniter4jClient {
/**
* Retrieve a document by id
* @param docId
* @return
*/
@Override
public <T extends Object> Map<String, T> getSourcesByIds(String index, String type, Set<String> docIds, Class<T> classOfT, String... fieldNames) {
public <T> Map<String, T> getSourcesByIds(String index, String type, Set<String> docIds, Class<T> classOfT, String... fieldNames) {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(index)
.setSearchType(SearchType.QUERY_AND_FETCH);
.setSearchType(SearchType.DEFAULT); // TODO was QUERY_AND_FETCH
searchRequest.setQuery(QueryBuilders.idsQuery(type).ids(docIds));
searchRequest.setQuery(QueryBuilders.idsQuery(type).addIds(docIds.toArray(new String[0])));
if (CollectionUtils.isNotEmpty(fieldNames)) {
searchRequest.setFetchSource(fieldNames, null);
}
......@@ -427,8 +410,8 @@ public class Duniter4jClientImpl implements Duniter4jClient {
ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper();
for (SearchHit searchHit : response.getHits().getHits()) {
if (searchHit.source() != null) {
result.put(searchHit.getId(), objectMapper.readValue(searchHit.source(), classOfT));
if (searchHit.getSourceRef() != null) {
result.put(searchHit.getId(), objectMapper.readValue(searchHit.getSourceRef().toBytesRef().bytes, classOfT));
}
}
return result;
......@@ -545,7 +528,7 @@ public class Duniter4jClientImpl implements Duniter4jClient {
}
byte[] data = builder.toString().getBytes();
bulkRequest.add(new BytesArray(data), indexName, indexType, false);
bulkRequest.add(new BytesArray(data), indexName, indexType, false, XContentType.JSON);
} catch(Exception e) {
throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", indexName, indexType), e);
......@@ -777,66 +760,6 @@ public class Duniter4jClientImpl implements Duniter4jClient {
return client.prepareGet(index, type, id);
}
@Override
public PutIndexedScriptRequestBuilder preparePutIndexedScript() {
return client.preparePutIndexedScript();
}
@Override
public PutIndexedScriptRequestBuilder preparePutIndexedScript(@Nullable String scriptLang, String id, String source) {
return client.preparePutIndexedScript(scriptLang, id, source);
}
@Override
public void deleteIndexedScript(DeleteIndexedScriptRequest request, ActionListener<DeleteIndexedScriptResponse> listener) {
client.deleteIndexedScript(request, listener);
}
@Override
public ActionFuture<DeleteIndexedScriptResponse> deleteIndexedScript(DeleteIndexedScriptRequest request) {
return client.deleteIndexedScript(request);
}
@Override
public DeleteIndexedScriptRequestBuilder prepareDeleteIndexedScript() {
return client.prepareDeleteIndexedScript();
}
@Override
public DeleteIndexedScriptRequestBuilder prepareDeleteIndexedScript(@Nullable String scriptLang, String id) {
return client.prepareDeleteIndexedScript(scriptLang, id);
}
@Override
public void putIndexedScript(PutIndexedScriptRequest request, ActionListener<PutIndexedScriptResponse> listener) {
client.putIndexedScript(request, listener);
}
@Override
public ActionFuture<PutIndexedScriptResponse> putIndexedScript(PutIndexedScriptRequest request) {
return client.putIndexedScript(request);
}
@Override
public GetIndexedScriptRequestBuilder prepareGetIndexedScript() {
return client.prepareGetIndexedScript();
}
@Override
public GetIndexedScriptRequestBuilder prepareGetIndexedScript(@Nullable String scriptLang, String id) {
return client.prepareGetIndexedScript(scriptLang, id);
}
@Override
public void getIndexedScript(GetIndexedScriptRequest request, ActionListener<GetIndexedScriptResponse> listener) {
client.getIndexedScript(request, listener);
}
@Override
public ActionFuture<GetIndexedScriptResponse> getIndexedScript(GetIndexedScriptRequest request) {
return client.getIndexedScript(request);
}
@Override
public ActionFuture<MultiGetResponse> multiGet(MultiGetRequest request) {
return client.multiGet(request);
......@@ -852,57 +775,6 @@ public class Duniter4jClientImpl implements Duniter4jClient {
return client.prepareMultiGet();
}
@Override
@Deprecated
public ActionFuture<CountResponse> count(CountRequest request) {
return client.count(request);
}
@Override
@Deprecated
public void count(CountRequest request, ActionListener<CountResponse> listener) {
client.count(request, listener);
}
@Override
@Deprecated
public CountRequestBuilder prepareCount(String... indices) {
return client.prepareCount(indices);
}
@Override
@Deprecated
public ActionFuture<ExistsResponse> exists(ExistsRequest request) {
return client.exists(request);
}
@Override
@Deprecated
public void exists(ExistsRequest request, ActionListener<ExistsResponse> listener) {
client.exists(request, listener);
}
@Override
@Deprecated
public ExistsRequestBuilder prepareExists(String... indices) {
return client.prepareExists(indices);
}
@Override
public ActionFuture<SuggestResponse> suggest(SuggestRequest request) {
return client.suggest(request);
}
@Override
public void suggest(SuggestRequest request, ActionListener<SuggestResponse> listener) {
client.suggest(request, listener);
}
@Override
public SuggestRequestBuilder prepareSuggest(String... indices) {
return client.prepareSuggest(indices);
}
@Override
public ActionFuture<SearchResponse> search(SearchRequest request) {
return client.search(request);
......@@ -1007,36 +879,6 @@ public class Duniter4jClientImpl implements Duniter4jClient {
return client.prepareMultiTermVectors();
}
@Override
public ActionFuture<PercolateResponse> percolate(PercolateRequest request) {
return client.percolate(request);
}
@Override
public void percolate(PercolateRequest request, ActionListener<PercolateResponse> listener) {
client.percolate(request, listener);
}
@Override
public PercolateRequestBuilder preparePercolate() {
return client.preparePercolate();
}
@Override