Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • clients/cesium-grp/cesium-plus-pod
  • clients/java/duniter4j
  • ji_emme/duniter4j
  • dvermd/cesium-plus-pod
  • okayotanoka/cesium-plus-pod
  • pokapow/cesium-plus-pod
  • pini-gh/cesium-plus-pod
7 results
Show changes
Showing
with 1803 additions and 392 deletions
package org.duniter.core.client.service.exception;
/*
* #%L
* UCoin Java :: Core Client API
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.core.client.model.bma.Error;
import org.duniter.core.exception.TechnicalException;
/**
* Created by eis on 11/02/15.
*/
public class HttpTimeoutException extends TechnicalException{
private static final long serialVersionUID = -5260280401104018980L;
public HttpTimeoutException() {
super();
}
public HttpTimeoutException(String message, Throwable cause) {
super(message, cause);
}
public HttpTimeoutException(String message) {
super(message);
}
public HttpTimeoutException(Error error) {
super(error.getMessage());
setCode(error.getUcode());
}
public HttpTimeoutException(Throwable cause) {
super(cause);
}
}
package org.duniter.elasticsearch.exception;
package org.duniter.core.client.service.exception;
/*
* #%L
* UCoin Java Client :: Core API
* UCoin Java :: Core Client API
* %%
* Copyright (C) 2014 - 2015 EIS
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
......@@ -22,29 +22,28 @@ package org.duniter.elasticsearch.exception;
* #L%
*/
import org.duniter.core.exception.BusinessException;
import org.elasticsearch.rest.RestStatus;
/**
* Created by Benoit on 03/04/2015.
* Created by eis on 11/02/15.
*/
public class NotFoundException extends DuniterElasticsearchException{
public class HttpUnauthorizeException extends BusinessException {
public NotFoundException(Throwable cause) {
super(cause);
private static final long serialVersionUID = -5260280401144018980L;
public HttpUnauthorizeException() {
super();
}
public NotFoundException(String msg, Object... args) {
super(msg, args);
public HttpUnauthorizeException(String message, Throwable cause) {
super(message, cause);
}
public NotFoundException(String msg, Throwable cause, Object... args) {
super(msg, args, cause);
public HttpUnauthorizeException(String message) {
super(message);
}
@Override
public RestStatus status() {
return RestStatus.NOT_FOUND;
public HttpUnauthorizeException(Throwable cause) {
super(cause);
}
}
......@@ -2,7 +2,7 @@ package org.duniter.core.client.service.exception;
/*
* #%L
* UCoin Java Client :: Core API
* Duniter4j :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
......
......@@ -25,9 +25,7 @@ package org.duniter.core.client.service.local;
import org.duniter.core.beans.Service;
import org.duniter.core.client.model.local.Currency;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
/**
* Created by eis on 07/02/15.
......@@ -36,58 +34,22 @@ public interface CurrencyService extends Service {
Currency save(final Currency currency);
List<Currency> getCurrencies(long accountId);
Iterable<Currency> findAll();
Currency getCurrencyById(long currencyId);
/**
* Return a (cached) currency name, by id
* @param currencyId
* @return
*/
String getCurrencyNameById(long currencyId);
/**
* Return a currency id, by name
* @param currencyName
* @return
*/
Long getCurrencyIdByName(String currencyName);
Optional<Currency> findById(String id);
/**
* Return a (cached) list of currency ids
* @return
*/
Set<Long> getCurrencyIds();
/**
* Return a (cached) number of registered currencies
* @return
*/
int getCurrencyCount();
/**
* Fill all cache need for currencies
* @param context
*/
void loadCache(long accountId);
Iterable<String> findAllIds();
/**
* Return the value of the last universal dividend
* @param currencyId
* Return number of registered currencies
* @return
*/
long getLastUD(long currencyId);
long count();
/**
* Return a map of UD (key=blockNumber, value=amount)
* @return
*/
Map<Integer, Long> refreshAndGetUD(long currencyId, long lastSyncBlockNumber);
/**
* Return a map of UD (key=blockNumber, value=amount)
* @return
*/
Map<Integer, Long> getAllUD(long currencyId);
boolean existsById(String id);
}
......@@ -23,18 +23,17 @@ package org.duniter.core.client.service.local;
*/
import org.duniter.core.beans.InitializingBean;
import org.duniter.core.client.dao.CurrencyDao;
import org.duniter.core.client.model.local.Currency;
import org.duniter.core.client.repositories.CurrencyRepository;
import org.duniter.core.client.service.ServiceLocator;
import org.duniter.core.client.service.bma.BlockchainRemoteService;
import org.duniter.core.util.ObjectUtils;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.cache.Cache;
import org.duniter.core.util.cache.SimpleCache;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
......@@ -43,14 +42,9 @@ import java.util.Set;
*/
public class CurrencyServiceImpl implements CurrencyService, InitializingBean {
private Cache<String, Optional<Currency>> mCurrencyCache;
private static final long UD_CACHE_TIME_MILLIS = 5 * 60 * 1000; // = 5 min
private Cache<Long, Currency> mCurrencyCache;
private Cache<Long, Long> mUDCache;
private BlockchainRemoteService blockchainRemoteService;
private CurrencyDao currencyDao;
private CurrencyRepository<Currency> currencyRepository;
public CurrencyServiceImpl() {
super();
......@@ -58,191 +52,88 @@ public class CurrencyServiceImpl implements CurrencyService, InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
blockchainRemoteService = ServiceLocator.instance().getBlockchainRemoteService();
currencyDao = ServiceLocator.instance().getBean(CurrencyDao.class);
currencyRepository = ServiceLocator.instance().getBean(CurrencyRepository.class);
// Load cache from account
long accountId = ServiceLocator.instance().getDataContext().getAccountId();
if (accountId != -1) {
loadCache(accountId);
}
// Load cache
initCaches();
}
@Override
public void close() throws IOException {
currencyDao = null;
blockchainRemoteService = null;
currencyRepository = null;
}
public Currency save(final Currency currency) {
ObjectUtils.checkNotNull(currency);
ObjectUtils.checkArgument(StringUtils.isNotBlank(currency.getCurrencyName()));
ObjectUtils.checkArgument(StringUtils.isNotBlank(currency.getFirstBlockSignature()));
ObjectUtils.checkNotNull(currency.getMembersCount());
ObjectUtils.checkArgument(currency.getMembersCount().intValue() >= 0);
ObjectUtils.checkNotNull(currency.getLastUD());
ObjectUtils.checkArgument(currency.getLastUD().longValue() > 0);
ObjectUtils.checkArgument((currency.getAccount() != null && currency.getAccount().getId() != null)
|| currency.getAccountId() != null, "One of 'currency.account.id' or 'currency.accountId' is mandatory.");
Currency result;
// Create
if (currency.getId() == null) {
result = currencyDao.create(currency);
// Update the cache (if already initialized)
if (mCurrencyCache != null) {
mCurrencyCache.put(currency.getId(), currency);
}
}
// or update
else {
currencyDao.update(currency);
result = currency;
Preconditions.checkNotNull(currency);
Preconditions.checkArgument(StringUtils.isNotBlank(currency.getId()));
Preconditions.checkArgument(StringUtils.isNotBlank(currency.getFirstBlockSignature()));
Preconditions.checkNotNull(currency.getMembersCount());
Preconditions.checkArgument(currency.getMembersCount() >= 0);
Preconditions.checkNotNull(currency.getDividend());
Preconditions.checkArgument(currency.getDividend() > 0);
Currency result = currencyRepository.save(currency);
// Update the cache (if already initialized)
if (mCurrencyCache != null) {
mCurrencyCache.put(currency.getId(), Optional.of(currency));
}
return result;
}
public List<Currency> getCurrencies(long accountId) {
return currencyDao.getCurrencies(accountId);
public Iterable<Currency> findAll() {
return currencyRepository.findAll();
}
public Currency getCurrencyById(long currencyId) {
return mCurrencyCache.get(currencyId);
public Optional<Currency> findById(String id) {
return mCurrencyCache.get(id);
}
/**
* Return a (cached) currency name, by id
* @param currencyId
* Return a (cached) list of currency ids
* @return
*/
public String getCurrencyNameById(long currencyId) {
Currency currency = mCurrencyCache.getIfPresent(currencyId);
if (currency == null) {
return null;
}
return currency.getCurrencyName();
}
public Iterable<String> findAllIds() {
Set<String> ids = mCurrencyCache.keySet();
if (CollectionUtils.isNotEmpty(ids)) return ids;
/**
* Return a currency id, by name
* @param currencyName
* @return
*/
public Long getCurrencyIdByName(String currencyName) {
ObjectUtils.checkArgument(StringUtils.isNotBlank(currencyName));
// Search from currencies
for (Map.Entry<Long, Currency> entry : mCurrencyCache.entrySet()) {
Currency currency = entry.getValue();
if (ObjectUtils.equals(currencyName, currency.getCurrencyName())) {
return entry.getKey();
}
}
return null;
return currencyRepository.findAllIds();
}
/**
* Return a (cached) list of currency ids
* Return a (cached) number of registered currencies
* @return
*/
public Set<Long> getCurrencyIds() {
return mCurrencyCache.keySet();
public long count() {
return currencyRepository.count();
}
/**
* Return a (cached) number of registered currencies
* @return
*/
public int getCurrencyCount() {
return mCurrencyCache.entrySet().size();
@Override
public boolean existsById(String id) {
return currencyRepository.existsById(id);
}
/**
* Fill all cache need for currencies
* @param accountId
* Fill allOfToList cache need for currencies
*/
public void loadCache(long accountId) {
if (mCurrencyCache == null || mUDCache == null) {
// Create and fill the currency cache
List<Currency> currencies = getCurrencies(accountId);
if (mCurrencyCache == null) {
mCurrencyCache = new SimpleCache<Long, Currency>() {
@Override
public Currency load(Long currencyId) {
return currencyDao.getById(currencyId);
}
};
// Fill the cache
for (Currency currency : currencies) {
mCurrencyCache.put(currency.getId(), currency);
protected void initCaches() {
// Create and fill the currency cache
if (mCurrencyCache == null) {
mCurrencyCache = new SimpleCache<String, Optional<Currency>>() {
@Override
public Optional<Currency> load(String id) {
return currencyRepository.findById(id);
}
}
};
// Create the UD cache
if (mUDCache == null) {
mUDCache = new SimpleCache<Long, Long>(UD_CACHE_TIME_MILLIS) {
@Override
public Long load(final Long currencyId) {
// Retrieve the last UD from the blockchain
final long lastUD = blockchainRemoteService.getLastUD(currencyId);
// Update currency
Currency currency = getCurrencyById(currencyId);
if (!ObjectUtils.equals(currency.getLastUD(), lastUD)) {
currency.setLastUD(lastUD);
currencyDao.update(currency);
}
return lastUD;
}
};
// Load cache
for (Currency currency: findAll()) {
mCurrencyCache.put(currency.getId(), Optional.of(currency));
}
}
}
/**
* Return the value of the last universal dividend
* @param currencyId
* @return
*/
public long getLastUD(long currencyId) {
return mUDCache.get(currencyId);
}
/**
* Return a map of UD (key=blockNumber, value=amount)
* @return
*/
public Map<Integer, Long> refreshAndGetUD(long currencyId, long lastSyncBlockNumber) {
// Retrieve new UDs from blockchain
Map<Integer, Long> newUDs = blockchainRemoteService.getUDs(currencyId, lastSyncBlockNumber + 1);
// If any, insert new into DB
if (newUDs != null && newUDs.size() > 0) {
currencyDao.insertUDs(currencyId, newUDs);
}
return getAllUD(currencyId);
}
/**
* Return a map of UD (key=blockNumber, value=amount)
* @return
*/
public Map<Integer, Long> getAllUD(long currencyId) {
return currencyDao.getAllUD(currencyId);
}
}
package org.duniter.core.client.dao;
package org.duniter.core.client.service.local;
/*
* #%L
......@@ -22,64 +22,39 @@ package org.duniter.core.client.dao;
* #L%
*/
import org.duniter.core.beans.Bean;
import org.duniter.core.client.model.local.Currency;
import org.duniter.core.beans.Service;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
/**
* Created by eis on 07/02/15.
*/
public interface CurrencyDao extends Bean, EntityDao<Currency> {
Currency create(final Currency currency);
Currency update(final Currency currency);
void remove(final Currency currency);
List<Currency> getCurrencies(long accountId);
public interface DividendService extends Service {
/**
* Return a (cached) currency name, by id
* Return the value of the last universal dividend
* @param currencyId
* @return
*/
String getCurrencyNameById(long currencyId);
/**
* Return a currency id, by name
* @param currencyName
* @return
*/
Long getCurrencyIdByName(String currencyName);
/**
* Return a (cached) list of blockchain ids
* @return
*/
Set<Long> getCurrencyIds();
Optional<Long> findLastDividendByCurrency(String currency);
/**
* Return a (cached) number of registered currencies
* Return a map of UD (key=blockNumber, value=amount)
* @return
*/
int getCurrencyCount();
Map<Integer, Long> refreshAndGetDividends(String currency, long lastSyncBlockNumber);
/**
* Return the value of the last universal dividend
* @param currencyId
* Return a map of UD (key=blockNumber, value=amount)
* @return
*/
long getLastUD(long currencyId);
Map<Integer, Long> findAllDividendsByCurrency(String currency);
/**
* Return a map of UD (key=blockNumber, value=amount)
* @return
* Update the last currency dividend
* @param currency
* @param dividend
*/
Map<Integer, Long> getAllUD(long currencyId);
void insertUDs(Long currencyId, Map<Integer, Long> newUDs);
void updateLastDividendByCurrency(String currency, Long dividend);
}
package org.duniter.core.client.service.local;
/*
* #%L
* UCoin Java :: Core Client API
* %%
* Copyright (C) 2014 - 2016 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.apache.commons.collections4.MapUtils;
import org.duniter.core.beans.InitializingBean;
import org.duniter.core.client.model.local.Currency;
import org.duniter.core.client.model.local.Dividend;
import org.duniter.core.client.repositories.CurrencyRepository;
import org.duniter.core.client.repositories.DividendRepository;
import org.duniter.core.client.service.ServiceLocator;
import org.duniter.core.client.service.bma.BlockchainRemoteService;
import org.duniter.core.util.Beans;
import org.duniter.core.util.ObjectUtils;
import org.duniter.core.util.cache.Cache;
import org.duniter.core.util.cache.SimpleCache;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* Created by eis on 14/04/21.
*/
public class DividendServiceImpl implements DividendService, InitializingBean {
private static final long UD_CACHE_TIME_MILLIS = 5 * 60 * 1000; // = 5 min
private Cache<String, Long> lastUdByCurrencyCache;
private BlockchainRemoteService blockchainRemoteService;
private DividendRepository dividendRepository;
private CurrencyRepository<Currency> currencyRepository;
public DividendServiceImpl() {
super();
}
@Override
public void afterPropertiesSet() throws Exception {
blockchainRemoteService = ServiceLocator.instance().getBlockchainRemoteService();
currencyRepository = ServiceLocator.instance().getBean(CurrencyRepository.class);
dividendRepository = ServiceLocator.instance().getBean(DividendRepository.class);
// Load cache
initCaches();
}
@Override
public void close() throws IOException {
blockchainRemoteService = null;
currencyRepository = null;
dividendRepository = null;
}
/**
* Fill allOfToList cache need for currencies
*/
public void initCaches() {
// Create the UD cache
if (lastUdByCurrencyCache == null) {
lastUdByCurrencyCache = new SimpleCache<String, Long>(UD_CACHE_TIME_MILLIS) {
@Override
public Long load(final String currency) {
// Retrieve the last UD from the blockchain
final Long lastUD = blockchainRemoteService.getLastDividend(currency);
// Update currency
if (lastUD != null) {
currencyRepository.findById(currency)
.filter(currencyEntity -> !ObjectUtils.equals(currencyEntity.getDividend(), lastUD))
.ifPresent(currencyEntity -> {
currencyEntity.setDividend(lastUD);
currencyRepository.save(currencyEntity);
});
}
return lastUD;
}
};
}
}
/**
* Return the value of the last universal dividend
* @param currency
* @return
*/
public Optional<Long> findLastDividendByCurrency(String currency) {
return Optional.ofNullable(lastUdByCurrencyCache.get(currency));
}
/**
* Return a map of UD (key=blockNumber, value=amount)
* @return
*/
public Map<Integer, Long> refreshAndGetDividends(String currency, long lastSyncBlockNumber) {
// Retrieve new UDs from blockchain
Map<Integer, Long> newUDs = blockchainRemoteService.getUDs(currency, lastSyncBlockNumber + 1);
// If any, insert new into DB
if (MapUtils.isNotEmpty(newUDs)) {
List<Dividend> dividends = newUDs.entrySet().stream()
.map(e -> Dividend.builder()
.number(e.getKey())
.dividend(e.getValue())
.build()).collect(Collectors.toList());
dividendRepository.saveAll(dividends);
// Update currency's last UD
Long lastDividend = dividends.stream().max(Comparator.comparing(Dividend::getNumber)).get().getDividend();
updateLastDividendByCurrency(currency, lastDividend);
}
// Return the full list
return findAllDividendsByCurrency(currency);
}
/**
* Return a map of UD (key=blockNumber, value=amount)
* @return
*/
public Map<Integer, Long> findAllDividendsByCurrency(String currency) {
return Beans.getStream(dividendRepository.findAllByCurrency(currency))
.collect(Collectors.toMap(Dividend::getNumber, Dividend::getDividend));
}
@Override
public void updateLastDividendByCurrency(String currency, Long dividend) {
currencyRepository.findById(currency)
.filter(currencyEntity -> !ObjectUtils.equals(currencyEntity.getDividend(), dividend))
.ifPresent(currencyEntity -> {
currencyEntity.setDividend(dividend);
currencyRepository.save(currencyEntity);
});
// Update UD cache
lastUdByCurrencyCache.put(currency, dividend);
}
}
package org.duniter.core.client.service.local;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.duniter.core.beans.Service;
import org.duniter.core.client.model.local.Peer;
import java.io.Closeable;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.function.Predicate;
/**
* Created by blavenie on 20/03/17.
*/
public interface NetworkService extends Service {
interface PeersChangeListener {
void onChanges(Collection<Peer> peers);
}
interface RefreshPeerListener {
void onRefresh(Peer peer);
}
class Sort {
public SortType sortType;
public boolean sortAsc;
}
class Filter {
public FilterType filterType;
public Peer.PeerStatus filterStatus;
public Boolean filterSsl;
public List<String> filterEndpoints;
public String currency;
public Integer minBlockNumber;
}
enum SortType {
UID,
PUBKEY,
API,
HARDSHIP,
BLOCK_NUMBER
}
enum FilterType {
MEMBER, // Only members peers
MIRROR // Only mirror peers
}
List<Peer> getPeers(Peer mainPeer);
List<Peer> getPeers(Peer mainPeer, Filter filter, Sort sort);
List<Peer> getPeers(Peer mainPeer, Filter filter, Sort sort, ExecutorService pool);
CompletableFuture<List<Peer>> getPeersAsync(Peer mainPeer, List<String> filterEndpoints, ExecutorService pool) throws ExecutionException, InterruptedException;
List<Peer> fillPeerStatsConsensus(final List<Peer> peers);
Predicate<Peer> peerFilter(Filter filter);
Comparator<Peer> peerComparator(Sort sort);
Closeable addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener);
Closeable addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener,
final Filter filter, final Sort sort, final boolean autoreconnect,
final ExecutorService executor);
CompletableFuture<List<Peer>> refreshPeersAsync(final Peer mainPeer, final List<Peer> peers, final ExecutorService pool);
String getVersion(final Peer peer);
}
package org.duniter.core.client.service.local;
/*
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.duniter.core.client.config.Configuration;
import org.duniter.core.client.model.bma.*;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.client.model.local.Peers;
import org.duniter.core.client.service.ServiceLocator;
import org.duniter.core.client.service.bma.BaseRemoteServiceImpl;
import org.duniter.core.client.service.bma.BlockchainRemoteService;
import org.duniter.core.client.service.bma.NetworkRemoteService;
import org.duniter.core.client.service.bma.WotRemoteService;
import org.duniter.core.client.service.exception.HttpConnectException;
import org.duniter.core.client.service.exception.HttpNotFoundException;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.*;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.concurrent.CompletableFutures;
import org.duniter.core.util.http.DnsUtils;
import org.duniter.core.util.http.InetAddressUtils;
import org.duniter.core.util.websocket.WebsocketClientEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Created by blavenie on 20/03/17.
*/
public class NetworkServiceImpl extends BaseRemoteServiceImpl implements NetworkService {
private static final Logger log = LoggerFactory.getLogger(NetworkServiceImpl.class);
private static final String PEERS_UPDATE_LOCK_NAME = "Peers update";
private final static String BMA_URL_STATUS = "/node/summary";
private final static String BMA_URL_BLOCKCHAIN_CURRENT = "/blockchain/current";
private final static String BMA_URL_BLOCKCHAIN_HARDSHIP = "/blockchain/hardship/";
private NetworkRemoteService networkRemoteService;
private WotRemoteService wotRemoteService;
private BlockchainRemoteService blockchainRemoteService;
private Configuration config;
private final LockManager lockManager = new LockManager(4, 10);
private PeerService peerService;
private List<RefreshPeerListener> refreshPeerListeners = Lists.newArrayList();
public NetworkServiceImpl() {
}
public NetworkServiceImpl(NetworkRemoteService networkRemoteService,
WotRemoteService wotRemoteService,
BlockchainRemoteService blockchainRemoteService,
PeerService peerService) {
this();
this.networkRemoteService = networkRemoteService;
this.wotRemoteService = wotRemoteService;
this.blockchainRemoteService = blockchainRemoteService;
this.peerService = peerService;
}
@Override
public void afterPropertiesSet() {
super.afterPropertiesSet();
this.networkRemoteService = ServiceLocator.instance().getNetworkRemoteService();
this.wotRemoteService = ServiceLocator.instance().getWotRemoteService();
this.blockchainRemoteService = ServiceLocator.instance().getBlockchainRemoteService();
this.config = Configuration.instance();
this.peerService = ServiceLocator.instance().getPeerService();
}
@Override
public List<Peer> getPeers(Peer firstPeer) {
BlockchainBlock current = blockchainRemoteService.getCurrentBlock(firstPeer);
// Default filter
Filter filterDef = new Filter();
filterDef.filterType = null;
filterDef.filterStatus = Peer.PeerStatus.UP;
filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.label(),
EndpointApi.BMAS.label(),
EndpointApi.WS2P.label(),
EndpointApi.GVA.label(),
EndpointApi.GVASUB.label());
filterDef.minBlockNumber = current.getNumber() - 100;
return getPeers(firstPeer, filterDef, null);
}
@Override
public List<Peer> getPeers(final Peer mainPeer, Filter filter, Sort sort) {
return getPeers(mainPeer, filter, sort, null);
}
@Override
public List<Peer> getPeers(final Peer mainPeer, Filter filter, Sort sort, ExecutorService executor) {
try {
return getPeersAsync(mainPeer, (filter != null ? filter.filterEndpoints : null), executor)
.thenApplyAsync(this::fillPeerStatsConsensus)
.thenApplyAsync(peers -> peers.stream()
// Filter on currency
.filter(peer -> mainPeer.getCurrency() == null || ObjectUtils.equals(mainPeer.getCurrency(), peer.getCurrency()))
// filter, then sort
.filter(peerFilter(filter))
.sorted(peerComparator(sort))
.collect(Collectors.toList()))
.thenApplyAsync(this::logPeers)
.get();
} catch (InterruptedException | ExecutionException e) {
throw new TechnicalException("Error while loading peers: " + e.getMessage(), e);
}
}
@Override
public Predicate<Peer> peerFilter(final Filter filter) {
return peer -> applyPeerFilter(peer, filter);
}
@Override
public Comparator<Peer> peerComparator(final Sort sort) {
return Comparator.comparing(peer -> computePeerStatsScore(peer, sort), Comparator.reverseOrder());
}
@Override
public CompletableFuture<List<Peer>> getPeersAsync(final Peer mainPeer, List<String> filterEndpoints, ExecutorService executor) throws ExecutionException, InterruptedException {
Preconditions.checkNotNull(mainPeer);
log.debug("Loading network peers...");
final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool();
return CompletableFuture.supplyAsync(() -> loadPeerLeafs(mainPeer, filterEndpoints), pool)
.thenApply(peers -> peers.stream()
// Replace by main peer, if same URL
.map(peer -> {
if (mainPeer.getUrl().equals(peer.getUrl())) {
// Update properties
mainPeer.setPubkey(peer.getPubkey());
mainPeer.setHash(peer.getHash());
mainPeer.setCurrency(peer.getCurrency());
mainPeer.setPeering(peer.getPeering());
// reuse instance
return mainPeer;
}
return peer;
})
// Exclude peer on intranet (not routable) addresses
.filter(peer -> DnsUtils.isInternetHostName(peer.getHost()) || InetAddressUtils.isInternetAddress(peer.getHost()))
.collect(Collectors.toList())
)
.thenCompose(peers -> this.refreshPeersAsync(mainPeer, peers, pool));
}
public CompletableFuture<Peer> refreshPeerAsync(final Peer peer,
final Map<String, String> memberUids,
final List<Ws2pHead> ws2pHeads,
final BlockchainDifficulties difficulties,
final ExecutorService pool) {
if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing peer status", peer.toString()));
CompletableFuture<Peer> result;
// WS2P: refresh using heads
if (Peers.hasWs2pEndpoint(peer)) {
result = CompletableFuture.supplyAsync(() -> fillWs2pPeer(peer, memberUids, ws2pHeads, difficulties), pool);
}
// BMA or ES_CORE
else if (Peers.hasBmaEndpoint(peer) || Peers.hasEsCoreEndpoint(peer)) {
result = CompletableFuture.allOf(
CompletableFuture.supplyAsync(() -> fillNodeSummary(peer), pool),
CompletableFuture.supplyAsync(() -> fillCurrentBlock(peer), pool)
)
.thenApply((v) -> peer)
.exceptionally(throwable -> {
peer.getStats().setStatus(Peer.PeerStatus.DOWN);
if(!(throwable instanceof HttpConnectException)) {
Throwable cause = throwable.getCause() != null ? throwable.getCause() : throwable;
peer.getStats().setError(cause.getMessage());
if (log.isDebugEnabled()) {
if (log.isTraceEnabled()) log.debug(String.format("[%s] is DOWN: %s", peer, cause.getMessage()), cause);
else log.debug(String.format("[%s] is DOWN: %s", peer, cause.getMessage()));
}
}
else if (log.isTraceEnabled()) log.debug(String.format("[%s] is DOWN", peer));
return peer;
})
.thenApplyAsync(p -> {
String uid = StringUtils.isNotBlank(p.getPubkey()) ? memberUids.get(p.getPubkey()) : null;
p.getStats().setUid(uid);
if (p.getStats().isReacheable() && Peers.hasBmaEndpoint(p)) {
// Hardship
if (StringUtils.isNotBlank(uid)) {
fillHardship(p);
}
}
return p;
})
.exceptionally(throwable -> {
peer.getStats().setHardshipLevel(0);
return peer;
});
}
// Unknown API: just return the peer
else {
result = CompletableFuture.completedFuture(peer);
}
// No listeners: return result
if (CollectionUtils.isEmpty(refreshPeerListeners)) {
return result;
}
// Executing listeners
return result.thenApplyAsync(p -> CompletableFuture.allOf(
refreshPeerListeners.stream()
.map(l -> CompletableFuture.runAsync(() -> l.onRefresh(peer), pool))
.toArray(CompletableFuture[]::new)
)
.exceptionally(e -> {
if (log.isDebugEnabled()) log.error(String.format("[%s] Refresh peer listeners error: %s", peer, e.getMessage()), e);
else log.error(String.format("[%s] Refresh peer listeners error: %s", peer, e.getMessage()));
return null;
}))
// Return the peer, as result
.thenApply(v -> peer);
}
public Peer fillWs2pPeer(final Peer peer,
final Map<String, String> memberUids,
final List<Ws2pHead> ws2pHeads,
final BlockchainDifficulties difficulties) {
if (log.isDebugEnabled()) log.debug(String.format("[%s] Refreshing WS2P peer status", peer.toString()));
if (StringUtils.isBlank(peer.getPubkey()) || StringUtils.isBlank(peer.getEpId())) return peer;
Ws2pHead ws2pHead = ws2pHeads.stream().filter(head ->
peer.getPubkey().equals(head.getPubkey())
&& peer.getEpId().equals(head.getWs2pid()
)
).findFirst().orElse(null);
Peer.Stats stats = peer.getStats();
if (ws2pHead != null) {
if (ws2pHead.getBlock() != null) {
String[] blockParts = ws2pHead.getBlock().split("-");
if (blockParts.length == 2) {
stats.setBlockNumber(Integer.parseInt(blockParts[0]));
stats.setBlockHash(blockParts[1]);
}
}
stats.setSoftware(ws2pHead.getSoftware());
stats.setVersion(ws2pHead.getSoftwareVersion());
}
else {
stats.setStatus(Peer.PeerStatus.DOWN);
}
// Set uid
String uid = memberUids.get(peer.getPubkey());
stats.setUid(uid);
if (uid != null) {
Integer difficulty = 0;
if (stats.getBlockNumber() == null || (stats.getBlockNumber().intValue()+1 == difficulties.getBlock().intValue())) {
difficulty = Stream.of(difficulties.getLevels())
.filter(d -> uid.equals(d.getUid()))
.map(d -> d.getLevel())
.filter(Objects::nonNull)
.findFirst()
// Could not known hardship, so fill 0 if member (=can compute)
.orElse(new Integer(0));
}
stats.setHardshipLevel(difficulty);
}
else {
stats.setHardshipLevel(null);
}
return peer;
}
public CompletableFuture<List<Peer>> refreshPeersAsync(final Peer mainPeer,final List<Peer> peers, final ExecutorService pool) {
if (CollectionUtils.isEmpty(peers)) return CompletableFuture.completedFuture(ImmutableList.of());
CompletableFuture<Map<String, String>> memberUidsFuture = CompletableFuture.supplyAsync(() -> wotRemoteService.getMembersUids(mainPeer), pool);
CompletableFuture<List<Ws2pHead>> ws2pHeadsFuture = CompletableFuture.supplyAsync(() -> networkRemoteService.getWs2pHeads(mainPeer), pool);
CompletableFuture<BlockchainDifficulties> difficultiesFuture = CompletableFuture.supplyAsync(() -> blockchainRemoteService.getDifficulties(mainPeer), pool);
return CompletableFuture.allOf(memberUidsFuture, ws2pHeadsFuture, difficultiesFuture)
// Refresh all endpoints
.thenApply(v -> {
final Map<String, String> memberUids = memberUidsFuture.join();
final List<Ws2pHead> ws2pHeads = ws2pHeadsFuture.join();
final BlockchainDifficulties difficulties = difficultiesFuture.join();
return peers.stream().map(peer ->
refreshPeerAsync(peer, memberUids, ws2pHeads, difficulties, pool))
.collect(Collectors.toList());
})
.thenCompose(CompletableFutures::allOfToList);
}
public List<Peer> fillPeerStatsConsensus(final List<Peer> peers) {
if (CollectionUtils.isEmpty(peers)) return peers;
final Map<String,Long> peerCountByBuid = peers.stream()
.filter(peer -> Peers.isReacheable(peer) && Peers.hasDuniterEndpoint(peer))
.map(Peers::buid)
.filter(Objects::nonNull)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
// Compute main consensus buid
Optional<Map.Entry<String, Long>> maxPeerCountEntry = peerCountByBuid.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getValue, Comparator.reverseOrder()))
.findFirst();
final String mainBuid = maxPeerCountEntry.isPresent() ? maxPeerCountEntry.get().getKey() : null;
// Compute total of UP peers
final Long peersUpTotal = peerCountByBuid.values().stream().mapToLong(Long::longValue).sum();
// Compute pct by buid
final Map<String, Double> buidsPct = peerCountByBuid.keySet().stream()
.collect(Collectors.toMap(
buid -> buid,
buid -> (peerCountByBuid.get(buid).doubleValue() * 100 / peersUpTotal)));
// Set consensus stats
peers.forEach(peer -> {
Peer.Stats stats = peer.getStats();
String buid = Peers.buid(stats);
// Set consensus stats on each peers
if (buid != null && Peers.hasDuniterEndpoint(peer)) {
boolean isMainConsensus = buid.equals(mainBuid);
stats.setMainConsensus(isMainConsensus);
boolean isForkConsensus = !isMainConsensus && peerCountByBuid.containsKey(buid) && peerCountByBuid.get(buid) > 1;
stats.setForkConsensus(isForkConsensus);
stats.setConsensusPct(isMainConsensus || isForkConsensus ? buidsPct.get(buid) : 0d);
}
});
return peers;
}
public Closeable addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener) {
BlockchainParameters parameters = blockchainRemoteService.getParameters(mainPeer);
fillCurrentBlock(mainPeer);
// Default filter
Filter filterDef = new Filter();
filterDef.filterType = null;
filterDef.filterStatus = Peer.PeerStatus.UP;
filterDef.filterEndpoints = ImmutableList.of(EndpointApi.BASIC_MERKLED_API.label(), EndpointApi.BMAS.label(), EndpointApi.WS2P.label());
filterDef.currency = parameters.getCurrency();
// Skip node on an old fork
if (mainPeer.getStats().getBlockNumber() != null) {
filterDef.minBlockNumber = mainPeer.getStats().getBlockNumber() - 100;
}
// Default sort
Sort sortDef = new Sort();
sortDef.sortType = null;
return addPeersChangeListener(mainPeer, listener, filterDef, sortDef, true, null);
}
public Closeable addPeersChangeListener(final Peer mainPeer, final PeersChangeListener listener,
final Filter filter, final Sort sort, final boolean autoreconnect,
final ExecutorService executor) {
final String currency = filter != null && filter.currency != null ? filter.currency :
blockchainRemoteService.getParameters(mainPeer).getCurrency();
final Set<String> knownBlocks = Sets.newHashSet();
final Predicate<Peer> peerFilter = peerFilter(filter);
final Comparator<Peer> peerComparator = peerComparator(sort);
final ExecutorService pool = (executor != null) ? executor : ForkJoinPool.commonPool();
final int peerUpMaxAgeInMs = config.getPeerUpMaxAge();
// Refreshing one peer (e.g. received from WS)
Consumer<List<Peer>> updateKnownBlocks = (updatedPeers) ->
knownBlocks.addAll(updatedPeers.stream().map(Peers::buid).collect(Collectors.toSet()))
;
// Load all peers
Runnable loadAllPeers = () -> {
try {
if (lockManager.tryLock(PEERS_UPDATE_LOCK_NAME, 1, TimeUnit.MINUTES)) {
try {
long now = System.currentTimeMillis();
List<Peer> result = getPeers(mainPeer, filter, sort, pool);
// Mark old peers as DOWN
long minUpTimeInMs = (System.currentTimeMillis() - peerUpMaxAgeInMs);
knownBlocks.clear();
updateKnownBlocks.accept(result);
// Save update peers
peerService.save(currency, result);
// Set old peers as DOWN (with a delay)
peerService.updatePeersAsDown(currency, minUpTimeInMs, filter.filterEndpoints);
long duration = System.currentTimeMillis() - now;
// If took more than 2 min => warning
if (duration /1000/60 > 2) {
log.warn(String.format("Refreshing peers took %s seconds", Math.round(duration/1000)));
}
// Send full list listener
listener.onChanges(result);
} catch (Exception e) {
log.error("Error while loading all peers: " + e.getMessage(), e);
} finally {
lockManager.unlock(PEERS_UPDATE_LOCK_NAME);
}
}
else {
log.debug("Could not acquire lock for reloading all peers. Skipping.");
}
} catch (InterruptedException e) {
log.warn("Stopping reloading all peers: " + e.getMessage());
}
};
// Refreshing one peer (e.g. received from WS)
Consumer<NetworkPeers.Peer> refreshPeerConsumer = (bmaPeer) -> {
if (lockManager.tryLock(PEERS_UPDATE_LOCK_NAME)) {
try {
final List<Peer> newPeers = new ArrayList<>();
addEndpointsAsPeers(bmaPeer, newPeers, null, filter.filterEndpoints);
refreshPeersAsync(mainPeer, newPeers, executor)
.thenAccept(refreshedPeers -> {
if (CollectionUtils.isEmpty(refreshedPeers)) return;
// Get the full list
final Map<String, Peer> knownPeers = peerService.getPeersByCurrencyId(currency)
.stream()
.filter(peerFilter)
.collect(Collectors.toMap(Peer::toString, Function.identity()));
// filter, to keep only existing peer, or expected by filter
List<Peer> changedPeers = refreshedPeers.stream()
.filter(refreshedPeer -> {
String peerId = refreshedPeer.toString();
boolean exists = knownPeers.containsKey(peerId);
if (exists){
knownPeers.remove(peerId);
}
// If include, add it to full list
boolean include = peerFilter.test(refreshedPeer);
if (include) {
knownPeers.put(peerId, refreshedPeer);
}
return include;
}).collect(Collectors.toList());
// If something changes
if (CollectionUtils.isNotEmpty(changedPeers)) {
List<Peer> result = Lists.newArrayList(knownPeers.values());
fillPeerStatsConsensus(result);
result.sort(peerComparator);
updateKnownBlocks.accept(changedPeers);
// Save updated peers
peerService.save(currency, changedPeers);
listener.onChanges(result);
}
});
} catch (Exception e) {
log.error("Error while refreshing a peer: " + e.getMessage(), e);
} finally {
lockManager.unlock(PEERS_UPDATE_LOCK_NAME);
}
}
};
// Manage new block event
WebsocketClientEndpoint.MessageListener blockListener = json -> {
log.debug("Received new block event");
try {
BlockchainBlock block = readValue(json, BlockchainBlock.class);
String blockBuid = BlockchainBlocks.buid(block);
boolean isNewBlock = (blockBuid != null && !knownBlocks.contains(blockBuid));
// If new block + wait 3s for network propagation
if (isNewBlock) {
schedule(loadAllPeers, pool, 3000/*waiting 3s, for block propagation*/);
}
} catch(IOException e) {
log.error("Could not parse peer received by WS: " + e.getMessage(), e);
}
};
WebsocketClientEndpoint wsBlockEndpoint = blockchainRemoteService.addBlockListener(mainPeer, blockListener, autoreconnect);
// Manage new peer event
WebsocketClientEndpoint.MessageListener peerListsner = json -> {
log.debug("Received new peer event");
try {
final NetworkPeers.Peer bmaPeer = readValue(json, NetworkPeers.Peer.class);
if (!lockManager.isLocked(PEERS_UPDATE_LOCK_NAME)) {
pool.submit(() -> refreshPeerConsumer.accept(bmaPeer));
}
} catch(IOException e) {
log.error("Could not parse peer received by WS: " + e.getMessage(), e);
}
};
WebsocketClientEndpoint wsPeerEndpoint = networkRemoteService.addPeerListener(mainPeer, peerListsner, autoreconnect);
// Default action: Load all peers
pool.submit(loadAllPeers);
// Return the tear down logic
return () -> {
wsBlockEndpoint.unregisterListener(blockListener);
wsPeerEndpoint.unregisterListener(peerListsner);
};
}
public String getVersion(final Peer peer) {
return getVersion(getNodeSummary(peer));
}
public JsonNode getNodeSummary(final Peer peer) {
return get(peer, BMA_URL_STATUS);
}
public String getVersion(JsonNode json) {
json = json.get("duniter");
if (json.isMissingNode()) throw new TechnicalException(String.format("Invalid format of [%s] response", BMA_URL_STATUS));
json = json.get("version");
if (json.isMissingNode()) throw new TechnicalException(String.format("No version attribute found in [%s] response", BMA_URL_STATUS));
return json.asText();
}
public String getSoftware(JsonNode json) {
json = json.get("duniter");
if (json.isMissingNode()) throw new TechnicalException(String.format("Invalid format of [%s] response", BMA_URL_STATUS));
json = json.get("software");
if (json.isMissingNode()) throw new TechnicalException(String.format("No software attribute found in [%s] response", BMA_URL_STATUS));
return json.asText();
}
public NetworkService addRefreshPeerListener(RefreshPeerListener listener) {
refreshPeerListeners.add(listener);
return this;
}
public NetworkService removeRefreshPeerListener(RefreshPeerListener listener) {
refreshPeerListeners.remove(listener);
return this;
}
/* -- protected methods -- */
protected List<Peer> loadPeerLeafs(Peer peer, List<String> filterEndpoints) {
List<String> leaves = networkRemoteService.getPeersLeaves(peer);
if (CollectionUtils.isEmpty(leaves)) return ImmutableList.of();
CryptoService cryptoService = ServiceLocator.instance().getCryptoService();
// If less than 100 node, get it in ONE call
if (leaves.size() <= 2000) {
List<Peer> peers = networkRemoteService.getPeers(peer);
if (CollectionUtils.isEmpty(peers)) return ImmutableList.of();
return peers.stream()
// Filter on endpoints - fix #18
.filter(peerEp -> CollectionUtils.isEmpty(filterEndpoints)
|| StringUtils.isBlank(peerEp.getApi())
|| filterEndpoints.contains(peerEp.getApi()))
// Compute the hash
.map(peerEp -> {
String hash = Peers.computeHash(peerEp, cryptoService);
peerEp.setHash(hash);
return peerEp;
}).collect(Collectors.toList());
}
// Get it by multiple call on /network/peering?leaf=
List<Peer> result = Lists.newArrayList();
int offset = 0;
int count = Constants.Config.MAX_SAME_REQUEST_COUNT;
while (offset < leaves.size()) {
if (offset + count > leaves.size()) count = leaves.size() - offset;
loadPeerLeafs(peer, result, leaves, offset, count, filterEndpoints);
offset += count;
try {
Thread.sleep(1000); // wait 1 s
} catch (InterruptedException e) {
// stop
offset = leaves.size();
}
}
return result;
}
protected void loadPeerLeafs(Peer requestedPeer, List<Peer> result, List<String> leaves, int offset, int count, List<String> filterEndpoints) {
for (int i = offset; i< offset + count; i++) {
String leaf = leaves.get(i);
try {
NetworkPeers.Peer peer = networkRemoteService.getPeerLeaf(requestedPeer, leaf);
addEndpointsAsPeers(peer, result, leaf, filterEndpoints);
} catch(HttpNotFoundException hnfe) {
log.debug("Peer not found for leaf=" + leaf);
// skip
} catch(TechnicalException e) {
log.warn("Error while getting peer leaf=" + leaf, e.getMessage());
// skip
}
}
}
protected void addEndpointsAsPeers(NetworkPeers.Peer peer, List<Peer> result, String hash, List<String> filterEndpoints) {
if (CollectionUtils.isNotEmpty(peer.getEndpoints())) {
for (NetworkPeering.Endpoint ep: peer.getEndpoints()) {
if (ep != null && ep.getApi() != null) {
Peer peerEp = Peer.builder()
.currency(peer.getCurrency())
.hash(hash)
.pubkey(peer.getPubkey())
.endpoint(ep)
.build();
// Filter on endpoints - fix #18
if (CollectionUtils.isEmpty(filterEndpoints)
|| StringUtils.isBlank(peerEp.getApi())
|| filterEndpoints.contains(peerEp.getApi())) {
result.add(peerEp);
}
}
}
}
}
protected boolean applyPeerFilter(Peer peer, Filter filter) {
if (filter == null) return true;
Peer.Stats stats = peer.getStats();
// Filter member or mirror
if (filter.filterType != null && (
(filter.filterType == FilterType.MEMBER && StringUtils.isBlank(stats.getUid()))
|| (filter.filterType == FilterType.MIRROR && StringUtils.isNotBlank(stats.getUid()))
)) {
return false;
}
// Filter on endpoints
if (CollectionUtils.isNotEmpty(filter.filterEndpoints)
&& (StringUtils.isBlank(peer.getApi())
|| !filter.filterEndpoints.contains(peer.getApi()))) {
return false;
}
// Filter on status
if (filter.filterStatus != null && filter.filterStatus != stats.getStatus()) {
return false;
}
// Filter on SSL
if (filter.filterSsl != null && filter.filterSsl != peer.isUseSsl()) {
return false;
}
// Filter block number
if (filter.minBlockNumber != null && (stats.getBlockNumber() == null || stats.getBlockNumber() < filter.minBlockNumber)) {
return false;
}
return true;
}
protected Peer fillNodeSummary(final Peer peer) {
// Skip if no BMA, BMAS or ES_CORE_API
if (!Peers.hasBmaEndpoint(peer) && !Peers.hasEsCoreEndpoint(peer)) return peer;
JsonNode summary = getNodeSummary(peer);
peer.getStats().setVersion(getVersion(summary));
peer.getStats().setSoftware(getSoftware(summary));
return peer;
}
protected Peer fillCurrentBlock(final Peer peer) {
// Skip if no BMA, BMAS or ES_CORE_API
if (!Peers.hasBmaEndpoint(peer) && !Peers.hasEsCoreEndpoint(peer)) return peer;
JsonNode json = get(peer, BMA_URL_BLOCKCHAIN_CURRENT);
String currency = json.has("currency") ? json.get("currency").asText() : null;
peer.setCurrency(currency);
Integer number = json.has("number") ? json.get("number").asInt() : null;
peer.getStats().setBlockNumber(number);
String hash = json.has("hash") ? json.get("hash").asText() : null;
peer.getStats().setBlockHash(hash);
Long medianTime = json.has("medianTime") ? json.get("medianTime").asLong() : null;
peer.getStats().setMedianTime(medianTime);
if (log.isTraceEnabled()) {
log.trace(String.format("[%s] current block [%s-%s]", peer, number, hash));
}
return peer;
}
protected Peer fillHardship(final Peer peer) {
if (StringUtils.isBlank(peer.getPubkey())) return peer;
JsonNode json = get(peer, BMA_URL_BLOCKCHAIN_HARDSHIP + peer.getPubkey());
Integer level = json.has("level") ? json.get("level").asInt() : null;
peer.getStats().setHardshipLevel(level);
return peer;
}
protected JsonNode get(final Peer peer, String path) {
return executeRequest(peer, path, JsonNode.class);
}
/**
* Log allOfToList peers found
*/
protected List<Peer> logPeers(final List<Peer> peers) {
if (!log.isDebugEnabled()) return peers;
if (CollectionUtils.isEmpty(peers)) {
log.debug("No peers found.");
}
else {
log.debug(String.format("Found %s peers", peers.size()));
if (log.isTraceEnabled()) {
peers.forEach(peerFound -> {
if (peerFound.getStats().getStatus() == Peer.PeerStatus.DOWN) {
String error = peerFound.getStats().getError();
log.trace(String.format(" [%s] status is %s %s",
peerFound.toString(),
Peer.PeerStatus.DOWN.name(),
error != null ? (":" + error) : ""));
} else {
log.trace(String.format(" [%s] status %s: [v%s] block [%s]", peerFound.toString(),
peerFound.getStats().getStatus().name(),
peerFound.getStats().getVersion(),
peerFound.getStats().getBlockNumber()
));
}
});
}
}
return peers;
}
protected double computePeerStatsScore(Peer peer, Sort sort) {
double score = 0;
Peer.Stats stats = peer.getStats();
if (sort != null && sort.sortType != null) {
long specScore = 0;
specScore += (sort.sortType == SortType.UID ? computeScoreAlphaValue(stats.getUid(), 3, sort.sortAsc) : 0);
specScore += (sort.sortType == SortType.PUBKEY ? computeScoreAlphaValue(peer.getPubkey(), 3, sort.sortAsc) : 0);
specScore += (sort.sortType == SortType.API ?
(peer.isUseSsl() ? (sort.sortAsc ? 1 : -1) :
(Peers.hasEndPointAPI(peer, EndpointApi.ES_USER_API) ? (sort.sortAsc ? 0.5 : -0.5) : 0)) : 0);
specScore += (sort.sortType == SortType.HARDSHIP ? (stats.getHardshipLevel() != null ? (sort.sortAsc ? (10000-stats.getHardshipLevel()) : stats.getHardshipLevel()): 0) : 0);
specScore += (sort.sortType == SortType.BLOCK_NUMBER ? (stats.getBlockNumber() != null ? (sort.sortAsc ? (1000000000 - stats.getBlockNumber()) : stats.getBlockNumber()) : 0) : 0);
score += (10000000000L * specScore);
}
score += (1000000000 * (stats.getStatus() == Peer.PeerStatus.UP ? 1 : 0));
score += (100000000 * (stats.isMainConsensus() ? 1 : 0));
score += (1000000 * (stats.isForkConsensus() ? stats.getConsensusPct() : 0));
score += (100 * (stats.getHardshipLevel() != null ? (10000-stats.getHardshipLevel()) : 0));
score += /* 1 */(peer.getPubkey() != null ? computeScoreAlphaValue(peer.getPubkey(), 2, true) : 0);
return score;
}
protected int computeScoreAlphaValue(String value, int nbChars, boolean asc) {
if (StringUtils.isBlank(value)) return 0;
int score = 0;
value = value.toLowerCase();
if (nbChars > value.length()) {
nbChars = value.length();
}
score += (int)value.charAt(0);
for (int i=1; i < nbChars; i++) {
score += Math.pow(0.001, i) * value.charAt(i);
}
return asc ? (1000 - score) : score;
}
protected void schedule(Runnable command, ExecutorService pool, long delayInMs) {
if (pool instanceof ScheduledExecutorService) {
((ScheduledExecutorService)pool).schedule(command, delayInMs, TimeUnit.MILLISECONDS);
}
else if (delayInMs <= 0) {
pool.submit(command);
}
else {
pool.submit(() -> {
try {
Thread.sleep(delayInMs);
command.run();
} catch (InterruptedException e) {
}
});
}
}
}
......@@ -25,6 +25,7 @@ package org.duniter.core.client.service.local;
import org.duniter.core.beans.Service;
import org.duniter.core.client.model.local.Peer;
import java.util.Collection;
import java.util.List;
/**
......@@ -34,26 +35,32 @@ public interface PeerService extends Service {
Peer save(final Peer peer);
Peer getPeerById(long peerId);
/**
* Return a (cached) active peer, by currency id
* @param currencyId
* @return
*/
Peer getActivePeerByCurrencyId(long currencyId);
Peer getActivePeerByCurrency(String currency);
/**
* Return a (cached) peer list, by currency id
* Save the active (default) peer, for a given currency id
* @param currencyId
* @return
* @param peer
*/
List<Peer> getPeersByCurrencyId(long currencyId);
void setCurrencyMainPeer(String currency, Peer peer);
/**
* Fill all cache need for currencies
* @param context
* @param accountId
* Return a (cached) peer list, by currency id
* @param currencyId
* @return
*/
void loadCache(long accountId);
List<Peer> getPeersByCurrencyId(String currency);
void save(String currencyId, List<Peer> peers);
void updatePeersAsDown(String currency, Collection<String> filterApis);
void updatePeersAsDown(String currency, long minUpTimeInMs, Collection<String> filterApis);
boolean existsByCurrencyAndId(String currency, String id);
}
......@@ -23,77 +23,86 @@ package org.duniter.core.client.service.local;
*/
import org.duniter.core.beans.InitializingBean;
import org.duniter.core.client.dao.PeerDao;
import org.duniter.core.client.config.Configuration;
import org.duniter.core.client.repositories.PeerRepository;
import org.duniter.core.client.model.local.Currency;
import org.duniter.core.client.model.local.Peer;
import org.duniter.core.client.model.local.Peers;
import org.duniter.core.client.service.ServiceLocator;
import org.duniter.core.exception.TechnicalException;
import org.duniter.core.service.CryptoService;
import org.duniter.core.util.CollectionUtils;
import org.duniter.core.util.ObjectUtils;
import org.duniter.core.util.Preconditions;
import org.duniter.core.util.StringUtils;
import org.duniter.core.util.cache.Cache;
import org.duniter.core.util.cache.SimpleCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
/**
* Created by eis on 07/02/15.
*/
public class PeerServiceImpl implements PeerService, InitializingBean {
private Cache<Long, List<Peer>> peersByCurrencyIdCache;
private Cache<Long, Peer> activePeerByCurrencyIdCache;
private static final Logger log = LoggerFactory.getLogger(PeerServiceImpl.class);
private Cache<String, List<Peer>> peersByCurrencyIdCache;
private Cache<String, Peer> activePeerByCurrencyIdCache;
private CurrencyService currencyService;
private PeerDao peerDao;
private CryptoService cryptoService;
private PeerRepository peerRepository;
private Configuration config;
public PeerServiceImpl() {
super();
}
@Override
public void afterPropertiesSet() throws Exception {
currencyService = ServiceLocator.instance().getCurrencyService();
peerDao = ServiceLocator.instance().getBean(PeerDao.class);
this.currencyService = ServiceLocator.instance().getCurrencyService();
this.peerRepository = ServiceLocator.instance().getBean(PeerRepository.class);
this.config = Configuration.instance();
this.cryptoService = ServiceLocator.instance().getCryptoService();
this.activePeerByCurrencyIdCache = new SimpleCache<String, Peer>() {
@Override
public Peer load(String currencyId) {
return loadDefaultPeer(currencyId);
}
};
}
@Override
public void close() throws IOException {
currencyService = null;
peerDao = null;
peerRepository = null;
peersByCurrencyIdCache = null;
activePeerByCurrencyIdCache = null;
cryptoService = null;
}
public Peer save(final Peer peer) {
ObjectUtils.checkNotNull(peer);
ObjectUtils.checkNotNull(peer.getCurrencyId());
ObjectUtils.checkArgument(StringUtils.isNotBlank(peer.getHost()));
ObjectUtils.checkArgument(peer.getPort() >= 0);
Peer result;
// Create
if (peer.getId() == null) {
result = peerDao.create(peer);
}
Preconditions.checkNotNull(peer);
Preconditions.checkNotNull(peer.getCurrency());
Preconditions.checkArgument(StringUtils.isNotBlank(peer.getHost()));
Preconditions.checkArgument(peer.getPort() >= 0);
// or update
else {
peerDao.update(peer);
result = peer;
}
peer.setHash(Peers.computeHash(peer, cryptoService));
Peer result = peerRepository.save(peer);
// update cache (if already loaded)
if (peersByCurrencyIdCache != null) {
List<Peer> peers = peersByCurrencyIdCache.get(peer.getCurrencyId());
List<Peer> peers = peersByCurrencyIdCache.get(peer.getCurrency());
if (peers == null) {
peers = new ArrayList<Peer>();
peersByCurrencyIdCache.put(peer.getCurrencyId(), peers);
peers = new ArrayList<>();
peersByCurrencyIdCache.put(peer.getCurrency(), peers);
peers.add(peer);
}
else if (!peers.contains(peer)) {
} else if (!peers.contains(peer)) {
peers.add(peer);
}
}
......@@ -101,45 +110,28 @@ public class PeerServiceImpl implements PeerService, InitializingBean {
return result;
}
public Peer getPeerById(long peerId) {
return peerDao.getById(peerId);
}
/**
* Return a (cached) active peer, by currency id
*
* @param currencyId
* @return
*/
public Peer getActivePeerByCurrencyId(long currencyId) {
// Check if cache as been loaded
if (activePeerByCurrencyIdCache == null) {
activePeerByCurrencyIdCache = new SimpleCache<Long, Peer>() {
@Override
public Peer load(Long currencyId) {
List<Peer> peers = peerDao.getPeersByCurrencyId(currencyId);
if (CollectionUtils.isEmpty(peers)) {
String currencyName = currencyService.getCurrencyNameById(currencyId);
throw new TechnicalException(String.format(
"No peers configure for currency [%s]",
currencyName != null ? currencyName : currencyId));
}
return peers.get(0);
}
};
}
public Peer getActivePeerByCurrency(String currencyId) {
return activePeerByCurrencyIdCache.get(currencyId);
}
@Override
public void setCurrencyMainPeer(String currencyId, Peer peer) {
activePeerByCurrencyIdCache.put(currencyId, peer);
}
/**
* Return a (cached) peer list, by currency id
*
* @param currencyId
* @return
*/
public List<Peer> getPeersByCurrencyId(long currencyId) {
public List<Peer> getPeersByCurrencyId(String currencyId) {
// Check if cache as been loaded
if (peersByCurrencyIdCache == null) {
throw new TechnicalException("Cache not initialize. Please call loadCache() before getPeersByCurrencyId().");
......@@ -149,24 +141,22 @@ public class PeerServiceImpl implements PeerService, InitializingBean {
}
/**
* Fill all cache need for currencies
* @param accountId
* Fill allOfToList cache need for currencies
*
*/
public void loadCache(long accountId) {
public void loadCache() {
if (peersByCurrencyIdCache != null) {
return;
}
peersByCurrencyIdCache = new SimpleCache<Long, List<Peer>>() {
peersByCurrencyIdCache = new SimpleCache<String, List<Peer>>() {
@Override
public List<Peer> load(Long currencyId) {
return peerDao.getPeersByCurrencyId(currencyId);
public List<Peer> load(String currencyId) {
return peerRepository.getPeersByCurrencyId(currencyId);
}
};
List<Currency> currencies = ServiceLocator.instance().getCurrencyService().getCurrencies(accountId);
for (Currency currency: currencies) {
for (Currency currency: currencyService.findAll()) {
// Get peers from DB
List<Peer> peers = getPeersByCurrencyId(currency.getId());
......@@ -177,4 +167,72 @@ public class PeerServiceImpl implements PeerService, InitializingBean {
}
}
@Override
public void save(String currencyId, List<Peer> peers) {
if (CollectionUtils.isNotEmpty(peers)) {
if (log.isDebugEnabled()) {
log.debug(String.format("[%s] Updating peers (%s endpoints found)", currencyId, peers.size()));
}
// FIXME: Duniter 1.7 return lastUpTime in ms. Check if this a bug or not
final long upTime = System.currentTimeMillis() / 1000;
peers.forEach(peer -> {
// On each UP peers: set last UP time
if (peer.getStats() != null && peer.getStats().isReacheable()) {
peer.getStats().setLastUpTime(upTime);
peer.getStats().setFirstDownTime(null);
}
// Save
save(peer);
});
}
}
@Override
public boolean existsByCurrencyAndId(String currency, String id) {
return peerRepository.existsByCurrencyAndId(currency, id);
}
@Override
public void updatePeersAsDown(String currencyId, Collection<String> filterApis) {
int peerDownTimeoutMs = config.getPeerUpMaxAge();
// Mark old peers as DOWN
if (peerDownTimeoutMs >0) {
long maxUpTimeInMs = System.currentTimeMillis() - peerDownTimeoutMs;
updatePeersAsDown(currencyId, maxUpTimeInMs, filterApis);
}
}
@Override
public void updatePeersAsDown(String currencyId, long minUpTimeInMs, Collection<String> filterApis) {
if (log.isDebugEnabled()) {
log.debug(String.format("[%s] %s Setting peers as DOWN, if older than [%s]...", currencyId, filterApis, new Date(minUpTimeInMs *1000)));
}
peerRepository.updatePeersAsDown(currencyId, minUpTimeInMs, filterApis);
}
protected Peer loadDefaultPeer(String currencyId) {
List<Peer> peers = peerRepository.getPeersByCurrencyId(currencyId);
if (CollectionUtils.isEmpty(peers)) {
throw new TechnicalException(String.format(
"No peers configure for currency [%s]",
currencyId));
}
Peer defaultPeer = peers.stream()
.filter(peer -> peer.getStats() == null || peer.getStats().getStatus() == null || peer.getStats().getStatus() == Peer.PeerStatus.UP)
.findFirst().orElse(null);
if (defaultPeer != null) {
// Make sure currency is filled
defaultPeer.setCurrency(currencyId);
}
else {
log.warn(String.format("[%s] No default peer found. Unable to send remote request.", currencyId));
}
return defaultPeer;
}
}
package org.duniter.core.client.model.bma.gson;
package org.duniter.core.client.util;
/*
/*-
* #%L
* UCoin Java Client :: Core API
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2015 EIS
* Copyright (C) 2014 - 2021 Duniter Team
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
......@@ -16,48 +16,52 @@ package org.duniter.core.client.model.bma.gson;
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.google.gson.*;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.apache.commons.lang3.StringUtils;
import org.duniter.core.exception.TechnicalException;
import java.lang.reflect.Type;
public class KnownBlocks {
public class RevokedTypeAdapter implements JsonDeserializer<BlockchainBlock.Revoked>, JsonSerializer<BlockchainBlock.Revoked>{
@Override
public BlockchainBlock.Revoked deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
String identityStr = json.getAsString();
if (StringUtils.isBlank(identityStr)) {
return null;
}
String[] identityParts = identityStr.split(":");
if (identityParts.length != 4) {
throw new JsonParseException(String.format("Bad format for BlockchainBlock.Revoked. Should have 4 parts, but found %s.", identityParts.length));
}
private KnownBlocks() {
// helper class
}
BlockchainBlock.Revoked result = new BlockchainBlock.Revoked();
int i = 0;
result.setSignature(identityParts[i++]);
result.setUserId(identityParts[i++]);
public static BlockchainBlock getFirstBlock(String currencyId) {
return result;
}
BlockchainBlock result = new BlockchainBlock();
result.setNumber(0);
@Override
public JsonElement serialize(BlockchainBlock.Revoked input, Type type, JsonSerializationContext context) {
String result = new StringBuilder()
.append(input.getSignature()).append(":")
.append(input.getUserId()).toString();
// G1 currency
switch (currencyId) {
return context.serialize(result.toString(), String.class);
case KnownCurrencies.G1 :
result.setCurrency("g1");
result.setHash("000003D02B95D3296A4F06DBAC51775C4336A4DC09D0E958DC40033BE7E20F3D");
result.setTime(1488987127L);
result.setMedianTime(1488987127L);
result.setIssuer("2ny7YAdmzReQxAayyJZsyVYwYhVyax2thKcGknmQy5nQ");
result.setSignature("49OD/8pj0bU0Lg6HB4p+5TOcRbgtj8Ubxmhen4IbOXM+g33V/I56GfF+QbD9U138Ek04E9o0lSjaDIVI/BrkCw==");
break;
case KnownCurrencies.G1_TEST :
result.setCurrency("g1-test");
result.setHash("0000DEFA598EA82BC8FF19BC56B49A686E63617DCC7304FAF7F0461FA34E0F9C");
result.setTime(1496842431L);
result.setMedianTime(1496842431L);
result.setIssuer("3dnbnYY9i2bHMQUGyFp5GVvJ2wBkVpus31cDJA5cfRpj");
result.setSignature("OQQJ8TVISMgpz8SmdVGHYAUQMDnHpXqeFal4+/q2hV37uyrpC8iF6d50Wgg2TMKhsB/9zelOXZgbuzutAOZ5AA==");
break;
default:
throw new TechnicalException(String.format("First block for currency %s not defined !", currencyId));
}
return result;
}
}
package org.duniter.elasticsearch;
package org.duniter.core.client.util;
/*
/*-
* #%L
* UCoin Java Client :: ElasticSearch Indexer
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2016 EIS
* Copyright (C) 2014 - 2021 Duniter Team
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
......@@ -16,13 +16,16 @@ package org.duniter.elasticsearch;
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
public interface KnownCurrencies {
String G1 = "g1";
public class TestFixtures extends org.duniter.core.test.TestFixtures {
String G1_TEST = "g1-test";
}
package org.duniter.core.client.util.http;
/*-
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2021 Duniter Team
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpRequest;
import org.apache.http.client.HttpClient;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.config.CookieSpecs;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.duniter.core.client.config.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.NoRouteToHostException;
import java.net.UnknownHostException;
public abstract class HttpClients {
private static final Logger log = LoggerFactory.getLogger(HttpClients.class);
private static ThreadLocal<HttpClientConnectionManager> connectionManagerMapper = new ThreadLocal<HttpClientConnectionManager>() {
@Override
public HttpClientConnectionManager initialValue() {
if (log.isDebugEnabled()) log.debug("[HttpClients] Creating new HttpClientConnectionManager, for thread [%s]", Thread.currentThread().getId());
Configuration config = Configuration.instance();
return createConnectionManager(
config.getNetworkMaxTotalConnections(),
config.getNetworkMaxConnectionsPerRoute(),
config.getNetworkTimeout());
}
};
private static ThreadLocal<HttpClient> httpClientsMapper = new ThreadLocal<HttpClient>() {
@Override
public HttpClient initialValue() {
HttpClientConnectionManager connectionManager= connectionManagerMapper.get();
if (log.isDebugEnabled()) log.debug("[HttpClients] Creating new HttpClient, for thread [%s]", Thread.currentThread().getId());
return createHttpClient(connectionManager, 0);
}
@Override
public void remove() {
super.remove();
}
};
public static HttpClient getThreadHttpClient(final Integer timeout) {
if (timeout <= 0) return getThreadHttpClient();
final HttpClientConnectionManager connectionManager = connectionManagerMapper.get();
return createHttpClient(connectionManager, timeout);
}
public static HttpClient getThreadHttpClient() {
return httpClientsMapper.get();
}
/**
* Remove client from the thread
*/
public static void remove() {
connectionManagerMapper.remove();
httpClientsMapper.remove();
}
public static HttpClient createHttpClient(int timeout) {
return createHttpClient(null,timeout);
}
public static HttpClient createHttpClient(HttpClientConnectionManager connectionManager, int timeout) {
if (timeout <= 0) {
Configuration config = Configuration.instance();
timeout = config.getNetworkTimeout();
}
return org.apache.http.impl.client.HttpClients.custom()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(createRequestConfig(timeout))
.setRetryHandler(createRetryHandler(timeout))
.build();
}
public static PoolingHttpClientConnectionManager createConnectionManager(
int maxTotalConnections,
int maxConnectionsPerRoute,
int timeout) {
PoolingHttpClientConnectionManager connectionManager
= new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(maxTotalConnections);
connectionManager.setDefaultMaxPerRoute(maxConnectionsPerRoute);
connectionManager.setDefaultSocketConfig(SocketConfig.custom()
.setSoTimeout(timeout).build());
return connectionManager;
}
public static RequestConfig createRequestConfig(int timeout) {
return RequestConfig.custom()
.setSocketTimeout(timeout).setConnectTimeout(timeout)
.setMaxRedirects(1)
.setCookieSpec(CookieSpecs.IGNORE_COOKIES)
.build();
}
protected static HttpRequestRetryHandler createRetryHandler(int timeout) {
final int maxRetryCount = (timeout < 1000) ? 2 : 3;
return new HttpRequestRetryHandler() {
public boolean retryRequest(IOException exception, int executionCount, HttpContext context) {
boolean retrying = true;
if (exception instanceof NoRouteToHostException) {
// Bad DNS name
retrying =false;
}
else if (exception instanceof InterruptedIOException) {
// Timeout
retrying = false;
}
else if (exception instanceof UnknownHostException) {
// Unknown host
retrying = false;
}
else if (exception instanceof SSLException) {
// SSL handshake exception
retrying = false;
}
else if (exception instanceof HttpHostConnectException) {
// Host connect error
retrying = false;
}
if (retrying && executionCount >= maxRetryCount) {
// Do not retry if over max retry count
return false;
}
if (!retrying) {
if (log.isDebugEnabled()) {
log.debug("Failed request to " + HttpClientContext.adapt(context).getRequest().getRequestLine() + ": " + exception.getMessage());
}
return false;
}
HttpClientContext clientContext = HttpClientContext.adapt(context);
HttpRequest request = clientContext.getRequest();
boolean idempotent = !(request instanceof HttpEntityEnclosingRequest);
if (idempotent) {
// Retry if the request is considered idempotent
if (log.isDebugEnabled()) log.debug("Failed (but will retry) request to " + request.getRequestLine() + ": " + exception.getMessage());
return true;
}
return false;
}
};
}
}
duniter4j.client.authentication=Http request error (unauthorized or forbidden).
duniter4j.client.core.connect=Could not connect to Duniter node [%s]
duniter4j.client.core.emptyResponse=
duniter4j.client.core.invalidResponse=
duniter4j.client.authentication=Http request error\: unauthorized or forbidden
duniter4j.client.core.connect=Http request error {%s}\: could not connect
duniter4j.client.core.emptyResponse=[%s] Empty Response
duniter4j.client.core.invalidResponse=[%s] Invalid response
duniter4j.client.core.invalidResponseContentType=[%s] Invalid response content-type\: expected [%s] but get [%s]
duniter4j.client.core.timeout=
duniter4j.client.notFound=Resource non found [%s]
duniter4j.client.status=Http request error\: %s
......@@ -12,6 +13,9 @@ duniter4j.config.option.data.directory.description=
duniter4j.config.option.i18n.directory.description=
duniter4j.config.option.i18n.locale.description=
duniter4j.config.option.inceptionYear.description=
duniter4j.config.option.network.loadPeers.maxDuration.description=
duniter4j.config.option.network.maxConnections.description=
duniter4j.config.option.network.maxConnectionsPerHost.description=
duniter4j.config.option.network.timeout.description=
duniter4j.config.option.node.currency.description=
duniter4j.config.option.node.elasticsearch.host.description=
......
duniter4j.client.authentication=Echec de la requete (Accès interdit ou non autorisé).
duniter4j.client.core.connect=Echec de la connection au noeud Duniter [%s]
duniter4j.client.core.emptyResponse=
duniter4j.client.core.invalidResponse=
duniter4j.client.core.timeout=
duniter4j.client.authentication=Echec de la requete\: accès interdit ou non autorisé).
duniter4j.client.core.connect=Echec de la requete {%s}\: connection impossible
duniter4j.client.core.emptyResponse=[%s] Réponse vide
duniter4j.client.core.invalidResponse=[%s] Réponse non valide
duniter4j.client.core.invalidResponseContentType=[%s] Type de réponse invalide\: attendu [%s] mais [%s] obtenu
duniter4j.client.core.timeout=Délai d'attente de la requête dépassé
duniter4j.client.notFound=Ressource non trouvée [%s]
duniter4j.client.status=Echec de requete HTTP [%s] \: %s
duniter4j.config=
duniter4j.config.option.basedir.description=
duniter4j.config.option.cache.directory.description=
duniter4j.config.option.data.directory.description=
duniter4j.config.option.i18n.directory.description=
duniter4j.config.option.i18n.locale.description=
duniter4j.config.option.inceptionYear.description=
duniter4j.config.option.network.timeout.description=
duniter4j.config.option.node.currency.description=
duniter4j.config.option.node.elasticsearch.host.description=
duniter4j.config.option.node.elasticsearch.port.description=
duniter4j.config.option.node.elasticsearch.protocol.description=
duniter4j.config.option.node.elasticsearch.url.description=
duniter4j.config.option.node.host.description=
duniter4j.config.option.node.port.description=
duniter4j.config.option.node.protocol.description=
duniter4j.config.option.organizationName.description=
duniter4j.config.option.passwd.description=
duniter4j.config.option.salt.description=
duniter4j.config.option.site.url.description=
duniter4j.config.option.tmp.directory.description=
duniter4j.config.option.version.description=
duniter4j.config.parse.error=
duniter4j.config=Options de configuration Duniter4j \:\: client
duniter4j.config.option.basedir.description=Répertoire de travail de l'application
duniter4j.config.option.cache.directory.description=Répertoire pour le cache applicatif
duniter4j.config.option.data.directory.description=Répertoire de stockage des données
duniter4j.config.option.i18n.directory.description=Répertoire de stockage des traductions
duniter4j.config.option.i18n.locale.description=Locale (langue) à utiliser
duniter4j.config.option.inceptionYear.description=Anée de réalisation
duniter4j.config.option.network.loadPeers.maxDuration.description=Délai maximum de chargement des pairs
duniter4j.config.option.network.maxConnections.description=
duniter4j.config.option.network.maxConnectionsPerHost.description=
duniter4j.config.option.network.timeout.description=Délai d'attente maximal pour joindre un pair
duniter4j.config.option.node.currency.description=Symbole de la monnaie
duniter4j.config.option.node.elasticsearch.host.description=Hôte du noeud ES
duniter4j.config.option.node.elasticsearch.port.description=Port du noeud ES
duniter4j.config.option.node.elasticsearch.protocol.description=Protocole du noeud ES
duniter4j.config.option.node.elasticsearch.url.description=URL du noeud ES
duniter4j.config.option.node.host.description=Hôte du noeud Duniter
duniter4j.config.option.node.port.description=Port du noeud Duniter
duniter4j.config.option.node.protocol.description=Protocole du noeud Duniter
duniter4j.config.option.organizationName.description=Organisme développant le projet
duniter4j.config.option.passwd.description=Mot de passe du trousseau
duniter4j.config.option.salt.description=Sel (salt) du trousseau
duniter4j.config.option.site.url.description=URL du site du projet
duniter4j.config.option.tmp.directory.description=Répertoire temporaire
duniter4j.config.option.version.description=version de l'application
duniter4j.config.parse.error=Erreur lors de la lecture du fichier de configuration
......@@ -7,12 +7,15 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %5p (%c:%L) - %m%n
# ucoin levels
# Duniter4j levels
log4j.logger.org.duniter=DEBUG
log4j.logger.org.duniter.core.client.service.bma.AbstractNetworkService=WARN
# Http client connection debug
#log4j.logger.org.apache.http.impl.conn=DEBUG
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.file=ucoin-client.log
log4j.appender.file.file=duniter4j-core-client.log
log4j.appender.file.MaxFileSize=10MB
log4j.appender.file.MaxBackupIndex=4
......
......@@ -25,8 +25,7 @@ package org.duniter.core.client;
public class TestFixtures extends org.duniter.core.test.TestFixtures {
public long getDefaultCurrencyId() {
return -1;
public String getDefaultCurrency() {
return "g1-test";
}
}
......@@ -2,7 +2,7 @@ package org.duniter.core.client;
/*
* #%L
* UCoin Java Client :: Core API
* Duniter4j :: Core API
* %%
* Copyright (C) 2014 - 2015 EIS
* %%
......@@ -24,6 +24,7 @@ package org.duniter.core.client;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.duniter.core.client.config.Configuration;
import org.duniter.core.client.config.ConfigurationOption;
import org.duniter.core.client.model.local.Peer;
......@@ -41,9 +42,9 @@ import java.io.IOException;
import java.util.List;
import java.util.Locale;
@Slf4j
public class TestResource extends org.duniter.core.test.TestResource {
private static final Logger log = LoggerFactory.getLogger(TestResource.class);
public static TestResource create() {
return new TestResource(null);
......@@ -102,10 +103,9 @@ public class TestResource extends org.duniter.core.test.TestResource {
/* -- -- */
/**
* Convenience methods that could be override to initialize other configuration
* Convenience methods that could be overridden to initialize other configuration
*
* @param configFilename
* @param configArgs
*/
protected void initConfiguration(String configFilename) {
String[] configArgs = getConfigArgs();
......@@ -133,8 +133,8 @@ public class TestResource extends org.duniter.core.test.TestResource {
Locale i18nLocale = config.getI18nLocale();
if (log.isInfoEnabled()) {
log.info(String.format("Starts i18n with locale [%s] at [%s]",
if (log.isDebugEnabled()) {
log.debug(String.format("Starts i18n with locale [%s] at [%s]",
i18nLocale, i18nDirectory));
}
I18n.init(new UserI18nInitializer(
......@@ -152,11 +152,11 @@ public class TestResource extends org.duniter.core.test.TestResource {
protected void initMockData() {
Configuration config = Configuration.instance();
// Set a default account id, then load cache
ServiceLocator.instance().getDataContext().setAccountId(0);
Peer peer = new Peer(config.getNodeHost(), config.getNodePort());
peer.setCurrencyId(fixtures.getDefaultCurrencyId());
Peer peer = Peer.builder()
.host(config.getNodeHost())
.port(config.getNodePort())
.build();
peer.setCurrency(fixtures.getDefaultCurrency());
ServiceLocator.instance().getPeerService().save(peer);
......
package org.duniter.core.client.model;
/*-
* #%L
* Duniter4j :: Core Client API
* %%
* Copyright (C) 2014 - 2017 EIS
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public
* License along with this program. If not, see
* <http://www.gnu.org/licenses/gpl-3.0.html>.
* #L%
*/
import com.fasterxml.jackson.databind.ObjectMapper;
import org.duniter.core.client.model.bma.BlockchainBlock;
import org.duniter.core.client.model.bma.jackson.JacksonUtils;
import org.junit.Assume;
import java.io.File;
import java.nio.file.Files;
/**
* Created by blavenie on 24/07/17.
*/
public class BlockFileUtils {
public static BlockchainBlock readBlockFile(String jsonFileName) {
try {
ObjectMapper om = JacksonUtils.getThreadObjectMapper();
BlockchainBlock block = om.readValue(Files.readAllBytes(new File("src/test/resources" , jsonFileName).toPath()), BlockchainBlock.class);
Assume.assumeNotNull(block);
return block;
}
catch(Exception e) {
Assume.assumeNoException(e);
return null;
}
}
}