Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • clients/cesium-grp/cesium-plus-pod
  • clients/java/duniter4j
  • ji_emme/duniter4j
  • dvermd/cesium-plus-pod
  • okayotanoka/cesium-plus-pod
  • pokapow/cesium-plus-pod
  • pini-gh/cesium-plus-pod
7 results
Show changes
Showing
with 3805 additions and 84 deletions
source diff could not be displayed: it is too large. Options to address this: view the blob.
source diff could not be displayed: it is too large. Options to address this: view the blob.
source diff could not be displayed: it is too large. Options to address this: view the blob.
source diff could not be displayed: it is too large. Options to address this: view the blob.
GNU GENERAL PUBLIC LICENSE GNU AFFERO GENERAL PUBLIC LICENSE
Version 3, 29 June 2007 Version 3, 19 November 2007
Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/> Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
Everyone is permitted to copy and distribute verbatim copies Everyone is permitted to copy and distribute verbatim copies
...@@ -7,17 +7,15 @@ ...@@ -7,17 +7,15 @@
Preamble Preamble
The GNU General Public License is a free, copyleft license for The GNU Affero General Public License is a free, copyleft license for
software and other kinds of works. software and other kinds of works, specifically designed to ensure
cooperation with the community in the case of network server software.
The licenses for most software and other practical works are designed The licenses for most software and other practical works are designed
to take away your freedom to share and change the works. By contrast, to take away your freedom to share and change the works. By contrast,
the GNU General Public License is intended to guarantee your freedom to our General Public Licenses are intended to guarantee your freedom to
share and change all versions of a program--to make sure it remains free share and change all versions of a program--to make sure it remains free
software for all its users. We, the Free Software Foundation, use the software for all its users.
GNU General Public License for most of our software; it applies also to
any other work released this way by its authors. You can apply it to
your programs, too.
When we speak of free software, we are referring to freedom, not When we speak of free software, we are referring to freedom, not
price. Our General Public Licenses are designed to make sure that you price. Our General Public Licenses are designed to make sure that you
...@@ -26,44 +24,34 @@ them if you wish), that you receive source code or can get it if you ...@@ -26,44 +24,34 @@ them if you wish), that you receive source code or can get it if you
want it, that you can change the software or use pieces of it in new want it, that you can change the software or use pieces of it in new
free programs, and that you know you can do these things. free programs, and that you know you can do these things.
To protect your rights, we need to prevent others from denying you Developers that use our General Public Licenses protect your rights
these rights or asking you to surrender the rights. Therefore, you have with two steps: (1) assert copyright on the software, and (2) offer
certain responsibilities if you distribute copies of the software, or if you this License which gives you legal permission to copy, distribute
you modify it: responsibilities to respect the freedom of others. and/or modify the software.
For example, if you distribute copies of such a program, whether A secondary benefit of defending all users' freedom is that
gratis or for a fee, you must pass on to the recipients the same improvements made in alternate versions of the program, if they
freedoms that you received. You must make sure that they, too, receive receive widespread use, become available for other developers to
or can get the source code. And you must show them these terms so they incorporate. Many developers of free software are heartened and
know their rights. encouraged by the resulting cooperation. However, in the case of
software used on network servers, this result may fail to come about.
Developers that use the GNU GPL protect your rights with two steps: The GNU General Public License permits making a modified version and
(1) assert copyright on the software, and (2) offer you this License letting the public access it on a server without ever releasing its
giving you legal permission to copy, distribute and/or modify it. source code to the public.
For the developers' and authors' protection, the GPL clearly explains The GNU Affero General Public License is designed specifically to
that there is no warranty for this free software. For both users' and ensure that, in such cases, the modified source code becomes available
authors' sake, the GPL requires that modified versions be marked as to the community. It requires the operator of a network server to
changed, so that their problems will not be attributed erroneously to provide the source code of the modified version running there to the
authors of previous versions. users of that server. Therefore, public use of a modified version, on
a publicly accessible server, gives the public access to the source
Some devices are designed to deny users access to install or run code of the modified version.
modified versions of the software inside them, although the manufacturer
can do so. This is fundamentally incompatible with the aim of An older license, called the Affero General Public License and
protecting users' freedom to change the software. The systematic published by Affero, was designed to accomplish similar goals. This is
pattern of such abuse occurs in the area of products for individuals to a different license, not a version of the Affero GPL, but Affero has
use, which is precisely where it is most unacceptable. Therefore, we released a new version of the Affero GPL which permits relicensing under
have designed this version of the GPL to prohibit the practice for those this license.
products. If such problems arise substantially in other domains, we
stand ready to extend this provision to those domains in future versions
of the GPL, as needed to protect the freedom of users.
Finally, every program is threatened constantly by software patents.
States should not allow patents to restrict development and use of
software on general-purpose computers, but in those that do, we wish to
avoid the special danger that patents applied to a free program could
make it effectively proprietary. To prevent this, the GPL assures that
patents cannot be used to render the program non-free.
The precise terms and conditions for copying, distribution and The precise terms and conditions for copying, distribution and
modification follow. modification follow.
...@@ -72,7 +60,7 @@ modification follow. ...@@ -72,7 +60,7 @@ modification follow.
0. Definitions. 0. Definitions.
"This License" refers to version 3 of the GNU General Public License. "This License" refers to version 3 of the GNU Affero General Public License.
"Copyright" also means copyright-like laws that apply to other kinds of "Copyright" also means copyright-like laws that apply to other kinds of
works, such as semiconductor masks. works, such as semiconductor masks.
...@@ -549,35 +537,45 @@ to collect a royalty for further conveying from those to whom you convey ...@@ -549,35 +537,45 @@ to collect a royalty for further conveying from those to whom you convey
the Program, the only way you could satisfy both those terms and this the Program, the only way you could satisfy both those terms and this
License would be to refrain entirely from conveying the Program. License would be to refrain entirely from conveying the Program.
13. Use with the GNU Affero General Public License. 13. Remote Network Interaction; Use with the GNU General Public License.
Notwithstanding any other provision of this License, if you modify the
Program, your modified version must prominently offer all users
interacting with it remotely through a computer network (if your version
supports such interaction) an opportunity to receive the Corresponding
Source of your version by providing access to the Corresponding Source
from a network server at no charge, through some standard or customary
means of facilitating copying of software. This Corresponding Source
shall include the Corresponding Source for any work covered by version 3
of the GNU General Public License that is incorporated pursuant to the
following paragraph.
Notwithstanding any other provision of this License, you have Notwithstanding any other provision of this License, you have
permission to link or combine any covered work with a work licensed permission to link or combine any covered work with a work licensed
under version 3 of the GNU Affero General Public License into a single under version 3 of the GNU General Public License into a single
combined work, and to convey the resulting work. The terms of this combined work, and to convey the resulting work. The terms of this
License will continue to apply to the part which is the covered work, License will continue to apply to the part which is the covered work,
but the special requirements of the GNU Affero General Public License, but the work with which it is combined will remain governed by version
section 13, concerning interaction through a network will apply to the 3 of the GNU General Public License.
combination as such.
14. Revised Versions of this License. 14. Revised Versions of this License.
The Free Software Foundation may publish revised and/or new versions of The Free Software Foundation may publish revised and/or new versions of
the GNU General Public License from time to time. Such new versions will the GNU Affero General Public License from time to time. Such new versions
be similar in spirit to the present version, but may differ in detail to will be similar in spirit to the present version, but may differ in detail to
address new problems or concerns. address new problems or concerns.
Each version is given a distinguishing version number. If the Each version is given a distinguishing version number. If the
Program specifies that a certain numbered version of the GNU General Program specifies that a certain numbered version of the GNU Affero General
Public License "or any later version" applies to it, you have the Public License "or any later version" applies to it, you have the
option of following the terms and conditions either of that numbered option of following the terms and conditions either of that numbered
version or of any later version published by the Free Software version or of any later version published by the Free Software
Foundation. If the Program does not specify a version number of the Foundation. If the Program does not specify a version number of the
GNU General Public License, you may choose any version ever published GNU Affero General Public License, you may choose any version ever published
by the Free Software Foundation. by the Free Software Foundation.
If the Program specifies that a proxy can decide which future If the Program specifies that a proxy can decide which future
versions of the GNU General Public License can be used, that proxy's versions of the GNU Affero General Public License can be used, that proxy's
public statement of acceptance of a version permanently authorizes you public statement of acceptance of a version permanently authorizes you
to choose that version for the Program. to choose that version for the Program.
...@@ -635,40 +633,29 @@ the "copyright" line and a pointer to where the full notice is found. ...@@ -635,40 +633,29 @@ the "copyright" line and a pointer to where the full notice is found.
Copyright (C) <year> <name of author> Copyright (C) <year> <name of author>
This program is free software: you can redistribute it and/or modify This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or the Free Software Foundation, either version 3 of the License, or
(at your option) any later version. (at your option) any later version.
This program is distributed in the hope that it will be useful, This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details. GNU Affero General Public License for more details.
You should have received a copy of the GNU General Public License You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
Also add information on how to contact you by electronic and paper mail. Also add information on how to contact you by electronic and paper mail.
If the program does terminal interaction, make it output a short If your software can interact with users remotely through a computer
notice like this when it starts in an interactive mode: network, you should also make sure that it provides a way for users to
get its source. For example, if your program is a web application, its
<program> Copyright (C) <year> <name of author> interface could display a "Source" link that leads users to an archive
This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. of the code. There are many ways you could offer source, and different
This is free software, and you are welcome to redistribute it solutions will be better for different programs; see section 13 for the
under certain conditions; type `show c' for details. specific requirements.
The hypothetical commands `show w' and `show c' should show the appropriate
parts of the General Public License. Of course, your program's commands
might be different; for a GUI interface, you would use an "about box".
You should also get your employer (if you work as a programmer) or school, You should also get your employer (if you work as a programmer) or school,
if any, to sign a "copyright disclaimer" for the program, if necessary. if any, to sign a "copyright disclaimer" for the program, if necessary.
For more information on this, and how to apply and follow the GNU GPL, see For more information on this, and how to apply and follow the GNU AGPL, see
<http://www.gnu.org/licenses/>. <http://www.gnu.org/licenses/>.
The GNU General Public License does not permit incorporating your program
into proprietary programs. If your program is a subroutine library, you
may consider it more useful to permit linking proprietary applications with
the library. If this is what you want to do, use the GNU Lesser General
Public License instead of this License. But first, please read
<http://www.gnu.org/philosophy/why-not-lgpl.html>.
...@@ -2,18 +2,19 @@ ...@@ -2,18 +2,19 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<parent> <parent>
<groupId>org.duniter</groupId> <groupId>org.duniter.cesium</groupId>
<artifactId>duniter4j</artifactId> <artifactId>cesium-plus-pod</artifactId>
<version>0.3.5-SNAPSHOT</version> <version>1.11.0</version>
</parent> </parent>
<groupId>org.duniter</groupId> <artifactId>cesium-plus-pod-core</artifactId>
<artifactId>duniter4j-core-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Duniter4j :: Core Client API</name> <name>Cesium+ pod :: Core plugin</name>
<description>An ElasticSearch plugin that can index data from a Duniter currency</description>
<properties> <properties>
<i18n.bundleOutputName>duniter4j-core-client-i18n</i18n.bundleOutputName> <!-- i18n configuration -->
<i18n.bundleOutputName>${project.artifactId}-i18n</i18n.bundleOutputName>
<i18n.generateCsvFile>true</i18n.generateCsvFile> <i18n.generateCsvFile>true</i18n.generateCsvFile>
<i18n.bundleCsvFile> <i18n.bundleCsvFile>
${maven.gen.dir}/resources/META-INF/${i18n.bundleOutputName}.csv ${maven.gen.dir}/resources/META-INF/${i18n.bundleOutputName}.csv
...@@ -21,94 +22,116 @@ ...@@ -21,94 +22,116 @@
<config.i18nBundleName>${i18n.bundleOutputName}</config.i18nBundleName> <config.i18nBundleName>${i18n.bundleOutputName}</config.i18nBundleName>
</properties> </properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.duniter</groupId> <groupId>org.duniter</groupId>
<artifactId>duniter4j-core-shared</artifactId> <artifactId>duniter4j-core-client</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>${project.parent.artifactId}-model</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- LOGGING DEPENDENCIES - SLF4J --> <!-- LOGGING DEPENDENCIES - SLF4J -->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency> <dependency>
<groupId>javax.websocket</groupId> <groupId>org.slf4j</groupId>
<artifactId>javax.websocket-api</artifactId> <artifactId>slf4j-log4j12</artifactId>
<scope>runtime</scope> <optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.glassfish.tyrus</groupId>
<artifactId>tyrus-client</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.glassfish.tyrus</groupId>
<artifactId>tyrus-container-grizzly-client</artifactId>
<version>1.12</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
</dependency>
<dependency>
<groupId>org.nuiton</groupId>
<artifactId>nuiton-config</artifactId>
</dependency>
<dependency>
<groupId>org.nuiton.i18n</groupId>
<artifactId>nuiton-i18n</artifactId>
</dependency>
<dependency> <dependency>
<groupId>com.google.code.gson</groupId> <groupId>com.fasterxml.jackson.core</groupId>
<artifactId>gson</artifactId> <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>de.grundid.opendatalab</groupId>
<artifactId>geojson-jackson</artifactId>
</dependency> </dependency>
<!-- Unit test --> <!-- Elastic Search -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>org.elasticsearch</groupId>
<artifactId>junit</artifactId> <artifactId>elasticsearch</artifactId>
<scope>test</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>com.sun.mail</groupId>
<artifactId>slf4j-log4j12</artifactId> <artifactId>jakarta.mail</artifactId>
<scope>test</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<optional>true</optional>
<scope>provided</scope>
</dependency>
<!-- Antlr String template -->
<dependency>
<groupId>org.antlr</groupId>
<artifactId>ST4</artifactId>
<scope>compile</scope>
</dependency>
<!-- JNA (need for OS shutdown hook) -->
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna-platform</artifactId>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Websocket -->
<dependency>
<groupId>javax.websocket</groupId>
<artifactId>javax.websocket-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- Unit test -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>${project.parent.artifactId}-test</artifactId>
<version>${project.version}</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
<testResources> <resources>
<testResource> <resource>
<directory>src/test/resources</directory> <directory>src/main/filtered-resources</directory>
<filtering>true</filtering>
<includes>
<include>*.config</include>
<include>**/*.properties</include>
</includes>
</resource>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering> <filtering>false</filtering>
</testResource> </resource>
</testResources> </resources>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.nuiton.i18n</groupId> <groupId>org.nuiton.i18n</groupId>
...@@ -142,6 +165,31 @@ ...@@ -142,6 +165,31 @@
</execution> </execution>
</executions> </executions>
</plugin> </plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>assembly-plugin</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<attach>true</attach>
<appendAssemblyId>false</appendAssemblyId>
<finalName>${project.artifactId}-${project.version}</finalName>
<descriptors>
<descriptor>
${basedir}/src/main/assembly/plugin.xml
</descriptor>
</descriptors>
<skipAssembly>${assembly.skip}</skipAssembly>
</configuration>
</execution>
</executions>
</plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
# Generated by org.codehaus.mojo.license.AddThirdPartyMojo
#-------------------------------------------------------------------------------
# Already used licenses in project :
# - ASL, version 2
# - Apache 2.0
# - Apache License 2.0
# - Apache License Version 2.0
# - BSD License
# - BSD licence
# - CC0 1.0 Universal
# - CDDL
# - CDDL+GPL
# - COMMON DEVELOPMENT AND DISTRIBUTION LICENSE (CDDL) Version 1.0
# - Common Development and Distribution License (CDDL) v1.0
# - Dual license consisting of the CDDL v1.1 and GPL v2
# - Eclipse Public License 1.0
# - GPLv2+CE
# - General Public License (GPL) v3
# - Indiana University Extreme! Lab Software License, vesion 1.1.1
# - LGPL, version 2.1
# - Lesser General Public License (LGPL) v 3.0
# - Lesser General Public License (LPGL)
# - Lesser General Public License (LPGL) v 2.1
# - Lesser General Public License (LPGL) version 3.0
# - MIT License
# - New BSD License
# - Public Domain, per Creative Commons CC0
# - The Apache Software License, Version 2.0
#-------------------------------------------------------------------------------
# Please fill the missing licenses for dependencies :
#
#
#Fri May 18 18:26:32 CEST 2018
commons-primitives--commons-primitives--1.0=The Apache Software License, Version 2.0
org.antlr--ST4--4.1=BSD License
\ No newline at end of file
...@@ -15,8 +15,6 @@ ...@@ -15,8 +15,6 @@
<useProjectArtifact>true</useProjectArtifact> <useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering> <useTransitiveFiltering>true</useTransitiveFiltering>
<excludes> <excludes>
<exclude>org.elasticsearch:elasticsearch</exclude>
<exclude>net.java.dev.jna:jna</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude> <exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>log4j:log4j</exclude> <exclude>log4j:log4j</exclude>
</excludes> </excludes>
......
app.name=duniter4j
duniter4j.config.path=sqqs
duniter4j.version=${project.version} duniter4j.version=${project.version}
duniter4j.site.url=${project.url} duniter4j.site.url=${project.url}
duniter4j.inceptionYear=${project.inceptionYear} duniter4j.inceptionYear=${project.inceptionYear}
......
###
# Global logging configuration # Global logging configuration
log4j.rootLogger=ERROR, stdout, file log4j.rootLogger=ERROR, stdout, file
#log4j.rootLogger=ERROR, stdout
# Console output # Console output
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %5p (%c:%L) - %m%n log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %5p %m%n
# ucoin levels # Duniter4j levels
log4j.logger.org.duniter=INFO log4j.logger.org.duniter=INFO
#log4j.logger.org.duniter.core.client=DEBUG
#log4j.logger.org.duniter.core.client.service=DEBUG #log4j.logger.org.duniter.core.client.service=DEBUG
log4j.logger.org.duniter.core.client.service.bma=DEBUG log4j.logger.org.duniter.elasticsearch=DEBUG
log4j.logger.org.duniter.core.beans=WARN log4j.logger.org.duniter.core.client.model.bma.Endpoints=ERROR
# Other frameworks levels
log4j.logger.org.apache.http=ERROR
log4j.logger.org.nuiton.util=WARN
log4j.logger.org.nuiton.config=WARN
log4j.logger.org.nuiton.converter=WARN
log4j.logger.org.nuiton.i18n=ERROR
log4j.logger.org.elasticsearch=WARN
#log4j.logger.org.elasticsearch=INFO
log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.file=ucoin-client.log log4j.appender.file.file=${duniter4j.log.file}
log4j.appender.file.MaxFileSize=10MB log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=4 log4j.appender.file.MaxBackupIndex=4
log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{ISO8601} %5p %c - %m%n log4j.appender.file.layout.ConversionPattern=%d{ISO8601} %5p (%c:%L) - [%t] %m%n
name=duniter4j-elasticsearch name=cesium-plus-pod-core
description=Plugin for Duniter node indexation description=Core plugin for Cesium+ pod
version=${project.version} version=${project.version}
site=false site=false
jvm=true jvm=true
classname=org.duniter.elasticsearch.Plugin classname=org.duniter.elasticsearch.Plugin
java.version=1.7 java.version=1.8
elasticsearch.version=2.3.3 elasticsearch.version=2.4.6
isolated=true isolated=false
package org.duniter.elasticsearch;
/*
* #%L
* duniter4j-elasticsearch-plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import com.google.common.collect.Lists;
import org.duniter.elasticsearch.dao.DaoModule;
import org.duniter.elasticsearch.rest.RestModule;
import org.duniter.elasticsearch.script.BlockchainTxCountScriptFactory;
import org.duniter.elasticsearch.security.SecurityModule;
import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.service.ServiceModule;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.duniter.elasticsearch.websocket.WebSocketModule;
import org.duniter.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.duniter.elasticsearch.http.WebSocketServerModule;
import org.elasticsearch.http.HttpServerModule;
import org.elasticsearch.script.ScriptModule;
import java.util.Collection;
public class Plugin extends org.elasticsearch.plugins.Plugin {
private ESLogger logger;
private boolean enable;
private boolean enableWs;
@Inject public Plugin(Settings settings) {
this.logger = Loggers.getLogger("duniter.core", settings, new String[0]);
this.enable = settings.getAsBoolean("duniter.enable", true);
this.enableWs = settings.getAsBoolean("duniter.ws.enable", this.enable);
}
@Override
public String name() {
return "cesium-plus-pod-core";
}
@Override
public String description() {
return "Duniter Core Plugin";
}
public void onModule(ScriptModule scriptModule) {
// TODO: in ES v5+, see example here :
// https://github.com/imotov/elasticsearch-native-script-example/blob/60a390f77f2fb25cb89d76de5071c52207a57b5f/src/main/java/org/elasticsearch/examples/nativescript/plugin/NativeScriptExamplesPlugin.java
scriptModule.registerScript("txcount", BlockchainTxCountScriptFactory.class);
}
public void onModule(HttpServerModule httpServerModule) {
if (this.enableWs) httpServerModule.setHttpServerTransport(NettyHttpServerTransport.class, "cesium-plus-core");
}
@Override
public Collection<Module> nodeModules() {
if (!enable) {
logger.warn(description() + " has been disabled.");
return Lists.newArrayList();
}
Collection<Module> modules = Lists.newArrayList();
modules.add(new SecurityModule());
modules.add(new RestModule());
// Websocket
if (this.enableWs) {
modules.add(new WebSocketServerModule());
modules.add(new WebSocketModule());
}
modules.add(new DaoModule());
modules.add(new ServiceModule());
return modules;
}
@Override
public Collection<Class<? extends LifecycleComponent>> nodeServices() {
if (!enable) return Lists.newArrayList();
Collection<Class<? extends LifecycleComponent>> components = Lists.newArrayList();
components.add(PluginSettings.class);
components.add(ThreadPool.class);
components.add(PluginInit.class);
return components;
}
/* -- protected methods -- */
}
package org.duniter.elasticsearch;
/*
* #%L
* Duniter4j :: ElasticSearch Plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.commons.io.IOUtils;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.local.Currency;
import org.duniter.core.client.model.local.Identity;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.client.service.bma.BlockchainRemoteService;
import org.duniter.core.service.MailService;
import org.duniter.core.util.Beans;
import org.duniter.core.util.Preconditions;
import org.duniter.elasticsearch.dao.*;
import org.duniter.elasticsearch.rest.security.RestSecurityController;
import org.duniter.elasticsearch.service.*;
import org.duniter.elasticsearch.synchro.SynchroService;
import org.duniter.elasticsearch.threadpool.ScheduledActionFuture;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestRequest;
import java.io.Closeable;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* Created by blavenie on 17/06/16.
*/
public class PluginInit extends AbstractLifecycleComponent<PluginInit> {
public static final String CURRENCY_NAME_REGEXP = "[a-zA-Z0-9_-]+";
private final PluginSettings pluginSettings;
private final ThreadPool threadPool;
private final Injector injector;
private final ESLogger logger;
@Inject
public PluginInit(Settings settings, PluginSettings pluginSettings, ThreadPool threadPool, final Injector injector) {
super(settings);
this.logger = Loggers.getLogger("duniter.core", settings, new String[0]);
this.pluginSettings = pluginSettings;
this.threadPool = threadPool;
this.injector = injector;
}
@Override
protected void doStart() {
// Configure HTTP API access rules (run once)
threadPool.scheduleOnStarted(this::defineHttpAccessRules);
// Each time the node is the master
threadPool.onMasterStart(() -> {
logger.info(String.format("Starting core jobs..."
+ " blockchain {block: %s, peer: %s, pending: %s},"
+ " p2p {synchro: %s, websocket: %s, emit_peering: %s},"
+ " security {enable: %s, quota: %s},"
+ " doc_stats {enable: %s}",
pluginSettings.enableBlockchainIndexation(),
pluginSettings.enableBlockchainPeerIndexation(),
pluginSettings.enablePendingMembershipIndexation(),
pluginSettings.enableSynchro(),
pluginSettings.enableSynchroWebsocket(),
pluginSettings.enablePeering(),
pluginSettings.enableSecurity(),
pluginSettings.enableQuota(),
pluginSettings.enableDocStats()));
// Create indices (once)
createIndices();
threadPool.scheduleOnClusterReady(() -> {
// Start blockchain indexation (and wait)
startIndexBlockchain().map(ScheduledActionFuture::actionGet);
// Start synchro (and wait)
startSynchro().map(ScheduledActionFuture::actionGet);
// Start publish peering (and wait)
startPublishingPeer().map(ScheduledActionFuture::actionGet);
// Start doc stats (and wait)
startDocStatistics().map(ScheduledActionFuture::actionGet);
// Migrate old data (and wait)
startDataMigration().actionGet();
logger.info("Starting core jobs... [OK]");
});
});
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
protected void defineHttpAccessRules() {
// Synchronize blockchain
if (pluginSettings.enableBlockchainIndexation()) {
// Add access security rules, for the currency indices
injector.getInstance(RestSecurityController.class)
// Add access to currencies/record index
.allowIndexType(RestRequest.Method.GET,
CurrencyExtendRepository.INDEX,
CurrencyExtendRepository.RECORD_TYPE)
.allowPostSearchIndexType(
CurrencyExtendRepository.INDEX,
CurrencyExtendRepository.RECORD_TYPE)
// Add access to <currency>/block index
.allowIndexType(RestRequest.Method.GET,
CURRENCY_NAME_REGEXP,
BlockRepository.TYPE)
.allowPostSearchIndexType(
CURRENCY_NAME_REGEXP,
BlockRepository.TYPE)
// Add access to <currency>/blockStat index
.allowIndexType(RestRequest.Method.GET,
CURRENCY_NAME_REGEXP,
BlockStatRepository.TYPE)
.allowPostSearchIndexType(
CURRENCY_NAME_REGEXP,
BlockStatRepository.TYPE)
// Add access to <currency>/peer index
.allowIndexType(RestRequest.Method.GET,
CURRENCY_NAME_REGEXP,
PeerExtendRepository.TYPE)
.allowPostSearchIndexType(
CURRENCY_NAME_REGEXP,
PeerExtendRepository.TYPE)
// Add access to <currency>/movement index
.allowIndexType(RestRequest.Method.GET,
CURRENCY_NAME_REGEXP,
MovementRepository.TYPE)
.allowPostSearchIndexType(
CURRENCY_NAME_REGEXP,
MovementRepository.TYPE)
// Add access to <currency>/member index
.allowIndexType(RestRequest.Method.GET,
CURRENCY_NAME_REGEXP,
MemberRepository.TYPE)
.allowPostSearchIndexType(
CURRENCY_NAME_REGEXP,
MemberRepository.TYPE)
// Add access to <currency>/pending index
.allowIndexType(RestRequest.Method.GET,
CURRENCY_NAME_REGEXP,
PendingMembershipRepository.TYPE)
.allowPostSearchIndexType(
CURRENCY_NAME_REGEXP,
PendingMembershipRepository.TYPE)
// Add access to <currency>/synchro index
.allowIndexType(RestRequest.Method.GET,
CURRENCY_NAME_REGEXP,
SynchroExecutionRepository.TYPE)
.allowPostSearchIndexType(
CURRENCY_NAME_REGEXP,
SynchroExecutionRepository.TYPE);
}
// Allow scroll search (need by synchro from other peers)
injector.getInstance(RestSecurityController.class)
.allow(RestRequest.Method.POST, "^/_search/scroll$")
.allow(RestRequest.Method.DELETE, "^/_search/scroll$"); // WARN: should NEVER authorized URL likes /_search/scroll/all (= ALL scroll )
// Add access to document/stats index
if (pluginSettings.enableDocStats()) {
injector.getInstance(RestSecurityController.class)
.allowIndexType(RestRequest.Method.GET,
DocStatRepository.INDEX,
DocStatRepository.TYPE)
.allowPostSearchIndexType(
DocStatRepository.INDEX,
DocStatRepository.TYPE);
}
// Add access to search request log
if (pluginSettings.enableQuota()) {
injector.getInstance(RestSecurityController.class)
.allowPostSearchIndexType(
RequestLogRepository.INDEX,
RequestLogRepository.TYPE);
}
}
protected ScheduledActionFuture<?> createIndices() {
checkMasterNode();
// Reload All indices
if (pluginSettings.reloadAllIndices()) {
if (logger.isWarnEnabled()) {
logger.warn("Reloading indices...");
}
injector.getInstance(CurrencyService.class)
.deleteIndex()
.createIndexIfNotExists();
if (pluginSettings.enableDocStats()) {
injector.getInstance(DocStatService.class)
.deleteIndex()
.createIndexIfNotExists();
}
if (pluginSettings.enableQuota() && pluginSettings.logRejectedRequests()) {
injector.getInstance(RequestLogService.class)
.deleteIndex()
.createIndexIfNotExists();
}
if (logger.isInfoEnabled()) {
logger.info("Reloading indices [OK]");
}
}
else if (pluginSettings.enableBlockchainIndexation() && pluginSettings.reloadBlockchainIndices() && pluginSettings.reloadBlockchainIndicesFrom() <= 0) {
if (logger.isWarnEnabled()) {
logger.warn("/!\\ Reloading blockchain indices...");
}
injector.getInstance(CurrencyService.class)
.deleteIndex()
.createIndexIfNotExists();
if (logger.isInfoEnabled()) {
logger.info("Reloading blockchain indices [OK]");
}
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Checking indices...");
}
injector.getInstance(CurrencyService.class)
.createIndexIfNotExists();
if (pluginSettings.enableDocStats()) {
injector.getInstance(DocStatService.class)
.createIndexIfNotExists();
}
if (pluginSettings.enableQuota() && pluginSettings.logRejectedRequests()) {
injector.getInstance(RequestLogService.class)
.createIndexIfNotExists();
if (pluginSettings.enableDocStats()) {
injector.getInstance(DocStatService.class)
.registerIndex(RequestLogRepository.INDEX, RequestLogRepository.TYPE);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Checking indices [OK]");
}
}
// IMPORTANT: make sure cluster is ready (=all indices created)
return threadPool.scheduleOnClusterReady(() -> {});
}
protected void checkMasterNode() {
Preconditions.checkArgument(threadPool.isMasterNode(), "Node must be the master node to execute this job");
}
protected void reloadBlockchainIfNeed(Peer peer) {
// Reload the blockchain
if (pluginSettings.reloadBlockchainIndices()) {
// If partial reload (from a block)
if (pluginSettings.reloadBlockchainIndicesFrom() > 0) {
// Delete blocs range [from,to]
if (pluginSettings.reloadBlockchainIndicesTo() >= pluginSettings.reloadBlockchainIndicesFrom()) {
logger.warn(String.format("[%s] /!\\ Re-indexing blockchain range [%s-%s]...",
peer.getCurrency(),
pluginSettings.reloadBlockchainIndicesFrom(),
pluginSettings.reloadBlockchainIndicesTo()));
injector.getInstance(BlockchainService.class)
.deleteRange(peer.getCurrency(),
pluginSettings.reloadBlockchainIndicesFrom(),
pluginSettings.reloadBlockchainIndicesTo());
}
else {
logger.warn(String.format("[%s] /!\\ Re-indexing blockchain from block #%s...", peer.getCurrency(), pluginSettings.reloadBlockchainIndicesFrom()));
injector.getInstance(BlockchainService.class)
.deleteFrom(peer.getCurrency(), pluginSettings.reloadBlockchainIndicesFrom());
}
}
// Reindex range
if (pluginSettings.reloadBlockchainIndicesFrom() > 0 &&
pluginSettings.reloadBlockchainIndicesTo() >= pluginSettings.reloadBlockchainIndicesFrom()) {
// Wait cluster finished deletion, then reindex
threadPool.scheduleOnClusterReady(() -> {
injector.getInstance(BlockchainService.class)
.indexBlocksRange(peer,
pluginSettings.reloadBlockchainIndicesFrom(),
pluginSettings.reloadBlockchainIndicesTo());
})
.actionGet();
}
}
}
protected Optional<ScheduledActionFuture<?>> startIndexBlockchain() {
if (!pluginSettings.enableBlockchainIndexation()) return Optional.empty(); // Skip
checkMasterNode();
try {
// Index the currency
Peer peer = pluginSettings.checkAndGetDuniterPeer();
Currency currency = createCurrencyFromPeer(peer).actionGet();
// Reload some blockchain blocks
// TODO: Move this reload feature, into a admin REST service ?
reloadBlockchainIfNeed(peer);
// Start indexation from peer
List<Closeable> closeable = Lists.newArrayList();
closeable.add(startIndexBlockchainFromPeer(peer));
// Auto switch peer
if (pluginSettings.enableBlockchainPeerIndexation()) {
final String currencyId = currency.getId();
final long maxBlockchainAge = 2L * currency.getParameters().getAvgGenTime() * currency.getParameters().getMedianTimeBlocks();
//long maxBlockchainAge = 10;
threadPool.scheduleAtFixedRate(() -> {
if (this.isBlockchainOlderThan(currencyId, maxBlockchainAge)) {
logger.warn("Peer seems to be NOT synchronized (age > 1 hour). Trying to switch to another UP peer");
Peer newPeer = this.findSyncPeer(currencyId, maxBlockchainAge).orElse(null);
if (newPeer != null && !pluginSettings.sameAsClusterPeer(newPeer)) {
// Close previous jobs
IOUtils.closeQuietly(closeable.toArray(new Closeable[0]));
closeable.clear();
logger.info("Restarting blockchain indexation on a new peer: {}", newPeer.toString());
// Define the main peer for this currency
injector.getInstance(PeerService.class)
.setCurrencyMainPeer(currencyId, peer);
// Restart, on the new peer
closeable.add(startIndexBlockchainFromPeer(newPeer));
} else {
logger.warn("Cannot find another peer to connect. Retrying in {}s...", maxBlockchainAge);
}
}
},
// next start
maxBlockchainAge,
// Then every check is need
maxBlockchainAge, TimeUnit.SECONDS);
}
return Optional.of(threadPool.scheduleOnClusterReady(() -> {}));
}
catch (Throwable e) {
// Log, then retying in 2s
logger.error("Failed during start of blockchain indexation. Retrying in 10s...", e);
return Optional.of(threadPool.schedule(this::startIndexBlockchain, 10, TimeUnit.SECONDS));
}
}
protected Closeable startIndexBlockchainFromPeer(Peer peer){
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getCurrency());
if (logger.isInfoEnabled()) {
logger.info(String.format("[%s] Indexing blockchain...", peer.getCurrency()));
}
final List<Closeable> jobs = Lists.newArrayList();
// Index blocks (and listen if new block appear)
jobs.add(startIndexBlocks(peer));
// Index WoT members
jobs.add(startIndexMembers(peer));
// Start listening pending memberships
jobs.add(startIndexPendingMemberships(peer));
// Index peers (and listen if new peer appear)
jobs.add(startIndexPeers(peer));
// Return a closeable
return () -> IOUtils.closeQuietly(
jobs.stream()
.filter(Objects::nonNull)
.toArray(Closeable[]::new)
);
}
protected ScheduledActionFuture<Currency> createCurrencyFromPeer(Peer peer) throws Throwable {
Preconditions.checkNotNull(peer);
Preconditions.checkArgument(pluginSettings.enableBlockchainIndexation());
Currency currency;
try {
// Index (or refresh) node's currency
currency = injector.getInstance(CurrencyService.class)
.indexCurrencyFromPeer(peer, true);
} catch (Throwable e) {
logger.error(String.format("Error while indexing currency: %s", e.getMessage()), e);
throw e;
}
final String currencyId = currency.getId();
peer.setCurrency(currencyId);
// Define the main peer for this currency (will fill a cache in PeerService)
injector.getInstance(PeerService.class)
.setCurrencyMainPeer(currencyId, peer);
// Wait enf of currency index creation
final Currency result = currency;
return threadPool.scheduleOnClusterReady(() -> result);
}
protected Closeable startIndexBlocks(Peer peer) {
if (!pluginSettings.enableBlockchainIndexation()) return null; // Skip
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getCurrency());
try {
// Indexing block from the given peer
Closeable stoppable = injector.getInstance(BlockchainService.class)
// Index last blocks
.indexLastBlocks(peer)
// Listen for new blocks
.listenAndIndexNewBlock(peer);
if (logger.isInfoEnabled()) logger.info(String.format("[%s] Indexing blockchain [OK]", peer.getCurrency()));
// Stop when node is not the master
threadPool.scheduleOnMasterFirstStop(stoppable);
return stoppable;
} catch (Throwable e) {
logger.error(String.format("[%s] Indexing blockchain error: %s", peer.getCurrency(), e.getMessage()), e);
throw e;
}
}
protected Closeable startIndexMembers(Peer peer) {
if (!pluginSettings.enableBlockchainIndexation()) return null; // Skip
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getCurrency());
// Add stats on members
if (pluginSettings.enableDocStats()) {
DocStatService docStatService = injector.getInstance(DocStatService.class);
// Is member
{
QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Identity.Fields.IS_MEMBER, true))
);
String queryName = Joiner.on('_').join(peer.getCurrency(), Strings.toUnderscoreCase(Identity.Fields.IS_MEMBER));
docStatService.registerIndex(peer.getCurrency(), MemberRepository.TYPE, queryName, query, null);
}
}
try {
// Index Wot members
WotService wotService = injector.getInstance(WotService.class)
.indexMembers(peer.getCurrency());
// Listen new block, to update members
Closeable stop = wotService.listenAndIndexMembers(peer.getCurrency());
// Stop to listen, if master stop
threadPool.scheduleOnMasterFirstStop(stop);
return stop;
} catch (Throwable e) {
logger.error(String.format("[%s] Indexing WoT members error: %s", peer.getCurrency(), e.getMessage()), e);
throw e;
}
}
protected Closeable startIndexPeers(Peer peer) {
if (!pluginSettings.enableBlockchainPeerIndexation()) return null; // Skip
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getCurrency());
// Add stats on UP peers, per API
if (pluginSettings.enableDocStats()) {
DocStatService docStatService = injector.getInstance(DocStatService.class);
QueryBuilder statusQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Peer.Fields.STATS + "." + Peer.Stats.Fields.STATUS, Peer.PeerStatus.UP.name()));
// Peers UP
{
QueryBuilder query = QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.nestedQuery(Peer.Fields.STATS, statusQuery)));
String queryName = Joiner.on('_').join(peer.getCurrency(), PeerExtendRepository.TYPE, Peer.PeerStatus.UP.name()).toLowerCase();
docStatService.registerIndex(peer.getCurrency(), PeerExtendRepository.TYPE, queryName, query, null);
}
// Peers UP, per API
Arrays.stream(EndpointApi.values())
.forEach(api -> {
BoolQueryBuilder apiQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Peer.Fields.API, api.name()));
QueryBuilder query = QueryBuilders.constantScoreQuery(apiQuery
.must(QueryBuilders.nestedQuery(Peer.Fields.STATS, statusQuery)));
String queryName = Joiner.on('_').join(peer.getCurrency(), PeerExtendRepository.TYPE, api.name()).toLowerCase();
docStatService.registerIndex(peer.getCurrency(), PeerExtendRepository.TYPE, queryName, query, null);
});
}
try {
// Index peers (and listen if new peer appear)
logger.info(String.format("[%s] Indexing peers...", peer.getCurrency()));
PeerService peerService = injector.getInstance(PeerService.class)
.indexPeers(peer);
Closeable stop = peerService.listenAndIndexPeers(peer);
// Stop to listen, if master stop
threadPool.scheduleOnMasterFirstStop(stop);
return stop;
} catch (Throwable e) {
logger.error(String.format("[%s] Indexing blockchain peers error: %s", peer.getCurrency(), e.getMessage()), e);
throw e;
}
}
protected Closeable startIndexPendingMemberships(Peer peer) {
if (!pluginSettings.enablePendingMembershipIndexation()) return null; // Skip
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getCurrency());
// Register stats on indices
if (pluginSettings.enableDocStats()) {
injector.getInstance(DocStatService.class)
.registerIndex(peer.getCurrency(), PendingMembershipRepository.TYPE);
}
try {
// Index peers (and listen if new peer appear)
ScheduledActionFuture<?> job = injector.getInstance(PendingMembershipService.class)
.indexFromPeer(peer)
.startScheduling();
Closeable close = () -> job.cancel(true);
// Stop to listen, if master stop
threadPool.scheduleOnMasterFirstStop(close);
return close;
} catch (Throwable e) {
logger.error(String.format("[%s] Indexing blockchain peers error: %s", peer.getCurrency(), e.getMessage()), e);
throw e;
}
}
protected Optional<ScheduledActionFuture<?>> startSynchro() {
checkMasterNode();
// Start synchro, if enable in config
if (pluginSettings.enableSynchro()) {
ScheduledActionFuture future = injector.getInstance(SynchroService.class)
.startScheduling();
// Stop action when master stop
threadPool.scheduleOnMasterFirstStop((Runnable) () -> future.cancel(true));
return Optional.of(future);
}
return Optional.empty();
}
protected Optional<ScheduledActionFuture<?>> startPublishingPeer() {
checkMasterNode();
// Start publish peering to network, if enable in config
if (pluginSettings.enablePeering()) {
Optional<ScheduledActionFuture<?>> optionalFuture = injector.getInstance(NetworkService.class)
.startPublishingPeerDocumentToNetwork()
// Stop to listen, if master stop
.map(future -> {
threadPool.scheduleOnMasterFirstStop((Runnable) () -> future.cancel(true));
return future;
});
return optionalFuture;
}
return Optional.empty();
}
protected Optional<ScheduledActionFuture<?>> startDocStatistics() {
// Start doc stats, if enable in config
if (pluginSettings.enableDocStats()) {
// Add index [currency/record] to stats
final DocStatService docStatService = injector
.getInstance(DocStatService.class)
.registerIndex(CurrencyExtendRepository.INDEX, CurrencyExtendRepository.RECORD_TYPE);
ScheduledActionFuture<?> future = docStatService.startScheduling();
// Stop to listen, if master stop
threadPool.scheduleOnMasterFirstStop((Runnable)() -> future.cancel(true));
return Optional.of(future);
}
return Optional.empty();
}
protected ScheduledActionFuture<?> startDataMigration() {
return threadPool.scheduleOnClusterReady(() -> {
// Start migration (if need)
injector.getInstance(DocStatService.class)
.startDataMigration();
});
}
protected boolean isBlockchainOlderThan(String currencyId, long masAgeInSeconds) {
final BlockchainService blockchainService = injector.getInstance(BlockchainService.class);
long blockchainAgeSec = System.currentTimeMillis() / 1000 - blockchainService.getCurrentTime(currencyId);
return blockchainAgeSec > masAgeInSeconds;
}
protected Optional<Peer> findSyncPeer(String currencyId, long minBlockchainAge) {
BlockchainRemoteService blockchainRemoteService = injector.getInstance(BlockchainRemoteService.class);
BlockchainService blockchainService = injector.getInstance(BlockchainService.class);
List<Peer> peers = injector.getInstance(PeerService.class)
.getUpPeersByApis(currencyId, EndpointApi.BASIC_MERKLED_API, EndpointApi.BMAS);
Comparator<Peer> lastUpTimeComparator = Comparator.comparing((p) -> p.getStats() != null ? p.getStats().getLastUpTime() : 0);
return Beans.getStream(peers)
// Sort (more recent first)
.sorted(lastUpTimeComparator.reversed())
.filter(peer -> {
try {
BlockchainBlock current = blockchainRemoteService.getCurrentBlock(peer);
if (current == null || !currencyId.equals(current.getCurrency())) return false;
return blockchainService.getBlockAge(current) < minBlockchainAge;
} catch (Exception e) {
return false; // Skip (e.g. node is down)
}
}).findFirst();
}
}
package org.duniter.elasticsearch;
/*
* #%L
* Duniter4j :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.duniter.core.client.config.Configuration;
import org.duniter.core.client.config.ConfigurationOption;
import org.duniter.core.client.config.ConfigurationProvider;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.crypto.CryptoUtils;
import org.duniter.core.util.crypto.KeyPair;
import org.duniter.elasticsearch.i18n.I18nInitializer;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.nuiton.config.ApplicationConfig;
import org.nuiton.config.ApplicationConfigHelper;
import org.nuiton.config.ApplicationConfigProvider;
import org.nuiton.config.ArgumentsParserException;
import org.nuiton.i18n.I18n;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import static org.nuiton.i18n.I18n.t;
/**
* Access to configuration options
* @author Benoit Lavenier <benoit.lavenier@e-is.pro>
* @since 1.0
*/
public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
private static KeyPair nodeKeyPair;
private static boolean isRandomNodeKeyPair;
private static String nodePubkey;
private static List<String> i18nBundleNames = new CopyOnWriteArrayList<>(); // Default
private static boolean isI18nStarted = false;
private static Peer duniterPeer;
private Optional<Peer> clusterPeer;
private List<Peer> clusterPeerEndpoints;
private Set<String> adminAndModeratorPubkeys;
private String softwareDefaultVersion;
private final CryptoService cryptoService;
protected final Settings settings;
/**
* Delegate application config.
*/
protected final ApplicationConfig applicationConfig;
protected final org.duniter.core.client.config.Configuration clientConfig;
@Inject
public PluginSettings(org.elasticsearch.common.settings.Settings settings,
CryptoService cryptoService) {
super(settings);
this.settings = settings;
this.cryptoService = cryptoService;
this.applicationConfig = new ApplicationConfig("duniter4j.config");
// Cascade the application config to the client module
clientConfig = new org.duniter.core.client.config.Configuration(this.applicationConfig);
Configuration.setInstance(clientConfig);
// Set the default bundle name
addI18nBundleName(getI18nBundleName());
// Allow to redefine user api
String apiLabel = settings.get("duniter.core.api");
if (StringUtils.isNotBlank(apiLabel)) {
EndpointApi.ES_CORE_API.setLabel(apiLabel);
}
// Init the version
softwareDefaultVersion = getPackageVersion();
}
@Override
protected void doStart() {
// get all config providers
Set<ApplicationConfigProvider> providers =
ImmutableSet.of(new ConfigurationProvider());
// load all default options
ApplicationConfigHelper.loadAllDefaultOption(applicationConfig,
providers);
// Overrides defaults Duniter4j options
String baseDir = settings.get("path.home");
applicationConfig.setConfigFileName("duniter4j.config");
applicationConfig.setDefaultOption(ConfigurationOption.BASEDIR.getKey(), baseDir);
applicationConfig.setDefaultOption(ConfigurationOption.NODE_HOST.getKey(), getDuniterNodeHost());
applicationConfig.setDefaultOption(ConfigurationOption.NODE_PORT.getKey(), String.valueOf(getDuniterNodePort()));
applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_TIMEOUT.getKey(), String.valueOf(getNetworkTimeout()));
applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_MAX_CONNECTIONS.getKey(), String.valueOf(getNetworkMaxConnections()));
applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_MAX_CONNECTIONS_PER_ROUTE.getKey(), String.valueOf(getNetworkMaxConnectionsPerRoute()));
// Make sure peerUpMaxAge (ms) >= 'duniter.p2p.peering.interval' (s)
{
int peerUpMaxAgeMs = Integer.parseInt(ConfigurationOption.NETWORK_PEER_UP_MAX_AGE.getDefaultValue());
int publishPeeringMs = getPeeringInterval() * 1000;
if (peerUpMaxAgeMs < publishPeeringMs) {
applicationConfig.setDefaultOption(ConfigurationOption.NETWORK_PEER_UP_MAX_AGE.getKey(), String.valueOf(publishPeeringMs));
}
}
try {
applicationConfig.parse(new String[]{});
} catch (ArgumentsParserException e) {
throw new TechnicalException(t("duniter4j.config.parse.error"), e);
}
File appBasedir = applicationConfig.getOptionAsFile(
ConfigurationOption.BASEDIR.getKey());
if (appBasedir == null) {
appBasedir = new File("");
}
if (!appBasedir.isAbsolute()) {
appBasedir = new File(appBasedir.getAbsolutePath());
}
if (appBasedir.getName().equals("..")) {
appBasedir = appBasedir.getParentFile().getParentFile();
}
if (appBasedir.getName().equals(".")) {
appBasedir = appBasedir.getParentFile();
}
applicationConfig.setOption(
ConfigurationOption.BASEDIR.getKey(),
appBasedir.getAbsolutePath());
// Init i18n
try {
initI18n();
}
catch(IOException e) {
logger.error(String.format("Could not init i18n: %s", e.getMessage()), e);
}
// Init Http client logging
System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.Log4JLogger");
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
public Settings getSettings() {
return settings;
}
/* -- settings on App -- */
public String getSoftwareName() {
return settings.get("duniter.software.name", "cesium-plus-pod");
}
public String getSoftwareVersion() {
return settings.get("duniter.software.version", softwareDefaultVersion);
}
public void setSoftwareDefaultVersion(String defaultVersion) {
softwareDefaultVersion = defaultVersion;
}
/* -- settings on cluster -- */
public String getClusterName() {
return settings.get("cluster.name", "?");
}
public String getClusterRemoteHost() {
return settings.get("cluster.remote.host");
}
public int getClusterRemotePort() {
return settings.getAsInt("cluster.remote.port", 80);
}
public boolean getClusterRemoteUseSsl() {
return settings.getAsBoolean("cluster.remote.useSsl", getClusterRemotePort() == 443);
}
public String getClusterRemoteUrlOrNull() {
return getClusterPeer()
.map(Peer::getUrl)
.orElse(null);
}
public Optional<Peer> getClusterPeer() {
if (clusterPeer == null) {
if (StringUtils.isBlank(getClusterRemoteHost())) {
clusterPeer = Optional.empty();
}
else {
clusterPeer = Optional.of(Peer.builder().host(getClusterRemoteHost())
.port(getClusterRemotePort())
.useSsl(getClusterRemoteUseSsl())
.pubkey(getNodePubkey())
.build());
}
}
return clusterPeer;
}
/**
* Test if a peer is same as the cluster peer (same host and port)
* @param aPeer
* @return
*/
public boolean sameAsClusterPeer(Peer aPeer) {
return aPeer != null && getClusterPeer().map(clusterPeer -> clusterPeer.getHost().equalsIgnoreCase(aPeer.getHost())
&& clusterPeer.getPort() == clusterPeer.getPort())
.orElse(false);
}
public List<Peer> getClusterPeerEndpoints() {
if (this.clusterPeerEndpoints != null) return clusterPeerEndpoints;
Set<String> endpointApis = getPeeringPublishedApis();
if (StringUtils.isBlank(getClusterRemoteHost()) || CollectionUtils.isEmpty(endpointApis)) {
this.clusterPeerEndpoints = ImmutableList.of();
}
else {
// Make sure node has a pubkey
initNodeKeyring();
this.clusterPeerEndpoints = endpointApis.stream().map(api -> {
Peer p = Peer.builder()
.host(getClusterRemoteHost())
.port(getClusterRemotePort())
.useSsl(getClusterRemoteUseSsl())
.pubkey(getNodePubkey())
.api(api)
.build();
String hash = cryptoService.hash(p.toString());
p.setHash(hash);
p.setId(hash);
if (logger.isDebugEnabled()) {
logger.debug(String.format("[%s] Computed hash to identify this endpoint: %", p.toString(), hash));
}
return p;
}).collect(Collectors.toList());
}
return this.clusterPeerEndpoints;
}
/* -- Settings on Duniter node (with BMA API) -- */
public String getDuniterNodeHost() {
return settings.get("duniter.host", "g1.duniter.org");
}
public int getDuniterNodePort() {
return settings.getAsInt("duniter.port", 10901);
}
public boolean getDuniterNodeUseSsl() {
return settings.getAsBoolean("duniter.useSsl", getDuniterNodePort() == 443);
}
/* -- Other settings -- */
public String getCoreEnpointApi() {
return EndpointApi.ES_CORE_API.label();
}
public boolean isIndexBulkEnable() {
return settings.getAsBoolean("duniter.bulk.enable", true);
}
public int getIndexBulkSize() {
return settings.getAsInt("duniter.bulk.size", 500);
}
public int getSynchroBulkSize() {
return settings.getAsInt("duniter.p2p.bulk.size", Math.min(getIndexBulkSize(), 250));
}
public int getNodeForkResyncWindow() {
return settings.getAsInt("duniter.fork.resync.window", 100);
}
public String getDefaultStringAnalyzer() {
return settings.get("duniter.string.analyzer", "english");
}
public boolean reloadAllIndices() {
return settings.getAsBoolean("duniter.indices.reload", false);
}
public boolean enableBlockchainIndexation() {
return settings.getAsBoolean("duniter.blockchain.enable", true);
}
public boolean enableMovementIndexation() {
return enableBlockchainIndexation() && settings.getAsBoolean("duniter.blockchain.movement.enable", true);
}
public String[] getMovementIncludesComment() {
return settings.getAsArray("duniter.blockchain.movement.includes.comment", null/*no inclusion*/);
}
public String[] getMovementExcludesComment() {
return settings.getAsArray("duniter.blockchain.movement.excludes.comment", null/*no exclusion*/);
}
public boolean enableBlockchainPeerIndexation() {
return settings.getAsBoolean("duniter.blockchain.peer.enable", enableBlockchainIndexation());
}
public boolean enablePendingMembershipIndexation() {
return settings.getAsBoolean("duniter.blockchain.membership.pending.enable", enableBlockchainIndexation());
}
public boolean reloadBlockchainIndices() {
return settings.getAsBoolean("duniter.blockchain.reload", false);
}
public int reloadBlockchainIndicesFrom() {
return settings.getAsInt("duniter.blockchain.reload.from", 0);
}
public int reloadBlockchainIndicesTo() {
return settings.getAsInt("duniter.blockchain.reload.to", -1);
}
public File getTempDirectory() {
return Configuration.instance().getTempDirectory();
}
public int getNetworkTimeout() {
return settings.getAsInt("duniter.network.timeout", 20000 /*20s*/);
}
public int getNetworkMaxConnections() {
return settings.getAsInt("duniter.network.maxConnections", 100);
}
public int getNetworkMaxConnectionsPerRoute() {
return settings.getAsInt("duniter.network.maxConnectionsPerRoute", 5);
}
public boolean enableSynchro() {
return settings.getAsBoolean("duniter.p2p.enable", true);
}
public boolean enableSynchroWebsocket() {
return settings.getAsBoolean("duniter.p2p.ws.enable", true);
}
public boolean enablePeering() {
return this.settings.getAsBoolean("duniter.p2p.peering.enable", enableSynchro());
}
/**
* Peer endpoint API to index (into the '_currency_/peer')
* @return
*/
public Set<String> getPeerIndexedApis() {
String[] includeApis = settings.getAsArray("duniter.p2p.peer.indexedApis");
if (ArrayUtils.isNotEmpty(includeApis)) return ImmutableSet.copyOf(includeApis);
// By default: getPeeringPublishedApis + getPeeringTargetedApis
Set<String> defaults = Sets.newHashSet(
EndpointApi.BASIC_MERKLED_API.label(),
EndpointApi.BMAS.label(),
EndpointApi.WS2P.label(),
EndpointApi.GVA.label(),
EndpointApi.GVASUB.label()
);
// Add targeted APIs
Set<String> peeringTargetedApis = getPeeringTargetedApis();
if (peeringTargetedApis != null) defaults.addAll(peeringTargetedApis);
// Add published APIs
Set<String> peeringPublishedApis = getPeeringPublishedApis();
if (peeringPublishedApis != null) defaults.addAll(peeringPublishedApis);
return defaults;
}
/**
* Endpoint API to publish, in the emitted peer document. By default, plugins will defined their own API
* @return
*/
public Set<String> getPeeringPublishedApis() {
String[] targetedApis = settings.getAsArray("duniter.p2p.peering.publishedApis");
if (ArrayUtils.isEmpty(targetedApis)) return null;
return ImmutableSet.copyOf(targetedApis);
}
/**
* Targeted API where to sendBlock the peer document.
* This API should accept a POST request to '/network/peering' (like Duniter node, but can also be a pod)
* @return
*/
public Set<String> getPeeringTargetedApis() {
String[] targetedApis = settings.getAsArray("duniter.p2p.peering.targetedApis", new String[]{
getCoreEnpointApi()
});
if (ArrayUtils.isEmpty(targetedApis)) return null;
return ImmutableSet.copyOf(targetedApis);
}
/**
* Interval (in seconds) between publications of the peer document
* @return
*/
public int getPeeringInterval() {
return this.settings.getAsInt("duniter.p2p.peering.interval", 3600 /*=1h*/);
}
/**
* Interval (in seconds) between publications of the peer document
* @return
*/
public int getPeersCacheTimeToLive() {
return this.settings.getAsInt("duniter.p2p.peers.cache.timeToLive", 600 /*=10min*/);
}
public boolean fullResyncAtStartup() {
return settings.getAsBoolean("duniter.p2p.fullResyncAtStartup", false);
}
public int getSynchroTimeOffset() {
return settings.getAsInt("duniter.p2p.peerTimeOffset", 60*60/*=1hour*/);
}
public String[] getSynchroIncludesEndpoints() {
return settings.getAsArray("duniter.p2p.includes.endpoints");
}
public String[] getSynchroIncludesPubkeys() {
return settings.getAsArray("duniter.p2p.includes.pubkeys");
}
public boolean enableSynchroDiscovery() {
return settings.getAsBoolean("duniter.p2p.discovery.enable", true);
}
public boolean isDevMode() {
return settings.getAsBoolean("duniter.dev.enable", false);
}
public int getNodeRetryCount() {
return settings.getAsInt("duniter.retry.count", 5);
}
/**
* Time before retry (in millis)
* @return
*/
public int getNodeRetryWaitDuration() {
return settings.getAsInt("duniter.retry.waitDuration", 5000);
}
public String getShareBaseUrl() {
return settings.get("duniter.share.base.url");
}
public Peer checkAndGetDuniterPeer() {
if (duniterPeer != null) return duniterPeer;
if (StringUtils.isBlank(getDuniterNodeHost())) {
logger.error("ERROR: node host is required");
System.exit(-1);
return null;
}
if (getDuniterNodePort() <= 0) {
logger.error("ERROR: node port is required");
System.exit(-1);
return null;
}
this.duniterPeer = Peer.builder()
.host(getDuniterNodeHost())
.port(getDuniterNodePort())
.useSsl(getDuniterNodeUseSsl())
.build();
return duniterPeer;
}
public String getKeyringSalt() {
return settings.get("duniter.keyring.salt");
}
public String getKeyringPassword() {
return settings.get("duniter.keyring.password");
}
public String getKeyringPublicKey() {
return settings.get("duniter.keyring.pub");
}
public String getKeyringSecretKey() {
return settings.get("duniter.keyring.sec");
}
public boolean enableSecurity() {
return settings.getAsBoolean("duniter.security.enable", true);
}
public boolean enableQuota() {
return settings.getAsBoolean("duniter.security.quota.enable", enableSecurity());
}
public boolean logRejectedRequests() {
return settings.getAsBoolean("duniter.security.quota.log.enable", enableQuota());
}
public String[] getIpWhiteList() {
return settings.getAsArray("duniter.security.whitelist", new String[] {"127.0.0.1", "::1"});
}
public String[] getIpBlackList() {
return settings.getAsArray("duniter.security.blacklist", new String[] {});
}
public int getDocumentTimeMaxPastDelta() {
return settings.getAsInt("duniter.document.time.maxPastDelta", 7200); // in seconds = 2h
}
public int getDocumentTimeMaxFutureDelta() {
return settings.getAsInt("duniter.document.time.maxFutureDelta", 600); // in seconds = 10min
}
public boolean allowDocumentModerationByAdmin() {
return settings.getAsBoolean("duniter.document.moderators.admin", true); //
}
public String[] getDocumentModeratorsPubkeys() {
return this.settings.getAsArray("duniter.document.moderators.pubkeys");
}
public Set<String> getDocumentAdminAndModeratorsPubkeys() {
if (adminAndModeratorPubkeys == null) {
ImmutableSet.Builder<String> moderators = ImmutableSet.builder();
if (!isRandomNodeKeypair() && allowDocumentModerationByAdmin()) {
moderators.add(getNodePubkey());
}
adminAndModeratorPubkeys = moderators.add(getDocumentModeratorsPubkeys()).build();
}
return adminAndModeratorPubkeys;
}
public String getWebSocketHost() {
return settings.get("network.host", "localhost");
}
public String getWebSocketPort() {
return settings.get("duniter.ws.port");
}
public boolean getWebSocketEnable() {
return settings.getAsBoolean("duniter.ws.enable", Boolean.TRUE);
}
public String[] getWebSocketChangesListenSource() {
return settings.getAsArray("duniter.ws.changes.listenSource", new String[]{"*"});
}
public boolean enableDocStats() {
return settings.getAsBoolean("duniter.stats.enable", true);
}
/* protected methods */
protected void initI18n() throws IOException {
//if (I18n.getDefaultLocale() != null) return; // already init
// --------------------------------------------------------------------//
// init i18n
// --------------------------------------------------------------------//
File i18nDirectory = clientConfig.getI18nDirectory();
if (i18nDirectory.exists()) {
// clean i18n cache
FileUtils.cleanDirectory(i18nDirectory);
}
FileUtils.forceMkdir(i18nDirectory);
if (logger.isDebugEnabled()) {
logger.debug("I18N directory: " + i18nDirectory);
}
Locale i18nLocale = clientConfig.getI18nLocale();
if (logger.isInfoEnabled()) {
logger.info(String.format("Starts i18n with locale [%s] at [%s]",
i18nLocale, i18nDirectory));
}
if (logger.isDebugEnabled()) {
logger.debug(String.format("Using I18n Bundles: %s",getI18nBundleNames()));
}
I18n.init(new I18nInitializer(i18nDirectory, getI18nBundleNames()),
i18nLocale);
isI18nStarted = true;
}
protected void reloadI18n() {
try {
I18n.close();
initI18n();
}
catch(IOException e) {
logger.error("Could not reload I18n");
}
}
protected String getI18nBundleName() {
return "cesium-plus-pod-core-i18n";
}
protected String[] getI18nBundleNames() {
return i18nBundleNames.toArray(new String[i18nBundleNames.size()]);
}
public void addI18nBundleName(String i18nBundleName) {
if (!this.i18nBundleNames.contains(i18nBundleName)) {
this.i18nBundleNames.add(i18nBundleName);
if (isI18nStarted) {
reloadI18n();
}
}
}
public Locale getI18nLocale() {
return clientConfig.getI18nLocale();
}
/**
* Override the version default option, from the MANIFEST implementation version (if any)
*/
protected String getPackageVersion() {
// Override application version
Package currentPackage = this.getClass().getPackage();
String newVersion = currentPackage.getImplementationVersion();
if (newVersion == null) {
newVersion = currentPackage.getSpecificationVersion();
}
return newVersion;
}
public KeyPair getNodeKeypair() {
initNodeKeyring();
return this.nodeKeyPair;
}
public boolean isRandomNodeKeypair() {
initNodeKeyring();
return this.isRandomNodeKeyPair;
}
public String getNodePubkey() {
initNodeKeyring();
return this.nodePubkey;
}
protected synchronized void initNodeKeyring() {
if (this.nodeKeyPair != null) return;
if (StringUtils.isNotBlank(getKeyringSalt()) &&
StringUtils.isNotBlank(getKeyringPassword())) {
this.nodeKeyPair = cryptoService.getKeyPair(getKeyringSalt(), getKeyringPassword());
this.nodePubkey = CryptoUtils.encodeBase58(this.nodeKeyPair.getPubKey());
this.isRandomNodeKeyPair = false;
}
else {
// Use a ramdom keypair
this.nodeKeyPair = cryptoService.getRandomKeypair();
this.nodePubkey = CryptoUtils.encodeBase58(this.nodeKeyPair.getPubKey());
this.isRandomNodeKeyPair = true;
logger.warn(String.format("No keyring in config. salt/password (or keyring) is need to signed user event documents. Will use a generated key [%s]", this.nodePubkey));
if (logger.isDebugEnabled()) {
logger.debug(String.format(" salt: " + getKeyringSalt().replaceAll(".", "*")));
logger.debug(String.format("password: " + getKeyringPassword().replaceAll(".", "*")));
}
}
}
}
package org.duniter.elasticsearch.beans;
/*-
* #%L
* Duniter4j :: ElasticSearch Core plugin
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import org.duniter.core.beans.Bean;
import org.duniter.core.beans.BeanCreationException;
import org.duniter.core.beans.BeanFactory;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Injector;
/**
* Created by blavenie on 31/03/17.
*/
public class ESBeanFactory extends BeanFactory {
private Injector injector = null;
private ThreadPool threadPool = null;
@Inject
public void setInjector(Injector injector) {
this.injector = injector;
}
@Inject
public void setThreadPool(ThreadPool threadPool) {
this.threadPool = threadPool;
}
@Override
protected <S extends Bean> void initBean(S bean) {
threadPool.scheduleOnClusterReady(() -> super.initBean(bean));
if (injector != null) {
injector.injectMembers(bean);
}
}
@Override
protected <S extends Bean> S newBean(Class<S> clazz) {
try {
return super.newBean(clazz);
}
catch(BeanCreationException e) {
// try using injector, if exists
if (injector != null) {
return injector.getBinding(clazz).getProvider().get();
}
throw e;
}
}
}
package org.duniter.elasticsearch.client;
/*-
* #%L
* Duniter4j :: ElasticSearch Core plugin
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import org.duniter.core.beans.Bean;
import org.duniter.core.model.IEntity;
import org.duniter.elasticsearch.dao.handler.StringReaderHandler;
import org.duniter.elasticsearch.threadpool.CompletableActionFuture;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
import java.io.File;
import java.io.InputStream;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* Created by blavenie on 03/04/17.
*/
public interface Duniter4jClient extends Bean, Client {
boolean existsIndex(String index);
void deleteIndexIfExists(String indexName);
Object getFieldById(String index, String type, String docId, String fieldName);
Map<String, Object> getFieldByIds(String index, String type, Set<String> ids, String fieldName);
Map<String, Object> getFieldsById(String index, String type, String docId, String... fieldNames);
<T> T getTypedFieldById(String index, String type, String docId, String fieldName);
Map<String, Object> getMandatoryFieldsById(String index, String type, String docId, String... fieldNames);
<T> T getMandatoryTypedFieldById(String index, String type, String docId, String fieldName);
String indexDocumentFromJson(String index, String type, String json);
void updateDocumentFromJson(String index, String type, String id, String json);
void checkSameDocumentField(String index, String type, String id, String fieldName, String expectedvalue) throws ElasticsearchException;
void checkSameDocumentIssuer(String index, String type, String id, String expectedIssuer);
boolean isDocumentExists(String index, String type, String id) throws ElasticsearchException;
void checkDocumentExists(String index, String type, String id) throws ElasticsearchException;
/**
* Retrieve a document by id (safe mode)
* @param docId
* @return
*/
<T extends Object> T getSourceByIdOrNull(String index, String type, String docId, Class<T> classOfT, String... fieldNames);
/**
* Retrieve a document by id
* @param docId
* @return
*/
<T extends Object> T getSourceById(String index, String type, String docId, Class<T> classOfT, String... fieldNames);
<T extends Object> Map<String, T> getSourcesByIds(String index, String type, Set<String> docIds, Class<T> classOfT, String... fieldNames);
<C extends IEntity<String>> C readSourceOrNull(SearchHit searchHit, Class<? extends C> clazz);
void bulkFromClasspathFile(String classpathFile, String indexName, String indexType);
void bulkFromClasspathFile(String classpathFile, String indexName, String indexType, StringReaderHandler handler);
void bulkFromFile(File file, String indexName, String indexType);
void bulkFromFile(File file, String indexName, String indexType, StringReaderHandler handler);
void bulkFromStream(InputStream is, String indexName, String indexType);
void bulkFromStream(InputStream is, String indexName, String indexType, StringReaderHandler handler);
void flushDeleteBulk(final String index, final String type, BulkRequestBuilder bulkRequest);
void flushBulk(BulkRequestBuilder bulkRequest);
BulkRequestBuilder bulkDeleteFromSearch(String index,
String type,
SearchRequestBuilder searchRequest,
BulkRequestBuilder bulkRequest,
int bulkSize,
boolean flushAll);
<T extends ActionResponse> Optional<T> safeExecuteRequest(ActionRequestBuilder<?, T, ?> request, boolean wait);
<T extends ActionResponse> ListenableActionFuture<T> safeExecuteRequest(ActionRequestBuilder<?, T, ?> request);
ScheduledThreadPoolExecutor scheduler();
}
package org.duniter.elasticsearch.client;
/*
* #%L
* Duniter4j :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
import org.duniter.elasticsearch.model.Record;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.model.IEntity;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.ObjectUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.dao.handler.StringReaderHandler;
import org.duniter.elasticsearch.exception.AccessDeniedException;
import org.duniter.elasticsearch.exception.NotFoundException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountRequest;
import org.elasticsearch.action.count.CountRequestBuilder;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.exists.ExistsRequest;
import org.elasticsearch.action.exists.ExistsRequestBuilder;
import org.elasticsearch.action.exists.ExistsResponse;
import org.elasticsearch.action.explain.ExplainRequest;
import org.elasticsearch.action.explain.ExplainRequestBuilder;
import org.elasticsearch.action.explain.ExplainResponse;
import org.elasticsearch.action.fieldstats.FieldStatsRequest;
import org.elasticsearch.action.fieldstats.FieldStatsRequestBuilder;
import org.elasticsearch.action.fieldstats.FieldStatsResponse;
import org.elasticsearch.action.get.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptRequest;
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptRequestBuilder;
import org.elasticsearch.action.indexedscripts.delete.DeleteIndexedScriptResponse;
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptRequest;
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptRequestBuilder;
import org.elasticsearch.action.indexedscripts.get.GetIndexedScriptResponse;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequest;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptRequestBuilder;
import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptResponse;
import org.elasticsearch.action.percolate.*;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.suggest.SuggestRequest;
import org.elasticsearch.action.suggest.SuggestRequestBuilder;
import org.elasticsearch.action.suggest.SuggestResponse;
import org.elasticsearch.action.termvectors.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.*;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* Created by Benoit on 08/04/2015.
*/
public class Duniter4jClientImpl implements Duniter4jClient {
private final ESLogger logger;
private final Client client;
private final org.duniter.elasticsearch.threadpool.ThreadPool threadPool;
@Inject
public Duniter4jClientImpl(Client client, Settings settings, org.duniter.elasticsearch.threadpool.ThreadPool threadPool) {
super();
this.logger = Loggers.getLogger("duniter.client", settings, new String[0]);
this.client = client;
this.threadPool = threadPool;
}
@Override
public boolean existsIndex(String indexes) {
IndicesExistsRequestBuilder requestBuilder = client.admin().indices().prepareExists(indexes);
IndicesExistsResponse response = requestBuilder.execute().actionGet();
return response.isExists();
}
@Override
public void deleteIndexIfExists(String indexName){
if (!existsIndex(indexName)) {
return;
}
if (logger.isInfoEnabled()) {
logger.info(String.format("Deleting index [%s]", indexName));
}
DeleteIndexRequestBuilder deleteIndexRequestBuilder = client.admin().indices().prepareDelete(indexName);
deleteIndexRequestBuilder.execute().actionGet();
}
@Override
public String indexDocumentFromJson(String index, String type, String json) {
IndexResponse response = client.prepareIndex(index, type)
.setSource(json)
.setRefresh(true)
.execute().actionGet();
return response.getId();
}
@Override
public void updateDocumentFromJson(String index, String type, String id, String json) {
// Execute indexBlocksFromNode
safeExecuteRequest(client.prepareUpdate(index, type, id)
.setRefresh(true)
.setDoc(json), true);
}
@Override
public void checkSameDocumentField(String index, String type, String id, String fieldName, String expectedValue) throws ElasticsearchException {
GetResponse response = client.prepareGet(index, type, id)
.setFields(fieldName)
.execute().actionGet();
boolean failed = !response.isExists();
if (failed) {
throw new NotFoundException(String.format("Document [%s/%s/%s] not exists.", index, type, id));
} else {
String docValue = (String)response.getFields().get(fieldName).getValue();
if (!Objects.equals(expectedValue, docValue)) {
throw new AccessDeniedException(String.format("Could not delete this document: not same [%s].", fieldName));
}
}
}
@Override
public boolean isDocumentExists(String index, String type, String id) throws ElasticsearchException {
GetResponse response = client.prepareGet(index, type, id)
.setFetchSource(false)
.execute().actionGet();
return response.isExists();
}
@Override
public void checkDocumentExists(String index, String type, String id) throws ElasticsearchException {
if (!isDocumentExists(index, type, id)) {
throw new NotFoundException(String.format("Document [%s/%s/%s] not exists.", index, type, id));
}
}
@Override
public void checkSameDocumentIssuer(String index, String type, String id, String expectedIssuer) {
String issuer = getMandatoryFieldsById(index, type, id, Record.Fields.ISSUER).get(Record.Fields.ISSUER).toString();
if (!ObjectUtils.equals(expectedIssuer, issuer)) {
throw new AccessDeniedException("Not same issuer");
}
}
/**
* Retrieve some field from a document id, and check if all field not null
* @param index
* @param type
* @param docId
* @param fieldNames
* @return
*/
@Override
public Map<String, Object> getMandatoryFieldsById(String index, String type, String docId, String... fieldNames) {
Map<String, Object> fields = getFieldsById(index, type, docId, fieldNames);
if (MapUtils.isEmpty(fields)) throw new NotFoundException(String.format("Document [%s/%s/%s] not exists.", index, type, docId));
Arrays.stream(fieldNames).forEach((fieldName) -> {
if (!fields.containsKey(fieldName)) throw new NotFoundException(String.format("Document [%s/%s/%s] should have the mandatory field [%s].", index, type, docId, fieldName));
});
return fields;
}
/**
* Retrieve some field from a document id
* @param docId
* @return
*/
@Override
public Map<String, Object> getFieldsById(String index, String type, String docId, String... fieldNames) {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(index)
.setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequest.setQuery(QueryBuilders.idsQuery().ids(docId));
searchRequest.addFields(fieldNames);
// Execute query
try {
SearchResponse response = searchRequest.execute().actionGet();
if (response.getHits().getTotalHits() == 0) return null;
Map<String, Object> result = new HashMap<>();
// Read query result
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
Map<String, SearchHitField> hitFields = searchHit.getFields();
for(String fieldName: hitFields.keySet()) {
result.put(fieldName, hitFields.get(fieldName).getValue());
}
break;
}
return result;
}
catch(SearchPhaseExecutionException e) {
// Failed or no item on index
throw new TechnicalException(String.format("[%s/%s] Unable to retrieve fields [%s] for id [%s]",
index, type,
Joiner.on(',').join(fieldNames).toString(),
docId), e);
}
}
/**
* Retrieve some field from a document id
* @param index
* @param type
* @param ids
* @param fieldName
* @return
*/
@Override
public Map<String, Object> getFieldByIds(String index, String type, Set<String> ids, String fieldName) {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(index)
.setTypes(type)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequest.setQuery(QueryBuilders.idsQuery().ids(ids));
searchRequest.addFields(fieldName);
// Execute query
try {
SearchResponse response = searchRequest.execute().actionGet();
Map<String, Object> result = new HashMap<>();
// Read query result
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
Map<String, SearchHitField> hitFields = searchHit.getFields();
if (hitFields.get(fieldName) != null) {
result.put(searchHit.getId(), hitFields.get(fieldName).getValue());
}
}
return result;
}
catch(SearchPhaseExecutionException e) {
// Failed or no item on index
throw new TechnicalException(String.format("[%s/%s] Unable to retrieve field [%s] for ids [%s]",
index, type, fieldName,
Joiner.on(',').join(ids).toString()), e);
}
}
/**
* Retrieve a field from a document id
* @param docId
* @return
*/
@Override
public Object getFieldById(String index, String type, String docId, String fieldName) {
Map<String, Object> result = getFieldsById(index, type, docId, fieldName);
if (MapUtils.isEmpty(result)) {
return null;
}
return result.get(fieldName);
}
@Override
public <T> T getTypedFieldById(String index, String type, String docId, String fieldName) {
return (T)getFieldById(index, type, docId, fieldName);
}
@Override
public <T> T getMandatoryTypedFieldById(String index, String type, String docId, String fieldName) {
Object result = getFieldById(index, type, docId, fieldName);
if (result == null) {
throw new NotFoundException(String.format("Document [%s/%s/%s] missing value for mandatory field [%s].", index, type, docId, fieldName));
}
return (T)result;
}
/**
* Retrieve a document by id (safe mode)
* @param docId
* @return
*/
@Override
public <T extends Object> T getSourceByIdOrNull(String index, String type, String docId, Class<T> classOfT, String... fieldNames) {
try {
return getSourceById(index, type, docId, classOfT, fieldNames);
}
catch(TechnicalException e) {
return null; // not found
}
}
/**
* Retrieve a document by id
* @param docId
* @return
*/
@Override
public <T extends Object> T getSourceById(String index, String type, String docId, Class<T> classOfT, String... fieldNames) {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(index)
.setSearchType(SearchType.QUERY_AND_FETCH);
searchRequest.setQuery(QueryBuilders.idsQuery(type).ids(docId));
if (CollectionUtils.isNotEmpty(fieldNames)) {
searchRequest.setFetchSource(fieldNames, null);
}
else {
searchRequest.setFetchSource(true); // full source
}
// Execute query
try {
SearchResponse response = searchRequest.execute().actionGet();
if (response.getHits().getTotalHits() == 0) return null;
// Read query result
SearchHit[] searchHits = response.getHits().getHits();
ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper();
for (SearchHit searchHit : searchHits) {
if (searchHit.source() != null) {
return objectMapper.readValue(searchHit.source(), classOfT);
}
break;
}
return null;
}
catch(SearchPhaseExecutionException | IOException e) {
// Failed to get source
throw new TechnicalException(String.format("[%s/%s] Error while getting [%s]",
index, type,
docId), e);
}
}
/**
* Retrieve a document by id
* @param docIds
* @return
*/
@Override
public <T extends Object> Map<String, T> getSourcesByIds(String index, String type, Set<String> docIds, Class<T> classOfT, String... fieldNames) {
// Prepare request
SearchRequestBuilder searchRequest = client
.prepareSearch(index)
.setSearchType(SearchType.QUERY_AND_FETCH);
searchRequest.setQuery(QueryBuilders.idsQuery(type).ids(docIds));
if (CollectionUtils.isNotEmpty(fieldNames)) {
searchRequest.setFetchSource(fieldNames, null);
}
else {
searchRequest.setFetchSource(true); // full source
}
Map<String, T> result = new HashMap<>();
// Execute query
try {
SearchResponse response = searchRequest.execute().actionGet();
if (response.getHits().getTotalHits() == 0) return result;
// Read query result
ObjectMapper objectMapper = JacksonUtils.getThreadObjectMapper();
for (SearchHit searchHit : response.getHits().getHits()) {
if (searchHit.source() != null) {
result.put(searchHit.getId(), objectMapper.readValue(searchHit.source(), classOfT));
}
}
return result;
}
catch(SearchPhaseExecutionException | IOException e) {
// Failed to get source
throw new TechnicalException(String.format("[%s/%s] Error while getting id=%s",
index, type,
docIds.toString()), e);
}
}
@Override
public <C extends IEntity<String>> C readSourceOrNull(SearchHit searchHit, Class<? extends C> clazz) {
try {
C value = JacksonUtils.getThreadObjectMapper().readValue(searchHit.getSourceRef().streamInput(), clazz);
value.setId(searchHit.getId());
return value;
}
catch(IOException e) {
logger.warn(String.format("Unable to deserialize source [%s/%s/%s] into [%s]: %s", searchHit.getIndex(), searchHit.getType(), searchHit.getId(), clazz.getName(), e.getMessage()));
return null;
}
}
@Override
public void bulkFromClasspathFile(String classpathFile, String indexName, String indexType) {
bulkFromClasspathFile(classpathFile, indexName, indexType, null);
}
@Override
public void bulkFromClasspathFile(String classpathFile, String indexName, String indexType, StringReaderHandler handler) {
InputStream is = null;
try {
is = getClass().getClassLoader().getResourceAsStream(classpathFile);
if (is == null) {
throw new TechnicalException(String.format("Could not retrieve data file [%s] need to fill index [%s]: ", classpathFile, indexName));
}
bulkFromStream(is, indexName, indexType, handler);
}
finally {
if (is != null) {
try {
is.close();
}
catch(IOException e) {
// Silent is gold
}
}
}
}
@Override
public void bulkFromFile(File file, String indexName, String indexType) {
bulkFromFile(file, indexName, indexType, null);
}
@Override
public void bulkFromFile(File file, String indexName, String indexType, StringReaderHandler handler) {
Preconditions.checkNotNull(file);
Preconditions.checkArgument(file.exists());
InputStream is = null;
try {
is = new BufferedInputStream(new FileInputStream(file));
bulkFromStream(is, indexName, indexType, handler);
}
catch(FileNotFoundException e) {
throw new TechnicalException(String.format("[%s] Could not find file %s", indexName, file.getPath()), e);
}
finally {
if (is != null) {
try {
is.close();
}
catch(IOException e) {
// Silent is gold
}
}
}
}
@Override
public void bulkFromStream(InputStream is, String indexName, String indexType) {
bulkFromStream(is, indexName, indexType, null);
}
@Override
public void bulkFromStream(InputStream is, String indexName, String indexType, StringReaderHandler handler) {
Preconditions.checkNotNull(is);
BulkRequest bulkRequest = Requests.bulkRequest();
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(is));
String line = br.readLine();
StringBuilder builder = new StringBuilder();
while(line != null) {
line = line.trim();
if (StringUtils.isNotBlank(line)) {
if (logger.isTraceEnabled()) {
logger.trace(String.format("[%s] Add to bulk: %s", indexName, line));
}
if (handler != null) {
line = handler.onReadLine(line.trim());
}
builder.append(line).append('\n');
}
line = br.readLine();
}
byte[] data = builder.toString().getBytes();
bulkRequest.add(new BytesArray(data), indexName, indexType, false);
} catch(Exception e) {
throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", indexName, indexType), e);
}
finally {
if (br != null) {
try {
br.close();
}
catch(IOException e) {
// Silent is gold
}
}
}
try {
client.bulk(bulkRequest).actionGet();
} catch(Exception e) {
throw new TechnicalException(String.format("[%s] Error while inserting rows into %s", indexName, indexType), e);
}
}
@Override
public void flushDeleteBulk(final String index, final String type, final BulkRequestBuilder bulkRequest) {
if (bulkRequest.numberOfActions() > 0) {
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
// If failures, continue but save missing blocks
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
for (BulkItemResponse itemResponse : bulkResponse) {
boolean skip = !itemResponse.isFailed();
if (!skip) {
logger.debug(String.format("[%s/%s] Error while deleting doc [%s]: %s. Skipping this deletion.", index, type, itemResponse.getId(), itemResponse.getFailureMessage()));
}
}
}
}
}
@Override
public void flushBulk(final BulkRequestBuilder bulkRequest) {
if (bulkRequest.numberOfActions() == 0) return; // Nothing to flush
// Flush the bulk
BulkResponse bulkResponse = bulkRequest.get();
Set<String> missingDocIds = new LinkedHashSet<>();
// If failures, continue but save missing blocks
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
for (BulkItemResponse itemResponse : bulkResponse) {
boolean skip = !itemResponse.isFailed()
|| missingDocIds.contains(itemResponse.getId());
if (!skip) {
logger.error(String.format("[%s/%s] could not process _id=%s: %s. Skipping.",
itemResponse.getIndex(), itemResponse.getType(), itemResponse.getId(), itemResponse.getFailureMessage()));
missingDocIds.add(itemResponse.getId());
}
}
}
}
@Override
public BulkRequestBuilder bulkDeleteFromSearch(final String index,
final String type,
final SearchRequestBuilder searchRequest,
BulkRequestBuilder bulkRequest,
final int bulkSize,
final boolean flushAll) {
// Execute query, while there is some data
try {
int counter = 0;
boolean loop = true;
searchRequest.setSize(bulkSize);
SearchResponse response = searchRequest.execute().actionGet();
// Execute query, while there is some data
do {
// Read response
SearchHit[] searchHits = response.getHits().getHits();
for (SearchHit searchHit : searchHits) {
// Add deletion to bulk
bulkRequest.add(
client.prepareDelete(index, type, searchHit.getId())
);
counter++;
// Flush the bulk if not empty
if ((bulkRequest.numberOfActions() % bulkSize) == 0) {
flushDeleteBulk(index, type, bulkRequest);
bulkRequest = client.prepareBulk();
}
}
// Prepare next iteration
if (counter == 0 || counter >= response.getHits().getTotalHits()) {
loop = false;
}
// Prepare next iteration
else {
searchRequest.setFrom(counter);
response = searchRequest.execute().actionGet();
}
} while(loop);
// last flush
if (flushAll && (bulkRequest.numberOfActions() % bulkSize) != 0) {
flushDeleteBulk(index, type, bulkRequest);
}
} catch (SearchPhaseExecutionException e) {
// Failed or no item on index
logger.error(String.format("Error while deleting by reference: %s. Skipping deletions.", e.getMessage()), e);
}
return bulkRequest;
}
/* delegate methods */
@Override
public AdminClient admin() {
return client.admin();
}
@Override
public ActionFuture<IndexResponse> index(IndexRequest request) {
return client.index(request);
}
@Override
public void index(IndexRequest request, ActionListener<IndexResponse> listener) {
client.index(request, listener);
}
@Override
public IndexRequestBuilder prepareIndex() {
return client.prepareIndex();
}
@Override
public ActionFuture<UpdateResponse> update(UpdateRequest request) {
return client.update(request);
}
@Override
public void update(UpdateRequest request, ActionListener<UpdateResponse> listener) {
client.update(request, listener);
}
@Override
public UpdateRequestBuilder prepareUpdate() {
return client.prepareUpdate();
}
@Override
public UpdateRequestBuilder prepareUpdate(String index, String type, String id) {
return client.prepareUpdate(index, type, id);
}
@Override
public IndexRequestBuilder prepareIndex(String index, String type) {
return client.prepareIndex(index, type);
}
@Override
public IndexRequestBuilder prepareIndex(String index, String type, @Nullable String id) {
return client.prepareIndex(index, type, id);
}
@Override
public ActionFuture<DeleteResponse> delete(DeleteRequest request) {
return client.delete(request);
}
@Override
public void delete(DeleteRequest request, ActionListener<DeleteResponse> listener) {
client.delete(request, listener);
}
@Override
public DeleteRequestBuilder prepareDelete() {
return client.prepareDelete();
}
@Override
public DeleteRequestBuilder prepareDelete(String index, String type, String id) {
return client.prepareDelete(index, type, id);
}
@Override
public ActionFuture<BulkResponse> bulk(BulkRequest request) {
return client.bulk(request);
}
@Override
public void bulk(BulkRequest request, ActionListener<BulkResponse> listener) {
client.bulk(request, listener);
}
@Override
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
@Override
public ActionFuture<GetResponse> get(GetRequest request) {
return client.get(request);
}
@Override
public void get(GetRequest request, ActionListener<GetResponse> listener) {
client.get(request, listener);
}
@Override
public GetRequestBuilder prepareGet() {
return client.prepareGet();
}
@Override
public GetRequestBuilder prepareGet(String index, @Nullable String type, String id) {
return client.prepareGet(index, type, id);
}
@Override
public PutIndexedScriptRequestBuilder preparePutIndexedScript() {
return client.preparePutIndexedScript();
}
@Override
public PutIndexedScriptRequestBuilder preparePutIndexedScript(@Nullable String scriptLang, String id, String source) {
return client.preparePutIndexedScript(scriptLang, id, source);
}
@Override
public void deleteIndexedScript(DeleteIndexedScriptRequest request, ActionListener<DeleteIndexedScriptResponse> listener) {
client.deleteIndexedScript(request, listener);
}
@Override
public ActionFuture<DeleteIndexedScriptResponse> deleteIndexedScript(DeleteIndexedScriptRequest request) {
return client.deleteIndexedScript(request);
}
@Override
public DeleteIndexedScriptRequestBuilder prepareDeleteIndexedScript() {
return client.prepareDeleteIndexedScript();
}
@Override
public DeleteIndexedScriptRequestBuilder prepareDeleteIndexedScript(@Nullable String scriptLang, String id) {
return client.prepareDeleteIndexedScript(scriptLang, id);
}
@Override
public void putIndexedScript(PutIndexedScriptRequest request, ActionListener<PutIndexedScriptResponse> listener) {
client.putIndexedScript(request, listener);
}
@Override
public ActionFuture<PutIndexedScriptResponse> putIndexedScript(PutIndexedScriptRequest request) {
return client.putIndexedScript(request);
}
@Override
public GetIndexedScriptRequestBuilder prepareGetIndexedScript() {
return client.prepareGetIndexedScript();
}
@Override
public GetIndexedScriptRequestBuilder prepareGetIndexedScript(@Nullable String scriptLang, String id) {
return client.prepareGetIndexedScript(scriptLang, id);
}
@Override
public void getIndexedScript(GetIndexedScriptRequest request, ActionListener<GetIndexedScriptResponse> listener) {
client.getIndexedScript(request, listener);
}
@Override
public ActionFuture<GetIndexedScriptResponse> getIndexedScript(GetIndexedScriptRequest request) {
return client.getIndexedScript(request);
}
@Override
public ActionFuture<MultiGetResponse> multiGet(MultiGetRequest request) {
return client.multiGet(request);
}
@Override
public void multiGet(MultiGetRequest request, ActionListener<MultiGetResponse> listener) {
client.multiGet(request, listener);
}
@Override
public MultiGetRequestBuilder prepareMultiGet() {
return client.prepareMultiGet();
}
@Override
@Deprecated
public ActionFuture<CountResponse> count(CountRequest request) {
return client.count(request);
}
@Override
@Deprecated
public void count(CountRequest request, ActionListener<CountResponse> listener) {
client.count(request, listener);
}
@Override
@Deprecated
public CountRequestBuilder prepareCount(String... indices) {
return client.prepareCount(indices);
}
@Override
@Deprecated
public ActionFuture<ExistsResponse> exists(ExistsRequest request) {
return client.exists(request);
}
@Override
@Deprecated
public void exists(ExistsRequest request, ActionListener<ExistsResponse> listener) {
client.exists(request, listener);
}
@Override
@Deprecated
public ExistsRequestBuilder prepareExists(String... indices) {
return client.prepareExists(indices);
}
@Override
public ActionFuture<SuggestResponse> suggest(SuggestRequest request) {
return client.suggest(request);
}
@Override
public void suggest(SuggestRequest request, ActionListener<SuggestResponse> listener) {
client.suggest(request, listener);
}
@Override
public SuggestRequestBuilder prepareSuggest(String... indices) {
return client.prepareSuggest(indices);
}
@Override
public ActionFuture<SearchResponse> search(SearchRequest request) {
return client.search(request);
}
@Override
public void search(SearchRequest request, ActionListener<SearchResponse> listener) {
client.search(request, listener);
}
@Override
public SearchRequestBuilder prepareSearch(String... indices) {
return client.prepareSearch(indices);
}
@Override
public ActionFuture<SearchResponse> searchScroll(SearchScrollRequest request) {
return client.searchScroll(request);
}
@Override
public void searchScroll(SearchScrollRequest request, ActionListener<SearchResponse> listener) {
client.searchScroll(request, listener);
}
@Override
public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) {
return client.prepareSearchScroll(scrollId);
}
@Override
public ActionFuture<MultiSearchResponse> multiSearch(MultiSearchRequest request) {
return client.multiSearch(request);
}
@Override
public void multiSearch(MultiSearchRequest request, ActionListener<MultiSearchResponse> listener) {
client.multiSearch(request, listener);
}
@Override
public MultiSearchRequestBuilder prepareMultiSearch() {
return client.prepareMultiSearch();
}
@Override
public ActionFuture<TermVectorsResponse> termVectors(TermVectorsRequest request) {
return client.termVectors(request);
}
@Override
public void termVectors(TermVectorsRequest request, ActionListener<TermVectorsResponse> listener) {
client.termVectors(request, listener);
}
@Override
public TermVectorsRequestBuilder prepareTermVectors() {
return client.prepareTermVectors();
}
@Override
public TermVectorsRequestBuilder prepareTermVectors(String index, String type, String id) {
return client.prepareTermVectors(index, type, id);
}
@Override
@Deprecated
public ActionFuture<TermVectorsResponse> termVector(TermVectorsRequest request) {
return client.termVector(request);
}
@Override
@Deprecated
public void termVector(TermVectorsRequest request, ActionListener<TermVectorsResponse> listener) {
client.termVector(request, listener);
}
@Override
@Deprecated
public TermVectorsRequestBuilder prepareTermVector() {
return client.prepareTermVector();
}
@Override
@Deprecated
public TermVectorsRequestBuilder prepareTermVector(String index, String type, String id) {
return client.prepareTermVector(index, type, id);
}
@Override
public ActionFuture<MultiTermVectorsResponse> multiTermVectors(MultiTermVectorsRequest request) {
return client.multiTermVectors(request);
}
@Override
public void multiTermVectors(MultiTermVectorsRequest request, ActionListener<MultiTermVectorsResponse> listener) {
client.multiTermVectors(request, listener);
}
@Override
public MultiTermVectorsRequestBuilder prepareMultiTermVectors() {
return client.prepareMultiTermVectors();
}
@Override
public ActionFuture<PercolateResponse> percolate(PercolateRequest request) {
return client.percolate(request);
}
@Override
public void percolate(PercolateRequest request, ActionListener<PercolateResponse> listener) {
client.percolate(request, listener);
}
@Override
public PercolateRequestBuilder preparePercolate() {
return client.preparePercolate();
}
@Override
public ActionFuture<MultiPercolateResponse> multiPercolate(MultiPercolateRequest request) {
return client.multiPercolate(request);
}
@Override
public void multiPercolate(MultiPercolateRequest request, ActionListener<MultiPercolateResponse> listener) {
client.multiPercolate(request, listener);
}
@Override
public MultiPercolateRequestBuilder prepareMultiPercolate() {
return client.prepareMultiPercolate();
}
@Override
public ExplainRequestBuilder prepareExplain(String index, String type, String id) {
return client.prepareExplain(index, type, id);
}
@Override
public ActionFuture<ExplainResponse> explain(ExplainRequest request) {
return client.explain(request);
}
@Override
public void explain(ExplainRequest request, ActionListener<ExplainResponse> listener) {
client.explain(request, listener);
}
@Override
public ClearScrollRequestBuilder prepareClearScroll() {
return client.prepareClearScroll();
}
@Override
public ActionFuture<ClearScrollResponse> clearScroll(ClearScrollRequest request) {
return client.clearScroll(request);
}
@Override
public void clearScroll(ClearScrollRequest request, ActionListener<ClearScrollResponse> listener) {
client.clearScroll(request, listener);
}
@Override
public FieldStatsRequestBuilder prepareFieldStats() {
return client.prepareFieldStats();
}
@Override
public ActionFuture<FieldStatsResponse> fieldStats(FieldStatsRequest request) {
return client.fieldStats(request);
}
@Override
public void fieldStats(FieldStatsRequest request, ActionListener<FieldStatsResponse> listener) {
client.fieldStats(request, listener);
}
@Override
public Settings settings() {
return client.settings();
}
@Override
public Headers headers() {
return client.headers();
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> ActionFuture<Response> execute(Action<Request, Response, RequestBuilder> action, Request request) {
return client.execute(action, request);
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
client.execute(action, request, listener);
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> RequestBuilder prepareExecute(Action<Request, Response, RequestBuilder> action) {
return client.prepareExecute(action);
}
public ThreadPool threadPool() {
return client.threadPool();
}
public ScheduledThreadPoolExecutor scheduler() {
return (ScheduledThreadPoolExecutor)client.threadPool().scheduler();
}
public void close() {
client.close();
}
public <T extends ActionResponse> Optional<T> safeExecuteRequest(ActionRequestBuilder<?, T, ?> request, boolean wait) {
// Execute in a pool
ListenableActionFuture<T> actionFuture = safeExecuteRequest(request);
// Wait end of action, if need
if (wait && actionFuture != null) {
return Optional.of(actionFuture.actionGet());
}
return Optional.empty();
}
public <T extends ActionResponse> ListenableActionFuture<T> safeExecuteRequest(ActionRequestBuilder<?, T, ?> request) {
// Execute in a pool
ListenableActionFuture<T> actionFuture = null;
boolean acceptedInPool = false;
long retryCounter = 0;
while(!acceptedInPool) {
try {
retryCounter++;
actionFuture = request.execute();
acceptedInPool = true;
}
catch(EsRejectedExecutionException e) {
// To many try: cancel
if (retryCounter >= 30 ) {
logger.error("Thread pool full (after waiting 1min). Cancelling an insert or update operation.");
throw new TechnicalException("Thread pool full (after waiting 1min). Cancelling an insert or update operation.");
}
// not accepted, so wait
try {
logger.debug("Thread pool seems busy: waiting 2s and retry...");
Thread.sleep(2000); // 2s
}
catch(InterruptedException e2) {
// silent
}
}
}
return actionFuture;
}
}
package org.duniter.elasticsearch.dao;
/*
* #%L
* Duniter4j :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import com.fasterxml.jackson.core.JsonProcessingException;
import org.duniter.core.exception.TechnicalException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
/**
* Created by Benoit on 08/04/2015.
*/
public abstract class AbstractIndexRepository<T extends IndexRepository> extends AbstractRepository implements IndexRepository<T> {
private final String index;
public AbstractIndexRepository(String index) {
super("duniter.dao."+index);
this.index = index;
}
/**
* Create index
* @throws JsonProcessingException
*/
protected abstract void createIndex() throws JsonProcessingException;
@Override
public String getIndex() {
return index;
}
@Override
public T createIndexIfNotExists() {
try {
if (!client.existsIndex(index)) {
createIndex();
}
}
catch(JsonProcessingException e) {
throw new TechnicalException(String.format("Error while creating index [%s]", index));
}
return (T)this;
}
@Override
public T deleteIndex() {
client.deleteIndexIfExists(index);
return (T)this;
}
@Override
public boolean existsIndex() {
return client.existsIndex(index);
}
/* -- protected methods -- */
protected void deleteIndexIfExists(){
if (!client.existsIndex(index)) {
return;
}
if (logger.isInfoEnabled()) {
logger.info(String.format("Deleting index [%s]", index));
}
DeleteIndexRequestBuilder deleteIndexRequestBuilder = client.admin().indices().prepareDelete(index);
deleteIndexRequestBuilder.execute().actionGet();
}
}
package org.duniter.elasticsearch.dao;
/*
* #%L
* Duniter4j :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import com.fasterxml.jackson.core.JsonProcessingException;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.Preconditions;
import org.duniter.elasticsearch.dao.handler.StringReaderHandler;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilder;
import java.io.File;
import java.io.InputStream;
import java.util.Map;
/**
* Created by Benoit on 08/04/2015.
*/
public abstract class AbstractIndexTypeRepository<T extends IndexTypeRepository>
extends AbstractRepository
implements IndexTypeRepository<T, String> {
private final String index;
private final String type;
public AbstractIndexTypeRepository(String index, String type) {
super("duniter.dao."+index);
this.index = index;
this.type = type;
}
/**
* Create index
* @throws JsonProcessingException
*/
protected abstract void createIndex() throws JsonProcessingException;
@Override
public String getIndex() {
return index;
}
@Override
public String getType() {
return type;
}
@Override
public T createIndexIfNotExists() {
try {
if (!client.existsIndex(index)) {
createIndex();
}
}
catch(JsonProcessingException e) {
throw new TechnicalException(String.format("Error while creating index [%s]", index));
}
return (T)this;
}
@Override
public T deleteIndex() {
client.deleteIndexIfExists(index);
return (T)this;
}
@Override
public boolean existsById(String docId) {
return client.isDocumentExists(index, type, docId);
}
public String create(final String json) {
return client.indexDocumentFromJson(index, type, json);
}
public void update(final String id, final String json) {
client.updateDocumentFromJson(index, type, id, json);
}
public String indexDocumentFromJson(String json) {
return client.indexDocumentFromJson(index, type, json);
}
public void updateDocumentFromJson(String id, String json) {
client.updateDocumentFromJson(index, type, id, json);
}
/**
* Retrieve a field from a document id
* @param docId
* @return
*/
public Object getFieldById(String docId, String fieldName) {
return client.getFieldById(index, type, docId, fieldName);
}
public <T> T getTypedFieldById(String docId, String fieldName) {
return client.getTypedFieldById(index, type, docId, fieldName);
}
@Override
public Map<String, Object> getMandatoryFieldsById(String docId, String... fieldNames) {
return client.getMandatoryFieldsById(index, type, docId, fieldNames);
}
@Override
public Map<String, Object> getFieldsById(String docId, String... fieldNames) {
return client.getFieldsById(index, type, docId, fieldNames);
}
/**
* Retrieve a document by id (safe mode)
* @param docId
* @return
*/
public <T extends Object> T getSourceByIdOrNull(String docId, Class<T> classOfT, String... fieldNames) {
return client.getSourceByIdOrNull(index, type, docId, classOfT, fieldNames);
}
/**
* Retrieve a document by id
* @param docId
* @return
*/
public <T extends Object> T getSourceById(String docId, Class<T> classOfT, String... fieldNames) {
return client.getSourceById(index, type, docId, classOfT, fieldNames);
}
public void bulkFromClasspathFile(String classpathFile) {
client.bulkFromClasspathFile(classpathFile, index, type, null);
}
public void bulkFromClasspathFile(String classpathFile, StringReaderHandler handler) {
client.bulkFromClasspathFile(classpathFile, index, type, handler);
}
public void bulkFromFile(File file) {
client.bulkFromFile(file, index, type, null);
}
public void bulkFromFile(File file, StringReaderHandler handler) {
client.bulkFromFile(file, index, type, handler);
}
public void bulkFromStream(InputStream is) {
client.bulkFromStream(is, index, type, null);
}
public void bulkFromStream(InputStream is, StringReaderHandler handler) {
client.bulkFromStream(is, index, type, handler);
}
public void flushDeleteBulk(BulkRequestBuilder bulkRequest) {
client.flushDeleteBulk(index, type, bulkRequest);
}
@Override
public boolean existsIndex() {
return client.existsIndex(index);
}
public void create(String json, boolean wait) {
Preconditions.checkNotNull(json);
// Execute
client.safeExecuteRequest(client.prepareIndex(getIndex(), getType())
.setRefresh(false) // let's see if this works
.setSource(json), wait);
}
public void update(String id, String json, boolean wait) {
Preconditions.checkNotNull(json);
// Execute
client.safeExecuteRequest(client.prepareUpdate(getIndex(), getType(), id)
.setRefresh(false) // let's see if this works
.setDoc(json), wait);
}
public long count() {
return count(null);
}
public long count(QueryBuilder query) {
// Prepare count request
SearchRequestBuilder searchRequest = client
.prepareSearch(getIndex())
.setTypes(getType())
.setFetchSource(false)
.setSearchType(SearchType.QUERY_AND_FETCH)
.setSize(0);
// Query
if (query != null) {
searchRequest.setQuery(query);
}
// Execute query
try {
SearchResponse response = searchRequest.execute().actionGet();
return response.getHits().getTotalHits();
}
catch(SearchPhaseExecutionException e) {
// Failed or no item on index
logger.error(String.format("Error while counting comment replies: %s", e.getMessage()), e);
}
return 1;
}
@Override
public void deleteById(final String id) {
Preconditions.checkNotNull(id);
client.prepareDelete(index, type, id).execute().actionGet();
}
}
package org.duniter.elasticsearch.dao;
/*
* #%L
* Duniter4j :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* #L%
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.duniter.core.beans.Bean;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
import org.duniter.core.service.CryptoService;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.client.Duniter4jClient;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.SearchHit;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Created by Benoit on 08/04/2015.
*/
public abstract class AbstractRepository implements Bean {
protected final String loggerName;
protected ESLogger logger;
protected Duniter4jClient client;
protected CryptoService cryptoService;
protected PluginSettings pluginSettings;
protected ObjectMapper objectMapper;
public AbstractRepository(String loggerName) {
super();
this.loggerName = loggerName;
}
@Inject
public void setClient(Duniter4jClient client) {
this.client = client;
}
@Inject
public void setCryptoService(CryptoService cryptoService) {
this.cryptoService = cryptoService;
}
@Inject
public void setPluginSettings(PluginSettings pluginSettings) {
this.pluginSettings = pluginSettings;
this.logger = Loggers.getLogger(loggerName, pluginSettings.getSettings(), new String[0]);
}
/* -- protected methods -- */
protected ObjectMapper getObjectMapper() {
if (objectMapper == null) {
objectMapper = JacksonUtils.newObjectMapper();
}
return objectMapper;
}
protected <C> List<C> toList(SearchResponse response, final Function<SearchHit, C> mapper) {
if (response.getHits() == null || response.getHits().getTotalHits() == 0) return ImmutableList.of();
return Arrays.stream(response.getHits().getHits())
.map(mapper::apply)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
protected Stream<SearchHit> toStream(SearchRequestBuilder request, int size) {
int from = 0;
long total = -1;
request.setSize(size);
Stream<SearchHit> result = null;
do{
SearchResponse response = request.execute().actionGet();
// Create the result stream, or concat
result = (result == null) ?
Arrays.stream(response.getHits().getHits()) :
Stream.concat(result, Arrays.stream(response.getHits().getHits()));
from += size;
request.setFrom(from);
if (total == -1) total = (response.getHits() != null) ? response.getHits().getTotalHits() : 0;
}
while(from < total);
return result;
}
protected Stream<SearchHit> toStream(SearchResponse response) {
if (response.getHits() == null || response.getHits().getTotalHits() == 0) return Stream.empty();
return Arrays.stream(response.getHits().getHits());
}
protected <C> List<C> toList(SearchResponse response, Class<? extends C> clazz) {
final ObjectMapper objectMapper = getObjectMapper();
return toList(response, hit -> readValueOrNull(objectMapper, hit, clazz));
}
protected <C> List<C> toList(SearchRequestBuilder request, Class<? extends C> clazz) {
final List<C> result = Lists.newArrayList();
final ObjectMapper objectMapper = getObjectMapper();
int size = this.pluginSettings.getIndexBulkSize();
long total = -1;
int from = 0;
request.setSize(size).setFrom(from);
do {
SearchResponse response = client.safeExecuteRequest(request).actionGet();
toStream(response)
.map(hit -> readValueOrNull(objectMapper, hit, clazz))
.filter(Objects::nonNull)
.forEach(result::add);
from += size;
request.setFrom(from);
if (total == -1) total = response.getHits().getTotalHits();
} while(from < total);
return result;
}
protected <C> C readValueOrNull(ObjectMapper objectMapper, SearchHit hit, Class<C> clazz) {
try {
return objectMapper.readValue(hit.getSourceRef().streamInput(), clazz);
}
catch(IOException e) {
logger.warn(String.format("Unable to deserialize source [%s/%s/%s] into [%s]: %s", hit.getIndex(), hit.getType(), hit.getId(), clazz.getName(), e.getMessage()));
return null;
}
}
protected Set<String> executeAndGetIds(SearchResponse response) {
return toStream(response).map(SearchHit::getId).collect(Collectors.toSet());
}
protected Set<String> executeAndGetIds(SearchRequestBuilder request) {
Set<String> result = Sets.newHashSet();
int size = this.pluginSettings.getIndexBulkSize();
request.setSize(size);
long total = -1;
int from = 0;
do {
request.setFrom(from);
SearchResponse response = request.execute().actionGet();
toStream(response).forEach(hit -> result.add(hit.getId()));
if (total == -1) total = response.getHits().getTotalHits();
from += size;
} while(from<total);
return result;
}
}