Skip to content
Snippets Groups Projects
Commit 0362cd90 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[es-core] Add Peer.epId for WS2P endpoint

[es-core] Synchro now listen peers changes, throw WebSocket
parent 6d89fb3b
No related branches found
No related tags found
No related merge requests found
Showing
with 563 additions and 211 deletions
......@@ -27,6 +27,7 @@ public enum EndpointApi {
BASIC_MERKLED_API,
BMAS,
BMATOR,
WS2P,
ES_CORE_API,
ES_USER_API,
ES_SUBSCRIPTION_API,
......
......@@ -127,6 +127,7 @@ public class NetworkPeering implements Serializable {
public String ipv4;
public String ipv6;
public Integer port;
public String id;
public EndpointApi getApi() {
return api;
......@@ -168,9 +169,18 @@ public class NetworkPeering implements Serializable {
this.port = port;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
@Override
public String toString() {
String s = "api=" + api.name() + "\n" +
(id != null ? ("id=" + id + "\n") : "" ) +
"dns=" + dns + "\n" +
"ipv4=" + ipv4 + "\n" +
"ipv6=" + ipv6 + "\n" +
......
......@@ -46,15 +46,18 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi
public static final String EP_END_REGEXP = "(?:[ ]+([a-z0-9-_]+[.][a-z0-9-_.]*))?(?:[ ]+([0-9.]+))?(?:[ ]+([0-9a-f:]+))?(?:[ ]+([0-9]+))$";
public static final String BMA_API_REGEXP = "^BASIC_MERKLED_API" + EP_END_REGEXP;
public static final String BMAS_API_REGEXP = "^BMAS" + EP_END_REGEXP;
public static final String WS2P_API_REGEXP = "^WS2P[ ]+([a-z0-9]+)[ ]+" + EP_END_REGEXP;
public static final String OTHER_API_REGEXP = "^([A-Z_-]+)" + EP_END_REGEXP;
private Pattern bmaPattern;
private Pattern bmasPattern;
private Pattern ws2pPattern;
private Pattern otherApiPattern;
public EndpointDeserializer() {
bmaPattern = Pattern.compile(BMA_API_REGEXP);
bmasPattern = Pattern.compile(BMAS_API_REGEXP);
ws2pPattern = Pattern.compile(WS2P_API_REGEXP);
otherApiPattern = Pattern.compile(OTHER_API_REGEXP);
}
......@@ -81,6 +84,15 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi
return endpoint;
}
// WS2P API
mather = ws2pPattern.matcher(ept);
if (mather.matches()) {
endpoint.api = EndpointApi.WS2P;
endpoint.id = mather.group(1);
parseDefaultFormatEndPoint(mather, endpoint, 2);
return endpoint;
}
// Other API
mather = otherApiPattern.matcher(ept);
if (mather.matches()) {
......
......@@ -22,10 +22,7 @@ package org.duniter.core.client.model.bma.jackson;
* #L%
*/
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationConfig;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.NetworkPeering;
......
......@@ -47,6 +47,7 @@ public class Peer implements LocalEntity<String>, Serializable {
private String ipv4;
private String ipv6;
private Integer port;
private String epId;
private Boolean useSsl;
private String pubkey;
private String hash;
......@@ -101,6 +102,11 @@ public class Peer implements LocalEntity<String>, Serializable {
return this;
}
public Builder setEpId(String epId) {
this.epId = epId;
return this;
}
public Builder setHost(String host) {
Preconditions.checkNotNull(host);
if (InetAddressUtils.isIPv4Address(host)) {
......@@ -135,6 +141,9 @@ public class Peer implements LocalEntity<String>, Serializable {
if (source.port != null) {
setPort(source.port);
}
if (StringUtils.isNotBlank(source.id)) {
setEpId(source.id);
}
return this;
}
......@@ -144,6 +153,9 @@ public class Peer implements LocalEntity<String>, Serializable {
(port == 443 || this.api == EndpointApi.BMAS.name());
String api = this.api != null ? this.api : EndpointApi.BASIC_MERKLED_API.name();
Peer ep = new Peer(api, dns, ipv4, ipv6, port, useSsl);
if (StringUtils.isNotBlank(this.epId)) {
ep.setEpId(this.epId);
}
if (StringUtils.isNotBlank(this.currency)) {
ep.setCurrency(this.currency);
}
......@@ -165,6 +177,7 @@ public class Peer implements LocalEntity<String>, Serializable {
public static final String PROPERTY_DNS = "dns";
public static final String PROPERTY_IPV4 = "ipv4";
public static final String PROPERTY_IPV6 = "ipv6";
public static final String PROPERTY_EP_ID = "epId";
public static final String PROPERTY_STATS = "stats";
private String id;
......@@ -173,6 +186,7 @@ public class Peer implements LocalEntity<String>, Serializable {
private String dns;
private String ipv4;
private String ipv6;
private String epId;
private String url;
private String host;
......@@ -308,6 +322,14 @@ public class Peer implements LocalEntity<String>, Serializable {
init();
}
public String getEpId() {
return epId;
}
public void setEpId(String epId) {
this.epId = epId;
}
public boolean isUseSsl() {
return useSsl;
}
......@@ -354,6 +376,9 @@ public class Peer implements LocalEntity<String>, Serializable {
if (api != null) {
joiner.add(api);
}
if (epId != null) {
joiner.add(epId);
}
if (dns != null) {
joiner.add(dns);
}
......
......@@ -45,7 +45,6 @@ public class DateUtils {
Calendar cal = new GregorianCalendar();
cal.setTimeInMillis(System.currentTimeMillis());
cal.add(Calendar.HOUR, 1);
cal.add(Calendar.HOUR_OF_DAY, 1);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
......
package org.duniter.core.util;
import java.util.Iterator;
import java.util.PrimitiveIterator;
public class PrimitiveIterators {
public interface OfLong extends PrimitiveIterator.OfLong {
long current();
}
private static OfLong nullLongSequence = new OfLong() {
@Override
public long nextLong() {
return 0;
}
@Override
public boolean hasNext() {
return false;
}
public long current() {
return 0;
}
};
public static class LongSequence implements OfLong {
long value = 0;
@Override
public long nextLong() {
return value++;
}
@Override
public boolean hasNext() {
return true;
}
@Override
public long current() {
return value;
}
}
public static OfLong newLongSequence() {
return new LongSequence();
}
public static OfLong nullLongSequence() {
return nullLongSequence;
}
}
......@@ -30,7 +30,7 @@ import org.duniter.elasticsearch.service.BlockchainService;
import org.duniter.elasticsearch.service.CurrencyService;
import org.duniter.elasticsearch.service.DocStatService;
import org.duniter.elasticsearch.service.PeerService;
import org.duniter.elasticsearch.service.synchro.SynchroService;
import org.duniter.elasticsearch.synchro.SynchroService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
......
......@@ -222,6 +222,10 @@ public class PluginSettings extends AbstractLifecycleComponent<PluginSettings> {
return settings.getAsBoolean("duniter.synchro.enable", true);
}
public boolean enableSynchroWebsocket() {
return settings.getAsBoolean("duniter.synchro.ws.enable", true);
}
public int getSynchroTimeOffset() {
return settings.getAsInt("duniter.synchro.timeOffset", 60*60/*=1hour*/);
}
......
......@@ -326,6 +326,12 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao {
.field("type", "string")
.endObject()
// epId
.startObject(Peer.PROPERTY_EP_ID)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
// stats
.startObject(Peer.PROPERTY_STATS)
.field("type", "nested")
......
......@@ -65,9 +65,8 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu
Preconditions.checkNotNull(execution.getTime());
Preconditions.checkArgument(execution.getTime() > 0);
// Serialize into JSON
// WARN: must use GSON, to have same JSON result (e.g identities and joiners field must be converted into String)
try {
// Serialize into JSON
String json = getObjectMapper().writeValueAsString(execution);
// Preparing indexBlocksFromNode
......@@ -77,7 +76,7 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu
// Execute indexBlocksFromNode
indexRequest
.setRefresh(true)
.execute();
.execute().actionGet();
}
catch(JsonProcessingException e) {
throw new TechnicalException(e);
......@@ -153,7 +152,8 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu
// result
.startObject(SynchroExecution.PROPERTY_RESULT)
.field("type", "nested")
//.field("dynamic", "false")
.field("dynamic", "false")
.startObject("properties")
// inserts
.startObject(SynchroResult.PROPERTY_INSERTS)
......@@ -170,6 +170,11 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu
.field("type", "long")
.endObject()
// deletes
.startObject(SynchroResult.PROPERTY_INVALID_SIGNATURES)
.field("type", "long")
.endObject()
.endObject()
.endObject()
......
......@@ -36,6 +36,7 @@ public class SynchroResult implements Serializable {
public static final String PROPERTY_INSERTS = "inserts";
public static final String PROPERTY_UPDATES = "updates";
public static final String PROPERTY_DELETES = "deletes";
public static final String PROPERTY_INVALID_SIGNATURES = "invalidSignatures";
private long insertTotal = 0;
private long updateTotal = 0;
......
......@@ -52,7 +52,11 @@ public class PeerService extends AbstractService {
private PeerDao peerDao;
private ThreadPool threadPool;
private List<String> includeEndpointApis = Lists.newArrayList(EndpointApi.BASIC_MERKLED_API.name(), EndpointApi.BMAS.name());
// Define endpoint API to include
private List<String> includeEndpointApis = Lists.newArrayList(
EndpointApi.BASIC_MERKLED_API.name(),
EndpointApi.BMAS.name(),
EndpointApi.WS2P.name());
@Inject
public PeerService(Duniter4jClient client, PluginSettings settings, ThreadPool threadPool,
......@@ -77,6 +81,12 @@ public class PeerService extends AbstractService {
return this;
}
public PeerService addIncludeEndpointApi(EndpointApi api) {
Preconditions.checkNotNull(api);
addIncludeEndpointApi(api.name());
return this;
}
public PeerService indexPeers(Peer peer) {
try {
......
......@@ -35,7 +35,7 @@ import org.duniter.core.service.MailService;
import org.duniter.elasticsearch.PluginInit;
import org.duniter.elasticsearch.PluginSettings;
import org.duniter.elasticsearch.service.changes.ChangeService;
import org.duniter.elasticsearch.service.synchro.SynchroService;
import org.duniter.elasticsearch.synchro.SynchroService;
import org.duniter.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
......
......@@ -38,18 +38,10 @@ package org.duniter.elasticsearch.service.changes;
limitations under the License.
*/
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.duniter.core.exception.TechnicalException;
import org.duniter.elasticsearch.exception.InvalidFormatException;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.joda.time.DateTime;
import java.io.IOException;
public class ChangeEvent {
private final String id;
private final String index;
......@@ -111,53 +103,9 @@ public class ChangeEvent {
return source;
}
public String toJson() {
try {
XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput());
builder.startObject()
.field("_index", getIndex())
.field("_type", getType())
.field("_id", getId())
.field("_timestamp", getTimestamp())
.field("_version", getVersion())
.field("_operation", getOperation().toString());
if (getSource() != null) {
builder.rawField("_source", getSource());
}
builder.endObject();
return builder.string();
} catch (IOException e) {
throw new TechnicalException("Error while generating JSON from change event", e);
}
}
public static ChangeEvent fromJson(String json) {
try {
ObjectMapper objectMapper = new ObjectMapper();
JsonNode actualObj = objectMapper.readTree(json);
String index = actualObj.get("_index").asText();
String type = actualObj.get("_type").asText();
String id = actualObj.get("_id").asText();
DateTime timestamp = new DateTime(actualObj.get("_timestamp").asLong());
ChangeEvent.Operation operation = ChangeEvent.Operation.valueOf(actualObj.get("_operation").asText());
long version = actualObj.get("_version").asLong();
JsonNode sourceNode = actualObj.get("_source");
BytesReference source = null;
if (sourceNode != null) {
// TODO : fill bytes reference from source
//source =
@JsonIgnore
public boolean hasSource() {
return source != null;
}
ChangeEvent event = new ChangeEvent(index, type, id, timestamp, operation, version, source);
return event;
} catch (IOException e) {
throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e);
}
}
public ChangeEvent clone(ChangeEvent event, boolean withSource) {
return new ChangeEvent(this, withSource);
}
}
package org.duniter.elasticsearch.service.changes;
/*
* #%L
* Duniter4j :: ElasticSearch Plugin
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
/*
Copyright 2015 ForgeRock AS
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.duniter.core.exception.TechnicalException;
import org.duniter.elasticsearch.exception.InvalidFormatException;
import org.duniter.elasticsearch.util.bytes.BytesJsonNode;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.joda.time.DateTime;
import java.io.IOException;
public class ChangeEvents {
private ChangeEvents() {
// helper class
}
public static ChangeEvent fromJson(String json) {
return fromJson(new ObjectMapper(), json);
}
public static ChangeEvent fromJson(ObjectMapper objectMapper, String json) {
try {
JsonNode actualObj = objectMapper.readTree(json);
String index = actualObj.get("_index").asText();
String type = actualObj.get("_type").asText();
String id = actualObj.get("_id").asText();
DateTime timestamp = new DateTime(actualObj.get("_timestamp").asLong());
ChangeEvent.Operation operation = ChangeEvent.Operation.valueOf(actualObj.get("_operation").asText());
long version = actualObj.get("_version").asLong();
JsonNode sourceNode = actualObj.get("_source");
BytesReference source = null;
if (sourceNode != null) {
source = new BytesJsonNode(sourceNode);
}
return new ChangeEvent(index, type, id, timestamp, operation, version, source);
} catch (IOException e) {
throw new InvalidFormatException("Invalid record JSON: " + e.getMessage(), e);
}
}
public static String toJson(ChangeEvent event) {
try {
XContentBuilder builder = new XContentBuilder(JsonXContent.jsonXContent, new BytesStreamOutput());
builder.startObject()
.field("_index", event.getIndex())
.field("_type", event.getType())
.field("_id", event.getId())
.field("_timestamp", event.getTimestamp())
.field("_version", event.getVersion())
.field("_operation", event.getOperation().toString());
if (event.hasSource()) {
builder.rawField("_source", event.getSource());
}
builder.endObject();
return builder.string();
} catch (IOException e) {
throw new TechnicalException("Error while generating JSON from change event", e);
}
}
public static JsonNode readTree(BytesReference source) throws IOException {
if (source == null) return null;
if (source instanceof BytesJsonNode) {
// Avoid new deserialization
return ((BytesJsonNode) source).toJsonNode();
}
return new ObjectMapper().readTree(source.streamInput());
}
public static JsonNode readTree(ObjectMapper objectMapper, BytesReference source) throws IOException {
if (source == null) return null;
if (source instanceof BytesJsonNode) {
// Avoid new deserialization
return ((BytesJsonNode) source).toJsonNode();
}
return objectMapper.readTree(source.streamInput());
}
public static <T> T readValue(BytesReference source, Class<T> clazz) throws IOException {
if (source == null) return null;
return new ObjectMapper().readValue(source.streamInput(), clazz);
}
}
......@@ -99,14 +99,17 @@ public class ChangeSource {
return types;
}
public void addIndex(String index){
public ChangeSource addIndex(String index){
this.indices.add(index);
return this;
}
public void addType(String type){
public ChangeSource addType(String type){
this.types.add(type);
return this;
}
public void addId(String id){
public ChangeSource addId(String id){
this.ids.add(id);
return this;
}
public String toString() {
......@@ -159,4 +162,17 @@ public class ChangeSource {
public boolean isEmpty() {
return indices == null && types == null && ids == null;
}
public void merge(ChangeSource s) {
if (s == null) return;
if (CollectionUtils.isNotEmpty(s.getIndices())) {
indices.addAll(s.getIndices());
}
if (CollectionUtils.isNotEmpty(s.getTypes())) {
types.addAll(s.getTypes());
}
if (CollectionUtils.isNotEmpty(s.getIds())) {
ids.addAll(s.getIds());
}
}
}
package org.duniter.elasticsearch.service.synchro;
package org.duniter.elasticsearch.synchro;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.local.Peer;
import org.duniter.elasticsearch.model.SynchroResult;
import org.duniter.elasticsearch.service.changes.ChangeEvent;
import org.duniter.elasticsearch.service.changes.ChangeSource;
public interface SynchroAction {
EndpointApi getEndPointApi();
ChangeSource getChangeSource();
void handleSynchronize(Peer peer,
long fromTime,
SynchroResult result);
void handleChange(Peer peer, ChangeEvent changeEvent);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment