diff --git a/pom.xml b/pom.xml index f8299fe73cb6937f87bc28449dfe4f1aaeeb3eef..e30ab408eaf6e14b20b7cddb86ae5c2421709c80 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ <gson.version>2.2.2</gson.version> <kalium.version>0.4.0</kalium.version> <scrypt.version>1.4.0</scrypt.version> + <!--<elasticsearch.version>2.2.0</elasticsearch.version>--> <elasticsearch.version>2.1.1</elasticsearch.version> <jna.version>4.1.0</jna.version> @@ -39,7 +40,6 @@ <spring-security.version>4.0.2.RELEASE</spring-security.version> <aspectj.version>1.8.7</aspectj.version> - <yuicompressor-maven-plugin.version>1.3.0</yuicompressor-maven-plugin.version> <htmlcompressor-maven-plugin.version>1.3</htmlcompressor-maven-plugin.version> <servlet-api.version>2.5</servlet-api.version> @@ -123,16 +123,6 @@ <url>https://github.com/ucoin-io/ucoinj/issues</url> </issueManagement> - <repositories> - <repository> - <id>wicketstuff-core-releases</id> - <url>https://oss.sonatype.org/content/repositories/releases</url> - <releases> - <enabled>true</enabled> - </releases> - </repository> - </repositories> - <dependencyManagement> <dependencies> @@ -335,8 +325,13 @@ <version>${jetty.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>javax.websocket</groupId> + <artifactId>javax.websocket-api</artifactId> + <version>1.1</version> + </dependency> - <!-- NaCL lib --> + <!-- NaCL lib --> <dependency> <groupId>org.abstractj.kalium</groupId> <artifactId>kalium</artifactId> @@ -557,9 +552,21 @@ </pluginManagement> </build> - <!-- Repositories needed to find the dependencies <repositories> <repository> <id>adagio-public-group</id> <url>http://nexus.e-is.pro/nexus/content/groups/ucoinj</url> - <snapshots> <enabled>true</enabled> <checksumPolicy>fail</checksumPolicy> </snapshots> <releases> <enabled>true</enabled> <checksumPolicy>fail</checksumPolicy> </releases> - </repository> </repositories> --> + <!-- Repositories needed to find the dependencies --> + <repositories> + <repository> + <id>ucoinj-public-group</id> + <url>http://nexus.e-is.pro/nexus/content/groups/ucoinj</url> + <snapshots> + <enabled>true</enabled> + <checksumPolicy>fail</checksumPolicy> + </snapshots> + <releases> + <enabled>true</enabled> + <checksumPolicy>fail</checksumPolicy> + </releases> + </repository> + </repositories> <pluginRepositories> <!-- Need for javascript minify maven plugin (yuicompressor-maven-plugin) --> diff --git a/ucoinj-cesium/src/main/cesium b/ucoinj-cesium/src/main/cesium index 76ab7adf4186c32b5083dd1ea82ae3e81ab602df..7fe6f85bd6150ce16c3a1d894836d6d757274175 160000 --- a/ucoinj-cesium/src/main/cesium +++ b/ucoinj-cesium/src/main/cesium @@ -1 +1 @@ -Subproject commit 76ab7adf4186c32b5083dd1ea82ae3e81ab602df +Subproject commit 7fe6f85bd6150ce16c3a1d894836d6d757274175 diff --git a/ucoinj-core-client/pom.xml b/ucoinj-core-client/pom.xml index 12d898c6d1fb87649ac1b000c7bfa6d64c9a6170..08207ba9d02a864d776f06f8f8b1e3fed3930589 100644 --- a/ucoinj-core-client/pom.xml +++ b/ucoinj-core-client/pom.xml @@ -33,6 +33,23 @@ <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>javax.websocket</groupId> + <artifactId>javax.websocket-api</artifactId> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.glassfish.tyrus</groupId> + <artifactId>tyrus-client</artifactId> + <version>1.12</version> + </dependency> + + <dependency> + <groupId>org.glassfish.tyrus</groupId> + <artifactId>tyrus-container-grizzly-client</artifactId> + <version>1.12</version> + </dependency> <dependency> <groupId>org.apache.commons</groupId> diff --git a/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/model/bma/BlockchainBlock.java b/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/model/bma/BlockchainBlock.java index 4e15b92dbe6a4442e0772dc03fc47700e06d74af..41a4d5625d1702392bdb157e340635c3695fec19 100644 --- a/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/model/bma/BlockchainBlock.java +++ b/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/model/bma/BlockchainBlock.java @@ -25,6 +25,7 @@ package io.ucoin.ucoinj.core.client.model.bma; import java.io.Serializable; +import java.math.BigInteger; /** * A block from the blockchain. @@ -43,7 +44,7 @@ public class BlockchainBlock implements Serializable { private Integer time; private Integer medianTime; private Integer membersCount; - private Long monetaryMass; + private BigInteger monetaryMass; private String currency; private String issuer; private String signature; @@ -111,11 +112,11 @@ public class BlockchainBlock implements Serializable { this.membersCount = membersCount; } - public Long getMonetaryMass() { + public BigInteger getMonetaryMass() { return monetaryMass; } - public void setMonetaryMass(Long monetaryMass) { + public void setMonetaryMass(BigInteger monetaryMass) { this.monetaryMass = monetaryMass; } diff --git a/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/model/local/Peer.java b/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/model/local/Peer.java index ea69c7570ebf20018fe25085f8388c47eaedc019..1848939beff04028baceda9db6adedc9bc73161a 100644 --- a/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/model/local/Peer.java +++ b/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/model/local/Peer.java @@ -82,10 +82,16 @@ public class Peer implements LocalEntity, Serializable { } public String toString() { +/* return new StringBuilder().append("url=").append(url).append(",") .append("host=").append(host).append(",") .append("port=").append(port) .toString(); +*/ + return new StringBuilder().append(host) + .append(":") + .append(port) + .toString(); } @Override diff --git a/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteService.java b/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteService.java index 616682ce888b4f5bfa60096b17537260ea444653..75d0c6a850d6f995873aaec1baa469167a7dad35 100644 --- a/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteService.java +++ b/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteService.java @@ -33,7 +33,11 @@ import io.ucoin.ucoinj.core.client.model.local.Wallet; import io.ucoin.ucoinj.core.client.service.exception.PubkeyAlreadyUsedException; import io.ucoin.ucoinj.core.client.service.exception.UidAlreadyUsedException; import io.ucoin.ucoinj.core.client.service.exception.UidMatchAnotherPubkeyException; +import io.ucoin.ucoinj.core.exception.TechnicalException; +import io.ucoin.ucoinj.core.util.websocket.WebsocketClientEndpoint; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; public interface BlockchainRemoteService extends Service { @@ -210,4 +214,9 @@ public interface BlockchainRemoteService extends Service { */ Map<Integer, Long> getUDs(long currencyId, long startOffset); + void addNewBlockListener(long currencyId, WebsocketClientEndpoint.MessageHandler messageHandler); + + void addNewBlockListener(Peer peer, WebsocketClientEndpoint.MessageHandler messageHandler); + + } \ No newline at end of file diff --git a/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteServiceImpl.java b/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteServiceImpl.java index c93b99dad1c7308786754aea64d435beac6e81c6..a24867a817aacfa7454007647f350a8a8472038a 100644 --- a/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteServiceImpl.java +++ b/ucoinj-core-client/src/main/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteServiceImpl.java @@ -21,15 +21,16 @@ package io.ucoin.ucoinj.core.client.service.bma; * <http://www.gnu.org/licenses/gpl-3.0.html>. * #L% */ + import io.ucoin.ucoinj.core.client.config.Configuration; +import io.ucoin.ucoinj.core.client.model.bma.BlockchainBlock; +import io.ucoin.ucoinj.core.client.model.bma.BlockchainMemberships; +import io.ucoin.ucoinj.core.client.model.bma.BlockchainParameters; import io.ucoin.ucoinj.core.client.model.bma.gson.JsonArrayParser; import io.ucoin.ucoinj.core.client.model.local.Currency; import io.ucoin.ucoinj.core.client.model.local.Identity; import io.ucoin.ucoinj.core.client.model.local.Peer; import io.ucoin.ucoinj.core.client.model.local.Wallet; -import io.ucoin.ucoinj.core.client.model.bma.BlockchainBlock; -import io.ucoin.ucoinj.core.client.model.bma.BlockchainMemberships; -import io.ucoin.ucoinj.core.client.model.bma.BlockchainParameters; import io.ucoin.ucoinj.core.client.service.ServiceLocator; import io.ucoin.ucoinj.core.client.service.exception.HttpBadRequestException; import io.ucoin.ucoinj.core.client.service.exception.PubkeyAlreadyUsedException; @@ -37,10 +38,12 @@ import io.ucoin.ucoinj.core.client.service.exception.UidAlreadyUsedException; import io.ucoin.ucoinj.core.client.service.exception.UidMatchAnotherPubkeyException; import io.ucoin.ucoinj.core.exception.TechnicalException; import io.ucoin.ucoinj.core.service.CryptoService; +import io.ucoin.ucoinj.core.util.CollectionUtils; import io.ucoin.ucoinj.core.util.ObjectUtils; import io.ucoin.ucoinj.core.util.StringUtils; import io.ucoin.ucoinj.core.util.cache.Cache; import io.ucoin.ucoinj.core.util.cache.SimpleCache; +import io.ucoin.ucoinj.core.util.websocket.WebsocketClientEndpoint; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; @@ -48,11 +51,11 @@ import org.apache.http.message.BasicNameValuePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.*; public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implements BlockchainRemoteService { @@ -88,6 +91,8 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement // Cache on blockchain parameters private Cache<Long, BlockchainParameters> mParametersCache; + private Map<URI, WebsocketClientEndpoint> blockWsEndPoints = new HashMap<>(); + public BlockchainRemoteServiceImpl() { super(); } @@ -102,6 +107,18 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement initCaches(); } + @Override + public void close() throws IOException { + super.close(); + + if (blockWsEndPoints.size() != 0) { + for (WebsocketClientEndpoint clientEndPoint: blockWsEndPoints.values()) { + clientEndPoint.close(); + } + blockWsEndPoints.clear(); + } + } + @Override public BlockchainParameters getParameters(long currencyId, boolean useCache) { if (!useCache) { @@ -486,6 +503,36 @@ public class BlockchainRemoteServiceImpl extends BaseRemoteServiceImpl implement return result; } + @Override + public void addNewBlockListener(long currencyId, WebsocketClientEndpoint.MessageHandler messageHandler) { + Peer peer = peerService.getActivePeerByCurrencyId(currencyId); + addNewBlockListener(peer, messageHandler); + } + + @Override + public void addNewBlockListener(Peer peer, WebsocketClientEndpoint.MessageHandler messageHandler) { + + try { + URI wsBlockURI = new URI(String.format("ws://%s:%s/ws/block", + peer.getHost(), + peer.getPort())); + + // Get the websocket, or open new one if not exists + WebsocketClientEndpoint wsClientEndPoint = blockWsEndPoints.get(wsBlockURI); + if (wsClientEndPoint == null || wsClientEndPoint.isClosed()) { + wsClientEndPoint = new WebsocketClientEndpoint(wsBlockURI); + blockWsEndPoints.put(wsBlockURI, wsClientEndPoint); + } + + // add listener + wsClientEndPoint.addMessageHandler(messageHandler); + + } catch (URISyntaxException ex) { + throw new TechnicalException("could not create URI need for web socket on block: " + ex.getMessage()); + } + + } + /* -- Internal methods -- */ /** diff --git a/ucoinj-core-client/src/test/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteServiceTest.java b/ucoinj-core-client/src/test/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteServiceTest.java index 1c7ed5b46e026250c9b67b21555c4ca57adf778b..9684b0844f290ff3ff2e05f31232ef8382aa61c2 100644 --- a/ucoinj-core-client/src/test/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteServiceTest.java +++ b/ucoinj-core-client/src/test/java/io/ucoin/ucoinj/core/client/service/bma/BlockchainRemoteServiceTest.java @@ -34,6 +34,7 @@ import io.ucoin.ucoinj.core.client.model.Member; import io.ucoin.ucoinj.core.client.model.bma.gson.GsonUtils; import io.ucoin.ucoinj.core.client.model.local.Peer; import io.ucoin.ucoinj.core.client.service.ServiceLocator; +import io.ucoin.ucoinj.core.util.websocket.WebsocketClientEndpoint; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,9 +48,12 @@ public class BlockchainRemoteServiceTest { private BlockchainRemoteService service; + private boolean isWebSocketNewBlockReceived; + @Before public void setUp() { service = ServiceLocator.instance().getBlockchainRemoteService(); + isWebSocketNewBlockReceived = false; } @Test @@ -102,6 +106,28 @@ public class BlockchainRemoteServiceTest { } + @Test + public void addNewBlockListener() throws Exception { + + isWebSocketNewBlockReceived = false; + + service.addNewBlockListener(createTestPeer(), new WebsocketClientEndpoint.MessageHandler() { + @Override + public void handleMessage(String message) { + BlockchainBlock block = GsonUtils.newBuilder().create().fromJson(message, BlockchainBlock.class); + log.debug("Received block #" + block.getNumber()); + isWebSocketNewBlockReceived = true; + } + }); + + int count = 0; + while(!isWebSocketNewBlockReceived) { + Thread.sleep(1000); // wait 1s + count++; + Assert.assertTrue("No block received from WebSocket, after 10s", count<10); + } + } + /* -- Internal methods -- */ protected Peer createTestPeer() { diff --git a/ucoinj-core-client/src/test/resources/log4j.properties b/ucoinj-core-client/src/test/resources/log4j.properties index b828a73a8a4971029a286063bc17c9d38973bc4a..aaa242d703bd3b591811389c3cd33fdf279c21ff 100644 --- a/ucoinj-core-client/src/test/resources/log4j.properties +++ b/ucoinj-core-client/src/test/resources/log4j.properties @@ -10,7 +10,8 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %5p (%c:%L) - %m%n # ucoin levels log4j.logger.io.ucoin.ucoinj=INFO #log4j.logger.io.ucoin.ucoinj.core.client.service=DEBUG -log4j.appender.io.ucoin.ucoinj.core.beans=WARN +log4j.logger.io.ucoin.ucoinj.core.client.service.bma=DEBUG +log4j.logger.io.ucoin.ucoinj.core.beans=WARN log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.file=ucoin-client.log diff --git a/ucoinj-core-client/src/test/resources/ucoinj-core-client-test.properties b/ucoinj-core-client/src/test/resources/ucoinj-core-client-test.properties index ed9ab39ef7d714d2a7b2c2230519744ad6f8e4fc..0774f09e68a257a381edb39debc29254a00018dd 100644 --- a/ucoinj-core-client/src/test/resources/ucoinj-core-client-test.properties +++ b/ucoinj-core-client/src/test/resources/ucoinj-core-client-test.properties @@ -1,9 +1,10 @@ #ucoinj.node.host=metab.ucoin.io -ucoinj.node.host=metab.ucoin.fr +#ucoinj.node.host=metab.ucoin.fr +ucoinj.node.host=192.168.0.28 ucoinj.node.port=9201 ucoinj.node.elasticsearch.host=localhost -ucoinj.node.elasticsearch.port=8080 +ucoinj.node.elasticsearch.port=9200 #ucoinj.node.elasticsearch.rest.host=www.data.ucoin.fr #ucoinj.node.elasticsearch.rest.port=80 diff --git a/ucoinj-core-shared/pom.xml b/ucoinj-core-shared/pom.xml index 5be84e84b630a51ce538a4594dc601e4ad95981a..86d94fd2500d8341101a3494db0656390d6feb8f 100644 --- a/ucoinj-core-shared/pom.xml +++ b/ucoinj-core-shared/pom.xml @@ -45,6 +45,12 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + + <dependency> + <groupId>javax.websocket</groupId> + <artifactId>javax.websocket-api</artifactId> + </dependency> + <!--dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> diff --git a/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/CommandLinesUtils.java b/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/CommandLinesUtils.java index faee6f2792f26b8531d10598172bc171e9662e40..afb9a021645b06451b566910985601b528a6e170 100644 --- a/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/CommandLinesUtils.java +++ b/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/CommandLinesUtils.java @@ -43,10 +43,7 @@ public class CommandLinesUtils { String inputValue = null; while (inputValue == null) { System.out.print(message.trim()); - if (StringUtils.isNotEmpty(defaultValue)) { - System.out.print(String.format(" [%s]", defaultValue)); - } - System.out.print(": "); + System.out.print("\n"); inputValue = scanIn.nextLine(); if (StringUtils.isBlank(inputValue)) { // A default exists: use it diff --git a/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/websocket/WebsocketClientEndpoint.java b/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/websocket/WebsocketClientEndpoint.java new file mode 100644 index 0000000000000000000000000000000000000000..84744898a1b5c852788c04b368a6b8b58f3e378e --- /dev/null +++ b/ucoinj-core-shared/src/main/java/io/ucoin/ucoinj/core/util/websocket/WebsocketClientEndpoint.java @@ -0,0 +1,134 @@ +package io.ucoin.ucoinj.core.util.websocket; + +import com.google.common.collect.Lists; +import io.ucoin.ucoinj.core.util.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.websocket.*; +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.List; + +/** + * ChatServer Client + * + * @author Jiji_Sasidharan + */ +@ClientEndpoint +public class WebsocketClientEndpoint implements Closeable { + + private static final Logger log = LoggerFactory.getLogger(WebsocketClientEndpoint.class); + + private Session userSession = null; + private List<MessageHandler> messageHandlers = Lists.newArrayList(); + private final URI endpointURI; + + public WebsocketClientEndpoint(URI endpointURI) { + this.endpointURI = endpointURI; + try { + WebSocketContainer container = ContainerProvider.getWebSocketContainer(); + container.connectToServer(this, endpointURI); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + if (log.isDebugEnabled()) { + log.debug(String.format("[%s] Closing WebSocket session...", endpointURI)); + } + userSession.close(); + userSession = null; + } + + /** + * Callback hook for Connection open events. + * + * @param userSession the userSession which is opened. + */ + @OnOpen + public void onOpen(Session userSession) { + if (log.isDebugEnabled()) { + log.debug(String.format("Opening WebSocket... [%s]", endpointURI)); + } + this.userSession = userSession; + } + + /** + * Callback hook for Connection close events. + * + * @param userSession the userSession which is getting closed. + * @param reason the reason for connection close + */ + @OnClose + public void onClose(Session userSession, CloseReason reason) { + if (log.isDebugEnabled()) { + log.debug(String.format("Closing WebSocket... [%s]", endpointURI)); + } + this.userSession = null; + } + + /** + * Callback hook for Message Events. This method will be invoked when a client send a message. + * + * @param message The text message + */ + @OnMessage + public void onMessage(String message) { + synchronized (messageHandlers) { + if (CollectionUtils.isNotEmpty(messageHandlers)) { + if (log.isDebugEnabled()) { + log.debug("[%s] Received message: " + message); + } + + for (MessageHandler messageHandler : messageHandlers) { + try { + messageHandler.handleMessage(message); + } catch (Exception e) { + log.error(String.format("[%s] Error during message handling: %s", endpointURI, e.getMessage()), e); + } + } + } + } + } + + /** + * register message handler + * + * @param msgHandler + */ + public void addMessageHandler(MessageHandler msgHandler) { + synchronized (messageHandlers) { + this.messageHandlers.add(msgHandler); + } + } + + /** + * Send a message. + * + * @param message + */ + public void sendMessage(String message) { + this.userSession.getAsyncRemote().sendText(message); + } + + /** + * Is closed ? + * @return + */ + public boolean isClosed() { + return (userSession == null); + } + /** + * Message handler. + * + * @author Jiji_Sasidharan + */ + public static interface MessageHandler { + + public void handleMessage(String message); + } +} \ No newline at end of file diff --git a/ucoinj-elasticsearch-plugin/pom.xml b/ucoinj-elasticsearch-plugin/pom.xml index 9aff332d094922a67e4277e3ac489e8b41478a2b..bf37f2d6af80444accee8d0b8a769676bcb97239 100644 --- a/ucoinj-elasticsearch-plugin/pom.xml +++ b/ucoinj-elasticsearch-plugin/pom.xml @@ -19,6 +19,37 @@ <artifactId>elasticsearch</artifactId> <scope>compile</scope> </dependency> + + <!-- JNA --> + <dependency> + <groupId>net.java.dev.jna</groupId> + <artifactId>jna</artifactId> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.xbib.elasticsearch.plugin</groupId> + <artifactId>elasticsearch-transport-websocket</artifactId> + <version>1.4.0.0</version> + </dependency> + + <!-- Unit tests --> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + <scope>test</scope> + </dependency> + </dependencies> <build> diff --git a/ucoinj-elasticsearch-plugin/src/test/java/io/ucoin/ucoinj/elasticsearch/plugin/StartES.java b/ucoinj-elasticsearch-plugin/src/test/java/io/ucoin/ucoinj/elasticsearch/plugin/StartES.java new file mode 100644 index 0000000000000000000000000000000000000000..7f1b2c6652a7509855ba26eeaa6a9294cd612417 --- /dev/null +++ b/ucoinj-elasticsearch-plugin/src/test/java/io/ucoin/ucoinj/elasticsearch/plugin/StartES.java @@ -0,0 +1,44 @@ +package io.ucoin.ucoinj.elasticsearch.plugin; + +import com.google.common.collect.Lists; +import org.elasticsearch.bootstrap.Elasticsearch; + +import java.io.File; +import java.util.Arrays; +import java.util.List; + +/** + * Created by blavenie on 02/02/16. + */ +public class StartES { + + public StartES() { + + } + + public static void main(String args[]) { + List<String> argList = Lists.newArrayList(); + if (args != null && args.length > 0) { + argList.addAll(Arrays.asList(args)); + } + + // path.data + String pathData = null; + if (argList.size() == 1) { + pathData = argList.get(0); + argList.remove(pathData); + } + else { + pathData = System.getProperty("tmp.dir") + File.separator + "elasticsearch-plugin-unit-test"; + } + + System.setProperty("es.path.home", "src/test/es-home"); + System.setProperty("es.path.data", pathData + File.separator + "data"); + System.setProperty("es.http.enable", "true"); + + argList.add("start"); + + Elasticsearch.main(argList.toArray(new String[argList.size()])); + + } +} diff --git a/ucoinj-elasticsearch-plugin/src/test/resources/log4j.properties b/ucoinj-elasticsearch-plugin/src/test/resources/log4j.properties new file mode 100644 index 0000000000000000000000000000000000000000..8a39e9b5e79ede37b62a300e7409f1393956eb46 --- /dev/null +++ b/ucoinj-elasticsearch-plugin/src/test/resources/log4j.properties @@ -0,0 +1,22 @@ +### +# Global logging configuration +log4j.rootLogger=ERROR, stdout, file + +# Console output +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %5p (%c:%L) - %m%n + +# ucoin levels +log4j.logger.io.ucoin.ucoinj=INFO +#log4j.logger.io.ucoin.ucoinj.core.client.service=DEBUG + +log4j.appender.file=org.apache.log4j.RollingFileAppender +log4j.appender.file.file=target/ucoinj-elasticsearch-plugin.log +log4j.appender.file.MaxFileSize=10MB +log4j.appender.file.MaxBackupIndex=4 + +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{ISO8601} %5p %c - %m%n + + diff --git a/ucoinj-elasticsearch/pom.xml b/ucoinj-elasticsearch/pom.xml index 46cf05f6e906a857388b2403f17e7e9041e8029c..fe1f98fd0d927e886bc5e19d7b56c5c8679642d6 100644 --- a/ucoinj-elasticsearch/pom.xml +++ b/ucoinj-elasticsearch/pom.xml @@ -28,6 +28,7 @@ io.ucoin.ucoinj.elasticsearch.Main </maven.jar.main.class> + <ucoinj-elasticsearch.config>${project.basedir}/src/test/resources/ucoinj-elasticsearch-test.properties</ucoinj-elasticsearch.config> </properties> <dependencies> @@ -258,6 +259,12 @@ <mainClass>${exec.mainClass}</mainClass> <classpathScope>${exec.classpathScope}</classpathScope> <commandlineArgs>start</commandlineArgs> + <systemProperties> + <property > + <key>ucoinj-elasticsearch.config</key> + <value>${ucoinj-elasticsearch.config}</value> + </property> + </systemProperties> </configuration> </execution> </executions> diff --git a/ucoinj-elasticsearch/src/main/assembly/min/ucoinj-elasticsearch.bat b/ucoinj-elasticsearch/src/main/assembly/min/ucoinj-elasticsearch.bat index 0b27e5e532697080f733c376caee4aa1e55e070f..d2486eebda815a79a8b87a153270ed35e1102522 100644 --- a/ucoinj-elasticsearch/src/main/assembly/min/ucoinj-elasticsearch.bat +++ b/ucoinj-elasticsearch/src/main/assembly/min/ucoinj-elasticsearch.bat @@ -11,6 +11,7 @@ set APP_BASEDIR=%CD% set JAVA_COMMAND=%JAVA_HOME%\bin\java set APP_LOG_FILE=%APP_BASEDIR%\data\${project.artifactId}-${project.version}.log set JAVA_OPTS=-Xmx1G +set APP_CONF_FILE=%APP_BASEDIR%\ucoinj.config if not exist "%JAVA_HOME%" goto no_java @@ -31,7 +32,7 @@ echo . set OLDDIR=%CD% cd /d %~dp0% -call "%JAVA_COMMAND%" %JAVA_OPTS% "-Ducoinj.log.file=%APP_LOG_FILE%" -Djna.nosys=true -jar ${project.build.finalName}.${project.packaging} %1 %2 %3 %4 %5 %6 %7 %8 %9 +call "%JAVA_COMMAND%" %JAVA_OPTS% "-Ducoinj.log.file=%APP_LOG_FILE%" ""-Ducoinj-elasticsearch.config=%APP_CONF_FILE%" -Djna.nosys=true -jar ${project.build.finalName}.${project.packaging} %1 %2 %3 %4 %5 %6 %7 %8 %9 set exitcode=%ERRORLEVEL% echo Stop with exitcode: %exitcode% cd %OLDDIR% diff --git a/ucoinj-elasticsearch/src/main/assembly/min/ucoinj-elasticsearch.sh b/ucoinj-elasticsearch/src/main/assembly/min/ucoinj-elasticsearch.sh index 87e0688ae9be02e7e3f5495aff1276a62c42af66..eca5d4529b000ff4f36435a955c3883322c11ef0 100644 --- a/ucoinj-elasticsearch/src/main/assembly/min/ucoinj-elasticsearch.sh +++ b/ucoinj-elasticsearch/src/main/assembly/min/ucoinj-elasticsearch.sh @@ -5,7 +5,8 @@ export JAVA_HOME=/usr/lib/jvm/default-java export APP_BASEDIR=$(pwd) export JAVA_COMMAND=$JAVA_HOME/bin/java -export APP_LOG_FILE=$APP_BASEDIR/data/${project.artifactId}-${project.version}.log +export APP_LOG_FILE=$APP_BASEDIR/logs/${project.artifactId}-${project.version}.log +export APP_CONF_FILE=$APP_BASEDIR/ucoinj.config cd $APP_BASEDIR @@ -25,7 +26,7 @@ if [ -d $JAVA_HOME ]; then echo "launch java" echo "java command: $JAVA_COMMAND" - $JAVA_COMMAND $MEMORY $APP_JVM_OPTS -Ducoinj.log.file=$APP_LOG_FILE -Djna.nosys=true -jar ${project.build.finalName}.${project.packaging} $* + $JAVA_COMMAND $MEMORY $APP_JVM_OPTS -Ducoinj.log.file=$APP_LOG_FILE -Ducoinj-elasticsearch.config=$APP_CONF_FILE -Djna.nosys=true -jar ${project.build.finalName}.${project.packaging} $* exitcode=$? echo "Stop ${project.name} with exitcode: $exitcode" exit $exitcode diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/Main.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/Main.java index 604a940b86c217e99fae003dca900554e228b918..de9d52aeff757a18b3bb3ec51e7c59d2bbaee791 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/Main.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/Main.java @@ -25,7 +25,6 @@ package io.ucoin.ucoinj.elasticsearch; import com.google.common.collect.Lists; import io.ucoin.ucoinj.core.exception.TechnicalException; -import io.ucoin.ucoinj.core.util.CollectionUtils; import io.ucoin.ucoinj.core.util.CommandLinesUtils; import io.ucoin.ucoinj.core.util.StringUtils; import io.ucoin.ucoinj.elasticsearch.config.Configuration; @@ -43,11 +42,19 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.util.*; public class Main { + private static String TITLE_SEPARATOR_LINE = "************************************************\n"; + private static String TITLE_EMPTY_LINE = "*\n"; + private static String TITLE = TITLE_SEPARATOR_LINE + + TITLE_EMPTY_LINE + + "* %s\n" // title + + TITLE_EMPTY_LINE + + "* %s\n" // sub-title + + TITLE_EMPTY_LINE + TITLE_SEPARATOR_LINE; + private static final Logger log = LoggerFactory.getLogger(Main.class); public static void main(String[] args) { @@ -69,7 +76,7 @@ public class Main { arguments.removeAll(Arrays.asList(ConfigurationAction.HELP.aliases)); // Could override config file name (useful for dev) - String configFile = "ucoinj.config"; + String configFile = "ucoinj-elasticsearch.config"; if (System.getProperty(configFile) != null) { configFile = System.getProperty(configFile); configFile = configFile.replaceAll("\\\\", "/"); @@ -114,8 +121,13 @@ public class Main { // If scheduling is running, wait quit instruction if (!quit) { + while (!quit) { - String userInput = CommandLinesUtils.readInput("*** uCoinj :: Elasticsearch successfully started *** >> To quit, press [Q] or enter\n", "Q", true); + String userInput = CommandLinesUtils.readInput( + String.format(TITLE, + "uCoinj :: Elasticsearch successfully started", + ">> To quit, press [Q] or [enter]"), + "Q", true); quit = StringUtils.isNotBlank(userInput) && "Q".equalsIgnoreCase(userInput); } } diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/action/IndexerAction.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/action/IndexerAction.java index 3f2f9709603586a82c6e0c0cd38c318af61b420d..8b968b296bff9a55e2cc23539105ccd0e4e0563c 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/action/IndexerAction.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/action/IndexerAction.java @@ -25,8 +25,10 @@ package io.ucoin.ucoinj.elasticsearch.action; */ import io.ucoin.ucoinj.core.client.model.bma.BlockchainParameters; +import io.ucoin.ucoinj.core.client.model.bma.gson.GsonUtils; import io.ucoin.ucoinj.core.client.model.local.Peer; import io.ucoin.ucoinj.core.client.service.bma.BlockchainRemoteService; +import io.ucoin.ucoinj.core.util.websocket.WebsocketClientEndpoint; import io.ucoin.ucoinj.elasticsearch.config.Configuration; import io.ucoin.ucoinj.elasticsearch.service.BlockIndexerService; import io.ucoin.ucoinj.elasticsearch.service.ServiceLocator; @@ -38,18 +40,28 @@ public class IndexerAction { /* Logger */ private static final Logger log = LoggerFactory.getLogger(IndexerAction.class); - public void indexLastBlocks() { + public void indexation() { - boolean async = ServiceLocator.instance().getElasticSearchService().isNodeInstance(); + final boolean async = ServiceLocator.instance().getElasticSearchService().isNodeInstance(); Runnable runnable = new Runnable() { @Override public void run() { Configuration config = Configuration.instance(); - Peer peer = checkConfigAndGetPeer(config); - BlockIndexerService blockIndexerService = ServiceLocator.instance().getBlockIndexerService(); - + final Peer peer = checkConfigAndGetPeer(config); + final BlockIndexerService blockIndexerService = ServiceLocator.instance().getBlockIndexerService(); blockIndexerService.indexLastBlocks(peer); + + if (async) { + ServiceLocator.instance().getBlockchainRemoteService().addNewBlockListener(peer, new WebsocketClientEndpoint.MessageHandler() { + @Override + public void handleMessage(String message) { + String currencyName = GsonUtils.getValueFromJSONAsString(message, "currency"); + blockIndexerService.indexBlockAsJson(peer, message, true /*refresh*/, true /*wait*/); + blockIndexerService.indexCurrentBlockAsJson(currencyName, message, true /*wait*/); + } + }); + } } }; @@ -64,7 +76,7 @@ public class IndexerAction { } } - public void resetAllBlocks() { + public void resetData() { BlockchainRemoteService blockchainService = ServiceLocator.instance().getBlockchainRemoteService(); BlockIndexerService indexerService = ServiceLocator.instance().getBlockIndexerService(); Configuration config = Configuration.instance(); diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/Configuration.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/Configuration.java index 8434a9eb3340b6bf044de67bd96f0bb8e01bee91..080319072c99262b492cacb2e033d31fa516eac6 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/Configuration.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/Configuration.java @@ -208,6 +208,19 @@ public class Configuration { return result; } + /** @return {@link ConfigurationOption#PLUGINS_DIRECTORY} value */ + public File getPluginsDirectory() { + File result = applicationConfig.getOptionAsFile(ConfigurationOption.PLUGINS_DIRECTORY.getKey()); + return result; + } + + /** @return {@link ConfigurationOption#MODE} value */ + public boolean isFullMode() { + String launchMode = applicationConfig.getOption(ConfigurationOption.LAUNCH_MODE.getKey()); + return "full".equals(launchMode); + } + + public ApplicationConfig getApplicationConfig() { return applicationConfig; } diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationAction.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationAction.java index 13572ec11812976ecd7bbb8213dfbd7ecbbb326e..4ad8e536e250bcac5d6ba86039b9f58ab0ff20a3 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationAction.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationAction.java @@ -35,9 +35,9 @@ public enum ConfigurationAction implements ConfigActionDef { START(NodeAction.class.getName() + "#start", "start"), - INDEX_BLOCKS(IndexerAction.class.getName() + "#indexLastBlocks", "index"), + INDEX_BLOCKS(IndexerAction.class.getName() + "#indexation", "index"), - RESET_BLOCKS(IndexerAction.class.getName() + "#resetAllBlocks", "reset-data"); + RESET_BLOCKS(IndexerAction.class.getName() + "#resetData", "reset-data"); public String action; public String[] aliases; diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationOption.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationOption.java index f502fecb716f6134b89b35bd8921a81263fa8e3b..2cb40ca9a704eb41d4b2d378ce340f191e08b099 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationOption.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/config/ConfigurationOption.java @@ -59,6 +59,18 @@ public enum ConfigurationOption implements ConfigOptionDef { "${ucoinj.basedir}/data", File.class), + PLUGINS_DIRECTORY( + "ucoinj.plugins.directory", + n("ucoinj.config.option.plugins.directory.description"), + "${ucoinj.basedir}/plugins", + File.class), + + LAUNCH_MODE( + "ucoinj.launch.mode", + n("ucoinj.config.option.launch.mode.description"), + "dev", + String.class), + I18N_DIRECTORY( "ucoinj.i18n.directory", n("ucoinj.config.option.i18n.directory.description"), diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BaseIndexerService.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BaseIndexerService.java index 9c44d15c696c1b5c08872ce6363d7bb8d8a218dc..165955558bcb0b6abfeced3cf68783045c653631 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BaseIndexerService.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BaseIndexerService.java @@ -23,13 +23,11 @@ package io.ucoin.ucoinj.elasticsearch.service; */ -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.ucoin.ucoinj.core.beans.Bean; import io.ucoin.ucoinj.core.beans.InitializingBean; import io.ucoin.ucoinj.core.exception.TechnicalException; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder; -import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.client.Client; diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BlockIndexerService.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BlockIndexerService.java index 963376a15233e93ddfd40ccc279d9bedeb0d0401..831da9d84ab9bdf217277b0d7c9af5692b5dce2c 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BlockIndexerService.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/BlockIndexerService.java @@ -85,6 +85,8 @@ public class BlockIndexerService extends BaseIndexerService { public static final String INDEX_TYPE_BLOCK = "block"; + public static final String INDEX_BLOCK_CURRENT_ID = "current"; + private static final int SYNC_MISSING_BLOCK_MAX_RETRY = 5; private CurrencyIndexerService currencyIndexerService; @@ -132,8 +134,8 @@ public class BlockIndexerService extends BaseIndexerService { BlockchainParameters parameter = blockchainService.getParameters(peer); if (parameter == null) { progressionModel.setStatus(ProgressionModel.Status.FAILED); - log.error(String.format("Could not connect to node [%s:%s]", - config.getNodeBmaHost(), config.getNodeBmaPort())); + log.error(String.format("Could not connect to node [%s]", + peer.getUrl())); return; } String currencyName = parameter.getCurrency(); @@ -159,6 +161,11 @@ public class BlockIndexerService extends BaseIndexerService { if (currentBlock != null) { int maxBlockNumber = currentBlock.getNumber(); + // DEV mode + if (!config.isFullMode()) { + maxBlockNumber = 5000; + } + // Get the last indexed block number int startNumber = 0; @@ -185,7 +192,6 @@ public class BlockIndexerService extends BaseIndexerService { ? indexBlocksUsingBulk(peer, currencyName, startNumber, maxBlockNumber, progressionModel) : indexBlocksNoBulk(peer, currencyName, startNumber, maxBlockNumber, progressionModel); - // If some blocks are missing, try to get it using other peers if (CollectionUtils.isNotEmpty(missingBlocks)) { progressionModel.setTask(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.otherPeers.task", currencyName)); @@ -203,7 +209,7 @@ public class BlockIndexerService extends BaseIndexerService { } else { if (log.isDebugEnabled()) { - log.debug(String.format("Current block from peer [%s:%s] is #%s. Index is up to date.", peer.getHost(), peer.getPort(), maxBlockNumber)); + log.debug(String.format("Current block from peer [%s] is #%s. Index is up to date.", peer.getUrl(), maxBlockNumber)); } progressionModel.setStatus(ProgressionModel.Status.SUCCESS); } @@ -346,18 +352,48 @@ public class BlockIndexerService extends BaseIndexerService { // Preparing indexation IndexRequestBuilder indexRequest = getClient().prepareIndex(currencyName, INDEX_TYPE_BLOCK) .setId(String.valueOf(number)) + .setRefresh(refresh) + .setSource(json); + + // Execute indexation + if (!wait) { + indexRequest.execute(); + } + else { + indexRequest.execute().actionGet(); + } + } + + /** + * + * @param json block as json + * @param refresh is a existing block ? + * @param wait need to wait until processed ? + */ + public void indexBlockAsJson(Peer peer, String json, boolean refresh, boolean wait) { + ObjectUtils.checkNotNull(json); + ObjectUtils.checkArgument(json.length() > 0); + + JsonAttributeParser blockNumberParser = new JsonAttributeParser("number"); + JsonAttributeParser blockCurrencyParser = new JsonAttributeParser("currency"); + + String currencyName = blockCurrencyParser.getValueAsString(json); + int number = blockNumberParser.getValueAsInt(json); + + log.info(I18n.t("ucoinj.blockIndexerService.indexBlock", currencyName, peer, number)); + + // Preparing indexation + IndexRequestBuilder indexRequest = getClient().prepareIndex(currencyName, INDEX_TYPE_BLOCK) + .setId(String.valueOf(number)) + .setRefresh(refresh) .setSource(json); // Execute indexation if (!wait) { - indexRequest - .setRefresh(refresh) - .execute(); + indexRequest.execute(); } else { - indexRequest - .setRefresh(refresh) - .execute().actionGet(); + indexRequest.execute().actionGet(); } } @@ -375,21 +411,23 @@ public class BlockIndexerService extends BaseIndexerService { // WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String) String json = gson.toJson(currentBlock); - indexCurrentBlockAsJson(currentBlock.getCurrency(), json.getBytes(), true, wait); + indexCurrentBlockAsJson(currentBlock.getCurrency(), json, wait); } /** * * @param currencyName * @param currentBlockJson block as JSON + * @pram wait need to wait until block processed ? */ - public void indexCurrentBlockAsJson(String currencyName, byte[] currentBlockJson, boolean refresh, boolean wait) { + public void indexCurrentBlockAsJson(String currencyName, String currentBlockJson, boolean wait) { ObjectUtils.checkNotNull(currentBlockJson); - ObjectUtils.checkArgument(currentBlockJson.length > 0); + ObjectUtils.checkArgument(currentBlockJson.length() > 0); // Preparing indexation IndexRequestBuilder indexRequest = getClient().prepareIndex(currencyName, INDEX_TYPE_BLOCK) - .setId("current") + .setId(INDEX_BLOCK_CURRENT_ID) + .setRefresh(true) .setSource(currentBlockJson); // Execute indexation @@ -397,9 +435,7 @@ public class BlockIndexerService extends BaseIndexerService { boolean acceptedInPool = false; while(!acceptedInPool) try { - indexRequest - .setRefresh(refresh) - .execute(); + indexRequest.execute(); acceptedInPool = true; } catch(EsRejectedExecutionException e) { @@ -413,9 +449,7 @@ public class BlockIndexerService extends BaseIndexerService { } } else { - indexRequest - .setRefresh(refresh) - .execute().actionGet(); + indexRequest.execute().actionGet(); } } @@ -484,7 +518,7 @@ public class BlockIndexerService extends BaseIndexerService { } public BlockchainBlock getCurrentBlock(String currencyName) { - return getBlockByIdStr(currencyName, "current"); + return getBlockByIdStr(currencyName, INDEX_BLOCK_CURRENT_ID); } /* -- Internal methods -- */ @@ -517,6 +551,11 @@ public class BlockIndexerService extends BaseIndexerService { .field("type", "string") .endObject() + // membersChanges + .startObject("monetaryMass") + .field("type", "string") + .endObject() + // identities: //.startObject("identities") //.endObject() @@ -607,13 +646,13 @@ public class BlockIndexerService extends BaseIndexerService { if (progressionModel.isCancel()) { progressionModel.setStatus(ProgressionModel.Status.STOPPED); if (log.isInfoEnabled()) { - log.info(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.stopped")); + log.info(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.stopped", peer)); } return missingBlockNumbers; } // Report progress - reportIndexBlocksProgress(progressionModel, firstNumber, lastNumber, curNumber); + reportIndexBlocksProgress(progressionModel, currencyName, peer, firstNumber, lastNumber, curNumber); } try { @@ -623,7 +662,7 @@ public class BlockIndexerService extends BaseIndexerService { // If last block if (curNumber == lastNumber - 1) { // update the current block - indexCurrentBlockAsJson(currencyName, blockAsJson.getBytes(), true, true /*wait*/); + indexCurrentBlockAsJson(currencyName, blockAsJson, true /*wait*/); } } catch(Throwable t) { @@ -649,7 +688,7 @@ public class BlockIndexerService extends BaseIndexerService { if (progressionModel.isCancel()) { progressionModel.setStatus(ProgressionModel.Status.STOPPED); if (log.isInfoEnabled()) { - log.info(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.stopped")); + log.info(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.stopped", currencyName, peer.getUrl())); } return missingBlockNumbers; } @@ -675,6 +714,8 @@ public class BlockIndexerService extends BaseIndexerService { // Process received blocks else { + + List<Integer> processedBlockNumbers = Lists.newArrayList(); BulkRequestBuilder bulkRequest = client.prepareBulk(); for (String blockAsJson : blocksAsJson) { int itemNumber = blockNumberParser.getValueAsInt(blockAsJson); @@ -684,15 +725,18 @@ public class BlockIndexerService extends BaseIndexerService { batchFirstNumber = itemNumber; } - // Add to bulk - bulkRequest.add(client.prepareIndex(currencyName, INDEX_TYPE_BLOCK, String.valueOf(itemNumber)) - .setRefresh(false) - .setSource(blockAsJson) - ); + if (!processedBlockNumbers.contains(itemNumber)) { + // Add to bulk + bulkRequest.add(client.prepareIndex(currencyName, INDEX_TYPE_BLOCK, String.valueOf(itemNumber)) + .setRefresh(false) + .setSource(blockAsJson) + ); + processedBlockNumbers.add(itemNumber); + } // If last block : also update the current block if (itemNumber == lastNumber) { - bulkRequest.add(client.prepareIndex(currencyName, INDEX_TYPE_BLOCK, "current") + bulkRequest.add(client.prepareIndex(currencyName, INDEX_TYPE_BLOCK, INDEX_BLOCK_CURRENT_ID) .setRefresh(true) .setSource(blockAsJson) ); @@ -709,7 +753,7 @@ public class BlockIndexerService extends BaseIndexerService { // process failures by iterating through each bulk response item for (BulkItemResponse itemResponse : bulkResponse) { boolean skip = !itemResponse.isFailed() - || Objects.equal("current", itemResponse.getId()) + || Objects.equal(INDEX_BLOCK_CURRENT_ID, itemResponse.getId()) || missingBlockNumbers.contains(Integer.parseInt(itemResponse.getId())); if (!skip) { int itemNumber = Integer.parseInt(itemResponse.getId()); @@ -724,7 +768,7 @@ public class BlockIndexerService extends BaseIndexerService { } // Report progress - reportIndexBlocksProgress(progressionModel, firstNumber, lastNumber, batchFirstNumber); + reportIndexBlocksProgress(progressionModel, currencyName, peer, firstNumber, lastNumber, batchFirstNumber); } @@ -769,7 +813,7 @@ public class BlockIndexerService extends BaseIndexerService { for(Peer childPeer: otherPeers) { if (log.isInfoEnabled()) { - log.info(String.format("Trying to get missing blocks from other peer [%s:%s]...", childPeer.getHost(), childPeer.getPort())); + log.info(String.format("[%s] Trying to get missing blocks from other peer [%s]...", currencyName, childPeer)); } try { for(String blockNumberStr: ImmutableSet.copyOf(sortedMissingBlocks)) { @@ -799,7 +843,7 @@ public class BlockIndexerService extends BaseIndexerService { String blockAsJson = blockchainRemoteService.getBlockAsJson(childPeer, blockNumber); if (StringUtils.isNotBlank(blockAsJson)) { if (debug) { - log.trace("Found missing block #%s on peer [%s:%s].", blockNumber, childPeer.getHost(), childPeer.getPort()); + log.debug(String.format("Found missing block #%s on peer [%s].", blockNumber, childPeer)); } // Index the missing block @@ -820,7 +864,7 @@ public class BlockIndexerService extends BaseIndexerService { } catch(TechnicalException e) { if (debug) { - log.debug("Error while getting blocks from peer [%s:%s]: %s. Skipping this peer.", childPeer.getHost(), childPeer.getPort(), e.getMessage()); + log.debug(String.format("Error while getting blocks from peer [%s]: %s. Skipping this peer.", childPeer), e.getMessage()); } continue; // skip this peer @@ -855,13 +899,13 @@ public class BlockIndexerService extends BaseIndexerService { return indexMissingBlocksFromOtherPeers(peer, newCurrentBlock, newMissingBlocks, tryCounter); } - protected void reportIndexBlocksProgress(ProgressionModel progressionModel, int firstNumber, int lastNumber, int curNumber) { + protected void reportIndexBlocksProgress(ProgressionModel progressionModel, String currencyName, Peer peer, int firstNumber, int lastNumber, int curNumber) { int pct = (curNumber - firstNumber) * 100 / (lastNumber - firstNumber); progressionModel.setCurrent(pct); - progressionModel.setMessage(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.progress", curNumber, lastNumber, pct)); + progressionModel.setMessage(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.progress", currencyName, peer, curNumber, lastNumber, pct)); if (log.isInfoEnabled()) { - log.info(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.progress", curNumber, lastNumber, pct)); + log.info(I18n.t("ucoinj.blockIndexerService.indexLastBlocks.progress", currencyName, peer, curNumber, lastNumber, pct)); } } diff --git a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/ElasticSearchService.java b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/ElasticSearchService.java index 5a309988187160a833a1641074bc222380e755d3..ac23afc8d55b160a05a48f4bc8c1018b486accc1 100644 --- a/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/ElasticSearchService.java +++ b/ucoinj-elasticsearch/src/main/java/io/ucoin/ucoinj/elasticsearch/service/ElasticSearchService.java @@ -187,6 +187,7 @@ public class ElasticSearchService implements Bean,InitializingBean, Closeable { .put("http.host", config.getHost()) .put("path.home", config.getBasedir()) .put("path.data", config.getDataDirectory()) + .put("path.plugins", config.getPluginsDirectory()) .build(); // Create a node builder diff --git a/ucoinj-elasticsearch/src/main/resources/i18n/ucoinj-elasticsearch_en_GB.properties b/ucoinj-elasticsearch/src/main/resources/i18n/ucoinj-elasticsearch_en_GB.properties index a3777902aa14800607f06081ce3bd4cba7949170..ae447d1c84a83379f4838ccbd6f593890e7a8ab8 100644 --- a/ucoinj-elasticsearch/src/main/resources/i18n/ucoinj-elasticsearch_en_GB.properties +++ b/ucoinj-elasticsearch/src/main/resources/i18n/ucoinj-elasticsearch_en_GB.properties @@ -1,8 +1,9 @@ ucoinj-elasticsearch.config= -ucoinj.blockIndexerService.indexLastBlocks.otherPeers.task= -ucoinj.blockIndexerService.indexLastBlocks.progress= -ucoinj.blockIndexerService.indexLastBlocks.stopped= -ucoinj.blockIndexerService.indexLastBlocks.task= +ucoinj.blockIndexerService.indexBlock=[%s] [%s] Indexing block \#%s +ucoinj.blockIndexerService.indexLastBlocks.otherPeers.task=Indexing missing blocks of [%s] from other peers +ucoinj.blockIndexerService.indexLastBlocks.progress=[%s] [%s] Indexing block \#%s / %s (%s%%)... +ucoinj.blockIndexerService.indexLastBlocks.stopped=[%s] [%s] Indexing last block - stopped +ucoinj.blockIndexerService.indexLastBlocks.task=Indexing last blocks of [%s] from peer [%s\:%s]... ucoinj.config.option.basedir.description= ucoinj.config.option.cache.directory.description= ucoinj.config.option.data.directory.description= @@ -36,6 +37,7 @@ ucoinj.config.option.node.elasticsearch.rest.url.description= ucoinj.config.option.node.host.description= ucoinj.config.option.node.port.description= ucoinj.config.option.node.protocol.description= +ucoinj.config.option.plugins.directory.description= ucoinj.config.option.taskExecutor.queueCapacity.description= ucoinj.config.option.tasks.queueCapacity.description= ucoinj.config.option.tmp.directory.description= diff --git a/ucoinj-elasticsearch/src/main/resources/i18n/ucoinj-elasticsearch_fr_FR.properties b/ucoinj-elasticsearch/src/main/resources/i18n/ucoinj-elasticsearch_fr_FR.properties index 181d65ac6132de305adecbb8685f58cc1f1f8190..1b9bcb1fd8123e19040043581ab97d51d00bc698 100644 --- a/ucoinj-elasticsearch/src/main/resources/i18n/ucoinj-elasticsearch_fr_FR.properties +++ b/ucoinj-elasticsearch/src/main/resources/i18n/ucoinj-elasticsearch_fr_FR.properties @@ -1,8 +1,9 @@ ucoinj-elasticsearch.config= +ucoinj.blockIndexerService.indexBlock=[%s] [%s] Indexing block \#%s ucoinj.blockIndexerService.indexLastBlocks.otherPeers.task=Indexing missing blocks of [%s] from other peers -ucoinj.blockIndexerService.indexLastBlocks.progress=Indexing block \#%s / %s (%s%%)... -ucoinj.blockIndexerService.indexLastBlocks.stopped=Indexing last block - stopped -ucoinj.blockIndexerService.indexLastBlocks.task=Indexing last blocks of [%s] from peer [%s\:%s] +ucoinj.blockIndexerService.indexLastBlocks.progress=[%s] [%s] Indexing block \#%s / %s (%s%%)... +ucoinj.blockIndexerService.indexLastBlocks.stopped=[%s] [%s] Indexing last block - stopped +ucoinj.blockIndexerService.indexLastBlocks.task=Indexing last blocks of [%s] from peer [%s\:%s]... ucoinj.config.option.basedir.description= ucoinj.config.option.cache.directory.description= ucoinj.config.option.data.directory.description= @@ -36,6 +37,7 @@ ucoinj.config.option.node.elasticsearch.rest.url.description= ucoinj.config.option.node.host.description= ucoinj.config.option.node.port.description= ucoinj.config.option.node.protocol.description= +ucoinj.config.option.plugins.directory.description= ucoinj.config.option.taskExecutor.queueCapacity.description= ucoinj.config.option.tasks.queueCapacity.description= ucoinj.config.option.tmp.directory.description= diff --git a/ucoinj-elasticsearch/src/test/resources/ucoinj-elasticsearch-localhost-node.properties b/ucoinj-elasticsearch/src/test/resources/ucoinj-elasticsearch-localhost-node.properties new file mode 100644 index 0000000000000000000000000000000000000000..260a6299accefae4b5d80ac1e850b6122f35e2c3 --- /dev/null +++ b/ucoinj-elasticsearch/src/test/resources/ucoinj-elasticsearch-localhost-node.properties @@ -0,0 +1,12 @@ +ucoinj.node.host=metab.ucoin.fr +ucoinj.node.port=9201 + +ucoinj.elasticsearch.embedded.enable=false +ucoinj.elasticsearch.local=fals +ucoinj.elasticsearch.http.enable=false +ucoinj.elasticsearch.cluster.name=ucoinj-elacticsearch-test + +#ucoinj.elasticsearch.cluster.name=ucoinj-elacticsearch + +ucoinj.elasticsearch.host=localhost +ucoinj.elasticsearch.port=9300 diff --git a/ucoinj-elasticsearch/src/test/resources/ucoinj-elasticsearch-test.properties b/ucoinj-elasticsearch/src/test/resources/ucoinj-elasticsearch-test.properties index 5e30d7dc793d2b07b59bafe0e970fe4d23d708de..eb9dbf279a433e34fd18601ebfe10fa809ebb0e0 100644 --- a/ucoinj-elasticsearch/src/test/resources/ucoinj-elasticsearch-test.properties +++ b/ucoinj-elasticsearch/src/test/resources/ucoinj-elasticsearch-test.properties @@ -1,10 +1,13 @@ ucoinj.node.host=metab.ucoin.fr ucoinj.node.port=9201 +ucoinj.basedir=target/es-home + +ucoinj.elasticsearch.data ucoinj.elasticsearch.embedded.enable=true -ucoinj.elasticsearch.local=true -ucoinj.elasticsearch.http.enable=false -ucoinj.elasticsearch.cluster.name=ucoinj-elacticsearch-test +ucoinj.elasticsearch.local=false +ucoinj.elasticsearch.http.enable=true +ucoinj.elasticsearch.cluster.name=ucoinj-elasticsearch #ucoinj.elasticsearch.cluster.name=ucoinj-elacticsearch