Skip to content
Snippets Groups Projects
Commit ef2f556a authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[es-core] start synchro when exists some peers on the expected endpoint API

parent 0362cd90
No related branches found
No related tags found
No related merge requests found
...@@ -22,9 +22,11 @@ package org.duniter.core.client.dao; ...@@ -22,9 +22,11 @@ package org.duniter.core.client.dao;
* #L% * #L%
*/ */
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.model.local.Peer;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* Created by blavenie on 29/12/15. * Created by blavenie on 29/12/15.
...@@ -41,4 +43,5 @@ public interface PeerDao extends EntityDao<String, Peer> { ...@@ -41,4 +43,5 @@ public interface PeerDao extends EntityDao<String, Peer> {
void updatePeersAsDown(String currencyId, long maxUpTime); void updatePeersAsDown(String currencyId, long maxUpTime);
boolean hasPeersUpWithApi(String currencyId, Set<EndpointApi> api);
} }
...@@ -23,6 +23,7 @@ package org.duniter.core.client.dao.mem; ...@@ -23,6 +23,7 @@ package org.duniter.core.client.dao.mem;
*/ */
import org.duniter.core.client.dao.PeerDao; import org.duniter.core.client.dao.PeerDao;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.model.local.Peer;
import org.duniter.core.util.Preconditions; import org.duniter.core.util.Preconditions;
...@@ -114,4 +115,15 @@ public class MemoryPeerDaoImpl implements PeerDao { ...@@ -114,4 +115,15 @@ public class MemoryPeerDaoImpl implements PeerDao {
}); });
} }
@Override
public boolean hasPeersUpWithApi(String currencyId, Set<EndpointApi> api) {
return getPeersByCurrencyId(currencyId)
.stream()
.anyMatch(p ->
api.contains(EndpointApi.valueOf(p.getApi())) &&
p.getStats() != null &&
Peer.PeerStatus.UP.equals(p.getStats().getStatus())
);
}
} }
...@@ -53,12 +53,14 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi ...@@ -53,12 +53,14 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi
private Pattern bmasPattern; private Pattern bmasPattern;
private Pattern ws2pPattern; private Pattern ws2pPattern;
private Pattern otherApiPattern; private Pattern otherApiPattern;
private boolean debug;
public EndpointDeserializer() { public EndpointDeserializer() {
bmaPattern = Pattern.compile(BMA_API_REGEXP); this.bmaPattern = Pattern.compile(BMA_API_REGEXP);
bmasPattern = Pattern.compile(BMAS_API_REGEXP); this.bmasPattern = Pattern.compile(BMAS_API_REGEXP);
ws2pPattern = Pattern.compile(WS2P_API_REGEXP); this.ws2pPattern = Pattern.compile(WS2P_API_REGEXP);
otherApiPattern = Pattern.compile(OTHER_API_REGEXP); this.otherApiPattern = Pattern.compile(OTHER_API_REGEXP);
this.debug = log.isDebugEnabled();
} }
@Override @Override
...@@ -103,7 +105,12 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi ...@@ -103,7 +105,12 @@ public class EndpointDeserializer extends JsonDeserializer<NetworkPeering.Endpoi
return endpoint; return endpoint;
} catch(Exception e) { } catch(Exception e) {
// Log unknown API (and continue = will skip this endpoint) // Log unknown API (and continue = will skip this endpoint)
log.warn("Unable to deserialize endpoint: unknown api [" + api + "]"); if (debug) {
log.warn("Unable to deserialize endpoint: unknown api [" + api + "]", e); // link the exception
}
else {
log.warn("Unable to deserialize endpoint: unknown api [" + api + "]");
}
} }
} }
......
...@@ -23,8 +23,10 @@ package org.duniter.elasticsearch.dao.impl; ...@@ -23,8 +23,10 @@ package org.duniter.elasticsearch.dao.impl;
*/ */
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import org.duniter.core.client.model.bma.EndpointApi;
import org.duniter.core.client.model.local.Peer; import org.duniter.core.client.model.local.Peer;
import org.duniter.core.exception.TechnicalException; import org.duniter.core.exception.TechnicalException;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions; import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils; import org.duniter.core.util.StringUtils;
import org.duniter.elasticsearch.dao.AbstractDao; import org.duniter.elasticsearch.dao.AbstractDao;
...@@ -50,6 +52,7 @@ import org.elasticsearch.search.aggregations.metrics.max.Max; ...@@ -50,6 +52,7 @@ import org.elasticsearch.search.aggregations.metrics.max.Max;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.Set;
/** /**
* Created by blavenie on 29/12/15. * Created by blavenie on 29/12/15.
...@@ -286,6 +289,29 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao { ...@@ -286,6 +289,29 @@ public class PeerDaoImpl extends AbstractDao implements PeerDao {
} }
@Override
public boolean hasPeersUpWithApi(String currencyId, Set<EndpointApi> api) {
SearchRequestBuilder searchRequest = client.prepareSearch(currencyId)
.setFetchSource(false)
.setTypes(TYPE)
.setSize(0);
// Query = filter on lastUpTime
BoolQueryBuilder query = QueryBuilders.boolQuery();
if (CollectionUtils.isNotEmpty(api)) {
query.minimumNumberShouldMatch(api.size());
api.forEach(a -> query.should(QueryBuilders.termQuery(Peer.PROPERTY_API, a.name())));
}
query.must(QueryBuilders.nestedQuery(Peer.PROPERTY_STATS, QueryBuilders.constantScoreQuery(QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Peer.PROPERTY_STATS + "." + Peer.Stats.PROPERTY_STATUS, Peer.PeerStatus.UP.name())))));
searchRequest.setQuery(query);
SearchResponse response = searchRequest.execute().actionGet();
return response.getHits() != null && response.getHits().getTotalHits() > 0;
}
@Override @Override
public XContentBuilder createTypeMapping() { public XContentBuilder createTypeMapping() {
try { try {
......
...@@ -192,6 +192,14 @@ public abstract class AbstractService implements Bean { ...@@ -192,6 +192,14 @@ public abstract class AbstractService implements Bean {
} }
if (!cryptoService.verify(recordJson, signature, issuer)) { if (!cryptoService.verify(recordJson, signature, issuer)) {
if (recordJson.contains("\"socials\":[]")) {
recordJson = recordJson.replaceAll(",\"socials\":\\[\\]", "");
if (cryptoService.verify(recordJson, signature, issuer)) {
return; // ok
}
}
throw new InvalidSignatureException("Invalid signature of JSON string"); throw new InvalidSignatureException("Invalid signature of JSON string");
} }
......
...@@ -158,7 +158,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S ...@@ -158,7 +158,7 @@ public abstract class AbstractSynchroAction extends AbstractService implements S
} }
try { try {
if (trace) { if (trace) {
logger.trace(String.format("%s [WS] Processing new change event...", logPrefix)); logger.trace(String.format("%s Processing new change event...", logPrefix));
} }
JsonNode source = ChangeEvents.readTree(changeEvent.getSource()); JsonNode source = ChangeEvents.readTree(changeEvent.getSource());
......
...@@ -237,24 +237,33 @@ public class SynchroService extends AbstractService { ...@@ -237,24 +237,33 @@ public class SynchroService extends AbstractService {
if (CollectionUtils.isEmpty(currencyIds)) return false; if (CollectionUtils.isEmpty(currencyIds)) return false;
for (String currencyId: currencyIds) { for (String currencyId: currencyIds) {
Long lastUpTime = peerDao.getMaxLastUpTime(currencyId); boolean hasSome = peerDao.hasPeersUpWithApi(currencyId, peerApiFilters);
if (lastUpTime != null) return true; if (hasSome) return true;
} }
return false; return false;
} }
protected boolean waitPeersReady() throws InterruptedException{ protected boolean waitPeersReady() throws InterruptedException{
int tryCounter = 0; final int sleepTime = 10 * 1000 /*10s*/;
int maxWaitingDuration = 5 * 6 * sleepTime; // 5 min
int waitingDuration = 0;
while (!isReady() && !hasSomePeers()) { while (!isReady() && !hasSomePeers()) {
// Wait 10s // Wait 10s
Thread.sleep(10 * 1000); Thread.sleep(sleepTime);
tryCounter++; waitingDuration += sleepTime;
if (tryCounter == 6 /*1 min wait*/) { if (waitingDuration >= maxWaitingDuration) {
logger.warn("Could not start data synchronisation. No Peer found."); logger.warn(String.format("Could not start data synchronisation. No Peer found (after waiting %s min).", waitingDuration/60/1000));
return false; // stop here return false; // stop here
} }
} }
// Wait again, to make sure all peers have been saved by NetworkService
Thread.sleep(sleepTime*2);
return true; return true;
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment