diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java index 74f7c52c806f34d0c7e77cbae911adcdec14fa00..58c92d35bd5a9f68e4a9101365df7c1c30b75777 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/BlockchainBlock.java @@ -25,7 +25,6 @@ package org.duniter.core.client.model.bma; import com.fasterxml.jackson.annotation.JsonGetter; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSetter; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; @@ -69,7 +68,7 @@ public class BlockchainBlock implements Serializable { private Joiner[] actives; private Revoked[] revoked; private String[] excluded; - private String[] certifications; + private Certification[] certifications; private Transaction[] transactions; private String signature; @@ -242,11 +241,11 @@ public class BlockchainBlock implements Serializable { this.excluded = excluded; } - public String[] getCertifications() { + public Certification[] getCertifications() { return certifications; } - public void setCertifications(String[] certifications) { + public void setCertifications(Certification[] certifications) { this.certifications = certifications; } @@ -347,8 +346,8 @@ public class BlockchainBlock implements Serializable { } s += "\ncertifications:"; if (certifications != null) { - for (String c : certifications) { - s += "\n\t" + c; + for (Certification c : certifications) { + s += "\n\t" + c.toString(); } } @@ -404,7 +403,7 @@ public class BlockchainBlock implements Serializable { @Override public String toString() { StringBuilder sb = new StringBuilder() - .append(":").append(publicKey) + .append(publicKey) .append(":").append(signature) .append(":").append(blockUid) .append("").append(userId); @@ -471,7 +470,7 @@ public class BlockchainBlock implements Serializable { public String toString() { StringBuilder sb = new StringBuilder() - .append(":").append(publicKey) + .append(publicKey) .append(":").append(signature) .append(":").append(membershipBlockUid) .append(":").append(idtyBlockUid) @@ -505,13 +504,62 @@ public class BlockchainBlock implements Serializable { public String toString() { StringBuilder sb = new StringBuilder() - .append(":").append(signature) + .append(signature) .append(":").append(userId); return sb.toString(); } } + public static class Certification implements Serializable { + private String fromPubkey; + private String toPubkey; + private String blockId; + private String signature; + + public String getFromPubkey() { + return fromPubkey; + } + + public void setFromPubkey(String fromPubkey) { + this.fromPubkey = fromPubkey; + } + + public String getToPubkey() { + return toPubkey; + } + + public void setToPubkey(String toPubkey) { + this.toPubkey = toPubkey; + } + + public String getSignature() { + return signature; + } + public void setSignature(String signature) { + this.signature = signature; + } + + public String getBlockId() { + return blockId; + } + + public void setBlockId(String blockId) { + this.blockId = blockId; + } + + @Override + public String toString() { + + StringBuilder sb = new StringBuilder() + .append(fromPubkey) + .append(":").append(toPubkey) + .append(":").append(blockId) + .append(":").append(signature); + + return sb.toString(); + } + } @JsonIgnoreProperties(ignoreUnknown = true) public static class Transaction implements Serializable { diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/CertificationDeserializer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/CertificationDeserializer.java new file mode 100644 index 0000000000000000000000000000000000000000..542130abf1f05b46b3be48cdb7272d27b395672a --- /dev/null +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/CertificationDeserializer.java @@ -0,0 +1,38 @@ +package org.duniter.core.client.model.bma.jackson; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.google.gson.JsonParseException; +import org.apache.commons.lang3.StringUtils; +import org.duniter.core.client.model.bma.BlockchainBlock; + +import java.io.IOException; + +/** + * Created by blavenie on 07/12/16. + */ +public class CertificationDeserializer extends JsonDeserializer<BlockchainBlock.Certification> { + @Override + public BlockchainBlock.Certification deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException { + String certificationStr = jp.getText(); + if (StringUtils.isBlank(certificationStr)) { + return null; + } + + String[] parts = certificationStr.split(":"); + if (parts.length != 4) { + throw new JsonParseException(String.format("Bad format for BlockchainBlock.Certification. Should have 4 parts, but found %s.", parts.length)); + } + + BlockchainBlock.Certification result = new BlockchainBlock.Certification(); + int i = 0; + + result.setFromPubkey(parts[i++]); + result.setToPubkey(parts[i++]); + result.setBlockId(parts[i++]); + result.setSignature(parts[i++]); + + return result; + } +} \ No newline at end of file diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java index dfc44541799de3170512cb0d841242abd0d900db..8f7214f1c73e2d1bb579942dc8b4ea4d83be48b8 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/JacksonUtils.java @@ -15,11 +15,17 @@ public abstract class JacksonUtils extends SimpleModule { // Configure deserializer SimpleModule module = new SimpleModule(); + + // Blockchain module.addDeserializer(BlockchainBlock.Identity.class, new IdentityDeserializer()); module.addDeserializer(BlockchainBlock.Joiner.class, new JoinerDeserializer()); module.addDeserializer(BlockchainBlock.Revoked.class, new RevokedDeserializer()); + module.addDeserializer(BlockchainBlock.Certification.class, new CertificationDeserializer()); + + // Network module.addDeserializer(NetworkPeering.Endpoint.class, new EndpointDeserializer()); + objectMapper.registerModule(module); return objectMapper; diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/RevokedDeserializer.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/RevokedDeserializer.java index ab7661f511f7aa9a87a1f0c3409f52786e17c53e..5a9f7787e6df874023d16887d36dc5ccad38a04e 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/RevokedDeserializer.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/model/bma/jackson/RevokedDeserializer.java @@ -15,21 +15,21 @@ import java.io.IOException; public class RevokedDeserializer extends JsonDeserializer<BlockchainBlock.Revoked> { @Override public BlockchainBlock.Revoked deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException { - String identityStr = jp.getText(); - if (StringUtils.isBlank(identityStr)) { + String str = jp.getText(); + if (StringUtils.isBlank(str)) { return null; } - String[] identityParts = identityStr.split(":"); - if (identityParts.length != 2) { - throw new JsonParseException(String.format("Bad format for BlockchainBlock.Revoked. Should have 4 parts, but found %s.", identityParts.length)); + String[] parts = str.split(":"); + if (parts.length != 2) { + throw new JsonParseException(String.format("Bad format for BlockchainBlock.Revoked. Should have 2 parts, but found %s.", parts.length)); } BlockchainBlock.Revoked result = new BlockchainBlock.Revoked(); int i = 0; - result.setSignature(identityParts[i++]); - result.setUserId(identityParts[i++]); + result.setSignature(parts[i++]); + result.setUserId(parts[i++]); return result; } diff --git a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java index f40c8aab73b8e0b08c7bd4891c17ae93912e42d2..ac46d141a7a8d8a9bdfe0dc5c9c91124bb1cb8f7 100644 --- a/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java +++ b/duniter4j-core-client/src/main/java/org/duniter/core/client/service/HttpServiceImpl.java @@ -35,6 +35,7 @@ import org.apache.http.client.methods.HttpUriRequest; import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; import org.duniter.core.beans.InitializingBean; import org.duniter.core.client.config.Configuration; import org.duniter.core.client.model.bma.Error; @@ -150,7 +151,9 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean } public <T> T executeRequest(HttpUriRequest request, Class<? extends T> resultClass, Class<?> errorClass) { - return executeRequest(httpClientCache.get(0), request, resultClass, errorClass); + //return executeRequest(httpClientCache.get(0), request, resultClass, errorClass); + return executeRequest( createHttpClient(0), request, resultClass, errorClass); + } public <T> T executeRequest(String absolutePath, Class<? extends T> resultClass) { @@ -227,7 +230,7 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean protected <T> T executeRequest(HttpClient httpClient, HttpUriRequest request, Class<? extends T> resultClass, Class<?> errorClass) { T result = null; - if (log.isDebugEnabled()) { + if (debug) { log.debug("Executing request : " + request.getRequestLine()); } @@ -235,15 +238,19 @@ public class HttpServiceImpl implements HttpService, Closeable, InitializingBean try { response = httpClient.execute(request); - if (log.isDebugEnabled()) { + if (debug) { log.debug("Received response : " + response.getStatusLine()); } switch (response.getStatusLine().getStatusCode()) { case HttpStatus.SC_OK: { - result = (T) parseResponse(response, resultClass); - - response.getEntity().consumeContent(); + if (resultClass == null || resultClass.equals(HttpResponse.class)) { + result = (T)response; + } + else { + result = (T) parseResponse(response, resultClass); + EntityUtils.consume(response.getEntity()); + } break; } case HttpStatus.SC_UNAUTHORIZED: diff --git a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml index 4fd157adf2cc1bd3aeb45f7ee7f6979f735cbec2..9f6a6d72475d825809e40a5d2275112f4101bef4 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/elasticsearch.yml @@ -127,10 +127,10 @@ duniter.blockchain.sync.enable: true # #duniter.host: cgeek.fr #duniter.port: 9330 -#duniter.host: test-net.duniter.fr -#duniter.port: 9201 -duniter.host: 192.168.0.28 -duniter.port: 21378 +duniter.host: test-net.duniter.fr +duniter.port: 9201 +#duniter.host: 192.168.0.28 +#duniter.port: 21378 # # ---------------------------------- Duniter4j security ------------------------- # @@ -139,7 +139,8 @@ duniter.keyring.password: def # Enable security, to disable HTTP access to the default ES admin API # -duniter.security.enable: true +#duniter.security.enable: true +duniter.security.enable: false # # Security token prefix (default: 'duniter-') # diff --git a/duniter4j-es-assembly/src/test/es-home/config/logging.yml b/duniter4j-es-assembly/src/test/es-home/config/logging.yml index e44874c64958a80b5508845964a9201d30bdea6b..ec1d38e7ebbfc88372cdca706c3a38636f796976 100644 --- a/duniter4j-es-assembly/src/test/es-home/config/logging.yml +++ b/duniter4j-es-assembly/src/test/es-home/config/logging.yml @@ -19,11 +19,12 @@ logger: org.duniter: INFO org.duniter.elasticsearch: DEBUG + org.duniter.core.client.service: DEBUG duniter : DEBUG duniter.security : ERROR duniter.user.event : INFO - duniter.network.p2p: INFO + duniter.network.p2p: TRACE security: DEBUG diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java index f73d427eac6f4dc0990e437d1dafcf557fce9bac..fc0fdef148d4459324afb5806d40c12d67bcf677 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractService.java @@ -31,6 +31,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import com.google.gson.JsonSyntaxException; import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.concurrent.ConcurrentException; import org.duniter.core.beans.Bean; import org.duniter.core.client.model.elasticsearch.Record; import org.duniter.core.exception.TechnicalException; @@ -70,6 +71,7 @@ import org.nuiton.i18n.I18n; import java.io.*; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.regex.Pattern; /** @@ -535,7 +537,8 @@ public abstract class AbstractService implements Bean { protected void flushDeleteBulk(final String index, final String type, BulkRequestBuilder bulkRequest) { if (bulkRequest.numberOfActions() > 0) { - BulkResponse bulkResponse = bulkRequest.get(); + + BulkResponse bulkResponse = bulkRequest.execute().actionGet(); // If failures, continue but save missing blocks if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java index f6218ef80b545e20f20e05790506a013a418cacc..4a82e385ee766255633262d2c89c87df6d4a0048 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/AbstractSynchroService.java @@ -169,149 +169,147 @@ public abstract class AbstractSynchroService extends AbstractService { } // Execute query - InputStream response; + JsonNode node; try { HttpPost httpPost = new HttpPost(httpService.getPath(peer, fromIndex, fromType, "_search")); httpPost.setEntity(new ByteArrayEntity(bos.bytes().array())); if (logger.isDebugEnabled()) { logger.debug(String.format("[%s] [%s/%s] Sending POST request: %s", peer, fromIndex, fromType, new String(bos.bytes().array()))); } - response = httpService.executeRequest(httpPost, InputStream.class, String.class); + // Parse response + node = httpService.executeRequest(httpPost, JsonNode.class, String.class); } catch(HttpUnauthorizeException e) { logger.error(String.format("[%s] [%s/%s] Unable to access (%s). Skipping data import.", peer, fromIndex, fromType, e.getMessage())); return 0; } + catch(TechnicalException e) { + throw new TechnicalException("Unable to parse search response", e); + } - // Parse response - try { - JsonNode node = objectMapper.readTree(response); + node = node.get("hits"); + int total = node == null ? 0 : node.get("total").asInt(0); + if (logger.isDebugEnabled()) { + logger.debug(String.format("[%s] [%s/%s] total to update: %s", peer, toIndex, toType, total)); + } - node = node.get("hits"); - int total = node == null ? 0 : node.get("total").asInt(0); - if (logger.isDebugEnabled()) { - logger.debug(String.format("[%s] [%s/%s] total to update: %s", peer, toIndex, toType, total)); - } + boolean debug = logger.isTraceEnabled(); - boolean debug = logger.isTraceEnabled(); + long counter = 0; - long counter = 0; + long insertHits = 0; + long updateHits = 0; - long insertHits = 0; - long updateHits = 0; + if (offset < total) { - if (offset < total) { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + bulkRequest.setRefresh(true); - BulkRequestBuilder bulkRequest = client.prepareBulk(); - bulkRequest.setRefresh(true); + for (Iterator<JsonNode> hits = node.get("hits").iterator(); hits.hasNext();){ + JsonNode hit = hits.next(); + String id = hit.get("_id").asText(); + JsonNode source = hit.get("_source"); + counter++; - for (Iterator<JsonNode> hits = node.get("hits").iterator(); hits.hasNext();){ - JsonNode hit = hits.next(); - String id = hit.get("_id").asText(); - JsonNode source = hit.get("_source"); - counter++; + try { + String issuer = source.get(issuerFieldName).asText(); + if (StringUtils.isBlank(issuer)) { + throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", issuerFieldName)); + } + Long version = source.get(versionFieldName).asLong(); + if (version == null) { + throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", versionFieldName)); + } - try { - String issuer = source.get(issuerFieldName).asText(); - if (StringUtils.isBlank(issuer)) { - throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", issuerFieldName)); - } - Long version = source.get(versionFieldName).asLong(); - if (version == null) { - throw new InvalidFormatException(String.format("Invalid format: missing or null %s field.", versionFieldName)); + GetResponse existingDoc = client.prepareGet(toIndex, toType, id) + .setFields(versionFieldName, issuerFieldName) + .execute().actionGet(); + + boolean doInsert = !existingDoc.isExists(); + + // Insert (new doc) + if (doInsert) { + String json = source.toString(); + //readAndVerifyIssuerSignature(json, source); + if (debug) { + logger.trace(String.format("[%s] [%s/%s] insert _id=%s\n%s", peer, toIndex, toType, id, json)); } + bulkRequest.add(client.prepareIndex(toIndex, toType, id) + .setSource(json.getBytes()) + ); + insertHits++; + } - GetResponse existingDoc = client.prepareGet(toIndex, toType, id) - .setFields(versionFieldName, issuerFieldName) - .execute().actionGet(); + // Existing doc + else { - boolean doInsert = !existingDoc.isExists(); + // Check same issuer + String existingIssuer = (String)existingDoc.getFields().get(issuerFieldName).getValue(); + if (!Objects.equals(issuer, existingIssuer)) { + throw new InvalidFormatException(String.format("Invalid document: not same [%s].", issuerFieldName)); + } - // Insert (new doc) - if (doInsert) { + // Check version + Long existingVersion = ((Number)existingDoc.getFields().get(versionFieldName).getValue()).longValue(); + boolean doUpdate = (existingVersion == null || version > existingVersion.longValue()); + + if (doUpdate) { String json = source.toString(); //readAndVerifyIssuerSignature(json, source); if (debug) { - logger.trace(String.format("[%s] [%s/%s] insert _id=%s\n%s", peer, toIndex, toType, id, json)); + logger.trace(String.format("[%s] [%s/%s] update _id=%s\n%s", peer, toIndex, toType, id, json)); } bulkRequest.add(client.prepareIndex(toIndex, toType, id) - .setSource(json.getBytes()) - ); - insertHits++; - } - - // Existing doc - else { - - // Check same issuer - String existingIssuer = (String)existingDoc.getFields().get(issuerFieldName).getValue(); - if (!Objects.equals(issuer, existingIssuer)) { - throw new InvalidFormatException(String.format("Invalid document: not same [%s].", issuerFieldName)); - } - - // Check version - Long existingVersion = ((Number)existingDoc.getFields().get(versionFieldName).getValue()).longValue(); - boolean doUpdate = (existingVersion == null || version > existingVersion.longValue()); - - if (doUpdate) { - String json = source.toString(); - //readAndVerifyIssuerSignature(json, source); - if (debug) { - logger.trace(String.format("[%s] [%s/%s] update _id=%s\n%s", peer, toIndex, toType, id, json)); - } - bulkRequest.add(client.prepareIndex(toIndex, toType, id) - .setSource(json.getBytes())); + .setSource(json.getBytes())); - updateHits++; - } + updateHits++; } - - } catch (InvalidFormatException e) { - if (debug) { - logger.debug(String.format("[%s] [%s/%s] %s. Skipping.", peer, toIndex, toType, e.getMessage())); - } - // Skipping document (continue) } - catch (Exception e) { - logger.warn(String.format("[%s] [%s/%s] %s. Skipping.", peer, toIndex, toType, e.getMessage()), e); - // Skipping document (continue) + + } catch (InvalidFormatException e) { + if (debug) { + logger.debug(String.format("[%s] [%s/%s] %s. Skipping.", peer, toIndex, toType, e.getMessage())); } + // Skipping document (continue) + } + catch (Exception e) { + logger.warn(String.format("[%s] [%s/%s] %s. Skipping.", peer, toIndex, toType, e.getMessage()), e); + // Skipping document (continue) } + } + + if (bulkRequest.numberOfActions() > 0) { + + // Flush the bulk if not empty + BulkResponse bulkResponse = bulkRequest.get(); + Set<String> missingDocIds = new LinkedHashSet<>(); - if (bulkRequest.numberOfActions() > 0) { - - // Flush the bulk if not empty - BulkResponse bulkResponse = bulkRequest.get(); - Set<String> missingDocIds = new LinkedHashSet<>(); - - // If failures, continue but save missing blocks - if (bulkResponse.hasFailures()) { - // process failures by iterating through each bulk response item - for (BulkItemResponse itemResponse : bulkResponse) { - boolean skip = !itemResponse.isFailed() - || missingDocIds.contains(itemResponse.getId()); - if (!skip) { - if (debug) { - logger.debug(String.format("[%s] [%s/%s] could not process _id=%s: %s. Skipping.", peer, toIndex, toType, itemResponse.getId(), itemResponse.getFailureMessage())); - } - missingDocIds.add(itemResponse.getId()); + // If failures, continue but save missing blocks + if (bulkResponse.hasFailures()) { + // process failures by iterating through each bulk response item + for (BulkItemResponse itemResponse : bulkResponse) { + boolean skip = !itemResponse.isFailed() + || missingDocIds.contains(itemResponse.getId()); + if (!skip) { + if (debug) { + logger.debug(String.format("[%s] [%s/%s] could not process _id=%s: %s. Skipping.", peer, toIndex, toType, itemResponse.getId(), itemResponse.getFailureMessage())); } + missingDocIds.add(itemResponse.getId()); } } } } + } - // update result stats - result.addInserts(toIndex, toType, insertHits); - result.addUpdates(toIndex, toType, updateHits); + // update result stats + result.addInserts(toIndex, toType, insertHits); + result.addUpdates(toIndex, toType, updateHits); - return counter; + return counter; - } catch(IOException e) { - throw new TechnicalException("Unable to parse search response", e); - } + /*} finally { - IOUtils.closeQuietly(response); - } + //IOUtils.closeQuietly(response); + }*/ } } diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java index 5ee621cf4cbbf64e4cac8b4b70400fa6f7196300..e6bc12cd43e83ed71577c5dc3193e9b081a15e7a 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/BlockchainService.java @@ -815,7 +815,7 @@ public class BlockchainService extends AbstractService { if (!processedBlockNumbers.contains(itemNumber)) { // Add to bulk bulkRequest.add(client.prepareIndex(currencyName, BLOCK_TYPE, String.valueOf(itemNumber)) - .setRefresh(false) + .setRefresh(false) // recommended for heavy indexing .setSource(blockAsJson) ); processedBlockNumbers.add(itemNumber); @@ -935,7 +935,7 @@ public class BlockchainService extends AbstractService { } // Index the missing block - indexBlockFromJson(currencyName, blockNumber, blockAsJson.getBytes(), false, true/*wait*/); + indexBlockFromJson(currencyName, blockNumber, blockAsJson.getBytes(), false/*refresh*/, true/*wait*/); // Remove this block number from the final missing list newMissingBlocks.remove(blockNumber); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java index 9612444ee283c20f76bf5e3f4c609ff6c48533fa..fd377ed5be4acb49d245acff3519979cbc9470d0 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/ServiceModule.java @@ -37,6 +37,7 @@ import org.duniter.core.service.CryptoService; import org.duniter.elasticsearch.PluginInit; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; @@ -47,6 +48,7 @@ public class ServiceModule extends AbstractModule implements Module { // common services bind(PluginSettings.class).asEagerSingleton(); + bind(ThreadPool.class).asEagerSingleton(); bind(PluginInit.class).asEagerSingleton(); bind(ChangeService.class).asEagerSingleton(); diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java index c31a804d39a2107d95665d47eb64fca4027a1cbe..2e078f3f3635e626a88d6df20e930ff8df271743 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/service/changes/ChangeService.java @@ -102,6 +102,12 @@ public class ChangeService { addChange(change); } + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + + return delete; + } + @Override public void postDelete(Engine.Delete delete) { if (!hasListener(indexName, delete.type(), delete.id())) { diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ScheduledActionFuture.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ScheduledActionFuture.java new file mode 100644 index 0000000000000000000000000000000000000000..45819a977e09bd3df2c7580688e0b9cda3a5076c --- /dev/null +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ScheduledActionFuture.java @@ -0,0 +1,102 @@ +package org.duniter.elasticsearch.threadpool; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class ScheduledActionFuture<T> implements ActionFuture<T> { + + private final ScheduledFuture<T> delegate; + + public ScheduledActionFuture(ScheduledFuture<T> delegate) { + this.delegate = delegate; + } + + @Override + public T actionGet() { + try { + return get(); + } catch (InterruptedException e) { + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + throw rethrowExecutionException(e); + } + } + + @Override + public T actionGet(String timeout) { + return actionGet(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".actionGet.timeout")); + } + + @Override + public T actionGet(long timeoutMillis) { + return actionGet(timeoutMillis, TimeUnit.MILLISECONDS); + } + + @Override + public T actionGet(TimeValue timeout) { + return actionGet(timeout.millis(), TimeUnit.MILLISECONDS); + } + + @Override + public T actionGet(long timeout, TimeUnit unit) { + try { + return get(timeout, unit); + } catch (TimeoutException e) { + throw new ElasticsearchTimeoutException(e.getMessage()); + } catch (InterruptedException e) { + throw new IllegalStateException("Future got interrupted", e); + } catch (ExecutionException e) { + throw rethrowExecutionException(e); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return delegate.cancel(mayInterruptIfRunning); + } + + @Override + public boolean isCancelled() { + return delegate.isCancelled(); + } + + @Override + public boolean isDone() { + return delegate.isDone(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return delegate.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return delegate.get(timeout, unit); + } + + static RuntimeException rethrowExecutionException(ExecutionException e) { + if (e.getCause() instanceof ElasticsearchException) { + ElasticsearchException esEx = (ElasticsearchException) e.getCause(); + Throwable root = esEx.unwrapCause(); + if (root instanceof ElasticsearchException) { + return (ElasticsearchException) root; + } else if (root instanceof RuntimeException) { + return (RuntimeException) root; + } + return new UncategorizedExecutionException("Failed execution", root); + } else if (e.getCause() instanceof RuntimeException) { + return (RuntimeException) e.getCause(); + } else { + return new UncategorizedExecutionException("Failed execution", e); + } + } +} \ No newline at end of file diff --git a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java index 0be7bc3a2a4ab1d1b70fb3b010a9c8a77d9b49c9..bc19dcca85bbef81425b3d2c274596538bca3d8c 100644 --- a/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java +++ b/duniter4j-es-core/src/main/java/org/duniter/elasticsearch/threadpool/ThreadPool.java @@ -39,6 +39,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsAbortPolicy; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.LoggingRunnable; import org.elasticsearch.transport.TransportService; import org.nuiton.i18n.I18n; @@ -52,20 +53,26 @@ import java.util.concurrent.*; public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { private ScheduledThreadPoolExecutor scheduler = null; - private Injector injector; - private ESLogger logger = Loggers.getLogger("threadpool"); + private final Injector injector; + private final ESLogger logger = Loggers.getLogger("threadpool"); + + private final org.elasticsearch.threadpool.ThreadPool delegate; private final List<Runnable> afterStartedCommands; @Inject public ThreadPool(Settings settings, - Injector injector + Injector injector, + org.elasticsearch.threadpool.ThreadPool esThreadPool ) { super(settings); this.injector = injector; this.afterStartedCommands = Lists.newArrayList(); - this.scheduler = new ScheduledThreadPoolExecutor(1, EsExecutors.daemonThreadFactory(settings, "duniter4j-scheduler"), new EsAbortPolicy()); + this.delegate = esThreadPool; + + int availableProcessors = EsExecutors.boundedNumberOfProcessors(settings); + this.scheduler = new ScheduledThreadPoolExecutor(availableProcessors, EsExecutors.daemonThreadFactory(settings, "duniter-scheduler"), new EsAbortPolicy()); this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); this.scheduler.setRemoveOnCancelPolicy(true); @@ -78,9 +85,7 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { if (!afterStartedCommands.isEmpty()) { scheduleOnStarted(() -> { - for (Runnable command: afterStartedCommands) { - command.run(); - } + afterStartedCommands.forEach(command -> command.run()); this.afterStartedCommands.clear(); }); } @@ -88,15 +93,10 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { public void doStop(){ scheduler.shutdown(); - // TODO : cancel all aiting jobs } public void doClose() {} - public ScheduledExecutorService scheduler() { - return this.scheduler; - } - /** * Schedules an rest when node is started (all services and modules ready) * @@ -132,19 +132,36 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { * @param command the rest to take * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled */ - public ScheduledFuture<?> schedule(Runnable command) { - return scheduler.schedule(new LoggingRunnable(command), 0, TimeUnit.MILLISECONDS); + public ScheduledActionFuture<?> schedule(Runnable command) { + return schedule(command, new TimeValue(0)); } /** * Schedules an rest that runs on the scheduler thread, after a delay. * * @param command the rest to take - * @param interval the delay interval + * @param name @see {@link org.elasticsearch.threadpool.ThreadPool.Names} + * @param delay the delay interval + * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled + */ + public ScheduledActionFuture<?> schedule(Runnable command, String name, TimeValue delay) { + if (name == null) { + return new ScheduledActionFuture<>(scheduler.schedule(new LoggingRunnable(logger, command), delay.millis(), TimeUnit.MILLISECONDS)); + } + return new ScheduledActionFuture<>(delegate.schedule(delay, + name, + command)); + } + + /** + * Schedules an rest that runs on the scheduler thread, after a delay. + * + * @param command the rest to take + * @param delay the delay interval * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled */ - public ScheduledFuture<?> schedule(Runnable command, TimeValue interval) { - return scheduler.schedule(new LoggingRunnable(command), interval.millis(), TimeUnit.MILLISECONDS); + public ScheduledActionFuture<?> schedule(Runnable command, TimeValue delay) { + return schedule(command, null, delay); } /** @@ -154,14 +171,14 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { * @param interval the delay interval * @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled */ - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) { - return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS); + public ScheduledActionFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) { + return new ScheduledActionFuture<>(delegate.scheduleWithFixedDelay(command, interval)); } /* -- protected methods -- */ - protected <T extends LifecycleComponent<T>> ScheduledFuture<?> scheduleAfterServiceState(Class<T> waitingServiceClass, + protected <T extends LifecycleComponent<T>> ScheduledActionFuture<?> scheduleAfterServiceState(Class<T> waitingServiceClass, final Lifecycle.State waitingState, final Runnable job) { Preconditions.checkNotNull(waitingServiceClass); @@ -216,39 +233,4 @@ public class ThreadPool extends AbstractLifecycleComponent<ThreadPool> { return canContinue; } - /* -- internal methods -- */ - - class LoggingRunnable implements Runnable { - - private final Runnable runnable; - - LoggingRunnable(Runnable runnable) { - this.runnable = runnable; - } - - @Override - public void run() { - try { - runnable.run(); - } catch (Throwable t) { - logger.warn("failed to run {}", t, runnable.toString()); - throw t; - } - } - - @Override - public int hashCode() { - return runnable.hashCode(); - } - - @Override - public boolean equals(Object obj) { - return runnable.equals(obj); - } - - @Override - public String toString() { - return "[threaded] " + runnable.toString(); - } - } } diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/model/event/GchangeEventCodes.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/model/event/GchangeEventCodes.java index 37bbd68db892a64c22df5d4bdbaee087d49728ae..2cdf6e3ab78e07d80bc85d893bb83deeaeef88cb 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/model/event/GchangeEventCodes.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/model/event/GchangeEventCodes.java @@ -5,5 +5,6 @@ package org.duniter.elasticsearch.gchange.model.event; */ public enum GchangeEventCodes { - NEW_COMMENT + NEW_COMMENT, + UPDATE_COMMENT } diff --git a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentUserEventService.java b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentUserEventService.java index 9b12c5592d826eb969266c793e04d080c7a3706c..2605a9a74aaf6774ff2f58c520225ebf66c10913 100644 --- a/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentUserEventService.java +++ b/duniter4j-es-gchange/src/main/java/org/duniter/elasticsearch/gchange/service/CommentUserEventService.java @@ -23,21 +23,16 @@ package org.duniter.elasticsearch.gchange.service; */ -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import org.apache.commons.collections4.MapUtils; import org.duniter.core.client.model.ModelUtils; -import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.jackson.JacksonUtils; import org.duniter.core.client.model.elasticsearch.RecordComment; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; -import org.duniter.core.util.CollectionUtils; import org.duniter.core.util.websocket.WebsocketClientEndpoint; import org.duniter.elasticsearch.PluginSettings; -import org.duniter.elasticsearch.exception.DocumentNotFoundException; import org.duniter.elasticsearch.gchange.model.MarketRecord; import org.duniter.elasticsearch.gchange.model.event.GchangeEventCodes; import org.duniter.elasticsearch.service.AbstractService; @@ -118,13 +113,13 @@ public class CommentUserEventService extends AbstractService implements ChangeSe case CREATE: if (change.getSource() != null) { RecordComment comment = objectMapper.readValue(change.getSource().streamInput(), RecordComment.class); - processCommentIndex(change.getIndex(), MarketService.RECORD_TYPE, change.getId(), comment, true/*is new*/); + processCreateComment(change.getIndex(), MarketService.RECORD_TYPE, change.getId(), comment); } break; case INDEX: if (change.getSource() != null) { RecordComment comment = objectMapper.readValue(change.getSource().streamInput(), RecordComment.class); - processCommentIndex(change.getIndex(), MarketService.RECORD_TYPE, change.getId(), comment, false/*is new*/); + processUpdateComment(change.getIndex(), MarketService.RECORD_TYPE, change.getId(), comment); } break; @@ -139,8 +134,6 @@ public class CommentUserEventService extends AbstractService implements ChangeSe catch(IOException e) { throw new TechnicalException(String.format("Unable to parse received comment %s", change.getId()), e); } - - //logger.info("receiveing block change: " + change.toJson()); } @Override @@ -192,7 +185,7 @@ public class CommentUserEventService extends AbstractService implements ChangeSe }; } - private void processCommentIndex(String index, String recordType, String commentId, RecordComment comment, boolean isNewComment) { + private void processCreateComment(String index, String recordType, String commentId, RecordComment comment) { String issuer = comment.getIssuer(); String recordId = comment.getRecord(); @@ -213,9 +206,50 @@ public class CommentUserEventService extends AbstractService implements ChangeSe userEventService.notifyUser( UserEvent.newBuilder(UserEvent.EventType.INFO, GchangeEventCodes.NEW_COMMENT.name()) .setMessage( - isNewComment ? - String.format("duniter.%s.event.newComment", index.toLowerCase()) : - String.format("duniter.%s.event.updateComment", index.toLowerCase()), + String.format("duniter.%s.event.newComment", index.toLowerCase()), + issuer, + issuerTitle != null ? issuerTitle : ModelUtils.minifyPubkey(issuer), + recordTitle + ) + .setRecipient(recordIssuer) + .setReference(index, recordType, recordId) + .setReferenceAnchor(commentId) + .setTime(comment.getTime()) + .build()); + } + } + + /** + * Same as processCreateComment(), but with other code and message. + * + * @param index + * @param recordType + * @param commentId + * @param comment + */ + private void processUpdateComment(String index, String recordType, String commentId, RecordComment comment) { + + String issuer = comment.getIssuer(); + String recordId = comment.getRecord(); + + // Notify issuer of record (is not same as comment writer) + Map<String, Object> recordFields = getFieldsById(index, recordType, recordId, + MarketRecord.PROPERTY_TITLE, MarketRecord.PROPERTY_ISSUER); + if (MapUtils.isEmpty(recordFields)) { // record not found + logger.warn(I18n.t(String.format("duniter.%s.error.comment.recordNotFound", index.toLowerCase()), recordId)); + return; // no event to emit + } + String recordIssuer = recordFields.get(MarketRecord.PROPERTY_ISSUER).toString(); + + // Get user title + String issuerTitle = userService.getProfileTitle(issuer); + + String recordTitle = recordFields.get(MarketRecord.PROPERTY_TITLE).toString(); + if (!issuer.equals(recordIssuer)) { + userEventService.notifyUser( + UserEvent.newBuilder(UserEvent.EventType.INFO, GchangeEventCodes.UPDATE_COMMENT.name()) + .setMessage( + String.format("duniter.%s.event.updateComment", index.toLowerCase()), issuer, issuerTitle != null ? issuerTitle : ModelUtils.minifyPubkey(issuer), recordTitle diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEventCodes.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEventCodes.java index 7f7ac356cc002e5824eb1a3f6a9a0c60c10dc24c..8ff7163726074deb8b743007a0e085a7a9dbc7dd 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEventCodes.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/model/UserEventCodes.java @@ -40,6 +40,10 @@ public enum UserEventCodes { TX_SENT, TX_RECEIVED, + // CERTIFICATION + CERT_SENT, + CERT_RECEIVED, + // Message MESSAGE_RECEIVED diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java index 7af2957dc5140b38ccaae533699af6f28f994be8..b39102a081f74c08e61d3270d4b4baaeaf444395 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/BlockchainUserEventService.java @@ -24,10 +24,8 @@ package org.duniter.elasticsearch.user.service; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import org.apache.commons.collections4.MapUtils; import org.duniter.core.client.model.ModelUtils; import org.duniter.core.client.model.bma.BlockchainBlock; import org.duniter.core.client.model.bma.jackson.JacksonUtils; @@ -57,13 +55,14 @@ public class BlockchainUserEventService extends AbstractService implements Chang public static final String DEFAULT_PUBKEYS_SEPARATOR = ", "; + private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE)); + public final UserService userService; public final UserEventService userEventService; public final ObjectMapper objectMapper; - public final List<ChangeSource> changeListenSources; public final boolean enable; @@ -76,7 +75,6 @@ public class BlockchainUserEventService extends AbstractService implements Chang this.userService = userService; this.userEventService = userEventService; this.objectMapper = JacksonUtils.newObjectMapper(); - this.changeListenSources = ImmutableList.of(new ChangeSource("*", BlockchainService.BLOCK_TYPE)); ChangeService.registerListener(this); this.enable = pluginSettings.enableBlockchainSync(); @@ -94,16 +92,25 @@ public class BlockchainUserEventService extends AbstractService implements Chang @Override public void onChange(ChangeEvent change) { + // Skip _id=current + if(change.getId() == "current") return; try { - switch (change.getOperation()) { - case CREATE: + // on create + case CREATE: // create + if (change.getSource() != null) { + BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); + processCreateBlock(block); + } + break; + + // on update case INDEX: if (change.getSource() != null) { BlockchainBlock block = objectMapper.readValue(change.getSource().streamInput(), BlockchainBlock.class); - processBlockIndex(block); + processUpdateBlock(block); } break; @@ -124,7 +131,7 @@ public class BlockchainUserEventService extends AbstractService implements Chang @Override public Collection<ChangeSource> getChangeSources() { - return changeListenSources; + return CHANGE_LISTEN_SOURCES; } /* -- internal method -- */ @@ -171,7 +178,7 @@ public class BlockchainUserEventService extends AbstractService implements Chang }; } - private void processBlockIndex(BlockchainBlock block) { + private void processCreateBlock(BlockchainBlock block) { // Joiners if (CollectionUtils.isNotEmpty(block.getJoiners())) { for (BlockchainBlock.Joiner joiner: block.getJoiners()) { @@ -199,6 +206,23 @@ public class BlockchainUserEventService extends AbstractService implements Chang processTx(block, tx); } } + + // Certifications + if (CollectionUtils.isNotEmpty(block.getCertifications())) { + for (BlockchainBlock.Certification cert: block.getCertifications()) { + processCertification(block, cert); + } + } + } + + private void processUpdateBlock(BlockchainBlock block) { + + // Delete events that reference this block + userEventService.deleteEventsByReference(new UserEvent.Reference(block.getCurrency(), BlockchainService.BLOCK_TYPE, String.valueOf(block.getNumber()))) + .actionGet(); + + processCreateBlock(block); + } private void processTx(BlockchainBlock block, BlockchainBlock.Transaction tx) { @@ -231,6 +255,25 @@ public class BlockchainUserEventService extends AbstractService implements Chang // TODO : indexer la TX dans un index/type spécifique ? } + private void processCertification(BlockchainBlock block, BlockchainBlock.Certification certification) { + String sender = certification.getFromPubkey(); + String receiver = certification.getToPubkey(); + + // Received + String senderName = userService.getProfileTitle(sender); + if (senderName == null) { + senderName = ModelUtils.minifyPubkey(sender); + } + notifyUserEvent(block, receiver, UserEventCodes.CERT_RECEIVED, I18n.n("duniter.user.event.cert.received"), sender, senderName); + + // Sent + String receiverName = userService.getProfileTitle(receiver); + if (receiverName == null) { + receiverName = ModelUtils.minifyPubkey(receiver); + } + notifyUserEvent(block, sender, UserEventCodes.CERT_SENT, I18n.n("duniter.user.event.cert.sent"), receiver, receiverName); + } + private void notifyUserEvent(BlockchainBlock block, String pubkey, UserEventCodes code, String message, String... params) { UserEvent event = UserEvent.newBuilder(UserEvent.EventType.INFO, code.name()) .setRecipient(pubkey) @@ -251,4 +294,5 @@ public class BlockchainUserEventService extends AbstractService implements Chang } + } diff --git a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java index 9e575ebc839721978171f8432e6e7d215ecce5b2..ec6d857dc76712ea107dd70ea2ae4db7642dde56 100644 --- a/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java +++ b/duniter4j-es-user/src/main/java/org/duniter/elasticsearch/user/service/UserEventService.java @@ -26,6 +26,7 @@ package org.duniter.elasticsearch.user.service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.gson.JsonSyntaxException; import org.duniter.core.exception.TechnicalException; import org.duniter.core.service.CryptoService; @@ -36,20 +37,23 @@ import org.duniter.core.util.crypto.KeyPair; import org.duniter.elasticsearch.PluginSettings; import org.duniter.elasticsearch.exception.InvalidSignatureException; import org.duniter.elasticsearch.service.AbstractService; -import org.duniter.elasticsearch.service.BlockchainService; +import org.duniter.elasticsearch.service.changes.ChangeEvent; +import org.duniter.elasticsearch.service.changes.ChangeService; +import org.duniter.elasticsearch.service.changes.ChangeSource; import org.duniter.elasticsearch.threadpool.ThreadPool; import org.duniter.elasticsearch.user.model.UserEvent; import org.duniter.elasticsearch.user.model.UserProfile; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.BoolQueryBuilder; @@ -58,14 +62,13 @@ import org.elasticsearch.search.SearchHit; import org.nuiton.i18n.I18n; import java.io.IOException; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; +import java.util.*; /** * Created by Benoit on 30/03/2015. */ -public class UserEventService extends AbstractService { +public class UserEventService extends AbstractService implements ChangeService.ChangeListener { + public interface UserEventListener { String getId(); @@ -75,8 +78,11 @@ public class UserEventService extends AbstractService { public static final String INDEX = "user"; public static final String EVENT_TYPE = "event"; + private static final Map<String, UserEventListener> LISTENERS = new HashMap<>(); + private static final List<ChangeSource> CHANGE_LISTEN_SOURCES = ImmutableList.of(new ChangeSource(INDEX, EVENT_TYPE)); + public static void registerListener(UserEventListener listener) { synchronized (LISTENERS) { LISTENERS.put(listener.getId(), listener); @@ -100,7 +106,6 @@ public class UserEventService extends AbstractService { final PluginSettings pluginSettings, final CryptoService cryptoService, final MailService mailService, - final BlockchainService blockchainService, final ThreadPool threadPool) { super("duniter.user.event", client, pluginSettings, cryptoService); this.mailService = mailService; @@ -112,6 +117,7 @@ public class UserEventService extends AbstractService { logger.trace("Mail disable"); } + ChangeService.registerListener(this); } @@ -119,23 +125,55 @@ public class UserEventService extends AbstractService { * Notify cluster admin */ public void notifyAdmin(UserEvent event) { - // async - //threadPool.schedule(() -> { - doNotifyAdmin(event); - //}); + Preconditions.checkNotNull(event); + + UserProfile adminProfile; + if (StringUtils.isNotBlank(nodePubkey)) { + adminProfile = getUserProfile(nodePubkey, UserProfile.PROPERTY_EMAIL, UserProfile.PROPERTY_LOCALE); + } + else { + adminProfile = new UserProfile(); + } + + // Add new event to index + Locale locale = StringUtils.isNotBlank(adminProfile.getLocale()) ? + new Locale(adminProfile.getLocale()) : + I18n.getDefaultLocale(); + if (StringUtils.isNotBlank(nodePubkey)) { + event.setRecipient(nodePubkey); + indexEvent(locale, event); + } + + // Send email to admin + String adminEmail = StringUtils.isNotBlank(adminProfile.getEmail()) ? + adminProfile.getEmail() : + pluginSettings.getMailAdmin(); + if (StringUtils.isNotBlank(adminEmail)) { + String subjectPrefix = pluginSettings.getMailSubjectPrefix(); + sendEmail(adminEmail, + I18n.l(locale, "duniter4j.event.subject."+event.getType().name(), subjectPrefix), + event.getLocalizedMessage(locale)); + } } /** * Notify a user */ - public void notifyUser(UserEvent event) { - // async - threadPool.schedule(() -> { - doNotifyUser(event); - }, TimeValue.timeValueMillis(500)); + public ListenableActionFuture<IndexResponse> notifyUser(UserEvent event) { + Preconditions.checkNotNull(event); + Preconditions.checkNotNull(event.getRecipient()); + + // Get user profile locale + UserProfile userProfile = getUserProfile(event.getRecipient(), + UserProfile.PROPERTY_EMAIL, UserProfile.PROPERTY_TITLE, UserProfile.PROPERTY_LOCALE); + + Locale locale = userProfile.getLocale() != null ? new Locale(userProfile.getLocale()) : null; + + // Add new event to index + return indexEvent(locale, event); } - public String indexEvent(Locale locale, UserEvent event) { + public ListenableActionFuture<IndexResponse> indexEvent(Locale locale, UserEvent event) { Preconditions.checkNotNull(event.getRecipient()); Preconditions.checkNotNull(event.getType()); Preconditions.checkNotNull(event.getCode()); @@ -167,11 +205,11 @@ public class UserEventService extends AbstractService { } - public String indexEvent(String eventJson) { + public ListenableActionFuture<IndexResponse> indexEvent(String eventJson) { return indexEvent(eventJson, true); } - public String indexEvent(String eventJson, boolean checkSignature) { + public ListenableActionFuture<IndexResponse> indexEvent(String eventJson, boolean checkSignature) { if (checkSignature) { JsonNode jsonNode = readAndVerifyIssuerSignature(eventJson); @@ -184,25 +222,21 @@ public class UserEventService extends AbstractService { logger.trace(eventJson); } - IndexResponse response = client.prepareIndex(INDEX, EVENT_TYPE) + return client.prepareIndex(INDEX, EVENT_TYPE) .setSource(eventJson) .setRefresh(false) - .execute().actionGet(); - - return response.getId(); + .execute(); } - public void deleteEventsByReference(final UserEvent.Reference reference) { + public ActionFuture<?> deleteEventsByReference(final UserEvent.Reference reference) { Preconditions.checkNotNull(reference); Preconditions.checkNotNull(reference.getIndex()); Preconditions.checkNotNull(reference.getType()); - threadPool.schedule(() -> { - doDeleteEventsByReference(reference); - }); + return threadPool.schedule(() -> doDeleteEventsByReference(reference)); } - public void markEventAsRead(String id, String signature) { + public ListenableActionFuture<UpdateResponse> markEventAsRead(String id, String signature) { Map<String, Object> fields = getMandatoryFieldsById(INDEX, EVENT_TYPE, id, UserEvent.PROPERTY_HASH, UserEvent.PROPERTY_RECIPIENT); String recipient = fields.get(UserEvent.PROPERTY_RECIPIENT).toString(); @@ -214,9 +248,9 @@ public class UserEventService extends AbstractService { throw new InvalidSignatureException("Invalid signature: only the recipient can mark an event as read."); } - UpdateRequestBuilder request = client.prepareUpdate(INDEX, EVENT_TYPE, id) - .setDoc("read_signature", signature); - request.execute(); + return client.prepareUpdate(INDEX, EVENT_TYPE, id) + .setDoc("read_signature", signature) + .execute(); } /* -- Internal methods -- */ @@ -358,76 +392,7 @@ public class UserEventService extends AbstractService { return CryptoUtils.encodeBase58(nodeKeyPair.getPubKey()); } - /** - * Notify cluster admin - */ - public void doNotifyAdmin(UserEvent event) { - - UserProfile adminProfile; - if (StringUtils.isNotBlank(nodePubkey)) { - adminProfile = getUserProfile(nodePubkey, UserProfile.PROPERTY_EMAIL, UserProfile.PROPERTY_LOCALE); - } - else { - adminProfile = new UserProfile(); - } - - // Add new event to index - Locale locale = StringUtils.isNotBlank(adminProfile.getLocale()) ? - new Locale(adminProfile.getLocale()) : - I18n.getDefaultLocale(); - if (StringUtils.isNotBlank(nodePubkey)) { - event.setRecipient(nodePubkey); - indexEventAndNotifyListener(locale, event); - } - - // Send email to admin - String adminEmail = StringUtils.isNotBlank(adminProfile.getEmail()) ? - adminProfile.getEmail() : - pluginSettings.getMailAdmin(); - if (StringUtils.isNotBlank(adminEmail)) { - String subjectPrefix = pluginSettings.getMailSubjectPrefix(); - sendEmail(adminEmail, - I18n.l(locale, "duniter4j.event.subject."+event.getType().name(), subjectPrefix), - event.getLocalizedMessage(locale)); - } - } - - /** - * Notify a user - */ - private void doNotifyUser(final UserEvent event) { - Preconditions.checkNotNull(event.getRecipient()); - - // Get user profile locale - UserProfile userProfile = getUserProfile(event.getRecipient(), - UserProfile.PROPERTY_EMAIL, UserProfile.PROPERTY_TITLE, UserProfile.PROPERTY_LOCALE); - - Locale locale = userProfile.getLocale() != null ? new Locale(userProfile.getLocale()) : null; - - // Add new event to index - indexEventAndNotifyListener(locale, event); - } - - private String indexEventAndNotifyListener(final Locale locale, final UserEvent event) { - // Add new event to index - final String eventId = indexEvent(locale, event); - final UserEvent eventCopy = new UserEvent(event); - eventCopy.setId(eventId); - - // Notify listeners - threadPool.schedule(() -> { - synchronized (LISTENERS) { - LISTENERS.values().stream().forEach(listener -> { - if (event.getRecipient().equals(listener.getPubkey())) { - listener.onEvent(eventCopy); - } - }); - } - }); - - return eventId; - } private void doDeleteEventsByReference(final UserEvent.Reference reference) { @@ -503,4 +468,62 @@ public class UserEventService extends AbstractService { throw new TechnicalException("Unable to serialize UserEvent object", e); } } + + @Override + public String getId() { + return "duniter.user.event"; + } + + @Override + public Collection<ChangeSource> getChangeSources() { + return CHANGE_LISTEN_SOURCES; + } + + @Override + public void onChange(ChangeEvent change) { + + try { + + switch (change.getOperation()) { + // on create + // on update + case INDEX: + case CREATE: // create + if (change.getSource() != null) { + UserEvent event = objectMapper.readValue(change.getSource().streamInput(), UserEvent.class); + processEventCreateOrUpdate(change.getId(), event); + } + break; + + // on delete + case DELETE: + // Do not propagate deletion + + break; + } + + } + catch(IOException e) { + throw new TechnicalException(String.format("Unable to parse received block %s", change.getId()), e); + } + + } + + private void processEventCreateOrUpdate(final String eventId, final UserEvent event) { + + event.setId(eventId); + + // Notify listeners + threadPool.schedule(() -> { + synchronized (LISTENERS) { + LISTENERS.values().stream().forEach(listener -> { + if (event.getRecipient().equals(listener.getPubkey())) { + listener.onEvent(event); + } + }); + } + }); + + } + } diff --git a/duniter4j-es-user/src/main/misc/curl_test.sh b/duniter4j-es-user/src/main/misc/curl_test.sh index 439419ccb733359b388c9e50a757456d4df2d427..6a8ba33ecdee671cddbc60785124fce404969128 100755 --- a/duniter4j-es-user/src/main/misc/curl_test.sh +++ b/duniter4j-es-user/src/main/misc/curl_test.sh @@ -1,5 +1,6 @@ #!/bin/sh +echo "--- COUNT query --- " curl -XPOST "http://127.0.0.1:9200/user/event/_count?pretty" -d' { query: { @@ -19,3 +20,70 @@ curl -XPOST "http://127.0.0.1:9200/user/event/_count?pretty" -d' _source: false }' +echo "--- IDS query --- " +curl -XPOST "http://127.0.0.1:9200/market/comment/_search?pretty" -d' +{ + query: { + terms: { + _id: [ "AVlA2v8sW3j-KIPA7pm8" ] + } + }, + from: 0, + size: 100 +}' + +echo "--- COUNT query --- " +curl -XPOST "http://127.0.0.1:9200/market/comment/_search?pretty" -d' +{ + query: { + terms: { + reply_to: ["AVlEmFhF1r62y3TgqdyR"] + } + }, + "aggs" : { + "reply_tos" : { + "terms" : { "field" : "reply_to" } + } + }, + size: 0 +}' + +echo "--- COUNT + GET query --- " +curl -XPOST "http://127.0.0.1:9200/market/comment/_search?pretty" -d' +{ + query: { + terms: { + record: [ "AVk_pr_49ItF-SEayNy1" ] + } + }, + "aggs" : { + "reply_tos" : { + "terms" : { "field" : "reply_to" } + } + }, + sort : [ + { "time" : {"order" : "desc"}} + ], + from: 0, + size: 5 +}' + +echo "--- GET user event --- " +curl -XPOST "http://127.0.0.1:9200/user/event/_search?pretty" -d' +{ + query: { + bool: { + filter: [ + {term: { recipient: "5ocqzyDMMWf1V8bsoNhWb1iNwax1e9M7VTUN6navs8of"}}, + {term: { code: "TX_RECEIVED"}} + ] + } + }, + sort : [ + { "time" : {"order" : "desc"}} + ], + from: 0, + size: 3 +}' + + diff --git a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties index d5b454c0ffd555e237b519485c09d4f238ba9418..8f195477edea3868c3113728a90ac029334d1de0 100644 --- a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties +++ b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_en_GB.properties @@ -2,6 +2,8 @@ duniter.event.NODE_BMA_DOWN= duniter.event.NODE_BMA_UP= duniter.event.NODE_STARTED=Node started on cluster Duniter4j ES [%s] duniter.user.event.active= +duniter.user.event.cert.received= +duniter.user.event.cert.sent= duniter.user.event.join= duniter.user.event.leave= duniter.user.event.message.received= diff --git a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties index 8e74a2b7152d1c12ceee321cf20dfd7ceea6501e..c41fd1df1a30d20fc71986df1e34189634261047 100644 --- a/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties +++ b/duniter4j-es-user/src/main/resources/i18n/duniter4j-es-user_fr_FR.properties @@ -1,6 +1,8 @@ duniter.event.NODE_BMA_DOWN=Noeud Duniter [%1$s\:%2$s] non joignable, depuis le noeud ES API [%3$s]. Dernière connexion à %4$d. Indexation de blockchain en attente. duniter.event.NODE_BMA_UP=Noeud Duniter [%1$s\:%2$s] à nouveau accessible. duniter.event.NODE_STARTED=Noeud ES API démarré sur le cluster Duniter [%1$s] +duniter.user.event.cert.received=%2$s vous a certifié (certification prise en compte). +duniter.user.event.cert.sent=Votre certification de %2$s a été pris en compte. duniter.user.event.message.received=Vous avez reçu un message de %2$s duniter.user.event.ms.active=Votre adhésion comme membre a bien été renouvellée duniter.user.event.ms.join=Vous êtes maintenant membre de la monnaie