Skip to content
Snippets Groups Projects
Commit 411e09b8 authored by vjrj's avatar vjrj
Browse files

Nodes forced to check when the list of node running is small

parent e17f9d49
No related branches found
No related tags found
No related merge requests found
......@@ -135,6 +135,15 @@ class NodeManager {
void notifyObserver() {
NodeManagerObserver.instance.update(this);
}
int nodesWorking(NodeType type) => nodeList(type)
.where((Node n) => n.errors < NodeManager.maxNodeErrors)
.toList()
.length;
List<Node> nodesWorkingList(NodeType type) => nodeList(type)
.where((Node n) => n.errors < NodeManager.maxNodeErrors)
.toList();
}
class NodeManagerObserver {
......
......@@ -31,7 +31,7 @@ final String currency = currencyDotEnv.isEmpty ? 'g1' : currencyDotEnv;
Future<String> getTxHistory(String publicKey) async {
final Response response =
await requestWithRetry(NodeType.duniter, '/tx/history/$publicKey');
await requestWithRetry(NodeType.duniter, '/tx/history/$publicKey');
if (response.statusCode == 200) {
return response.body;
} else {
......@@ -60,7 +60,7 @@ Future<Response> searchCPlusUser(String initialSearchTerm) async {
'/user/profile/_search?q=title:$searchTermLower OR issuer:$searchTerm OR title:$searchTermCapitalized OR title:$searchTerm';
final Response response =
await requestCPlusWithRetry(query, retryWith404: false);
await requestCPlusWithRetry(query, retryWith404: false);
return response;
}
......@@ -72,12 +72,12 @@ Future<Contact> getProfile(String pubKeyRaw,
'/user/profile/$pubKey',
retryWith404: false);
final Map<String, dynamic> result =
const JsonDecoder().convert(cPlusResponse.body) as Map<String, dynamic>;
const JsonDecoder().convert(cPlusResponse.body) as Map<String, dynamic>;
if (result['found'] == false) {
return Contact(pubKey: pubKey);
}
final Map<String, dynamic> profile =
const JsonDecoder().convert(cPlusResponse.body) as Map<String, dynamic>;
const JsonDecoder().convert(cPlusResponse.body) as Map<String, dynamic>;
final Contact c = await contactFromResultSearch(profile);
if (!onlyCPlusProfile) {
// This penalize the gva rate limit
......@@ -115,7 +115,7 @@ Future<List<Contact>> searchWot(String initialSearchTerm) async {
final List<Contact> contacts = <Contact>[];
if (response.statusCode == HttpStatus.ok) {
final Map<String, dynamic> data =
json.decode(response.body) as Map<String, dynamic>;
json.decode(response.body) as Map<String, dynamic>;
final List<dynamic> results = data['results'] as List<dynamic>;
// logger('Returning wot results ${results.length}');
if (results.isNotEmpty) {
......@@ -142,11 +142,11 @@ Future<Contact> getWot(Contact contact) async {
// Will be better to analyze the 404 response (to detect faulty node)
if (response.statusCode == HttpStatus.ok) {
final Map<String, dynamic> data =
json.decode(response.body) as Map<String, dynamic>;
json.decode(response.body) as Map<String, dynamic>;
final List<dynamic> results = data['results'] as List<dynamic>;
if (results.isNotEmpty) {
final List<dynamic> uids =
(results[0] as Map<String, dynamic>)['uids'] as List<dynamic>;
(results[0] as Map<String, dynamic>)['uids'] as List<dynamic>;
if (uids.isNotEmpty) {
// ignore: avoid_dynamic_calls
return contact.copyWith(nick: uids[0]!['uid'] as String);
......@@ -159,14 +159,14 @@ Future<Contact> getWot(Contact contact) async {
@Deprecated('use getProfile')
Future<String> _getDataImageFromKey(String publicKey) async {
final Response response =
await requestCPlusWithRetry('/user/profile/$publicKey');
await requestCPlusWithRetry('/user/profile/$publicKey');
if (response.statusCode == HttpStatus.ok) {
final Map<String, dynamic> data =
json.decode(response.body) as Map<String, dynamic>;
json.decode(response.body) as Map<String, dynamic>;
final Map<String, dynamic> source = data['_source'] as Map<String, dynamic>;
if (source.containsKey('avatar')) {
final Map<String, dynamic> avatarData =
source['avatar'] as Map<String, dynamic>;
source['avatar'] as Map<String, dynamic>;
if (avatarData.containsKey('_content')) {
final String content = avatarData['_content'] as String;
return 'data:image/png;base64,$content';
......@@ -213,21 +213,16 @@ Future<void> fetchNodes(NodeType type, bool force) async {
Future<void> _fetchDuniterNodes({bool force = false}) async {
const NodeType type = NodeType.duniter;
NodeManager().loading = true;
if (force || nodesWorking(type) < NodeManager.maxNodes) {
if (force) {
NodeManager().updateNodes(type, defaultDuniterNodes);
logger('Fetching nodes forced');
} else {
logger('Fetching ${type.name} nodes, we have ${nodesWorking(type)}');
}
final List<Node> nodes = await _fetchDuniterNodesFromPeers(type);
NodeManager().updateNodes(type, nodes);
} else {
logger('Skipping to fetch nodes as we have ${nodesWorking(type)}');
if (!kReleaseMode) {
// developer.log(StackTrace.current.toString());
}
final bool forceOrFewNodes =
force || NodeManager().nodesWorking(type) < NodeManager.maxNodes;
if (forceOrFewNodes) {
NodeManager().updateNodes(type, defaultDuniterNodes);
}
logger(
'Fetching ${type.name} nodes, we have ${NodeManager().nodesWorking(
type)}');
final List<Node> nodes = await _fetchDuniterNodesFromPeers(type);
NodeManager().updateNodes(type, nodes);
NodeManager().loading = false;
}
......@@ -236,12 +231,14 @@ Future<void> _fetchDuniterNodes({bool force = false}) async {
Future<void> _fetchCesiumPlusNodes({bool force = false}) async {
NodeManager().loading = true;
const NodeType type = NodeType.cesiumPlus;
if (force) {
final bool forceOrFewNodes = force || NodeManager().nodesWorking(type) <= 2;
if (forceOrFewNodes) {
NodeManager().updateNodes(type, defaultCesiumPlusNodes);
logger('Fetching cesium nodes forced');
} else {
logger('Fetching cesium plus nodes, we have ${nodesWorking(type)}');
}
logger(
'Fetching cesium plus nodes, we have ${NodeManager().nodesWorking(
type)}');
final List<Node> nodes = await _fetchNodes(NodeType.cesiumPlus);
NodeManager().updateNodes(type, nodes);
NodeManager().loading = false;
......@@ -254,24 +251,13 @@ Future<void> _fetchGvaNodes({bool force = false}) async {
NodeManager().updateNodes(type, defaultGvaNodes);
logger('Fetching gva nodes forced');
} else {
logger('Fetching gva nodes, we have ${nodesWorking(type)}');
logger('Fetching gva nodes, we have ${NodeManager().nodesWorking(type)}');
}
final List<Node> nodes = await _fetchDuniterNodesFromPeers(type);
NodeManager().updateNodes(type, nodes);
NodeManager().loading = false;
}
int nodesWorking(NodeType type) => NodeManager()
.nodeList(type)
.where((Node n) => n.errors < NodeManager.maxNodeErrors)
.toList()
.length;
List<Node> nodesWorkingList(NodeType type) => NodeManager()
.nodeList(type)
.where((Node n) => n.errors < NodeManager.maxNodeErrors)
.toList();
Future<List<Node>> _fetchDuniterNodesFromPeers(NodeType type) async {
final List<Node> lNodes = <Node>[];
final String apyType = (type == NodeType.duniter) ? 'BMAS' : 'GVA S';
......@@ -282,14 +268,14 @@ Future<List<Node>> _fetchDuniterNodesFromPeers(NodeType type) async {
final Response response = await getPeers();
if (response.statusCode == 200) {
final Map<String, dynamic> peerList =
jsonDecode(response.body) as Map<String, dynamic>;
jsonDecode(response.body) as Map<String, dynamic>;
final List<dynamic> peers = (peerList['peers'] as List<dynamic>)
.where((dynamic peer) =>
(peer as Map<String, dynamic>)['currency'] == currency)
(peer as Map<String, dynamic>)['currency'] == currency)
.where(
(dynamic peer) => (peer as Map<String, dynamic>)['version'] == 10)
.where((dynamic peer) =>
(peer as Map<String, dynamic>)['status'] == 'UP')
(peer as Map<String, dynamic>)['status'] == 'UP')
.toList();
// reorder peer list
peers.shuffle();
......@@ -297,7 +283,7 @@ Future<List<Node>> _fetchDuniterNodesFromPeers(NodeType type) async {
final Map<String, dynamic> peer = peerR as Map<String, dynamic>;
if (peer['endpoints'] != null) {
final List<String> endpoints =
List<String>.from(peer['endpoints'] as List<dynamic>);
List<String>.from(peer['endpoints'] as List<dynamic>);
for (int j = 0; j < endpoints.length; j++) {
if (endpoints[j].startsWith(apyType)) {
final String endpointUnParsed = endpoints[j];
......@@ -309,7 +295,9 @@ Future<List<Node>> _fetchDuniterNodesFromPeers(NodeType type) async {
final NodeCheck nodeCheck = await _pingNode(endpoint, type);
final Duration latency = nodeCheck.latency;
logger(
'Evaluating node: $endpoint, latency ${latency.inMicroseconds} currentBlock: ${nodeCheck.currentBlock}');
'Evaluating node: $endpoint, latency ${latency
.inMicroseconds} currentBlock: ${nodeCheck
.currentBlock}');
final Node node = Node(
url: endpoint,
latency: latency.inMicroseconds,
......@@ -342,7 +330,8 @@ Future<List<Node>> _fetchDuniterNodesFromPeers(NodeType type) async {
}
}
logger(
'Fetched ${lNodes.length} ${type.name} nodes ordered by latency ${lNodes.isNotEmpty ? '(first: ${lNodes.first.url})' : '(zero nodes)'}');
'Fetched ${lNodes.length} ${type.name} nodes ordered by latency ${lNodes
.isNotEmpty ? '(first: ${lNodes.first.url})' : '(zero nodes)'}');
} catch (e, stacktrace) {
await Sentry.captureException(e, stackTrace: stacktrace);
logger('General error in fetch ${type.name} nodes: $e');
......@@ -394,7 +383,8 @@ Future<List<Node>> _fetchNodes(NodeType type) async {
}
logger(
'Fetched ${lNodes.length} ${type.name} nodes ordered by latency (first: ${lNodes.first.url})');
'Fetched ${lNodes.length} ${type
.name} nodes ordered by latency (first: ${lNodes.first.url})');
} catch (e, stacktrace) {
await Sentry.captureException(e, stackTrace: stacktrace);
logger('General error in fetch ${type.name}: $e');
......@@ -411,7 +401,8 @@ Future<NodeCheck> _pingNode(String node, NodeType type) async {
int currentBlock = 0;
Duration latency;
try {
final Stopwatch stopwatch = Stopwatch()..start();
final Stopwatch stopwatch = Stopwatch()
..start();
if (type == NodeType.duniter) {
final Response response = await http
.get(Uri.parse('$node/blockchain/current'))
......@@ -420,7 +411,7 @@ Future<NodeCheck> _pingNode(String node, NodeType type) async {
latency = stopwatch.elapsed;
if (response.statusCode == 200) {
final Map<String, dynamic> json =
jsonDecode(response.body) as Map<String, dynamic>;
jsonDecode(response.body) as Map<String, dynamic>;
currentBlock = json['number'] as int;
} else {
latency = wrongNodeDuration;
......@@ -429,17 +420,17 @@ Future<NodeCheck> _pingNode(String node, NodeType type) async {
// see: http://g1.data.e-is.pro/network/peering
final Response response = await http
.get(Uri.parse('$node/node/stats'))
// Decrease http timeout during ping
// Decrease http timeout during ping
.timeout(timeout);
if (response.statusCode == 200) {
try {
final Map<String, dynamic> json =
jsonDecode(response.body.replaceAll('"cluster"{', '"cluster": {'))
as Map<String, dynamic>;
jsonDecode(response.body.replaceAll('"cluster"{', '"cluster": {'))
as Map<String, dynamic>;
currentBlock = ((((json['stats'] as Map<String, dynamic>)['cluster']
as Map<String, dynamic>)['indices']
as Map<String, dynamic>)['docs']
as Map<String, dynamic>)['count'] as int;
as Map<String, dynamic>)['indices']
as Map<String, dynamic>)['docs']
as Map<String, dynamic>)['count'] as int;
} catch (e) {
loggerDev('Cannot parse node/stats $e');
}
......@@ -460,7 +451,8 @@ Future<NodeCheck> _pingNode(String node, NodeType type) async {
latency = balance >= 0 ? stopwatch.elapsed : wrongNodeDuration;
}
logger(
'Ping tested in node $node ($type), latency ${latency.inMicroseconds}, current block $currentBlock');
'Ping tested in node $node ($type), latency ${latency
.inMicroseconds}, current block $currentBlock');
return NodeCheck(latency: latency, currentBlock: currentBlock);
} catch (e) {
// Handle exception when node is unavailable etc
......@@ -486,32 +478,29 @@ Future<http.Response> requestCPlusWithRetry(String path,
Future<http.Response> requestGvaWithRetry(String path,
{bool retryWith404 = true,
HttpType httpType = HttpType.get,
Map<String, String>? headers,
Object? body,
Encoding? encoding}) async {
HttpType httpType = HttpType.get,
Map<String, String>? headers,
Object? body,
Encoding? encoding}) async {
return _requestWithRetry(NodeType.gva, path, true, retryWith404,
httpType: httpType, headers: headers, body: body, encoding: encoding);
}
enum HttpType { get, post, delete }
Future<http.Response> _requestWithRetry(
NodeType type, String path, bool dontRecord, bool retryWith404,
Future<http.Response> _requestWithRetry(NodeType type, String path,
bool dontRecord, bool retryWith404,
{HttpType httpType = HttpType.get,
Map<String, String>? headers,
Object? body,
Encoding? encoding}) async {
final List<Node> nodes = NodeManager()
.nodeList(type)
.where((Node node) => node.errors <= NodeManager.maxNodeErrors)
.toList();
Map<String, String>? headers,
Object? body,
Encoding? encoding}) async {
final List<Node> nodes = NodeManager().nodesWorkingList(type);
if (nodes.isEmpty) {
nodes.addAll(type == NodeType.duniter
? defaultDuniterNodes
: type == NodeType.cesiumPlus
? defaultCesiumPlusNodes
: defaultGvaNodes);
? defaultCesiumPlusNodes
: defaultGvaNodes);
}
for (final int timeout in <int>[10]) {
// only one timeout for now
......@@ -520,16 +509,20 @@ Future<http.Response> _requestWithRetry(
try {
final Uri url = Uri.parse('${node.url}$path');
logger('Fetching $url (${type.name})');
final int startTime = DateTime.now().millisecondsSinceEpoch;
final int startTime = DateTime
.now()
.millisecondsSinceEpoch;
final Response response = httpType == HttpType.get
? await http.get(url).timeout(Duration(seconds: timeout))
: httpType == HttpType.post
? await http
.post(url, body: body, headers: headers, encoding: encoding)
.timeout(Duration(seconds: timeout))
: await http.delete(url,
body: body, headers: headers, encoding: encoding);
final int endTime = DateTime.now().millisecondsSinceEpoch;
? await http
.post(url, body: body, headers: headers, encoding: encoding)
.timeout(Duration(seconds: timeout))
: await http.delete(url,
body: body, headers: headers, encoding: encoding);
final int endTime = DateTime
.now()
.millisecondsSinceEpoch;
final int newLatency = endTime - startTime;
if (!kReleaseMode) {
logger('response.statusCode: ${response.statusCode}');
......@@ -574,11 +567,10 @@ Future<http.Response> _requestWithRetry(
'Cannot make the request to any of the ${nodes.length} nodes');
}
Future<PayResult> pay(
{required String to,
required double amount,
String? comment,
bool? useMempool}) async {
Future<PayResult> pay({required String to,
required double amount,
String? comment,
bool? useMempool}) async {
try {
final SelectedGvaNode selected = getGvaNode();
......@@ -587,7 +579,8 @@ Future<PayResult> pay(
final Gva gva = Gva(node: nodeUrl);
final CesiumWallet wallet = await SharedPreferencesHelper().getWallet();
logger(
'Trying $nodeUrl to send $amount to $to with comment ${comment ?? ''}');
'Trying $nodeUrl to send $amount to $to with comment ${comment ??
''}');
final String response = await gva.pay(
recipient: extractPublicKey(to),
......@@ -647,7 +640,9 @@ class PayResult {
String proxyfyNode(String nodeUrl) {
final String url = inProduction && kIsWeb
? '${window.location.protocol}//${window.location.hostname}/proxy/${nodeUrl.replaceFirst('https://', '').replaceFirst('http://', '')}/'
? '${window.location.protocol}//${window.location
.hostname}/proxy/${nodeUrl.replaceFirst('https://', '').replaceFirst(
'http://', '')}/'
: nodeUrl;
return url;
}
......@@ -655,7 +650,7 @@ String proxyfyNode(String nodeUrl) {
Future<Tuple2<Map<String, dynamic>?, Node>> gvaHistoryAndBalance(
String pubKeyRaw,
[int? pageSize,
String? cursor]) async {
String? cursor]) async {
logger('Get tx history (page size: $pageSize: cursor $cursor)');
final String pubKey = extractPublicKey(pubKeyRaw);
return gvaFunctionWrapper<Map<String, dynamic>>(
......@@ -672,8 +667,8 @@ Future<Tuple2<String?, Node>> gvaNick(String pubKey) async {
pubKey, (Gva gva) => gva.getUsername(extractPublicKey(pubKey)));
}
Future<Tuple2<T?, Node>> gvaFunctionWrapper<T>(
String pubKey, Future<T?> Function(Gva) specificFunction) async {
Future<Tuple2<T?, Node>> gvaFunctionWrapper<T>(String pubKey,
Future<T?> Function(Gva) specificFunction) async {
final List<Node> nodes = _getBestGvaNodes();
for (int i = 0; i < nodes.length; i++) {
final Node node = nodes[i];
......@@ -698,14 +693,11 @@ Future<Tuple2<T?, Node>> gvaFunctionWrapper<T>(
}
List<Node> _getBestGvaNodes() {
final List<Node> fnodes = NodeManager()
.nodeList(NodeType.gva)
.where((Node node) => node.errors <= NodeManager.maxNodeErrors)
.toList();
final List<Node> fnodes = NodeManager().nodesWorkingList(NodeType.gva);
final int maxCurrentBlock = fnodes.fold(
0,
(int max, Node node) =>
node.currentBlock > max ? node.currentBlock : max);
(int max, Node node) =>
node.currentBlock > max ? node.currentBlock : max);
final List<Node> nodes = fnodes
.where((Node node) => node.currentBlock == maxCurrentBlock)
.toList();
......@@ -750,7 +742,9 @@ Future<void> createOrUpdateCesiumPlusUser(String name) async {
'issuer': pubKey,
'title': name + userNameSuffix,
'geoPoint': null,
'time': DateTime.now().millisecondsSinceEpoch ~/
'time': DateTime
.now()
.millisecondsSinceEpoch ~/
1000, // current time in seconds
'tags': <String>[],
};
......@@ -774,7 +768,8 @@ Future<void> createOrUpdateCesiumPlusUser(String name) async {
logger('User profile updated successfully.');
} else {
logger(
'Failed to update user profile. Status code: ${updateResponse.statusCode}');
'Failed to update user profile. Status code: ${updateResponse
.statusCode}');
logger('Response body: ${updateResponse.body}');
}
} else if (userName == null) {
......@@ -789,7 +784,8 @@ Future<void> createOrUpdateCesiumPlusUser(String name) async {
logger('User profile created successfully.');
} else {
logger(
'Failed to create user profile. Status code: ${createResponse.statusCode}');
'Failed to create user profile. Status code: ${createResponse
.statusCode}');
}
}
}
......@@ -823,7 +819,9 @@ Future<bool> deleteCesiumPlusUser() async {
'issuer': pubKey,
'index': 'user',
'type': 'profile',
'time': DateTime.now().millisecondsSinceEpoch ~/
'time': DateTime
.now()
.millisecondsSinceEpoch ~/
1000, // current time in seconds
};
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment