Skip to content
Snippets Groups Projects
Commit 3d777769 authored by Benoit Lavenier's avatar Benoit Lavenier
Browse files

[enh] Synchro: Add api ES_SUBSCRIPTION_API

parent 41ae8b35
No related branches found
No related tags found
No related merge requests found
...@@ -88,16 +88,18 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu ...@@ -88,16 +88,18 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu
Preconditions.checkNotNull(peer); Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getCurrency()); Preconditions.checkNotNull(peer.getCurrency());
Preconditions.checkNotNull(peer.getId()); Preconditions.checkNotNull(peer.getId());
Preconditions.checkNotNull(peer.getApi());
BoolQueryBuilder query = QueryBuilders.boolQuery() BoolQueryBuilder query = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(SynchroExecution.PROPERTY_PEER, peer.getId())); .filter(QueryBuilders.termQuery(SynchroExecution.PROPERTY_PEER, peer.getId()))
.filter(QueryBuilders.termQuery(SynchroExecution.PROPERTY_API, peer.getApi()));
SearchResponse response = client.prepareSearch(peer.getCurrency()) SearchResponse response = client.prepareSearch(peer.getCurrency())
.setTypes(TYPE) .setTypes(TYPE)
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(query) .setQuery(query)
.setFetchSource(true) .setFetchSource(true)
.setFrom(0).setSize(1) .setSize(1)
.addSort(SynchroExecution.PROPERTY_TIME, SortOrder.DESC) .addSort(SynchroExecution.PROPERTY_TIME, SortOrder.DESC)
.get(); .get();
...@@ -126,6 +128,12 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu ...@@ -126,6 +128,12 @@ public class SynchroExecutionDaoImpl extends AbstractDao implements SynchroExecu
.field("index", "not_analyzed") .field("index", "not_analyzed")
.endObject() .endObject()
// peer
.startObject(SynchroExecution.PROPERTY_API)
.field("type", "string")
.field("index", "not_analyzed")
.endObject()
// issuer // issuer
.startObject(SynchroExecution.PROPERTY_ISSUER) .startObject(SynchroExecution.PROPERTY_ISSUER)
.field("type", "string") .field("type", "string")
......
...@@ -6,11 +6,13 @@ public class SynchroExecution extends Record { ...@@ -6,11 +6,13 @@ public class SynchroExecution extends Record {
public static final String PROPERTY_CURRENCY = "currency"; public static final String PROPERTY_CURRENCY = "currency";
public static final String PROPERTY_PEER = "peer"; public static final String PROPERTY_PEER = "peer";
public static final String PROPERTY_API = "api";
public static final String PROPERTY_RESULT = "result"; public static final String PROPERTY_RESULT = "result";
private String currency; private String currency;
private String peer; private String peer;
private String api;
private SynchroResult result; private SynchroResult result;
public String getCurrency() { public String getCurrency() {
...@@ -36,4 +38,12 @@ public class SynchroExecution extends Record { ...@@ -36,4 +38,12 @@ public class SynchroExecution extends Record {
public void setResult(SynchroResult result) { public void setResult(SynchroResult result) {
this.result = result; this.result = result;
} }
public String getApi() {
return api;
}
public void setApi(String api) {
this.api = api;
}
} }
...@@ -303,8 +303,6 @@ public class SynchroService extends AbstractService { ...@@ -303,8 +303,6 @@ public class SynchroService extends AbstractService {
protected boolean waitPeersReady() throws InterruptedException{ protected boolean waitPeersReady() throws InterruptedException{
final int sleepTime = 10 * 1000 /*10s*/; final int sleepTime = 10 * 1000 /*10s*/;
int maxWaitingDuration = 5 * 6 * sleepTime; // 5 min int maxWaitingDuration = 5 * 6 * sleepTime; // 5 min
int waitingDuration = 0; int waitingDuration = 0;
while (!isReady() && !hasSomePeers()) { while (!isReady() && !hasSomePeers()) {
...@@ -326,7 +324,6 @@ public class SynchroService extends AbstractService { ...@@ -326,7 +324,6 @@ public class SynchroService extends AbstractService {
protected long getLastExecutionTime(Peer peer) { protected long getLastExecutionTime(Peer peer) {
Preconditions.checkNotNull(peer); Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getId());
try { try {
SynchroExecution execution = synchroExecutionDao.getLastExecution(peer); SynchroExecution execution = synchroExecutionDao.getLastExecution(peer);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment