From ad5b021aae19d6a0a2c042115cd20405be93e8ca Mon Sep 17 00:00:00 2001 From: blavenie <benoit.lavenier@e-is.pro> Date: Fri, 30 Dec 2016 21:04:00 +0100 Subject: [PATCH] - Rename message/record into message/inbox - Add cache on HttpClient, by timeout - Fix maven conf to run ES node using -DskipAssembly - Add new index group/record - Synchro: log stats insert/update --- .../model/elasticsearch/RecordComment.java | 17 ++ .../client/model/elasticsearch/UserGroup.java | 71 +++++ .../core/client/service/HttpService.java | 4 +- .../core/client/service/HttpServiceImpl.java | 110 ++++--- .../service/bma/WotRemoteServiceImpl.java | 4 +- .../exception/HttpUnauthorizeException.java | 49 ++++ .../duniter/core/util/cache/SimpleCache.java | 39 ++- duniter4j-es-assembly/pom.xml | 16 +- .../src/test/es-home/config/logging.yml | 2 +- .../duniter/elasticsearch/PluginSettings.java | 5 + .../elasticsearch/model/SynchroResult.java | 69 +++++ .../rest/AbstractRestPostIndexAction.java | 6 +- .../AbstractRestPostMarkAsReadAction.java | 4 +- .../rest/AbstractRestPostUpdateAction.java | 9 +- .../rest/security/RedirectionRestRequest.java | 137 +++++++++ .../rest/security/RestSecurityFilter.java | 11 + .../service/AbstractService.java | 16 +- .../service/AbstractSynchroService.java | 126 ++++++-- .../service/CurrencyService.java | 2 +- .../service/changes/ChangeEvent.java | 1 - .../service/changes/ChangeService.java | 17 ++ .../service/changes/ChangeSource.java | 3 + .../websocket/WebSocketChangesEndPoint.java | 37 ++- .../market/RestMarketCommentUpdateAction.java | 2 +- .../market/RestMarketRecordUpdateAction.java | 2 +- .../RestRegistryRecordUpdateAction.java | 2 +- .../RestregistryCommentUpdateAction.java | 2 +- .../gchange/service/CommentService.java | 101 ++++++- .../service/CommentUserEventService.java | 241 ++++++++++++++++ .../gchange/service/MarketService.java | 22 +- .../gchange/service/RegistryService.java | 8 +- .../gchange/service/ServiceModule.java | 1 + .../gchange/service/SynchroService.java | 25 +- .../elasticsearch/user/PluginInit.java | 13 +- .../elasticsearch/user/rest/RestModule.java | 23 +- .../user/rest/group/RestGroupIndexAction.java | 44 +++ .../rest/group/RestGroupUpdateAction.java | 46 +++ .../message/RestMessageInboxIndexAction.java | 4 +- ... => RestMessageInboxMarkAsReadAction.java} | 14 +- .../message/RestMessageOutboxIndexAction.java | 4 +- .../compat/RestMessageRecordGetAction.java | 94 ++++++ .../compat/RestMessageRecordIndexAction.java | 49 ++++ .../RestMessageRecordMarkAsReadAction.java | 49 ++++ .../compat/RestMessageRecordSearchAction.java | 65 +++++ .../user/RestUserEventMarkAsReadAction.java | 4 +- .../user/RestUserProfileUpdateAction.java | 2 +- .../user/RestUserSettingsUpdateAction.java | 2 +- .../service/BlockchainUserEventService.java | 1 + .../user/service/GroupService.java | 271 ++++++++++++++++++ .../user/service/HistoryService.java | 29 ++ .../user/service/MessageService.java | 31 +- .../user/service/ServiceModule.java | 4 + .../user/service/SynchroService.java | 40 ++- .../user/service/UserEventService.java | 2 +- .../user/service/UserService.java | 15 +- 55 files changed, 1782 insertions(+), 185 deletions(-) create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/UserGroup.java create mode 100644 duniter4j-core-client/src/main/java/org/duniter/core/client/service/exception/HttpUnauthorizeException.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java create mode 100644 duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/security/RedirectionRestRequest.java create mode 100644 duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentUserEventService.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/group/RestGroupIndexAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/group/RestGroupUpdateAction.java rename duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/{RestMessageMarkAsReadAction.java => RestMessageInboxMarkAsReadAction.java} (71%) create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordGetAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordIndexAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordMarkAsReadAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordSearchAction.java create mode 100644 duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/GroupService.java diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/RecordComment.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/RecordComment.java index e818e035..6274a5b1 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/RecordComment.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/RecordComment.java @@ -22,6 +22,9 @@ package org.duniter.core.client.model.elasticsearch; * #L% */ +import com.fasterxml.jackson.annotation.JsonGetter; +import com.fasterxml.jackson.annotation.JsonSetter; + /** * Created by blavenie on 01/03/16. */ @@ -29,9 +32,13 @@ public class RecordComment extends Record { public static final String PROPERTY_MESSAGE="message"; public static final String PROPERTY_RECORD="record"; + public static final String PROPERTY_REPLY_TO="replyTo"; + + public static final String PROPERTY_REPLY_TO_JSON="reply_to"; private String message; private String record; + private String replyTo; public String getMessage() { return message; @@ -48,4 +55,14 @@ public class RecordComment extends Record { public void setRecord(String record) { this.record = record; } + + @JsonGetter(PROPERTY_REPLY_TO_JSON) + public String getReplyTo() { + return replyTo; + } + + @JsonSetter(PROPERTY_REPLY_TO_JSON) + public void setReplyTo(String replyTo) { + this.replyTo = replyTo; + } } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/UserGroup.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/UserGroup.java new file mode 100644 index 00000000..4144d10d --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/elasticsearch/UserGroup.java @@ -0,0 +1,71 @@ +package org.duniter.core.client.model.elasticsearch; + +/* + * #%L + * Duniter4j :: Core Client API + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +/** + * Created by blavenie on 01/03/16. + */ +public class UserGroup extends Record { + + public static final String PROPERTY_NAME="name"; + public static final String PROPERTY_TITLE="title"; + public static final String PROPERTY_DESCRIPTION="description"; + public static final String PROPERTY_CREATION_TIME="creationTime"; + + private String name; + private String title; + private String description; + private Long creationTime; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public Long getCreationTime() { + return creationTime; + } + + public void setCreationTime(Long creationTime) { + this.creationTime = creationTime; + } +} diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpService.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpService.java index b2390f40..a6fc7530 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpService.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpService.java @@ -47,9 +47,9 @@ public interface HttpService extends Service { <T> T executeRequest(Peer peer, String absolutePath, Class<? extends T> resultClass); - String getPath(Peer peer, String absolutePath); + String getPath(Peer peer, String... absolutePath); - String getPath(String absolutePath); + String getPath(String... absolutePath); URIBuilder getURIBuilder(URI baseUri, String... path); } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java index 54aedaa3..f40c8aab 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java @@ -24,6 +24,7 @@ package org.duniter.core.client.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; +import org.apache.commons.io.IOUtils; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.HttpClient; @@ -39,12 +40,11 @@ import org.duniter.core.client.config.Configuration; import org.duniter.core.client.model.bma.Error; import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.local.Peer; -import org.duniter.core.client.service.exception.HttpBadRequestException; -import org.duniter.core.client.service.exception.HttpConnectException; -import org.duniter.core.client.service.exception.HttpNotFoundException; -import org.duniter.core.client.service.exception.PeerConnectionException; +import org.duniter.core.client.service.exception.*; import org.duniter.core.exception.TechnicalException; import org.duniter.core.util.StringUtils; +import org.duniter.core.util.cache.Cache; +import org.duniter.core.util.cache.SimpleCache; import org.nuiton.i18n.I18n; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,11 +64,13 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean public static final String URL_PEER_ALIVE = "/blockchain/parameters"; - protected Integer baseTimeOut; protected ObjectMapper objectMapper; - protected HttpClient httpClient; protected Peer defaultPeer; private boolean debug; + protected Joiner pathJoiner = Joiner.on('/'); + protected SimpleCache<Integer, RequestConfig> requestConfigCache; + protected SimpleCache<Integer, HttpClient> httpClientCache; + public HttpServiceImpl() { super(); @@ -77,27 +79,51 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean @Override public void afterPropertiesSet() throws Exception { - Configuration config = Configuration.instance(); + + // Initialize caches + initCaches(); + this.objectMapper = JacksonUtils.newObjectMapper(); - this.baseTimeOut = config.getNetworkTimeout(); - this.httpClient = createHttpClient(); + } + + /** + * Initialize caches + */ + protected void initCaches() { + Configuration config = Configuration.instance(); + int cacheTimeInMillis = config.getNetworkCacheTimeInMillis(); + + requestConfigCache = new SimpleCache<Integer, RequestConfig>(cacheTimeInMillis) { + @Override + public RequestConfig load(Integer timeout) { + return createRequestConfig(timeout); + } + }; + + httpClientCache = new SimpleCache<Integer, HttpClient>(cacheTimeInMillis) { + @Override + public HttpClient load(Integer timeout) { + return createHttpClient(timeout); + } + }; + httpClientCache.registerRemoveListener(item -> { + log.debug("Closing HttpClient..."); + closeQuietly(item); + }); } public void connect(Peer peer) throws PeerConnectionException { if (peer == null) { throw new IllegalArgumentException("argument 'peer' must not be null"); } - if (httpClient == null) { - httpClient = createHttpClient(); - } if (peer == defaultPeer) { return; } HttpGet httpGet = new HttpGet(getPath(peer, URL_PEER_ALIVE)); - boolean isPeerAlive = false; + boolean isPeerAlive; try { - isPeerAlive = executeRequest(httpClient, httpGet); + isPeerAlive = executeRequest(httpClientCache.get(0/*=default timeout*/), httpGet); } catch(TechnicalException e) { this.defaultPeer = null; throw new PeerConnectionException(e); @@ -115,45 +141,43 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean @Override public void close() throws IOException { - if (httpClient instanceof CloseableHttpClient) { - ((CloseableHttpClient)httpClient).close(); - } - else if (httpClient instanceof Closeable) { - ((Closeable)httpClient).close(); - } - httpClient = null; + httpClientCache.clear(); + requestConfigCache.clear(); } public <T> T executeRequest(HttpUriRequest request, Class<? extends T> resultClass) { - return executeRequest(httpClient, request, resultClass); + return executeRequest(httpClientCache.get(0), request, resultClass); } public <T> T executeRequest(HttpUriRequest request, Class<? extends T> resultClass, Class<?> errorClass) { - return executeRequest(httpClient, request, resultClass, errorClass); + return executeRequest(httpClientCache.get(0), request, resultClass, errorClass); } public <T> T executeRequest(String absolutePath, Class<? extends T> resultClass) { HttpGet httpGet = new HttpGet(getPath(absolutePath)); - return executeRequest(httpClient, httpGet, resultClass); + return executeRequest(httpClientCache.get(0), httpGet, resultClass); } public <T> T executeRequest(Peer peer, String absolutePath, Class<? extends T> resultClass) { HttpGet httpGet = new HttpGet(getPath(peer, absolutePath)); - return executeRequest(httpClient, httpGet, resultClass); + return executeRequest(httpClientCache.get(0), httpGet, resultClass); } - public String getPath(Peer peer, String absolutePath) { - return new StringBuilder().append(peer.getUrl()).append(absolutePath).toString(); + public String getPath(Peer peer, String... absolutePath) { + return pathJoiner.join(peer.getUrl(), + pathJoiner.skipNulls().join(absolutePath)); } - public String getPath(String absolutePath) { + public String getPath(String... absolutePath) { checkDefaultPeer(); - return new StringBuilder().append(defaultPeer.getUrl()).append(absolutePath).toString(); + String pathToAppend = pathJoiner.skipNulls().join(absolutePath); + String result = pathJoiner.join(defaultPeer.getUrl(), pathToAppend); + return result; } public URIBuilder getURIBuilder(URI baseUri, String... path) { - String pathToAppend = Joiner.on('/').skipNulls().join(path); + String pathToAppend = pathJoiner.skipNulls().join(path); int customQueryStartIndex = pathToAppend.indexOf('?'); String customQuery = null; @@ -179,16 +203,20 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean } } - protected HttpClient createHttpClient() { - CloseableHttpClient httpClient = HttpClients.custom().setDefaultRequestConfig(getRequestConfig()) + protected HttpClient createHttpClient(int timeout) { + CloseableHttpClient httpClient = HttpClients.custom().setDefaultRequestConfig(requestConfigCache.get(timeout)) // .setDefaultCredentialsProvider(getCredentialsProvider()) .build(); return httpClient; } - protected RequestConfig getRequestConfig() { + protected RequestConfig createRequestConfig(int timeout) { // build request config for timeout - return RequestConfig.custom().setSocketTimeout(baseTimeOut).setConnectTimeout(baseTimeOut).build(); + if (timeout <= 0) { + // Use config default timeout + timeout = Configuration.instance().getNetworkTimeout(); + } + return RequestConfig.custom().setSocketTimeout(timeout).setConnectTimeout(timeout).build(); } protected <T> T executeRequest(HttpClient httpClient, HttpUriRequest request, Class<? extends T> resultClass) { @@ -220,7 +248,7 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean } case HttpStatus.SC_UNAUTHORIZED: case HttpStatus.SC_FORBIDDEN: - throw new TechnicalException(I18n.t("duniter4j.client.authentication")); + throw new HttpUnauthorizeException(I18n.t("duniter4j.client.authentication")); case HttpStatus.SC_NOT_FOUND: throw new HttpNotFoundException(I18n.t("duniter4j.client.notFound", request.toString())); case HttpStatus.SC_BAD_REQUEST: @@ -356,4 +384,16 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean throw new TechnicalException(e.getMessage(), e); } } + + public static void closeQuietly(HttpClient httpClient) { + try { + if (httpClient instanceof CloseableHttpClient) { + ((CloseableHttpClient) httpClient).close(); + } else if (httpClient instanceof Closeable) { + ((Closeable) httpClient).close(); + } + } catch(IOException e) { + // silent is gold + } + } } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/WotRemoteServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/WotRemoteServiceImpl.java index 97829efe..07ce3f27 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/WotRemoteServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/bma/WotRemoteServiceImpl.java @@ -516,7 +516,7 @@ public class WotRemoteServiceImpl extends BaseRemoteServiceImpl implements WotRe } } - // Group certifications by [uid, pubKey] and keep last timestamp + // group certifications by [uid, pubKey] and keep last timestamp result = groupByUidAndPubKey(result, true); } @@ -575,7 +575,7 @@ public class WotRemoteServiceImpl extends BaseRemoteServiceImpl implements WotRe } } - // Group certifications by [uid, pubKey] and keep last timestamp + // group certifications by [uid, pubKey] and keep last timestamp result = groupByUidAndPubKey(result, true); return result; diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/exception/HttpUnauthorizeException.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/exception/HttpUnauthorizeException.java new file mode 100644 index 00000000..e8590657 --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/exception/HttpUnauthorizeException.java @@ -0,0 +1,49 @@ +package org.duniter.core.client.service.exception; + +/* + * #%L + * UCoin Java :: Core Client API + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.core.exception.BusinessException; + +/** + * Created by eis on 11/02/15. + */ +public class HttpUnauthorizeException extends BusinessException { + + private static final long serialVersionUID = -5260280401144018980L; + + public HttpUnauthorizeException() { + super(); + } + + public HttpUnauthorizeException(String message, Throwable cause) { + super(message, cause); + } + + public HttpUnauthorizeException(String message) { + super(message); + } + + public HttpUnauthorizeException(Throwable cause) { + super(cause); + } +} diff --git a/duniter4j-core-shared/src/main/java/org/duniter/core/util/cache/SimpleCache.java b/duniter4j-core-shared/src/main/java/org/duniter/core/util/cache/SimpleCache.java index b7496dc9..d5de1c5e 100644 --- a/duniter4j-core-shared/src/main/java/org/duniter/core/util/cache/SimpleCache.java +++ b/duniter4j-core-shared/src/main/java/org/duniter/core/util/cache/SimpleCache.java @@ -24,12 +24,17 @@ package org.duniter.core.util.cache; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; /** * Created by eis on 30/03/15. */ public abstract class SimpleCache<K, V> implements Cache<K, V> { + public interface RemoveListener<V> { + void onRemove(V item); + } + private static final long ETERNAL_TIME = -1l; private static final long ILLIMITED_ITEMS_COUNT = -1l; private static final long ITEMS_COUNT_FOR_DEFAULT_CLEANING = 1000; @@ -38,6 +43,7 @@ public abstract class SimpleCache<K, V> implements Cache<K, V> { private final Map<K, Long> mCachedTimes; private final long mCacheTimeInMillis; private final long mCacheMaxItemCount; + private final List<RemoveListener<V>> mRemoveListeners; final Object mutex; @@ -50,11 +56,19 @@ public abstract class SimpleCache<K, V> implements Cache<K, V> { } public SimpleCache(long cacheTimeInMillis, long cacheMaxItemsCount) { - this.mCachedValues = Collections.synchronizedMap(new ConcurrentHashMap<K, V>()); - this.mCachedTimes = new ConcurrentHashMap<K, Long>(); + this.mCachedValues = Collections.synchronizedMap(new ConcurrentHashMap<>()); + this.mCachedTimes = new ConcurrentHashMap<>(); this.mCacheTimeInMillis = cacheTimeInMillis; this.mCacheMaxItemCount = cacheMaxItemsCount; this.mutex = mCachedValues; + this.mRemoveListeners = new CopyOnWriteArrayList<>(); + } + + public void registerRemoveListener(RemoveListener<V> listener) { + mRemoveListeners.add(listener); + } + public void unregisterRemoveListener(RemoveListener<V> listener) { + mRemoveListeners.remove(listener); } public V getIfPresent(K key) { @@ -134,6 +148,9 @@ public abstract class SimpleCache<K, V> implements Cache<K, V> { */ public void clear() { synchronized (mutex) { + if (hasRemoveListeners()) { + mCachedValues.values().forEach(v -> notifyRemoveListeners(v)); + } mCachedValues.clear(); mCachedTimes.clear(); } @@ -181,11 +198,27 @@ public abstract class SimpleCache<K, V> implements Cache<K, V> { } for (K key : keysToRemove) { - mCachedValues.remove(key); + V removedItem = mCachedValues.remove(key); mCachedTimes.remove(key); + + // Notify listeners + notifyRemoveListeners(removedItem); } } + private boolean hasRemoveListeners() { + return mRemoveListeners.size() > 0; + } + + private void notifyRemoveListeners(V removedItem) { + for(RemoveListener listener: mRemoveListeners) { + try { + listener.onRemove(removedItem); + } catch(Throwable t) { + // Silent + } + } + } /** * Reduce size of items: will first remove older items */ diff --git a/duniter4j-es-assembly/pom.xml b/duniter4j-es-assembly/pom.xml index b3acfffe..49f1da51 100644 --- a/duniter4j-es-assembly/pom.xml +++ b/duniter4j-es-assembly/pom.xml @@ -209,13 +209,25 @@ <!-- reuse standalone files --> <then> <delete failonerror="false"> - <fileset dir="${run.es.home}/plugins/${project.groupId}" includes="duniter4j-*.jar" /> + <fileset dir="${run.es.home}/plugins" includes="**/duniter4j-*.jar" /> </delete> - <copy todir="${run.es.home}/plugins/${project.groupId}" overwrite="true"> + <copy todir="${run.es.home}/plugins/duniter4j-es-core" overwrite="true"> + <fileset dir="../duniter4j-core-client/target" includes="duniter4j-*${project.version}.jar"> + </fileset> + <fileset dir="../duniter4j-core-shared/target" includes="duniter4j-*${project.version}.jar"> + </fileset> <fileset dir="../duniter4j-es-core/target" includes="duniter4j-*${project.version}.jar"> </fileset> + </copy> + <copy todir="${run.es.home}/plugins/duniter4j-es-core" overwrite="true"> + <fileset dir="../duniter4j-es-core/target" includes="duniter4j-*${project.version}.jar"> + </fileset> + </copy> + <copy todir="${run.es.home}/plugins/duniter4j-es-user" overwrite="true"> <fileset dir="../duniter4j-es-user/target" includes="duniter4j-*${project.version}.jar"> </fileset> + </copy> + <copy todir="${run.es.home}/plugins/duniter4j-es-gchange" overwrite="true"> <fileset dir="../duniter4j-es-gchange/target" includes="duniter4j-*${project.version}.jar"> </fileset> </copy> diff --git a/duniter4j-es-assembly/src/test/es-home/config/logging.yml b/duniter4j-es-assembly/src/test/es-home/config/logging.yml index 077a8d57..e44874c6 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/logging.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/logging.yml @@ -23,7 +23,7 @@ logger: duniter : DEBUG duniter.security : ERROR duniter.user.event : INFO - duniter.network.p2p: TRACE + duniter.network.p2p: INFO security: DEBUG diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java index 8996b5f9..5fc11369 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/PluginSettings.java @@ -99,6 +99,7 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { applicationConfig.setDefaultOption(ConfigurationOption.NODE_HOST.getKey(), getNodeBmaHost()); applicationConfig.setDefaultOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getNodeBmaPort())); applicationConfig.setDefaultOption(ConfigurationOption.NODE_PROTOCOL.getKey(), getNodeBmaPort() == 443 ? "https" : "http"); + applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_TIMEOUT.getKey(), String.valueOf(getNetworkTimeout())); try { applicationConfig.parse(new String[]{}); @@ -185,6 +186,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> { return Configuration.instance().getTempDirectory(); } + public int getNetworkTimeout() { + return settings.getAsInt("duniter.network.timeout", 100000 /*10s*/); + } + public boolean isDevMode() { return settings.getAsBoolean("duniter.dev.enable", false); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java new file mode 100644 index 00000000..d0cef017 --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/model/SynchroResult.java @@ -0,0 +1,69 @@ +package org.duniter.elasticsearch.model; + +import java.util.HashMap; +import java.util.Map; + +/** + * Created by blavenie on 30/12/16. + */ +public class SynchroResult { + + private long insertTotal = 0; + private long updateTotal = 0; + private long deleteTotal = 0; + private Map<String, Long> insertHits = new HashMap<>(); + private Map<String, Long> updateHits = new HashMap<>(); + private Map<String, Long> deleteHits = new HashMap<>(); + + public void addInserts(String index, String type, long nbHits) { + insertHits.put(index + "/" + type, getInserts(index, type) + nbHits); + insertTotal += nbHits; + } + + public void addUpdates(String index, String type, long nbHits) { + updateHits.put(index + "/" + type, getUpdates(index, type) + nbHits); + updateTotal += nbHits; + } + + public void addDeletes(String index, String type, long nbHits) { + deleteHits.put(index + "/" + type, getDeletes(index, type) + nbHits); + deleteTotal += nbHits; + } + + public long getInserts(String index, String type) { + return insertHits.getOrDefault(index + "/" + type, 0l); + } + + public long getUpdates(String index, String type) { + return updateHits.getOrDefault(index + "/" + type, 0l); + } + + public long getDeletes(String index, String type) { + return deleteHits.getOrDefault(index + "/" + type, 0l); + } + + public long getInserts() { + return insertTotal; + } + + public long getUpdates() { + return updateTotal; + } + + public long getDeletes() { + return deleteTotal; + } + + public long getTotal() { + return insertTotal + updateTotal + deleteTotal; + } + + + public String toString() { + return new StringBuilder() + .append("inserts [").append(insertTotal).append("]") + .append(", updates [").append(updateTotal).append("]") + .append(", deletes [").append(deleteTotal).append("]") + .toString(); + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostIndexAction.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostIndexAction.java index c2a0f7fd..9d4fe686 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostIndexAction.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostIndexAction.java @@ -58,15 +58,15 @@ public abstract class AbstractRestPostIndexAction extends BaseRestHandler { } @Override - protected void handleRequest(final RestRequest request, RestChannel restChannel, Client client) throws Exception { + protected void handleRequest(final RestRequest request, RestChannel channel, Client client) throws Exception { try { String id = indexer.handleJson(request.content().toUtf8()); - restChannel.sendResponse(new BytesRestResponse(OK, id)); + channel.sendResponse(new BytesRestResponse(OK, id)); } catch(DuniterElasticsearchException | BusinessException e) { log.error(e.getMessage(), e); - restChannel.sendResponse(new XContentThrowableRestResponse(request, e)); + channel.sendResponse(new XContentThrowableRestResponse(request, e)); } catch(Exception e) { log.error(e.getMessage(), e); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostMarkAsReadAction.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostMarkAsReadAction.java index dc782f39..7be26a52 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostMarkAsReadAction.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostMarkAsReadAction.java @@ -60,7 +60,7 @@ public abstract class AbstractRestPostMarkAsReadAction extends BaseRestHandler { String id = request.param("id"); try { - updater.handleSignature(request.content().toUtf8(), id); + updater.handleSignature(id, request.content().toUtf8()); restChannel.sendResponse(new BytesRestResponse(OK, id)); } catch(DuniterElasticsearchException | BusinessException e) { @@ -74,7 +74,7 @@ public abstract class AbstractRestPostMarkAsReadAction extends BaseRestHandler { public interface JsonReadUpdater { - void handleSignature(String signature, String id) throws DuniterElasticsearchException, BusinessException; + void handleSignature(String id, String signature) throws DuniterElasticsearchException, BusinessException; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostUpdateAction.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostUpdateAction.java index ede1eb5e..de3ff6ed 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostUpdateAction.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/AbstractRestPostUpdateAction.java @@ -23,6 +23,8 @@ package org.duniter.elasticsearch.rest; */ import org.duniter.core.exception.BusinessException; +import org.duniter.core.util.StringUtils; +import org.duniter.elasticsearch.exception.AccessDeniedException; import org.duniter.elasticsearch.rest.security.RestSecurityController; import org.duniter.elasticsearch.exception.DuniterElasticsearchException; import org.elasticsearch.client.Client; @@ -60,7 +62,10 @@ public abstract class AbstractRestPostUpdateAction extends BaseRestHandler { String id = request.param("id"); try { - updater.handleJson(request.content().toUtf8(), id); + if (StringUtils.isBlank(id)) { + throw new AccessDeniedException("Bad request (missing id in path)"); + } + updater.handleJson(id, request.content().toUtf8()); restChannel.sendResponse(new BytesRestResponse(OK, id)); } catch(DuniterElasticsearchException | BusinessException e) { @@ -74,7 +79,7 @@ public abstract class AbstractRestPostUpdateAction extends BaseRestHandler { public interface JsonUpdater { - void handleJson(String json, String id) throws DuniterElasticsearchException, BusinessException; + void handleJson(String id, String json) throws DuniterElasticsearchException, BusinessException; } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/security/RedirectionRestRequest.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/security/RedirectionRestRequest.java new file mode 100644 index 00000000..4462863e --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/security/RedirectionRestRequest.java @@ -0,0 +1,137 @@ +package org.duniter.elasticsearch.rest.security; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.rest.RestRequest; + +import java.net.SocketAddress; +import java.util.Map; + +/** + * Created by blavenie on 30/12/16. + */ +public class RedirectionRestRequest extends RestRequest { + + private final RestRequest delegate; + private String path; + + public RedirectionRestRequest(RestRequest delegate, String path) { + super(); + this.delegate = delegate; + this.path = path; + } + + @Override + public Method method() { + return delegate.method(); + } + + @Override + public String uri() { + return delegate.uri(); + } + + @Override + public String rawPath() { + return delegate.rawPath(); + } + + @Override + public boolean hasContent() { + return delegate.hasContent(); + } + + @Override + public BytesReference content() { + return delegate.content(); + } + + @Override + public String header(String name) { + return delegate.header(name); + } + + @Override + public Iterable<Map.Entry<String, String>> headers() { + return delegate.headers(); + } + + @Override + @Nullable + public SocketAddress getRemoteAddress() { + return delegate.getRemoteAddress(); + } + + @Override + @Nullable + public SocketAddress getLocalAddress() { + return delegate.getLocalAddress(); + } + + @Override + public boolean hasParam(String key) { + return delegate.hasParam(key); + } + + @Override + public String param(String key) { + return delegate.param(key); + } + + @Override + public Map<String, String> params() { + return delegate.params(); + } + + @Override + public float paramAsFloat(String key, float defaultValue) { + return delegate.paramAsFloat(key, defaultValue); + } + + @Override + public int paramAsInt(String key, int defaultValue) { + return delegate.paramAsInt(key, defaultValue); + } + + @Override + public long paramAsLong(String key, long defaultValue) { + return delegate.paramAsLong(key, defaultValue); + } + + @Override + public boolean paramAsBoolean(String key, boolean defaultValue) { + return delegate.paramAsBoolean(key, defaultValue); + } + + @Override + public Boolean paramAsBoolean(String key, Boolean defaultValue) { + return delegate.paramAsBoolean(key, defaultValue); + } + + @Override + public TimeValue paramAsTime(String key, TimeValue defaultValue) { + return delegate.paramAsTime(key, defaultValue); + } + + @Override + public ByteSizeValue paramAsSize(String key, ByteSizeValue defaultValue) { + return delegate.paramAsSize(key, defaultValue); + } + + @Override + public String[] paramAsStringArray(String key, String[] defaultValue) { + return delegate.paramAsStringArray(key, defaultValue); + } + + @Override + public String[] paramAsStringArrayOrEmptyIfAll(String key) { + return delegate.paramAsStringArrayOrEmptyIfAll(key); + } + + @Override + public String param(String key, String defaultValue) { + return delegate.param(key, defaultValue); + } +} diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/security/RestSecurityFilter.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/security/RestSecurityFilter.java index 64319346..c804400a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/security/RestSecurityFilter.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/rest/security/RestSecurityFilter.java @@ -23,11 +23,14 @@ package org.duniter.elasticsearch.rest.security; */ import org.duniter.elasticsearch.PluginSettings; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.rest.*; +import java.util.Map; + import static org.elasticsearch.rest.RestStatus.FORBIDDEN; public class RestSecurityFilter extends RestFilter { @@ -51,10 +54,18 @@ public class RestSecurityFilter extends RestFilter { @Override public void process(RestRequest request, RestChannel channel, RestFilterChain filterChain) throws Exception { + if (request.path().contains("message/record")) { + log.debug("---------------- Redirection ?!"); + + filterChain.continueProcessing(new RedirectionRestRequest(request, "message/inbox"), channel); + return; + } + if (securityController.isAllow(request)) { if (debug) { log.debug(String.format("Allow %s request [%s]", request.method().name(), request.path())); } + filterChain.continueProcessing(request, channel); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index 852643dd..f73d427e 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -148,7 +148,7 @@ public abstract class AbstractService implements Bean { protected String indexDocumentFromJson(String index, String type, String json) { IndexResponse response = client.prepareIndex(index, type) .setSource(json) - .setRefresh(false) + .setRefresh(true) .execute().actionGet(); return response.getId(); } @@ -170,6 +170,7 @@ public abstract class AbstractService implements Bean { protected void updateDocumentFromJson(String index, String type, String id, String json) { // Execute indexBlocksFromNode client.prepareUpdate(index, type, id) + .setRefresh(true) .setDoc(json) .execute().actionGet(); } @@ -231,6 +232,19 @@ public abstract class AbstractService implements Bean { } } + protected boolean isDocumentExists(String index, String type, String id) throws ElasticsearchException { + GetResponse response = client.prepareGet(index, type, id) + .setFetchSource(false) + .execute().actionGet(); + return response.isExists(); + } + + protected void checkDocumentExists(String index, String type, String id) throws ElasticsearchException { + if (!isDocumentExists(index, type, id)) { + throw new NotFoundException(String.format("Document [%s/%s/%s] not exists.", index, type, id)); + } + } + protected String getIssuer(JsonNode actualObj) { return getMandatoryField(actualObj, Record.PROPERTY_ISSUER).asText(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java index 57c9561f..f6218ef8 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java @@ -24,17 +24,20 @@ package org.duniter.elasticsearch.service; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; import org.apache.commons.io.IOUtils; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.ByteArrayEntity; import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.service.HttpService; +import org.duniter.core.client.service.exception.HttpUnauthorizeException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.exception.InvalidFormatException; +import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.service.AbstractService; import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -79,11 +82,62 @@ public abstract class AbstractSynchroService extends AbstractService { return peer; } - protected void importChanges(Peer peer, String index, String type, long sinceTime) { - importChanges(peer, index, type, Record.PROPERTY_ISSUER, Record.PROPERTY_TIME, sinceTime); + protected long importChangesRemap(SynchroResult result, + Peer peer, + String fromIndex, String fromType, + String toIndex, String toType, + long sinceTime) { + Preconditions.checkNotNull(result); + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(fromIndex); + Preconditions.checkNotNull(fromType); + Preconditions.checkNotNull(toIndex); + Preconditions.checkNotNull(toType); + + return doImportChanges(result, peer, fromIndex, fromType, toIndex, toType, Record.PROPERTY_ISSUER, Record.PROPERTY_TIME, sinceTime); } - protected void importChanges(Peer peer, String index, String type, String issuerFieldName, String versionFieldName, long sinceTime) { + protected long importChanges(SynchroResult result, Peer peer, String index, String type, long sinceTime) { + Preconditions.checkNotNull(result); + Preconditions.checkNotNull(peer); + Preconditions.checkNotNull(index); + Preconditions.checkNotNull(type); + + return doImportChanges(result, peer, index, type, index, type, Record.PROPERTY_ISSUER, Record.PROPERTY_TIME, sinceTime); + } + + /* -- private methods -- */ + + private long doImportChanges(SynchroResult result, + Peer peer, + String fromIndex, String fromType, + String toIndex, String toType, + String issuerFieldName, String versionFieldName, long sinceTime) { + + + long offset = 0; + int size = pluginSettings.getIndexBulkSize() / 10; + boolean stop = false; + while(!stop) { + long currentRowCount = doImportChangesAtOffset(result, peer, + fromIndex, fromType, toIndex, toType, + issuerFieldName, versionFieldName, sinceTime, + offset, size); + offset += currentRowCount; + stop = currentRowCount < size; + } + + return offset; // = total rows + } + + private long doImportChangesAtOffset(SynchroResult result, Peer peer, + String fromIndex, String fromType, + String toIndex, String toType, + String issuerFieldName, String versionFieldName, + long sinceTime, + long offset, + int size) { + // Create the search query BytesStreamOutput bos; @@ -102,17 +156,11 @@ public abstract class AbstractSynchroService extends AbstractService { .endObject() .endObject() .endObject() - // currency - /*.startObject("filter") - .startObject("term") - .field("currency", "sou") // todo, filter on configured currency only - .endObject() - .endObject()*/ .endObject() // end: query .endObject() - .field("from", 0) // todo - .field("size", 100) // todo + .field("from", offset) + .field("size", size) .endObject(); builder.flush(); @@ -121,14 +169,19 @@ public abstract class AbstractSynchroService extends AbstractService { } // Execute query - String path = "/" + Joiner.on('/').join(new String[]{index, type, "_search"}); - HttpPost httpPost = new HttpPost(httpService.getPath(peer, "/" + path)); - httpPost.setEntity(new ByteArrayEntity(bos.bytes().array())); - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] [%s/%s] Sending POST request: %s", peer, index, type, new String(bos.bytes().array()))); + InputStream response; + try { + HttpPost httpPost = new HttpPost(httpService.getPath(peer, fromIndex, fromType, "_search")); + httpPost.setEntity(new ByteArrayEntity(bos.bytes().array())); + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s/%s] Sending POST request: %s", peer, fromIndex, fromType, new String(bos.bytes().array()))); + } + response = httpService.executeRequest(httpPost, InputStream.class, String.class); + } + catch(HttpUnauthorizeException e) { + logger.error(String.format("[%s] [%s/%s] Unable to access (%s). Skipping data import.", peer, fromIndex, fromType, e.getMessage())); + return 0; } - InputStream response = httpService.executeRequest(httpPost, InputStream.class, String.class); - // Parse response try { @@ -137,24 +190,26 @@ public abstract class AbstractSynchroService extends AbstractService { node = node.get("hits"); int total = node == null ? 0 : node.get("total").asInt(0); if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] [%s/%s] total to update: %s", peer, index, type, total)); + logger.debug(String.format("[%s] [%s/%s] total to update: %s", peer, toIndex, toType, total)); } boolean debug = logger.isTraceEnabled(); - if (total > 0) { + long counter = 0; - int batchSize = pluginSettings.getIndexBulkSize(); + long insertHits = 0; + long updateHits = 0; + + if (offset < total) { BulkRequestBuilder bulkRequest = client.prepareBulk(); bulkRequest.setRefresh(true); for (Iterator<JsonNode> hits = node.get("hits").iterator(); hits.hasNext();){ JsonNode hit = hits.next(); - String hitIndex = hit.get("_index").asText(); - String hitType = hit.get("_type").asText(); String id = hit.get("_id").asText(); JsonNode source = hit.get("_source"); + counter++; try { String issuer = source.get(issuerFieldName).asText(); @@ -166,7 +221,7 @@ public abstract class AbstractSynchroService extends AbstractService { throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", versionFieldName)); } - GetResponse existingDoc = client.prepareGet(index, type, id) + GetResponse existingDoc = client.prepareGet(toIndex, toType, id) .setFields(versionFieldName, issuerFieldName) .execute().actionGet(); @@ -177,11 +232,12 @@ public abstract class AbstractSynchroService extends AbstractService { String json = source.toString(); //readAndVerifyIssuerSignature(json, source); if (debug) { - logger.trace(String.format("[%s] [%s/%s] insert _id=%s\n%s", peer, hitIndex, hitType, id, json)); + logger.trace(String.format("[%s] [%s/%s] insert _id=%s\n%s", peer, toIndex, toType, id, json)); } - bulkRequest.add(client.prepareIndex(hitIndex, hitType, id) + bulkRequest.add(client.prepareIndex(toIndex, toType, id) .setSource(json.getBytes()) ); + insertHits++; } // Existing doc @@ -201,21 +257,23 @@ public abstract class AbstractSynchroService extends AbstractService { String json = source.toString(); //readAndVerifyIssuerSignature(json, source); if (debug) { - logger.trace(String.format("[%s] [%s/%s] update _id=%s\n%s", peer, hitIndex, hitType, id, json)); + logger.trace(String.format("[%s] [%s/%s] update _id=%s\n%s", peer, toIndex, toType, id, json)); } - bulkRequest.add(client.prepareIndex(hitIndex, hitType, id) + bulkRequest.add(client.prepareIndex(toIndex, toType, id) .setSource(json.getBytes())); + + updateHits++; } } } catch (InvalidFormatException e) { if (debug) { - logger.debug(String.format("[%s] [%s/%s] %s. Skipping.", peer, index, type, e.getMessage())); + logger.debug(String.format("[%s] [%s/%s] %s. Skipping.", peer, toIndex, toType, e.getMessage())); } // Skipping document (continue) } catch (Exception e) { - logger.warn(String.format("[%s] [%s/%s] %s. Skipping.", peer, index, type, e.getMessage()), e); + logger.warn(String.format("[%s] [%s/%s] %s. Skipping.", peer, toIndex, toType, e.getMessage()), e); // Skipping document (continue) } } @@ -234,7 +292,7 @@ public abstract class AbstractSynchroService extends AbstractService { || missingDocIds.contains(itemResponse.getId()); if (!skip) { if (debug) { - logger.debug(String.format("[%s] [%s/%s] could not process _id=%s: %s. Skipping.", peer, index, type, itemResponse.getId(), itemResponse.getFailureMessage())); + logger.debug(String.format("[%s] [%s/%s] could not process _id=%s: %s. Skipping.", peer, toIndex, toType, itemResponse.getId(), itemResponse.getFailureMessage())); } missingDocIds.add(itemResponse.getId()); } @@ -243,6 +301,12 @@ public abstract class AbstractSynchroService extends AbstractService { } } + // update result stats + result.addInserts(toIndex, toType, insertHits); + result.addUpdates(toIndex, toType, updateHits); + + return counter; + } catch(IOException e) { throw new TechnicalException("Unable to parse search response", e); } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java index 5d584caa..867e0ef4 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/CurrencyService.java @@ -44,6 +44,7 @@ import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.exception.AccessDeniedException; import org.duniter.elasticsearch.exception.DuplicateIndexIdException; import org.duniter.elasticsearch.exception.InvalidSignatureException; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchPhaseExecutionException; @@ -55,7 +56,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.IdsQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHitField; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java index 2b26e303..4f68be7a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeEvent.java @@ -38,7 +38,6 @@ package org.duniter.elasticsearch.service.changes; limitations under the License. */ -import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonSyntaxException; diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java index 7b5d3bc4..c31a804d 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java @@ -38,6 +38,7 @@ package org.duniter.elasticsearch.service.changes; limitations under the License. */ +import com.google.common.base.Preconditions; import org.duniter.core.util.CollectionUtils; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.logging.ESLogger; @@ -184,6 +185,13 @@ public class ChangeService { } public static void registerListener(ChangeListener listener) { + Preconditions.checkNotNull(listener); + Preconditions.checkNotNull(listener.getId()); + if (LISTENERS.containsKey(listener.getId())) { + throw new IllegalArgumentException("Listener with id [%s] already registered. Id should be unique"); + } + + // Add to list LISTENERS.put(listener.getId(), listener); // Update sources @@ -201,6 +209,15 @@ public class ChangeService { } } + /** + * Usefull when listener sources has changed + * @param listener + */ + public static void refreshListener(ChangeListener listener) { + unregisterListener(listener); + registerListener(listener); + } + public static void unregisterListener(ChangeListener listener) { LISTENERS.remove(listener.getId()); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java index 982a3454..bf21d9dc 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeSource.java @@ -138,4 +138,7 @@ public class ChangeSource { return true; } + public boolean isEmpty() { + return indices == null && types == null && ids == null; + } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java index 8d7b02f6..3bcc36a2 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/websocket/WebSocketChangesEndPoint.java @@ -38,6 +38,8 @@ package org.duniter.elasticsearch.websocket; limitations under the License. */ +import com.google.common.collect.Maps; +import org.apache.commons.collections4.MapUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.service.changes.ChangeEvent; import org.duniter.elasticsearch.service.changes.ChangeService; @@ -51,11 +53,15 @@ import javax.websocket.server.ServerEndpoint; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; @ServerEndpoint(value = "/_changes") public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ - public static Collection<ChangeSource> SOURCES = null; + public static String PATH_PARAM_INDEX = "index"; + public static String PATH_PARAM_TYPE = "type"; + + public static Collection<ChangeSource> DEFAULT_SOURCES = null; public static class Init { @@ -67,17 +73,19 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ for(String sourceStr : sourcesStr) { sources.add(new ChangeSource(sourceStr)); } - SOURCES = sources; + DEFAULT_SOURCES = sources; } } private final ESLogger log = Loggers.getLogger("duniter.ws.changes"); private Session session; + private Map<String, ChangeSource> sources; @OnOpen public void onOpen(Session session) { log.debug("Connected ... " + session.getId()); this.session = session; + this.sources = null; ChangeService.registerListener(this); } @@ -93,12 +101,13 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ @Override public Collection<ChangeSource> getChangeSources() { - return SOURCES; + if (MapUtils.isEmpty(sources)) return DEFAULT_SOURCES; + return sources.values(); } @OnMessage public void onMessage(String message) { - log.debug("Received message: "+message); + addSourceFilter(message); } @OnClose @@ -114,4 +123,24 @@ public class WebSocketChangesEndPoint implements ChangeService.ChangeListener{ } + /* -- internal methods -- */ + + private void addSourceFilter(String filter) { + + ChangeSource source = new ChangeSource(filter); + if (source.isEmpty()) { + log.debug("Rejecting changes filter (seems to be empty): " + filter); + return; + } + + String sourceKey = source.toString(); + if (sources == null || !sources.containsKey(sourceKey)) { + log.debug("Adding changes filter: " + filter); + if (sources == null) { + sources = Maps.newHashMap(); + } + sources.put(sourceKey, source); + ChangeService.refreshListener(this); + } + } } diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/market/RestMarketCommentUpdateAction.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/market/RestMarketCommentUpdateAction.java index c10f58c8..7872d14d 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/market/RestMarketCommentUpdateAction.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/market/RestMarketCommentUpdateAction.java @@ -37,7 +37,7 @@ public class RestMarketCommentUpdateAction extends AbstractRestPostUpdateAction MarketService service) { super(settings, controller, client, securityController, MarketService.INDEX, MarketService.RECORD_COMMENT_TYPE, - (json, id) -> service.updateCommentFromJson(json, id)); + (id, json) -> service.updateCommentFromJson(id, json)); } } \ No newline at end of file diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/market/RestMarketRecordUpdateAction.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/market/RestMarketRecordUpdateAction.java index 9761924f..258dc668 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/market/RestMarketRecordUpdateAction.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/market/RestMarketRecordUpdateAction.java @@ -37,7 +37,7 @@ public class RestMarketRecordUpdateAction extends AbstractRestPostUpdateAction { MarketService service) { super(settings, controller, client, securityController, MarketService.INDEX, MarketService.RECORD_TYPE, - (json, id) -> service.updateRecordFromJson(json, id)); + (id, json) -> service.updateRecordFromJson(id, json)); } } \ No newline at end of file diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/registry/RestRegistryRecordUpdateAction.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/registry/RestRegistryRecordUpdateAction.java index e77b9425..9942fc7f 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/registry/RestRegistryRecordUpdateAction.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/registry/RestRegistryRecordUpdateAction.java @@ -37,7 +37,7 @@ public class RestRegistryRecordUpdateAction extends AbstractRestPostUpdateAction RegistryService service) { super(settings, controller, client, securityController, RegistryService.INDEX, RegistryService.RECORD_TYPE, - (json, id) -> service.updateRecordFromJson(json, id)); + (id, json) -> service.updateRecordFromJson(id, json)); } } \ No newline at end of file diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/registry/RestregistryCommentUpdateAction.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/registry/RestregistryCommentUpdateAction.java index 8687f463..464b0f76 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/registry/RestregistryCommentUpdateAction.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/rest/registry/RestregistryCommentUpdateAction.java @@ -37,7 +37,7 @@ public class RestregistryCommentUpdateAction extends AbstractRestPostUpdateActio RegistryService service) { super(settings, controller, client, securityController, RegistryService.INDEX, RegistryService.RECORD_COMMENT_TYPE, - (json, id) -> service.updateCommentFromJson(json, id)); + (id, json) -> service.updateCommentFromJson(id, json)); } } \ No newline at end of file diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentService.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentService.java index 9d0f60e2..143fb074 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentService.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentService.java @@ -24,23 +24,32 @@ package org.duniter.elasticsearch.gchange.service; import com.fasterxml.jackson.databind.JsonNode; +import com.google.gson.JsonSyntaxException; import org.apache.commons.collections4.MapUtils; import org.duniter.core.client.model.elasticsearch.RecordComment; -import org.duniter.core.client.service.bma.WotRemoteService; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.exception.DocumentNotFoundException; +import org.duniter.elasticsearch.exception.NotFoundException; import org.duniter.elasticsearch.gchange.PluginSettings; import org.duniter.elasticsearch.gchange.model.MarketRecord; import org.duniter.elasticsearch.gchange.model.event.GchangeEventCodes; import org.duniter.elasticsearch.service.AbstractService; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.model.UserEvent; +import org.duniter.elasticsearch.user.service.HistoryService; import org.duniter.elasticsearch.user.service.UserEventService; import org.duniter.elasticsearch.user.service.UserService; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.TermQueryBuilder; import org.nuiton.i18n.I18n; import java.io.IOException; @@ -53,16 +62,22 @@ public class CommentService extends AbstractService { private UserEventService userEventService; private UserService userService; + private ThreadPool threadPool; + private HistoryService historyService; @Inject public CommentService(Client client, PluginSettings pluginSettings, CryptoService cryptoService, UserService userService, - UserEventService userEventService) { + UserEventService userEventService, + HistoryService historyService, + ThreadPool threadPool) { super("gchange.comment", client, pluginSettings, cryptoService); this.userEventService = userEventService; this.userService = userService; + this.historyService = historyService; + this.threadPool = threadPool; } @@ -70,30 +85,29 @@ public class CommentService extends AbstractService { JsonNode commentObj = readAndVerifyIssuerSignature(json); String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText(); + // Check the record document exists + String recordId = getMandatoryField(commentObj, RecordComment.PROPERTY_RECORD).asText(); + checkDocumentExistsOrDeleted(index, recordType, recordId); + if (logger.isDebugEnabled()) { logger.debug(String.format("Indexing a %s from issuer [%s]", type, issuer.substring(0, 8))); } - String commentId = indexDocumentFromJson(index, type, json); - - // Notify record issuer - notifyRecordIssuerForComment(index, recordType, commentObj, true, commentId); - - return commentId; + return indexDocumentFromJson(index, type, json); } - public void updateCommentFromJson(final String index, final String recordType, final String type, final String json, final String id) { + public void updateCommentFromJson(final String index, final String recordType, final String type, final String id, final String json) { JsonNode commentObj = readAndVerifyIssuerSignature(json); + // Check the record document exists + String recordId = getMandatoryField(commentObj, RecordComment.PROPERTY_RECORD).asText(); + checkDocumentExistsOrDeleted(index, recordType, recordId); + if (logger.isDebugEnabled()) { String issuer = getMandatoryField(commentObj, RecordComment.PROPERTY_ISSUER).asText(); - logger.debug(String.format("Indexing a %s from issuer [%s]", type, issuer.substring(0, 8))); + logger.debug(String.format("[%s] Indexing a %s from issuer [%s] on [%s]", index, type, issuer.substring(0, 8))); } updateDocumentFromJson(index, type, id, json); - - // Notify record issuer - notifyRecordIssuerForComment(index, recordType, commentObj, false, id); - } public XContentBuilder createRecordCommentType(String index, String type) { @@ -132,6 +146,18 @@ public class CommentService extends AbstractService { .field("index", "not_analyzed") .endObject() + // aggregations + .startObject("aggregations") + .field("type", "nested") + .field("dynamic", "true") + .startObject("properties") + .startObject("reply_count") + .field("type", "integer") + .field("index", "not_analyzed") + .endObject() + .endObject() + .endObject() + .endObject() .endObject().endObject(); @@ -145,6 +171,20 @@ public class CommentService extends AbstractService { /* -- Internal methods -- */ + // Check the record document exists (or has been deleted) + private void checkDocumentExistsOrDeleted(String index, String type, String id) { + boolean recordExists; + try { + recordExists = isDocumentExists(index, type, id); + } catch (NotFoundException e) { + // Check if exists in delete history + recordExists = historyService.existsInDeleteHistory(index, type, id); + } + if (!recordExists) { + throw new NotFoundException(String.format("Comment refers a non-existent document [%s/%s/%s].", index, type, id)); + } + } + /** * Notify user when new comment */ @@ -180,4 +220,37 @@ public class CommentService extends AbstractService { } } + private void updateCommentAggregations(String index, String type, String id) { + long replyCount = countCommentReplies(index, type, id); + if (replyCount > 0) { + logger.warn("Comment [%s] has %s replies. Need to be updated", id, replyCount); + // TODO update aggregations + } + } + + private long countCommentReplies(String index, String type, String id) { + + // Prepare count request + SearchRequestBuilder searchRequest = client + .prepareSearch(index) + .setTypes(type) + .setFetchSource(false) + .setSearchType(SearchType.QUERY_AND_FETCH) + .setSize(0); + + // Query = filter on reference + TermQueryBuilder query = QueryBuilders.termQuery(RecordComment.PROPERTY_REPLY_TO_JSON, id); + searchRequest.setQuery(query); + + // Execute query + try { + SearchResponse response = searchRequest.execute().actionGet(); + return response.getHits().getTotalHits(); + } + catch(SearchPhaseExecutionException | JsonSyntaxException e) { + // Failed or no item on index + logger.error(String.format("Error while counting comment replies: %s", e.getMessage()), e); + } + return 1; + } } diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentUserEventService.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentUserEventService.java new file mode 100644 index 00000000..9b12c559 --- /dev/null +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentUserEventService.java @@ -0,0 +1,241 @@ +package org.duniter.elasticsearch.gchange.service; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.collections4.MapUtils; +import org.duniter.core.client.model.ModelUtils; +import org.duniter.core.client.model.bma.BlockchainBlock; +import org.duniter.core.client.model.bma.jackson.JacksonUtils; +import org.duniter.core.client.model.elasticsearch.RecordComment; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.core.util.CollectionUtils; +import org.duniter.core.util.websocket.WebsocketClientEndpoint; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.exception.DocumentNotFoundException; +import org.duniter.elasticsearch.gchange.model.MarketRecord; +import org.duniter.elasticsearch.gchange.model.event.GchangeEventCodes; +import org.duniter.elasticsearch.service.AbstractService; +import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; +import org.duniter.elasticsearch.user.model.UserEvent; +import org.duniter.elasticsearch.user.model.UserEventCodes; +import org.duniter.elasticsearch.user.service.UserEventService; +import org.duniter.elasticsearch.user.service.UserService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.nuiton.i18n.I18n; + +import java.io.IOException; +import java.util.*; + +/** + * Created by Benoit on 30/03/2015. + */ +public class CommentUserEventService extends AbstractService implements ChangeService.ChangeListener { + + static { + I18n.n("duniter.market.error.comment.recordNotFound"); + I18n.n("duniter.market.event.newComment"); + I18n.n("duniter.market.event.updateComment"); + + I18n.n("duniter.registry.error.comment.recordNotFound"); + I18n.n("duniter.registry.event.newComment"); + I18n.n("duniter.registry.event.updateComment"); + } + + public final UserService userService; + + public final UserEventService userEventService; + + public final ObjectMapper objectMapper; + + public final List<ChangeSource> changeListenSources; + + public final boolean enable; + + @Inject + public CommentUserEventService(Client client, PluginSettings settings, CryptoService cryptoService, + BlockchainService blockchainService, + UserService userService, + UserEventService userEventService) { + super("duniter.user.event.comment", client, settings, cryptoService); + this.userService = userService; + this.userEventService = userEventService; + this.objectMapper = JacksonUtils.newObjectMapper(); + this.changeListenSources = ImmutableList.of( + new ChangeSource(MarketService.INDEX, MarketService.RECORD_COMMENT_TYPE), + new ChangeSource(RegistryService.INDEX, MarketService.RECORD_COMMENT_TYPE)); + ChangeService.registerListener(this); + + this.enable = pluginSettings.enableBlockchainSync(); + + if (this.enable) { + blockchainService.registerConnectionListener(createConnectionListeners()); + } + } + + @Override + public String getId() { + return "duniter.user.event.comment"; + } + + @Override + public void onChange(ChangeEvent change) { + + + try { + + + switch (change.getOperation()) { + case CREATE: + if (change.getSource() != null) { + RecordComment comment = objectMapper.readValue(change.getSource().streamInput(), RecordComment.class); + processCommentIndex(change.getIndex(), MarketService.RECORD_TYPE, change.getId(), comment, true/*is new*/); + } + break; + case INDEX: + if (change.getSource() != null) { + RecordComment comment = objectMapper.readValue(change.getSource().streamInput(), RecordComment.class); + processCommentIndex(change.getIndex(), MarketService.RECORD_TYPE, change.getId(), comment, false/*is new*/); + } + break; + + // on DELETE : remove user event on block (using link + case DELETE: + processCommentDelete(change); + + break; + } + + } + catch(IOException e) { + throw new TechnicalException(String.format("Unable to parse received comment %s", change.getId()), e); + } + + //logger.info("receiveing block change: " + change.toJson()); + } + + @Override + public Collection<ChangeSource> getChangeSources() { + return changeListenSources; + } + + /* -- internal method -- */ + + /** + * Create a listener that notify admin when the Duniter node connection is lost or retrieve + */ + private WebsocketClientEndpoint.ConnectionListener createConnectionListeners() { + return new WebsocketClientEndpoint.ConnectionListener() { + private boolean errorNotified = false; + + @Override + public void onSuccess() { + // Send notify on reconnection + if (errorNotified) { + errorNotified = false; + userEventService.notifyAdmin(UserEvent.newBuilder(UserEvent.EventType.INFO, UserEventCodes.NODE_BMA_UP.name()) + .setMessage(I18n.n("duniter.event.NODE_BMA_UP"), + pluginSettings.getNodeBmaHost(), + String.valueOf(pluginSettings.getNodeBmaPort()), + pluginSettings.getClusterName()) + .build()); + } + } + + @Override + public void onError(Exception e, long lastTimeUp) { + if (errorNotified) return; // already notify + + // Wait 1 min, then notify admin (once) + long now = System.currentTimeMillis() / 1000; + boolean wait = now - lastTimeUp < 60; + if (!wait) { + errorNotified = true; + userEventService.notifyAdmin(UserEvent.newBuilder(UserEvent.EventType.ERROR, UserEventCodes.NODE_BMA_DOWN.name()) + .setMessage(I18n.n("duniter.event.NODE_BMA_DOWN"), + pluginSettings.getNodeBmaHost(), + String.valueOf(pluginSettings.getNodeBmaPort()), + pluginSettings.getClusterName(), + String.valueOf(lastTimeUp)) + .build()); + } + } + }; + } + + private void processCommentIndex(String index, String recordType, String commentId, RecordComment comment, boolean isNewComment) { + + String issuer = comment.getIssuer(); + String recordId = comment.getRecord(); + + // Notify issuer of record (is not same as comment writer) + Map<String, Object> recordFields = getFieldsById(index, recordType, recordId, + MarketRecord.PROPERTY_TITLE, MarketRecord.PROPERTY_ISSUER); + if (MapUtils.isEmpty(recordFields)) { // record not found + logger.warn(I18n.t(String.format("duniter.%s.error.comment.recordNotFound", index.toLowerCase()), recordId)); + } + String recordIssuer = recordFields.get(MarketRecord.PROPERTY_ISSUER).toString(); + + // Get user title + String issuerTitle = userService.getProfileTitle(issuer); + + String recordTitle = recordFields.get(MarketRecord.PROPERTY_TITLE).toString(); + if (!issuer.equals(recordIssuer)) { + userEventService.notifyUser( + UserEvent.newBuilder(UserEvent.EventType.INFO, GchangeEventCodes.NEW_COMMENT.name()) + .setMessage( + isNewComment ? + String.format("duniter.%s.event.newComment", index.toLowerCase()) : + String.format("duniter.%s.event.updateComment", index.toLowerCase()), + issuer, + issuerTitle != null ? issuerTitle : ModelUtils.minifyPubkey(issuer), + recordTitle + ) + .setRecipient(recordIssuer) + .setReference(index, recordType, recordId) + .setReferenceAnchor(commentId) + .setTime(comment.getTime()) + .build()); + } + + + } + + private void processCommentDelete(ChangeEvent change) { + if (change.getId() == null) return; + + // Delete events that reference this block + userEventService.deleteEventsByReference(new UserEvent.Reference(change.getIndex(), change.getType(), change.getId())); + } + + +} diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/MarketService.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/MarketService.java index 85cff3c9..ad85e7df 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/MarketService.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/MarketService.java @@ -35,6 +35,8 @@ import org.duniter.elasticsearch.gchange.PluginSettings; import org.duniter.elasticsearch.gchange.model.MarketRecord; import org.duniter.elasticsearch.gchange.model.event.GchangeEventCodes; import org.duniter.elasticsearch.service.AbstractService; +import org.duniter.elasticsearch.service.ServiceLocator; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.service.UserService; import org.duniter.elasticsearch.user.service.UserEventService; @@ -71,12 +73,16 @@ public class MarketService extends AbstractService { public MarketService(Client client, PluginSettings settings, CryptoService cryptoService, CommentService commentService, - WotRemoteService wotRemoteService, - UserEventService userEventService) { + UserEventService userEventService, + ThreadPool threadPool, + final ServiceLocator serviceLocator + ) { super("gchange." + INDEX, client, settings, cryptoService); this.commentService = commentService; - this.wotRemoteService = wotRemoteService; this.userEventService = userEventService; + threadPool.scheduleOnStarted(() -> { + wotRemoteService = serviceLocator.getWotRemoteService(); + }); } /** @@ -149,7 +155,7 @@ public class MarketService extends AbstractService { // Execute indexBlocksFromNode IndexResponse response = indexRequest - .setRefresh(false) + .setRefresh(true) .execute().actionGet(); return response.getId(); @@ -159,16 +165,16 @@ public class MarketService extends AbstractService { return checkIssuerAndIndexDocumentFromJson(INDEX, RECORD_TYPE, json); } - public void updateRecordFromJson(String json, String id) { - checkIssuerAndUpdateDocumentFromJson(INDEX, RECORD_TYPE, json, id); + public void updateRecordFromJson(String id, String json) { + checkIssuerAndUpdateDocumentFromJson(INDEX, RECORD_TYPE, id, json); } public String indexCommentFromJson(String json) { return commentService.indexCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, json); } - public void updateCommentFromJson(String json, String id) { - commentService.updateCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, json, id); + public void updateCommentFromJson(String id, String json) { + commentService.updateCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, id, json); } public MarketService fillRecordCategories() { diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/RegistryService.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/RegistryService.java index 3a1e8a3b..8a8c770c 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/RegistryService.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/RegistryService.java @@ -130,16 +130,16 @@ public class RegistryService extends AbstractService { return checkIssuerAndIndexDocumentFromJson(INDEX, RECORD_TYPE, json); } - public void updateRecordFromJson(String json, String id) { - checkIssuerAndUpdateDocumentFromJson(INDEX, RECORD_TYPE, json, id); + public void updateRecordFromJson(String id, String json) { + checkIssuerAndUpdateDocumentFromJson(INDEX, RECORD_TYPE, id, json); } public String indexCommentFromJson(String json) { return commentService.indexCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, json); } - public void updateCommentFromJson(String json, String id) { - commentService.updateCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, json, id); + public void updateCommentFromJson(String id, String json) { + commentService.updateCommentFromJson(INDEX, RECORD_TYPE, RECORD_COMMENT_TYPE, id, json); } /* -- Internal methods -- */ diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/ServiceModule.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/ServiceModule.java index 72ca1faf..ce56b692 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/ServiceModule.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/ServiceModule.java @@ -31,6 +31,7 @@ public class ServiceModule extends AbstractModule implements Module { bind(RegistryService.class).asEagerSingleton(); bind(CitiesRegistryService.class).asEagerSingleton(); bind(CommentService.class).asEagerSingleton(); + bind(CommentUserEventService.class).asEagerSingleton(); bind(MarketService.class).asEagerSingleton(); bind(SynchroService.class).asEagerSingleton(); } diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/SynchroService.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/SynchroService.java index a023c322..9dc5144a 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/SynchroService.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/SynchroService.java @@ -26,6 +26,7 @@ import org.duniter.core.client.model.local.Peer; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.gchange.PluginSettings; import org.duniter.elasticsearch.gchange.model.Protocol; +import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.service.AbstractSynchroService; import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -44,7 +45,7 @@ public class SynchroService extends AbstractSynchroService { } public void synchronize() { - logger.info("Synchronizing data..."); + logger.info("Synchronizing Gchange data..."); Peer peer = getPeerFromAPI(Protocol.GCHANGE_API); synchronize(peer); } @@ -57,19 +58,23 @@ public class SynchroService extends AbstractSynchroService { logger.info(String.format("[%s] Synchronizing gchange data since %s...", peer.toString(), sinceTime)); - importMarketChanges(peer, sinceTime); - importRegistryChanges(peer, sinceTime); + SynchroResult result = new SynchroResult(); + long time = System.currentTimeMillis(); - logger.info(String.format("[%s] Synchronizing gchange data since %s [OK]", peer.toString(), sinceTime)); + importMarketChanges(result, peer, sinceTime); + importRegistryChanges(result, peer, sinceTime); + + long duration = System.currentTimeMillis() - time; + logger.info(String.format("[%s] Synchronizing gchange data since %s [OK] %s (in %s ms)", peer.toString(), sinceTime, result.toString(), duration)); } - protected void importMarketChanges(Peer peer, long sinceTime) { - importChanges(peer, MarketService.INDEX, MarketService.RECORD_TYPE, sinceTime); - importChanges(peer, MarketService.INDEX, MarketService.RECORD_COMMENT_TYPE, sinceTime); + protected void importMarketChanges(SynchroResult result, Peer peer, long sinceTime) { + importChanges(result, peer, MarketService.INDEX, MarketService.RECORD_TYPE, sinceTime); + importChanges(result, peer, MarketService.INDEX, MarketService.RECORD_COMMENT_TYPE, sinceTime); } - protected void importRegistryChanges(Peer peer, long sinceTime) { - importChanges(peer, RegistryService.INDEX, RegistryService.RECORD_TYPE, sinceTime); - importChanges(peer, RegistryService.INDEX, RegistryService.RECORD_COMMENT_TYPE, sinceTime); + protected void importRegistryChanges(SynchroResult result, Peer peer, long sinceTime) { + importChanges(result, peer, RegistryService.INDEX, RegistryService.RECORD_TYPE, sinceTime); + importChanges(result, peer, RegistryService.INDEX, RegistryService.RECORD_COMMENT_TYPE, sinceTime); } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java index 1b9f5303..5e51af22 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/PluginInit.java @@ -25,12 +25,8 @@ package org.duniter.elasticsearch.user; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.model.UserEvent; -import org.duniter.elasticsearch.user.service.HistoryService; -import org.duniter.elasticsearch.user.service.MessageService; -import org.duniter.elasticsearch.user.service.SynchroService; -import org.duniter.elasticsearch.user.service.UserService; +import org.duniter.elasticsearch.user.service.*; import org.duniter.elasticsearch.user.model.UserEventCodes; -import org.duniter.elasticsearch.user.service.UserEventService; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -43,7 +39,7 @@ import org.nuiton.i18n.I18n; /** * Created by blavenie on 17/06/16. */ -public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticsearch.PluginInit> { +public class PluginInit extends AbstractLifecycleComponent<PluginInit> { private final PluginSettings pluginSettings; private final ThreadPool threadPool; @@ -110,7 +106,9 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse injector.getInstance(UserService.class) .deleteIndex() .createIndexIfNotExists(); - + injector.getInstance(GroupService.class) + .deleteIndex() + .createIndexIfNotExists(); if (logger.isInfoEnabled()) { logger.info("Reloading all Duniter indices... [OK]"); @@ -123,6 +121,7 @@ public class PluginInit extends AbstractLifecycleComponent<org.duniter.elasticse injector.getInstance(HistoryService.class).createIndexIfNotExists(); injector.getInstance(UserService.class).createIndexIfNotExists(); injector.getInstance(MessageService.class).createIndexIfNotExists(); + injector.getInstance(GroupService.class).createIndexIfNotExists(); if (logger.isInfoEnabled()) { logger.info("Checking Duniter indices... [OK]"); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/RestModule.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/RestModule.java index b0eacbd1..7c4275ae 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/RestModule.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/RestModule.java @@ -22,10 +22,16 @@ package org.duniter.elasticsearch.user.rest; * #L% */ +import org.duniter.elasticsearch.user.rest.group.RestGroupIndexAction; +import org.duniter.elasticsearch.user.rest.group.RestGroupUpdateAction; import org.duniter.elasticsearch.user.rest.history.RestHistoryDeleteIndexAction; import org.duniter.elasticsearch.user.rest.message.RestMessageInboxIndexAction; -import org.duniter.elasticsearch.user.rest.message.RestMessageMarkAsReadAction; +import org.duniter.elasticsearch.user.rest.message.RestMessageInboxMarkAsReadAction; import org.duniter.elasticsearch.user.rest.message.RestMessageOutboxIndexAction; +import org.duniter.elasticsearch.user.rest.message.compat.RestMessageRecordGetAction; +import org.duniter.elasticsearch.user.rest.message.compat.RestMessageRecordIndexAction; +import org.duniter.elasticsearch.user.rest.message.compat.RestMessageRecordMarkAsReadAction; +import org.duniter.elasticsearch.user.rest.message.compat.RestMessageRecordSearchAction; import org.duniter.elasticsearch.user.rest.user.*; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -42,12 +48,25 @@ public class RestModule extends AbstractModule implements Module { bind(RestUserEventMarkAsReadAction.class).asEagerSingleton(); bind(RestUserEventSearchAction.class).asEagerSingleton(); + // Group + bind(RestGroupIndexAction.class).asEagerSingleton(); + bind(RestGroupUpdateAction.class).asEagerSingleton(); + // History bind(RestHistoryDeleteIndexAction.class).asEagerSingleton(); // Message bind(RestMessageInboxIndexAction.class).asEagerSingleton(); bind(RestMessageOutboxIndexAction.class).asEagerSingleton(); - bind(RestMessageMarkAsReadAction.class).asEagerSingleton(); + bind(RestMessageInboxMarkAsReadAction.class).asEagerSingleton(); + + // Backward compatibility + { + // message/record + bind(RestMessageRecordIndexAction.class).asEagerSingleton(); + bind(RestMessageRecordSearchAction.class).asEagerSingleton(); + bind(RestMessageRecordGetAction.class).asEagerSingleton(); + bind(RestMessageRecordMarkAsReadAction.class).asEagerSingleton(); + } } } \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/group/RestGroupIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/group/RestGroupIndexAction.java new file mode 100644 index 00000000..6055efe7 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/group/RestGroupIndexAction.java @@ -0,0 +1,44 @@ +package org.duniter.elasticsearch.user.rest.group; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.rest.AbstractRestPostIndexAction; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.user.service.GroupService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestController; + +public class RestGroupIndexAction extends AbstractRestPostIndexAction { + + @Inject + public RestGroupIndexAction(Settings settings, RestController controller, Client client, + RestSecurityController securityController, + GroupService service) { + super(settings, controller, client, securityController, + GroupService.INDEX, + GroupService.RECORD_TYPE, + json -> service.indexRecordProfileFromJson(json)); + } +} \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/group/RestGroupUpdateAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/group/RestGroupUpdateAction.java new file mode 100644 index 00000000..b9b89b21 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/group/RestGroupUpdateAction.java @@ -0,0 +1,46 @@ +package org.duniter.elasticsearch.user.rest.group; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.rest.AbstractRestPostUpdateAction; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.user.service.GroupService; +import org.duniter.elasticsearch.user.service.UserService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestController; + +public class RestGroupUpdateAction extends AbstractRestPostUpdateAction { + + @Inject + public RestGroupUpdateAction(Settings settings, RestController controller, Client client, + RestSecurityController securityController, + GroupService service) { + super(settings, controller, client, securityController, + GroupService.INDEX, + GroupService.RECORD_TYPE, + (id, json) -> service.updateRecordFromJson(id, json)); + } + +} \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageInboxIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageInboxIndexAction.java index c469cba9..0fbc1e00 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageInboxIndexAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageInboxIndexAction.java @@ -38,7 +38,7 @@ public class RestMessageInboxIndexAction extends AbstractRestPostIndexAction { final MessageService service) { super(settings, controller, client, securityController, MessageService.INDEX, - MessageService.RECORD_TYPE, - json -> service.indexRecordFromJson(json)); + MessageService.INBOX_TYPE, + json -> service.indexInboxFromJson(json)); } } \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageMarkAsReadAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageInboxMarkAsReadAction.java similarity index 71% rename from duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageMarkAsReadAction.java rename to duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageInboxMarkAsReadAction.java index 9f38f96f..8aeee2d5 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageMarkAsReadAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageInboxMarkAsReadAction.java @@ -30,15 +30,15 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.rest.RestController; -public class RestMessageMarkAsReadAction extends AbstractRestPostMarkAsReadAction { +public class RestMessageInboxMarkAsReadAction extends AbstractRestPostMarkAsReadAction { @Inject - public RestMessageMarkAsReadAction(Settings settings, RestController controller, Client client, - RestSecurityController securityController, - MessageService messageService) { - super(settings, controller, client, securityController, MessageService.INDEX, MessageService.RECORD_TYPE, - (signature, id) -> { - messageService.markMessageAsRead(signature, id); + public RestMessageInboxMarkAsReadAction(Settings settings, RestController controller, Client client, + RestSecurityController securityController, + MessageService messageService) { + super(settings, controller, client, securityController, MessageService.INDEX, MessageService.INBOX_TYPE, + (id, signature) -> { + messageService.markMessageAsRead(id, signature); }); } } \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageOutboxIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageOutboxIndexAction.java index e2ac8774..b3fdb054 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageOutboxIndexAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/RestMessageOutboxIndexAction.java @@ -32,6 +32,8 @@ import org.elasticsearch.rest.RestController; public class RestMessageOutboxIndexAction extends AbstractRestPostIndexAction { + + @Inject public RestMessageOutboxIndexAction(Settings settings, RestController controller, Client client, RestSecurityController securityController, @@ -39,6 +41,6 @@ public class RestMessageOutboxIndexAction extends AbstractRestPostIndexAction { super(settings, controller, client, securityController, MessageService.INDEX, MessageService.OUTBOX_TYPE, - json -> service.indexRecordFromJson(json)); + json -> service.indexOuboxFromJson(json)); } } \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordGetAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordGetAction.java new file mode 100644 index 00000000..1986585e --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordGetAction.java @@ -0,0 +1,94 @@ +package org.duniter.elasticsearch.user.rest.message.compat; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.user.service.MessageService; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.VersionType; +import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.search.fetch.source.FetchSourceContext; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestStatus.NOT_FOUND; +import static org.elasticsearch.rest.RestStatus.OK; + +/** + * /message/record has been replaced by /message/inbox + * @deprecated + */ +@Deprecated +public class RestMessageRecordGetAction extends BaseRestHandler { + + @Inject + public RestMessageRecordGetAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(GET, String.format("%s/%s/{id}", MessageService.INDEX, MessageService.RECORD_TYPE), this); + } + + @Override + protected void handleRequest(final RestRequest request, RestChannel channel, Client client) throws Exception { + GetRequest getRequest = new GetRequest(MessageService.INDEX, MessageService.INBOX_TYPE, request.param("id")); + getRequest.operationThreaded(true); + getRequest.refresh(request.paramAsBoolean("refresh", getRequest.refresh())); + getRequest.routing(request.param("routing")); // order is important, set it after routing, so it will set the routing + getRequest.parent(request.param("parent")); + getRequest.preference(request.param("preference")); + getRequest.realtime(request.paramAsBoolean("realtime", null)); + getRequest.ignoreErrorsOnGeneratedFields(request.paramAsBoolean("ignore_errors_on_generated_fields", false)); + + String sField = request.param("fields"); + if (sField != null) { + String[] sFields = Strings.splitStringByCommaToArray(sField); + if (sFields != null) { + getRequest.fields(sFields); + } + } + + getRequest.version(RestActions.parseVersion(request)); + getRequest.versionType(VersionType.fromString(request.param("version_type"), getRequest.versionType())); + + getRequest.fetchSourceContext(FetchSourceContext.parseFromRestRequest(request)); + + client.get(getRequest, new RestBuilderListener<GetResponse>(channel) { + @Override + public RestResponse buildResponse(GetResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + response.toXContent(builder, request); + builder.endObject(); + if (!response.isExists()) { + return new BytesRestResponse(NOT_FOUND, builder); + } else { + return new BytesRestResponse(OK, builder); + } + } + }); + } +} \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordIndexAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordIndexAction.java new file mode 100644 index 00000000..22858ba1 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordIndexAction.java @@ -0,0 +1,49 @@ +package org.duniter.elasticsearch.user.rest.message.compat; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.rest.AbstractRestPostIndexAction; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.user.service.MessageService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestController; + +/** + * /message/record has been replaced by /message/inbox + * @deprecated + */ +@Deprecated +public class RestMessageRecordIndexAction extends AbstractRestPostIndexAction { + + @Inject + public RestMessageRecordIndexAction(Settings settings, RestController controller, Client client, + RestSecurityController securityController, + final MessageService service) { + super(settings, controller, client, securityController, + MessageService.INDEX, + MessageService.RECORD_TYPE, + json -> service.indexInboxFromJson(json)); + } +} \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordMarkAsReadAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordMarkAsReadAction.java new file mode 100644 index 00000000..3d242049 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordMarkAsReadAction.java @@ -0,0 +1,49 @@ +package org.duniter.elasticsearch.user.rest.message.compat; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.rest.AbstractRestPostMarkAsReadAction; +import org.duniter.elasticsearch.rest.security.RestSecurityController; +import org.duniter.elasticsearch.user.service.MessageService; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.RestController; + +/** + * /message/record has been replaced by /message/inbox + * @deprecated + */ +@Deprecated +public class RestMessageRecordMarkAsReadAction extends AbstractRestPostMarkAsReadAction { + + @Inject + public RestMessageRecordMarkAsReadAction(Settings settings, RestController controller, Client client, + RestSecurityController securityController, + MessageService messageService) { + super(settings, controller, client, securityController, MessageService.INDEX, MessageService.RECORD_TYPE, + (id, signature) -> { + messageService.markMessageAsRead(id, signature); + }); + } +} \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordSearchAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordSearchAction.java new file mode 100644 index 00000000..553435af --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/message/compat/RestMessageRecordSearchAction.java @@ -0,0 +1,65 @@ +package org.duniter.elasticsearch.user.rest.message.compat; + +/* + * #%L + * duniter4j-elasticsearch-plugin + * %% + * Copyright (C) 2014 - 2016 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + +import org.duniter.elasticsearch.user.service.MessageService; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.search.RestSearchAction; +import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; + +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestRequest.Method.POST; + +/** + * /message/record has been replaced by /message/inbox + * @deprecated + */ +@Deprecated +public class RestMessageRecordSearchAction extends BaseRestHandler { + + @Inject + public RestMessageRecordSearchAction(Settings settings, RestController controller, Client client) { + super(settings, controller, client); + controller.registerHandler(GET, String.format("%s/%s/_search", MessageService.INDEX, MessageService.RECORD_TYPE), this); + controller.registerHandler(POST, String.format("%s/%s/_search", MessageService.INDEX, MessageService.RECORD_TYPE), this); + } + + @Override + protected void handleRequest(final RestRequest request, RestChannel channel, Client client) throws Exception { + SearchRequest searchRequest = new SearchRequest(); + BytesReference restContent = RestActions.hasBodyContent(request) ? RestActions.getRestContent(request) : null; + RestSearchAction.parseSearchRequest(searchRequest, request, parseFieldMatcher, restContent); + searchRequest.indices(MessageService.INDEX); // override type + searchRequest.types(MessageService.INBOX_TYPE); // override type + client.search(searchRequest, new RestStatusToXContentListener<>(channel)); + } +} \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserEventMarkAsReadAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserEventMarkAsReadAction.java index 643fa085..3dad8d61 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserEventMarkAsReadAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserEventMarkAsReadAction.java @@ -37,8 +37,8 @@ public class RestUserEventMarkAsReadAction extends AbstractRestPostMarkAsReadAct RestSecurityController securityController, UserEventService userEventService) { super(settings, controller, client, securityController, UserEventService.INDEX, UserEventService.EVENT_TYPE, - (signature, id) -> { - userEventService.markEventAsRead(signature, id); + (id, signature) -> { + userEventService.markEventAsRead(id, signature); }); } } \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserProfileUpdateAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserProfileUpdateAction.java index b33b4f71..89cf3901 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserProfileUpdateAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserProfileUpdateAction.java @@ -39,7 +39,7 @@ public class RestUserProfileUpdateAction extends AbstractRestPostUpdateAction { super(settings, controller, client, securityController, UserService.INDEX, UserService.PROFILE_TYPE, - (json, id) -> service.updateProfileFromJson(json, id)); + (id, json) -> service.updateProfileFromJson(id, json)); } } \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserSettingsUpdateAction.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserSettingsUpdateAction.java index 24c8f5be..793c5a3a 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserSettingsUpdateAction.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/rest/user/RestUserSettingsUpdateAction.java @@ -39,7 +39,7 @@ public class RestUserSettingsUpdateAction extends AbstractRestPostUpdateAction { super(settings, controller, client, securityController, UserService.INDEX, UserService.SETTINGS_TYPE, - (json, id) -> service.updateSettingsFromJson(json, id)); + (id, json) -> service.updateSettingsFromJson(id, json)); } } \ No newline at end of file diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index dec3f835..7af2957d 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -99,6 +99,7 @@ public class BlockchainUserEventService extends AbstractService implements Chang switch (change.getOperation()) { + case CREATE: case INDEX: if (change.getSource() != null) { BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/GroupService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/GroupService.java new file mode 100644 index 00000000..fa276af2 --- /dev/null +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/GroupService.java @@ -0,0 +1,271 @@ +package org.duniter.elasticsearch.user.service; + +/* + * #%L + * UCoin Java Client :: Core API + * %% + * Copyright (C) 2014 - 2015 EIS + * %% + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public + * License along with this program. If not, see + * <http://www.gnu.org/licenses/gpl-3.0.html>. + * #L% + */ + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; +import org.apache.commons.collections4.MapUtils; +import org.duniter.core.client.model.ModelUtils; +import org.duniter.core.client.model.elasticsearch.Record; +import org.duniter.core.client.model.elasticsearch.UserGroup; +import org.duniter.core.client.model.elasticsearch.UserProfile; +import org.duniter.core.exception.TechnicalException; +import org.duniter.core.service.CryptoService; +import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.exception.AccessDeniedException; +import org.duniter.elasticsearch.service.AbstractService; +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * Created by Benoit on 30/03/2015. + */ +public class GroupService extends AbstractService { + + public static final String INDEX = "group"; + public static final String RECORD_TYPE = "record"; + + @Inject + public GroupService(Client client, + PluginSettings settings, + CryptoService cryptoService) { + super("duniter." + INDEX, client, settings,cryptoService); + } + + /** + * Create index need for blockchain registry, if need + */ + public GroupService createIndexIfNotExists() { + try { + if (!existsIndex(INDEX)) { + createIndex(); + } + } + catch(JsonProcessingException e) { + throw new TechnicalException(String.format("Error while creating index [%s]", INDEX)); + } + return this; + } + + /** + * Create index for registry + * @throws JsonProcessingException + */ + public GroupService createIndex() throws JsonProcessingException { + logger.info(String.format("Creating index [%s]", INDEX)); + + CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(INDEX); + org.elasticsearch.common.settings.Settings indexSettings = org.elasticsearch.common.settings.Settings.settingsBuilder() + .put("number_of_shards", 3) + .put("number_of_replicas", 1) + //.put("analyzer", createDefaultAnalyzer()) + .build(); + createIndexRequestBuilder.setSettings(indexSettings); + createIndexRequestBuilder.addMapping(RECORD_TYPE, createRecordType()); + createIndexRequestBuilder.execute().actionGet(); + + return this; + } + + public GroupService deleteIndex() { + deleteIndexIfExists(INDEX); + return this; + } + + public boolean existsIndex() { + return super.existsIndex(INDEX); + } + + /** + * + * Index an record + * @param profileJson + * @return the record id + */ + public String indexRecordProfileFromJson(String profileJson) { + + JsonNode actualObj = readAndVerifyIssuerSignature(profileJson); + String name = getName(actualObj); + + if (logger.isDebugEnabled()) { + logger.debug(String.format("Indexing a user profile from issuer [%s]", name.substring(0, 8))); + } + + IndexResponse response = client.prepareIndex(INDEX, RECORD_TYPE) + .setSource(profileJson) + .setId(name) // always use the name as id + .setRefresh(false) + .execute().actionGet(); + return response.getId(); + } + + /** + * Update a record + * @param recordJson + */ + public ListenableActionFuture<UpdateResponse> updateRecordFromJson(String recordJson, String id) { + + JsonNode actualObj = readAndVerifyIssuerSignature(recordJson); + String name = getName(actualObj); + + if (!Objects.equals(name, id)) { + throw new AccessDeniedException(String.format("Could not update this document: not issuer.")); + } + if (logger.isDebugEnabled()) { + logger.debug(String.format("Updating a group from name [%s]", name)); + } + + return client.prepareUpdate(INDEX, RECORD_TYPE, name) + .setDoc(recordJson) + .execute(); + } + + + + protected String getName(JsonNode actualObj) { + return getMandatoryField(actualObj, UserGroup.PROPERTY_NAME).asText(); + } + + public String getTitleByName(String name) { + + Object title = getFieldById(INDEX, RECORD_TYPE, name, UserGroup.PROPERTY_NAME); + if (title == null) return null; + return title.toString(); + } + + public Map<String, String> getTitlesByNames(Set<String> names) { + + Map<String, Object> titles = getFieldByIds(INDEX, RECORD_TYPE, names, UserGroup.PROPERTY_NAME); + if (MapUtils.isEmpty(titles)) return null; + Map<String, String> result = new HashMap<>(); + titles.entrySet().stream().forEach((entry) -> result.put(entry.getKey(), entry.getValue().toString())); + return result; + } + + /* -- Internal methods -- */ + + + public XContentBuilder createRecordType() { + String stringAnalyzer = pluginSettings.getDefaultStringAnalyzer(); + + try { + XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject(RECORD_TYPE) + .startObject("properties") + + // title + .startObject("title") + .field("type", "string") + .field("analyzer", stringAnalyzer) + .endObject() + + // description + .startObject("description") + .field("type", "string") + .field("analyzer", stringAnalyzer) + .endObject() + + // creationTime + .startObject("creationTime") + .field("type", "integer") + .endObject() + + // time + .startObject("time") + .field("type", "integer") + .endObject() + + // issuer + .startObject("issuer") + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + + // avatar + .startObject("avatar") + .field("type", "attachment") + .startObject("fields") // fields + .startObject("content") // content + .field("index", "no") + .endObject() + .startObject("title") // title + .field("type", "string") + .field("store", "no") + .endObject() + .startObject("author") // author + .field("store", "no") + .endObject() + .startObject("content_type") // content_type + .field("store", "yes") + .endObject() + .endObject() + .endObject() + + // social networks + .startObject("socials") + .field("type", "nested") + .field("dynamic", "false") + .startObject("properties") + .startObject("type") // type + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .startObject("url") // url + .field("type", "string") + .field("index", "not_analyzed") + .endObject() + .endObject() + .endObject() + + // tags + .startObject("tags") + .field("type", "completion") + .field("search_analyzer", "simple") + .field("analyzer", "simple") + .field("preserve_separators", "false") + .endObject() + + .endObject() + .endObject().endObject(); + + return mapping; + } + catch(IOException ioe) { + throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_TYPE, ioe.getMessage()), ioe); + } + } + +} diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/HistoryService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/HistoryService.java index a0005085..5d208585 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/HistoryService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/HistoryService.java @@ -25,20 +25,29 @@ package org.duniter.elasticsearch.user.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.google.gson.JsonSyntaxException; import org.duniter.core.client.model.elasticsearch.DeleteRecord; import org.duniter.core.client.model.elasticsearch.MessageRecord; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; +import org.duniter.core.util.StringUtils; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.exception.NotFoundException; import org.duniter.elasticsearch.service.AbstractService; +import org.duniter.elasticsearch.user.model.UserEvent; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; +import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.*; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; import java.io.IOException; @@ -143,6 +152,26 @@ public class HistoryService extends AbstractService { return response.getId(); } + public boolean existsInDeleteHistory(final String index, final String type, final String id) { + // Prepare search request + SearchRequestBuilder searchRequest = client + .prepareSearch(INDEX) + .setTypes(DELETE_TYPE) + .setFetchSource(false) + .setSearchType(SearchType.QUERY_AND_FETCH); + + // Query = filter on index/type/id + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery() + .filter(QueryBuilders.termQuery(DeleteRecord.PROPERTY_INDEX, index)) + .filter(QueryBuilders.termQuery(DeleteRecord.PROPERTY_TYPE, type)) + .filter(QueryBuilders.termQuery(DeleteRecord.PROPERTY_ID, id)); + + searchRequest.setQuery(QueryBuilders.nestedQuery(UserEvent.PROPERTY_REFERENCE, QueryBuilders.constantScoreQuery(boolQuery))); + + // Execute query + SearchResponse response = searchRequest.execute().actionGet(); + return response.getHits().getTotalHits() > 0; + } /* -- Internal methods -- */ diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java index e6cd1aa0..58559972 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/MessageService.java @@ -26,13 +26,11 @@ package org.duniter.elasticsearch.user.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import org.duniter.core.client.model.ModelUtils; -import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.duniter.elasticsearch.service.AbstractService; -import org.duniter.elasticsearch.service.BlockchainService; import org.duniter.elasticsearch.user.model.Message; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.model.UserEventCodes; @@ -55,9 +53,13 @@ import java.util.Map; public class MessageService extends AbstractService { public static final String INDEX = "message"; - public static final String RECORD_TYPE = "record"; + public static final String INBOX_TYPE = "inbox"; public static final String OUTBOX_TYPE = "outbox"; + @Deprecated + public static final String RECORD_TYPE = "record"; + + private final UserEventService userEventService; @Inject @@ -101,23 +103,22 @@ public class MessageService extends AbstractService { * @throws JsonProcessingException */ public MessageService createIndex() throws JsonProcessingException { - logger.info(String.format("Creating index [%s/%s]", INDEX, RECORD_TYPE)); + logger.info(String.format("Creating index [%s/%s]", INDEX, INBOX_TYPE)); CreateIndexRequestBuilder createIndexRequestBuilder = client.admin().indices().prepareCreate(INDEX); Settings indexSettings = Settings.settingsBuilder() .put("number_of_shards", 2) .put("number_of_replicas", 1) - //.put("analyzer", createDefaultAnalyzer()) .build(); createIndexRequestBuilder.setSettings(indexSettings); - createIndexRequestBuilder.addMapping(RECORD_TYPE, createRecordType()); + createIndexRequestBuilder.addMapping(INBOX_TYPE, createInboxType()); createIndexRequestBuilder.addMapping(OUTBOX_TYPE, createOutboxType()); createIndexRequestBuilder.execute().actionGet(); return this; } - public String indexRecordFromJson(String recordJson) { + public String indexInboxFromJson(String recordJson) { JsonNode actualObj = readAndVerifyIssuerSignature(recordJson); String issuer = getIssuer(actualObj); @@ -128,7 +129,7 @@ public class MessageService extends AbstractService { logger.debug(String.format("Indexing a message from issuer [%s]", issuer.substring(0, 8))); } - IndexResponse response = client.prepareIndex(INDEX, RECORD_TYPE) + IndexResponse response = client.prepareIndex(INDEX, INBOX_TYPE) .setSource(recordJson) .setRefresh(false) .execute().actionGet(); @@ -140,7 +141,7 @@ public class MessageService extends AbstractService { .setRecipient(recipient) .setMessage(I18n.n("duniter.user.event.message.received"), issuer, ModelUtils.minifyPubkey(issuer)) .setTime(time) - .setReference(INDEX, RECORD_TYPE, messageId) + .setReference(INDEX, INBOX_TYPE, messageId) .build()); return messageId; @@ -163,8 +164,8 @@ public class MessageService extends AbstractService { return response.getId(); } - public void markMessageAsRead(String signature, String id) { - Map<String, Object> fields = getMandatoryFieldsById(INDEX, RECORD_TYPE, id, Message.PROPERTY_HASH, Message.PROPERTY_RECIPIENT); + public void markMessageAsRead(String id, String signature) { + Map<String, Object> fields = getMandatoryFieldsById(INDEX, INBOX_TYPE, id, Message.PROPERTY_HASH, Message.PROPERTY_RECIPIENT); String recipient = fields.get(UserEvent.PROPERTY_RECIPIENT).toString(); String hash = fields.get(UserEvent.PROPERTY_HASH).toString(); @@ -174,15 +175,15 @@ public class MessageService extends AbstractService { throw new InvalidSignatureException("Invalid signature: only the recipient can mark an message as read."); } - UpdateRequestBuilder request = client.prepareUpdate(INDEX, RECORD_TYPE, id) + UpdateRequestBuilder request = client.prepareUpdate(INDEX, INBOX_TYPE, id) .setDoc("read_signature", signature); request.execute(); } /* -- Internal methods -- */ - public XContentBuilder createRecordType() { - return createMapping(RECORD_TYPE); + public XContentBuilder createInboxType() { + return createMapping(INBOX_TYPE); } public XContentBuilder createOutboxType() { @@ -235,7 +236,7 @@ public class MessageService extends AbstractService { return mapping; } catch(IOException ioe) { - throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, RECORD_TYPE, ioe.getMessage()), ioe); + throw new TechnicalException(String.format("Error while getting mapping for index [%s/%s]: %s", INDEX, INBOX_TYPE, ioe.getMessage()), ioe); } } } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java index bcb7164e..c0b55796 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/ServiceModule.java @@ -31,8 +31,12 @@ public class ServiceModule extends AbstractModule implements Module { bind(HistoryService.class).asEagerSingleton(); bind(MessageService.class).asEagerSingleton(); + bind(UserService.class).asEagerSingleton(); + bind(GroupService.class).asEagerSingleton(); + bind(UserEventService.class).asEagerSingleton(); + bind(BlockchainUserEventService.class).asEagerSingleton(); bind(SynchroService.class).asEagerSingleton(); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java index 263f0af0..c77368a6 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/SynchroService.java @@ -26,6 +26,7 @@ import org.duniter.core.client.model.elasticsearch.Protocol; import org.duniter.core.client.model.local.Peer; import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginSettings; +import org.duniter.elasticsearch.model.SynchroResult; import org.duniter.elasticsearch.service.ServiceLocator; import org.duniter.elasticsearch.service.AbstractSynchroService; import org.duniter.elasticsearch.threadpool.ThreadPool; @@ -59,19 +60,40 @@ public class SynchroService extends AbstractSynchroService { logger.info(String.format("[%s] Synchronizing user data since %s...", peer.toString(), sinceTime)); - importUserChanges(peer, sinceTime); - importMessageChanges(peer, sinceTime); + SynchroResult result = new SynchroResult(); + long time = System.currentTimeMillis(); - logger.info(String.format("[%s] Synchronizing user data since %s [OK]", peer.toString(), sinceTime)); + importHistoryChanges(result, peer, sinceTime); + importUserChanges(result, peer, sinceTime); + importMessageChanges(result, peer, sinceTime); + importGroupChanges(result, peer, sinceTime); + + long duration = System.currentTimeMillis() - time; + logger.info(String.format("[%s] Synchronizing user data since %s [OK] %s (ins %s ms)", peer.toString(), sinceTime, result.toString(), duration)); + } + + protected void importHistoryChanges(SynchroResult result, Peer peer, long sinceTime) { + importChanges(result, peer, HistoryService.INDEX, HistoryService.DELETE_TYPE, sinceTime); } - protected void importUserChanges(Peer peer, long sinceTime) { - importChanges(peer, UserService.INDEX, UserService.PROFILE_TYPE, sinceTime); - importChanges(peer, UserService.INDEX, UserService.SETTINGS_TYPE, sinceTime); + protected void importUserChanges(SynchroResult result, Peer peer, long sinceTime) { + importChanges(result, peer, UserService.INDEX, UserService.PROFILE_TYPE, sinceTime); + importChanges(result, peer, UserService.INDEX, UserService.SETTINGS_TYPE, sinceTime); } - protected void importMessageChanges(Peer peer, long sinceTime) { - importChanges(peer, MessageService.INDEX, MessageService.RECORD_TYPE, sinceTime); - importChanges(peer, MessageService.INDEX, MessageService.OUTBOX_TYPE, sinceTime); + protected void importMessageChanges(SynchroResult result, Peer peer, long sinceTime) { + // For compat + // TODO: remove this later + importChangesRemap(result, peer, MessageService.INDEX, MessageService.RECORD_TYPE, + MessageService.INDEX, MessageService.INBOX_TYPE, + sinceTime); + + importChanges(result, peer, MessageService.INDEX, MessageService.INBOX_TYPE, sinceTime); + importChanges(result, peer, MessageService.INDEX, MessageService.OUTBOX_TYPE, sinceTime); } + + protected void importGroupChanges(SynchroResult result, Peer peer, long sinceTime) { + importChanges(result, peer, GroupService.INDEX, GroupService.RECORD_TYPE, sinceTime); + } + } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index c61630e2..9e575ebc 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -202,7 +202,7 @@ public class UserEventService extends AbstractService { }); } - public void markEventAsRead(String signature, String id) { + public void markEventAsRead(String id, String signature) { Map<String, Object> fields = getMandatoryFieldsById(INDEX, EVENT_TYPE, id, UserEvent.PROPERTY_HASH, UserEvent.PROPERTY_RECIPIENT); String recipient = fields.get(UserEvent.PROPERTY_RECIPIENT).toString(); diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java index 96b162cc..ed587e26 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserService.java @@ -35,6 +35,7 @@ import org.duniter.core.service.MailService; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.exception.AccessDeniedException; import org.duniter.elasticsearch.service.AbstractService; +import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; @@ -138,7 +139,7 @@ public class UserService extends AbstractService { * Update an user profile * @param profileJson */ - public void updateProfileFromJson(String profileJson, String id) { + public ListenableActionFuture<UpdateResponse> updateProfileFromJson(String profileJson, String id) { JsonNode actualObj = readAndVerifyIssuerSignature(profileJson); String issuer = getIssuer(actualObj); @@ -147,12 +148,12 @@ public class UserService extends AbstractService { throw new AccessDeniedException(String.format("Could not update this document: not issuer.")); } if (logger.isDebugEnabled()) { - logger.debug(String.format("Indexing a user profile from issuer [%s]", issuer.substring(0, 8))); + logger.debug(String.format("Updating a user profile from issuer [%s]", issuer.substring(0, 8))); } - UpdateResponse response = client.prepareUpdate(INDEX, PROFILE_TYPE, issuer) + return client.prepareUpdate(INDEX, PROFILE_TYPE, issuer) .setDoc(profileJson) - .execute().actionGet(); + .execute(); } /** @@ -182,7 +183,7 @@ public class UserService extends AbstractService { * Update user settings * @param settingsJson settings, as JSON string */ - public void updateSettingsFromJson(String settingsJson, String id) { + public ListenableActionFuture<UpdateResponse> updateSettingsFromJson(String id, String settingsJson) { JsonNode actualObj = readAndVerifyIssuerSignature(settingsJson); String issuer = getIssuer(actualObj); @@ -194,9 +195,9 @@ public class UserService extends AbstractService { logger.debug(String.format("Indexing a user settings from issuer [%s]", issuer.substring(0, 8))); } - UpdateResponse response = client.prepareUpdate(INDEX, SETTINGS_TYPE, issuer) + return client.prepareUpdate(INDEX, SETTINGS_TYPE, issuer) .setDoc(settingsJson) - .execute().actionGet(); + .execute(); } -- GitLab