From 2a9849f5f88924f0888bac294e92bec2c2baacbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Eng=C3=A9libert?= <tuxmain@zettascript.org> Date: Sat, 1 Jun 2019 15:01:12 +0200 Subject: [PATCH] Change peer system --- README.md | 2 + client.py | 36 ++--- server.py | 457 +++++++++++++++++++++++++++++++++++++----------------- utils.py | 104 ++++++++----- 4 files changed, 391 insertions(+), 208 deletions(-) diff --git a/README.md b/README.md index a882c5a..2c7b872 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,8 @@ run in an onion network, guaranteeing a strong anonymity. ## How to install it +**Warning**: This version works with a dev version of Silkaj. If the latest release of Silkaj is still `0.7.1`, please use [this branch](https://git.duniter.org/clients/python/silkaj/tree/223_endpoint_click_decoupled) (you can also copy `network_tools.py` from this branch). + ### ➤ Debian (Ubuntu, Mint, etc.) sh install_gmixer.sh diff --git a/client.py b/client.py index 3d696cd..702503a 100644 --- a/client.py +++ b/client.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 """ + CopyLeft 2019 Pascal Engélibert <tuxmain@zettascript.org> ĞMixer-py Test Client (only for testing server) This file is part of ĞMixer-py. @@ -21,16 +22,13 @@ Sources: https://www.ietf.org/rfc/rfc3092.txt <- Very important RFC, please read it """ -import sys, os, asyncio, getpass, random, time, secrets +import sys, os, asyncio, getpass, random, time, secrets, socket import ubjson import plyvel import libnacl.sign from duniterpy.key import SigningKey, PublicKey import utils -VERSION = "0.1.0" -AUTHORS = ["Pascal Engélibert <tuxmain@zettascript.org>"] - DIR = "~/.gmixer" class Confirmation(): @@ -66,27 +64,27 @@ class Confirmation(): } def get_peers(host, proxy=None, proxy_onion_only=False): - header, content = utils.sdata(host, "GET", "/pubkey/list", proxy=proxy, proxy_onion_only=proxy_onion_only) + header, content = utils.sdata(host, "GET", "/peers/info", proxy=proxy, proxy_onion_only=proxy_onion_only) try: data = ubjson.loadb(content) - assert "pubkey" in data and "peers" in data - assert type(data["pubkey"]) == str and type(data["peers"]) == list + assert "info" in data and "peers" in data + assert type(data["peers"]) == list except (ubjson.decoder.DecoderException, AssertionError): print("Error: bad UBJSON") return - peers = [utils.Peer(data["pubkey"], host[0], host[1], True)] - for peer in data["peers"]: - peers.append(utils.Peer(peer["pubkey"], peer["host"], peer["port"], peer["up"])) + peers = [utils.Peer(data["info"]), *[utils.Peer(p) for p in data["peers"]]] + #peers = [utils.Peer(p) for p in data["peers"]] + print([p.pubkey for p in peers]) return peers def build_path(peers, receiver, layers=3): up_peers = [] for peer in peers: - if peer.up: - up_peers.append(peer) + #if peer.up: + up_peers.append(peer) if len(up_peers) < layers: return None, None path = [] @@ -99,11 +97,11 @@ def build_path(peers, receiver, layers=3): break path.append(peer.pubkey) if host == None: - host = (peer.host, peer.port) + host = peer.host path.append(receiver) return host, path -def mix(db_txs, amount, base, sender, path, host, proxy=None, proxy_onion_only=False, send_tx=True): +async def mix(db_txs, amount, base, sender, path, host, proxy=None, proxy_onion_only=False, send_tx=True): start_time = time.time() onetime_keys = [] @@ -254,14 +252,14 @@ def mix(db_txs, amount, base, sender, path, host, proxy=None, proxy_onion_only=F if send_tx: try: - utils.send_transaction(sender, path[0], amount, utils.gen_comment(comment_seeds[0])) + await utils.send_transaction(sender, path[0], amount, utils.gen_comment(comment_seeds[0])) message["sent"] = True db_txs.put(comment_seeds[0][1], PublicKey(sender.pubkey).encrypt_seal(ubjson.dumpb(message))) except socket.timeout: - utils.logprint("Error when sending tx: timeout", LOG_ERROR) + print("Error when sending tx: timeout") except Exception as e: - utils.logprint("Error when sending tx: " + str(e), LOG_ERROR) + print("Error when sending tx: " + str(e)) return async def main(db_txs, host, receiver, amount=1000, layers=3, proxy=None, proxy_onion_only=False, send_tx=True): @@ -292,11 +290,11 @@ async def main(db_txs, host, receiver, amount=1000, layers=3, proxy=None, proxy_ if input("OK? [yn]: ").lower() == "y": break - mix(db_txs, amount, 0, keys, path, host1, proxy, proxy_onion_only, send_tx) + await mix(db_txs, amount, 0, keys, path, host1, proxy, proxy_onion_only, send_tx) if __name__ == "__main__": if "--help" in sys.argv: - print("ĞMixer-py client "+VERSION+""" + print("ĞMixer-py client "+utils.VERSION+""" Options: -r <pubkey> receiver pubkey diff --git a/server.py b/server.py index 36b72fc..53c4f95 100644 --- a/server.py +++ b/server.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 """ + CopyLeft 2019 Pascal Engélibert <tuxmain@zettascript.org> This file is part of ĞMixer-py. ĞMixer-py is free software: you can redistribute it and/or modify @@ -17,7 +18,7 @@ along with ĞMixer-py. If not, see <https://www.gnu.org/licenses/>. """ -import sys, os, json, asyncio, getpass, random, time, socket, secrets +import sys, os, json, asyncio, getpass, random, time, socket, secrets, base64 from threading import Thread import ubjson import plyvel @@ -28,9 +29,6 @@ from duniterpy.api.client import Client from duniterpy.key import SigningKey, PublicKey import utils -VERSION = "0.1.0" -AUTHORS = ["Pascal Engélibert <tuxmain@zettascript.org>"] - """ Used terms: sender_ = node which has sent something to me @@ -54,6 +52,9 @@ BMA_HOSTS = ["g1.duniter.fr 443", "g1.duniter.org 443", "g1.presles.fr 443", "g1 MIX_INTERVAL = 60 MIX_MIN_TXS = 5 # minimum amount of txs to mix MIX_REQ_AGE_MAX = 604800 # maximum mix request age before return to sender +PEER_INFO_INTERVAL = 60 # interval for renewing my peer document +PEER_SIG_AGE_MAX = 600 # max age of a peer document signature +PEER_DETECT_INTERVAL = 120 # interval for fetching peer list def send_response(client, code, resp, dataformat="ubjson"): if dataformat == "ubjson": @@ -88,6 +89,8 @@ class TX: self.can_confirm = False self.need_confirm = True self.confirms = b"" + self.sender_hash = None + self.receiver_hash = None self.tx_sent = False def gen_mix_confirm(self, keys): @@ -128,11 +131,13 @@ class TX: "send_confirm": self.send_confirm, "date": self.date, "expire": self.expire, - "need_send" : self.need_send, - "can_confirm" : self.can_confirm, - "need_confirm" : self.need_confirm, - "confirms" : self.confirms, - "tx_sent" : self.tx_sent, + "need_send": self.need_send, + "can_confirm": self.can_confirm, + "need_confirm": self.need_confirm, + "confirms": self.confirms, + "tx_sent": self.tx_sent, + "sender_hash": self.sender_hash, + "receiver_hash": self.receiver_hash })) def import_ubjson(d): @@ -143,6 +148,8 @@ class TX: tx.need_confirm = d["need_confirm"] tx.confirms = d["confirms"] tx.tx_sent = d["tx_sent"] + tx.sender_hash = d["sender_hash"] + tx.receiver_hash = d["receiver_hash"] return tx def load_txs(db_txs, pool, tx_in_index, tx_out_index): @@ -158,7 +165,7 @@ def save_txs(db_txs, pool): for tx in pool: tx.export_ubjson(db_txs) -# Read ini config file +# Read json config file def read_config(cdir, conf_overwrite={}): if not os.path.isfile(cdir+"/config.json"): configfile = open(cdir+"/config.json", "w") @@ -177,6 +184,9 @@ def read_config(cdir, conf_overwrite={}): conf["server"].setdefault("bind_port", BIND_PORT) conf["server"].setdefault("public_host", PUBLIC_HOST) conf["server"].setdefault("public_port", PUBLIC_PORT) + conf["server"].setdefault("peer_info_interval", PEER_INFO_INTERVAL) + conf["server"].setdefault("peer_sig_age_max", PEER_SIG_AGE_MAX) + conf["server"].setdefault("peer_detect_interval", PEER_DETECT_INTERVAL) conf.setdefault("client", {}) conf["client"].setdefault("bma_hosts", BMA_HOSTS) conf["client"].setdefault("proxy", None) @@ -188,6 +198,9 @@ def read_config(cdir, conf_overwrite={}): conf["mix"].setdefault("mix_interval", MIX_INTERVAL) conf["mix"].setdefault("mix_min_txs", MIX_MIN_TXS) conf["mix"].setdefault("mix_req_age_max", MIX_REQ_AGE_MAX) + conf.setdefault("idty", {}) + conf["idty"].setdefault("sig", "") + conf["idty"].setdefault("pubkey", "") for key in conf_overwrite: c = conf @@ -202,13 +215,13 @@ def read_config(cdir, conf_overwrite={}): return conf class ServerThread(Thread): - def __init__(self, conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index, db_txs): + def __init__(self, conf, peers, keys, local_peer, pool, tx_in_index, tx_out_index, db_txs): Thread.__init__(self) self.conf = conf self.peers = peers - self.peers_index = peers_index self.keys = keys + self.local_peer = local_peer self.pool = pool self.tx_in_index = tx_in_index self.tx_out_index = tx_out_index @@ -289,23 +302,40 @@ class ServerThread(Thread): resp["pubkey"] = self.keys.pubkey if "version" in url: - resp["version"] = VERSION + resp["version"] = utils.VERSION if "mix" in url: - sender_pubkey = utils.getargv("mix", "", 1, url) + entry_node = "client" in url + + if entry_node: + sender_pubkey = utils.getargv("mix", "", 1, url) + + try: + sender_keys = PublicKey(sender_pubkey) + except ValueError: + send_response(client, "401 Unauthorized", {"error": "bad_sender_pubkey"}, resp_format) + continue + else: + try: + sender_hash = base64.urlsafe_b64decode(utils.getargv("mix", "", 1, url)) + except base64.binascii.Error: + send_response(client, "401 Unauthorized", {"error": "bad_url"}, resp_format) + continue + + if not sender_hash in self.peers: + send_response(client, "401 Unauthorized", {"error": "unknown_peer"}, resp_format) + continue + + peer = self.peers[sender_hash] + sender_pubkey = peer.pubkey + sender_keys = peer.keys + try: in_amount = int(utils.getargv("mix", "", 2, url)) in_base = int(utils.getargv("mix", "", 3, url)) except ValueError: send_response(client, "401 Unauthorized", {"error": "bad_amount_base"}, resp_format) continue - send_confirm = not "client" in url - - try: - sender_keys = PublicKey(sender_pubkey) - except ValueError: - send_response(client, "401 Unauthorized", {"error": "bad_sender_pubkey"}, resp_format) - continue try: raw = libnacl.sign.Verifier(sender_keys.hex_pk()).verify(content) # Verify @@ -355,10 +385,12 @@ class ServerThread(Thread): # Save tx in pool t = time.time() - tx = TX(sender_pubkey, receiver_pubkey, onetime_pubkey, in_amount, in_base, in_amount, in_base, message, in_seeds, out_seeds, send_confirm, t, t+self.conf["mix"]["mix_req_age_max"]) + tx = TX(sender_pubkey, receiver_pubkey, onetime_pubkey, in_amount, in_base, in_amount, in_base, message, in_seeds, out_seeds, not entry_node, t, t+self.conf["mix"]["mix_req_age_max"]) last_node = len(message) == 0 tx.need_send = not last_node tx.can_confirm = last_node + if not entry_node: + tx.sender_hash = peer.hash utils.logprint("TX "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), utils.LOG_TRACE) self.tx_out_index[out_seeds[1]] = tx self.tx_in_index[in_seeds[1]] = tx @@ -368,24 +400,39 @@ class ServerThread(Thread): resp["mix_ok"] = in_seeds[1] elif "confirm" in url: - receiver_pubkey = utils.getargv("confirm", "", 1, url) + try: + receiver_hash = base64.urlsafe_b64decode(utils.getargv("confirm", "", 1, url)) + except base64.binascii.Error: + send_response(client, "401 Unauthorized", {"error": "bad_url"}, resp_format) + continue + try: out_seed1 = bytes.fromhex(utils.getargv("confirm", "", 2, url)) except ValueError: send_response(client, "401 Unauthorized", {"error": "bad_url"}, resp_format) continue + if not receiver_hash in self.peers: + send_response(client, "401 Unauthorized", {"error": "unknown_peer"}, resp_format) + continue + + peer = self.peers[receiver_hash] + if not out_seed1 in self.tx_out_index: send_response(client, "404 Not Found", {"error": "unknown_tx"}, resp_format) continue tx = self.tx_out_index[out_seed1] + if peer.hash != tx.receiver_hash: + send_response(client, "404 Not Found", {"error": "unknown_tx"}, resp_format) + continue + if len(tx.confirms) > 0 or tx.can_confirm or not tx.need_confirm or tx.need_send: send_response(client, "403 Forbidden", {"error": "cannot_confirm"}, resp_format) continue - if receiver_pubkey != tx.receiver_pubkey: + if peer.pubkey != tx.receiver_pubkey: send_response(client, "401 Unauthorized", {"error": "bad_rec_pubkey"}, resp_format) continue @@ -443,44 +490,32 @@ class ServerThread(Thread): tx.export_ubjson(self.db_txs) utils.logprint("Confirmed "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), utils.LOG_TRACE) - if "list" in url: - peers_list = [] - for peer in self.peers: - peers_list.append({"pubkey":peer.pubkey, "host":peer.host, "port":peer.port, "up":peer.up}) - resp["peers"] = peers_list + if "peers" in url: + resp["peers"] = [self.peers[peer].raw for peer in self.peers] if "new" in url: - new_pubkey = utils.getargv("new", "", 1, url) try: - new_keys = PublicKey(new_pubkey) - except ValueError: - send_response(client, "401 Unauthorized", {"error": "bad_pubkey"}, resp_format) - continue - try: - message = libnacl.sign.Verifier(new_keys.hex_pk()).verify(content) # Verify - except ValueError: - send_response(client, "401 Unauthorized", {"error": "bad_signature"}, resp_format) + new_peer = utils.Peer(content) + except Exception as e: # TODO more specific exceptions + send_response(client, "400 Bad Request", {"error": "bad_peer"}, resp_format) + utils.logprint("Peer: bad peer: "+str(e), utils.LOG_ERROR) continue - try: - message = self.keys.decrypt_seal(message) # Decrypt - except libnacl.CryptError: - send_response(client, "403 Forbidden", {"error": "bad_encryption"}, resp_format) + + if new_peer.sigtime + self.conf["server"]["peer_sig_age_max"] <= new_peer.rectime: + send_response(client, "400 Bad Request", {"error": "too_old_sig"}, resp_format) + utils.logprint("Peer: too old sig: "+new_peer.to_human_str(), utils.LOG_WARN) continue - try: - message = ubjson.loadb(message) - except ubjson.decoder.DecoderException: - send_response(client, "400 Bad Request", {"error": "bad_ubjson"}, resp_format) + + if new_peer.hash in self.peers and new_peer.sigtime <= self.peers[new_peer.hash].sigtime: + send_response(client, "400 Bad Request", {"error": "more_recent_sig_exists"}, resp_format) + utils.logprint("Peer: have more recent sig: "+new_peer.to_human_str(), utils.LOG_WARN) continue - if new_pubkey == message["pubkey"]: - if not new_pubkey in self.peers_index: - peer = utils.Peer(new_pubkey, message["host"], message["port"], True) - self.peers.append(peer) - self.peers_index[new_pubkey] = peer - utils.logprint("Add "+str(peer), utils.LOG_TRACE) - else: - self.peers_index[new_pubkey].up = True - utils.logprint("Up "+str(peer), utils.LOG_TRACE) + utils.logprint("Peer: "+new_peer.to_human_str(), utils.LOG_TRACE) + self.peers[new_peer.hash] = new_peer + + if "info" in url: + resp["info"] = self.local_peer.raw # Send response send_response(client, "200 OK", resp, resp_format) @@ -491,13 +526,13 @@ class ServerThread(Thread): self.sock.shutdown(socket.SHUT_WR) class ClientThread(Thread): - def __init__(self, conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index, db_txs): + def __init__(self, conf, peers, keys, local_peer, pool, tx_in_index, tx_out_index, db_txs): Thread.__init__(self) self.conf = conf self.peers = peers - self.peers_index = peers_index self.keys = keys + self.local_peer = local_peer self.pool = pool self.tx_in_index = tx_in_index self.tx_out_index = tx_out_index @@ -508,62 +543,86 @@ class ClientThread(Thread): def detect_peers(self):# Check known peers and ask them for their known peer list utils.logprint("Start peers detection", utils.LOG_TRACE) - modified = True - asked = [] - while modified: - modified = False + """ + Try to interrogate 3 different random peers. When possible, do not interrogate 2 peers pertaining to the same identity. + """ + new_peers = {} + left_peers = [self.peers[p] for p in self.peers] # peers we have not tested and pertaining to not tested identities + random.shuffle(left_peers) + left2_peers = left_peers.copy() # peers we have not tested + answered = 0 # number of up tested peers + while answered < 3 and len(left2_peers) > 0: - for peer in self.peers: - if peer.pubkey in asked: - continue - asked.append(peer.pubkey) - - message = ubjson.dumpb({ - "pubkey": self.keys.pubkey, - "host": self.conf["server"]["public_host"], - "port": self.conf["server"]["public_port"], - "time": time.time() - }) - message = peer.keys.encrypt_seal(message) # Encrypt - message = self.keys.sign(message) # Sign - utils.logprint("Ask "+str(peer), utils.LOG_TRACE) + # Choose a peer + if len(left_peers) > 0: + peer = left_peers.pop() + left2_peers.remove(peer) + to_remove = [] + for i in left_peers: + if i.idty == peer.idty: + to_remove.append(i) + for i in to_remove: + left_peers.remove(i) + else: + peer = left2_peers.pop() + + # Ask the chosen peer + try: + header, content = utils.sdata(peer.host, "GET", "/peers/info", proxy=self.conf["client"]["proxy"], proxy_onion_only=self.conf["client"]["proxy_onion_only"]) + try: - header, content = utils.sdata((peer.host, peer.port), "POST", "/list/new/"+self.keys.pubkey, message, proxy=self.conf["client"]["proxy"], proxy_onion_only=self.conf["client"]["proxy_onion_only"]) # Send - except (ConnectionRefusedError, socks.GeneralProxyError, socket.gaierror, socket.timeout): - peer.up = False - utils.logprint("Down "+str(peer), utils.LOG_TRACE) + data = ubjson.loadb(content) + assert "peers" in data and type(data["peers"]) == list , "no peer list" + assert "info" in data , "no peer info" + except (ubjson.decoder.DecoderException, AssertionError) as e: + utils.logprint("Peer detection: Encoding error: "+peer.to_human_str()+"\n\t"+str(e), utils.LOG_WARN) continue - peer.up = True - utils.logprint("Up "+str(peer), utils.LOG_TRACE) try: - message = ubjson.loadb(content) - assert "peers" in message - assert type(message["peers"]) == list - except (ubjson.decoder.DecoderException, AssertionError): - utils.logprint("Bad json from "+str(peer), utils.LOG_ERROR) - continue - for i_peer in message["peers"]: - try: - assert "pubkey" in i_peer and "host" in i_peer and "port" in i_peer - int(i_peer["port"]) - except (AssertionError, ValueError): - utils.logprint("Bad json from "+str(peer), utils.LOG_ERROR) - continue - if i_peer["pubkey"] in self.peers_index or i_peer["pubkey"] == self.keys.pubkey: - continue + new_peer = utils.Peer(data["info"]) + except Exception as e: + print("Peer detection: info: "+str(e)) + new_peers.setdefault(new_peer.hash, []) + new_peers[new_peer.hash].append(new_peer) + + for raw in data["peers"]: try: - new_peer = utils.Peer(i_peer["pubkey"], i_peer["host"], i_peer["port"], None) - except ValueError: - utils.logprint("Bad pubkey from "+str(peer), utils.LOG_ERROR) + new_peer = utils.Peer(raw) + except Exception as e: + utils.logprint("Peer detection: "+str(e), utils.LOG_WARN) + if new_peer.hash == self.local_peer.hash: continue - self.peers.append(new_peer) - self.peers_index[new_peer.pubkey] = new_peer - modified = True - utils.logprint("Add "+str(new_peer), utils.LOG_TRACE) + new_peers.setdefault(new_peer.hash, []) + new_peers[new_peer.hash].append(new_peer) + + answered += 1 + + except (ConnectionRefusedError, socks.GeneralProxyError, socket.gaierror, socket.timeout): + utils.logprint("Peer detection: Network error: "+peer.to_human_str(), utils.LOG_WARN) + + # Choose the more recent peer infos + for peer in new_peers: + new_peer = max(new_peers[peer], key=lambda p: p.sigtime) # select the more recent + + if new_peer.sigtime + self.conf["server"]["peer_sig_age_max"] <= new_peer.rectime or \ + (peer in self.peers and new_peer.sigtime <= self.peers[peer].sigtime): + utils.logprint("Peer detection: too old sig", utils.LOG_TRACE) + continue + self.peers[peer] = new_peer + utils.logprint("Peer: "+new_peer.to_human_str(), utils.LOG_TRACE) + utils.logprint("Finished peers detection", utils.LOG_TRACE) + def spread_peer_info(self): + utils.logprint("Start spreading peer info", utils.LOG_TRACE) + for peer in self.peers: + try: + utils.sdata(self.peers[peer].host, "POST", "/new", self.local_peer.raw, proxy=self.conf["client"]["proxy"], proxy_onion_only=self.conf["client"]["proxy_onion_only"]) + except (ConnectionRefusedError, socks.GeneralProxyError, socket.gaierror, socket.timeout): + utils.logprint("Network error: "+self.peers[peer].to_human_str(), utils.LOG_WARN) + utils.logprint("Finished spreading peer info", utils.LOG_TRACE) + async def mix(self): can_mix = False for tx in self.pool: @@ -624,7 +683,7 @@ class ClientThread(Thread): random.shuffle(txs) for tx in txs: try: - utils.send_transaction(self.keys, tx.receiver_pubkey, tx.out_amount, utils.gen_comment(tx.out_seeds)) + await utils.send_transaction(self.keys, tx.receiver_pubkey, tx.out_amount, utils.gen_comment(tx.out_seeds)) tx.tx_sent = True tx.export_ubjson(self.db_txs) except socket.timeout: @@ -642,8 +701,13 @@ class ClientThread(Thread): asyncio.new_event_loop().run_until_complete(self.start_client()) async def start_client(self): - next_peers_detection = 0 - next_mix = time.time() + self.conf["mix"]["mix_interval"] + t = time.time() + next_mix = t + self.conf["mix"]["mix_interval"] + next_peers_detection = t + self.conf["server"]["peer_detect_interval"] + next_peer_info = t + self.conf["server"]["peer_info_interval"] + + self.detect_peers() + self.spread_peer_info() while self.work: t = time.time() @@ -651,7 +715,7 @@ class ClientThread(Thread): # Detect peers if t > next_peers_detection: self.detect_peers() - next_peers_detection = time.time()+120 + next_peers_detection = time.time() + self.conf["server"]["peer_detect_interval"] # Mix if t > next_mix: @@ -662,51 +726,66 @@ class ClientThread(Thread): if tx.need_send: utils.logprint("Send "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), utils.LOG_TRACE) - if tx.receiver_pubkey in self.peers_index: - peer = self.peers_index[tx.receiver_pubkey] + + # Find all the peers with that pubkey + peers = [] + for peer in self.peers: + if self.peers[peer].pubkey == tx.receiver_pubkey: + peers.append(self.peers[peer]) + random.shuffle(peers) + + for peer in peers: message = self.keys.sign(tx.out_seeds[1] + tx.message) # Sign try: - header, content = utils.sdata((peer.host, peer.port), "POST", "/mix/"+self.keys.pubkey+"/"+str(tx.out_amount)+"/"+str(tx.out_base), message, proxy=self.conf["client"]["proxy"], proxy_onion_only=self.conf["client"]["proxy_onion_only"]) + header, content = utils.sdata(peer.host, "POST", "/mix/"+base64.urlsafe_b64encode(self.local_peer.hash).decode()+"/"+str(tx.out_amount)+"/"+str(tx.out_base), message, proxy=self.conf["client"]["proxy"], proxy_onion_only=self.conf["client"]["proxy_onion_only"]) data = ubjson.loadb(content) assert data["mix_ok"] == tx.out_seeds[1] - tx.need_send = False - tx.export_ubjson(self.db_txs) - peer.up = True - utils.logprint("Sent "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), utils.LOG_TRACE) - utils.logprint("Up "+str(peer), utils.LOG_TRACE) except (ConnectionRefusedError, socks.GeneralProxyError, socket.gaierror, socket.timeout): peer.up = False - utils.logprint("Down "+str(peer), utils.LOG_TRACE) + utils.logprint("Network error: "+peer.to_human_str(), utils.LOG_WARN) + continue except (ubjson.decoder.DecoderException, KeyError, AssertionError): - utils.logprint("Error: bad response from "+str(peer), utils.LOG_WARN) + utils.logprint("Bad response: "+peer.to_human_str(), utils.LOG_WARN) + continue + + tx.need_send = False + tx.receiver_hash = peer.hash + tx.export_ubjson(self.db_txs) + utils.logprint("Sent "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), utils.LOG_TRACE) + break - else: - utils.logprint("Unknown peer: "+tx.receiver_pubkey, utils.LOG_WARN) + if len(peers) == 0: + utils.logprint("No peer for: "+tx.receiver_pubkey, utils.LOG_WARN) elif tx.can_confirm and tx.need_confirm and tx.send_confirm: utils.logprint("Confirm "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), utils.LOG_TRACE) - if tx.sender_pubkey in self.peers_index: - peer = self.peers_index[tx.sender_pubkey] + + if tx.sender_hash in self.peers: + peer = self.peers[tx.sender_hash] message = tx.gen_mix_confirm(self.keys) try: - header, content = utils.sdata((peer.host, peer.port), "POST", "/confirm/"+self.keys.pubkey+"/"+tx.in_seeds[1].hex(), message, proxy=self.conf["client"]["proxy"], proxy_onion_only=self.conf["client"]["proxy_onion_only"]) + header, content = utils.sdata(peer.host, "POST", "/confirm/"+base64.urlsafe_b64encode(self.local_peer.hash).decode()+"/"+tx.in_seeds[1].hex(), message, proxy=self.conf["client"]["proxy"], proxy_onion_only=self.conf["client"]["proxy_onion_only"]) data = ubjson.loadb(content) assert data["confirm_ok"] == tx.in_seeds[2] tx.need_confirm = False tx.export_ubjson(self.db_txs) peer.up = True utils.logprint("Confirmed "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), utils.LOG_TRACE) - utils.logprint("Up "+str(peer), utils.LOG_TRACE) except (ConnectionRefusedError, socks.GeneralProxyError, socket.gaierror, socket.timeout): peer.up = False - utils.logprint("Down "+str(peer), utils.LOG_TRACE) except (ubjson.decoder.DecoderException, KeyError, AssertionError): - utils.logprint("Error: bad response from "+str(peer), utils.LOG_WARN) - + utils.logprint("Bad response: "+peer.to_human_str(), utils.LOG_WARN) else: - utils.logprint("Unknown peer: "+tx.sender_pubkey, utils.LOG_WARN) + utils.logprint("No peer for: "+tx.receiver_pubkey, utils.LOG_WARN) + + # Generate peer info + if t > next_peer_info: + self.local_peer = utils.Peer.generate(self.conf, self.keys) + utils.logprint("Generated new peer info", utils.LOG_TRACE) + self.spread_peer_info() + next_peer_info = time.time() + self.conf["server"]["peer_info_interval"] # Remove expired requests expire_txs = [] @@ -722,7 +801,7 @@ class ClientThread(Thread): if len(expire_txs) > 0: utils.logprint("Removed "+str(len(expire_txs))+" expired txs", utils.LOG_TRACE) - time.sleep(5) + time.sleep(4) def get_credentials(conf): salt = conf["crypto"]["id_salt"] @@ -733,11 +812,24 @@ def get_credentials(conf): password = getpass.getpass("Enter your password: ") return salt, password +def gen_idty_sig(keys, idty_keys): + doc = { + "doctype": "gmixer/idtysig", + "docver": "1", + "pubkey": keys.pubkey, + "sigtime": time.time() + } + return idty_keys.sign(ubjson.dumpb(doc)) + # Main function def main(): - # Load conf & peers + # Load conf conf = read_config(DIR) - peers, peers_index = utils.read_peers(DIR) + + # Load peers + peers = {} + db_peers = plyvel.DB(DIR+"/db_peers", create_if_missing=True) + utils.load_peers(db_peers, peers) # Load txs pool = [] @@ -751,9 +843,12 @@ def main(): keys = SigningKey.from_credentials(salt, password) utils.logprint("Pubkey: "+keys.pubkey, utils.LOG_INFO) + # Generate peer info + local_peer = utils.Peer.generate(conf, keys) + # Start threads - clientThread = ClientThread(conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index, db_txs) - serverThread = ServerThread(conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index, db_txs) + clientThread = ClientThread(conf, peers, keys, local_peer, pool, tx_in_index, tx_out_index, db_txs) + serverThread = ServerThread(conf, peers, keys, local_peer, pool, tx_in_index, tx_out_index, db_txs) clientThread.start() serverThread.start() @@ -772,7 +867,8 @@ def main(): clientThread.join() # Save - utils.write_peers(DIR, peers) + utils.save_peers(db_peers, peers) + db_peers.close() save_txs(db_txs, pool) db_txs.close() @@ -789,23 +885,88 @@ if __name__ == "__main__": conf_overwrite = {} if "-P" in sys.argv: import subprocess - print("Fetching public address...") + utils.logprint("Fetching public address...", LOG_INFO) PUBLIC_HOST = subprocess.run(['curl', '-4', 'https://zettascript.org/tux/ip/'], stdout=subprocess.PIPE).stdout.decode("utf-8") - print("Public host: " + PUBLIC_HOST) + utils.logprint("Public host: " + PUBLIC_HOST, LOG_INFO) conf_overwrite["server.public_host"] = PUBLIC_HOST read_config(DIR, conf_overwrite) if "-s" in sys.argv: main() - elif "-i" in sys.argv: + elif "-i" in sys.argv or "-I" in sys.argv: + conf_overwrite = {} + if "-I" in sys.argv: + conf_overwrite["crypto.id_salt"], conf_overwrite["crypto.id_password"] = utils.gen_keys() + conf = read_config(DIR, conf_overwrite) + + salt, password = get_credentials(conf) + keys = SigningKey.from_credentials(salt, password) + + if conf["crypto"]["id_salt"] == "": + conf_overwrite["crypto.id_salt"] = salt + if conf["crypto"]["id_password"] == "": + conf_overwrite["crypto.id_password"] = password + + if "-g" in sys.argv: + loop = True + while loop: + idty_keys = SigningKey.from_credentials( + getpass.getpass("Identity passphrase (salt):"), + getpass.getpass("Identity password:") + ) + print(idty_keys.pubkey) + loop = input("Is that the right pubkey? [yn]: ").lower() != "y" + conf_overwrite["idty.sig"] = gen_idty_sig(keys, idty_keys).hex() + conf_overwrite["idty.pubkey"] = idty_keys.pubkey + + conf = read_config(DIR, conf_overwrite) + + elif "-p" in sys.argv: + peer_file = open(os.path.expanduser(utils.getargv("-p", "peers.ubjson")), "rb") + new_peers = ubjson.loadb(peer_file.read()) + peer_file.close() + + # Load conf conf = read_config(DIR) - utils.read_peers(DIR) + + # Load peers + peers = {} + db_peers = plyvel.DB(DIR+"/db_peers", create_if_missing=True) + utils.load_peers(db_peers, peers) + + # Import peers + for raw in new_peers: + try: + new_peer = utils.Peer(raw) + except Exception as e: # TODO more specific exceptions + print("Error: invalid peer data: "+str(e)) + continue + if new_peer.hash in peers and peers[new_peer].sigtime > new_peer.sigtime: + print("Too old: "+new_peer.to_human_str()) + continue + peers[new_peer.hash] = new_peer + print("Peer: "+new_peer.to_human_str()) + + # Save + utils.save_peers(db_peers, peers) + db_peers.close() - elif "-I" in sys.argv: - ID_SALT, ID_PASSWORD = gen_keys() + elif "-e" in sys.argv: + # Load conf conf = read_config(DIR) - utils.read_peers(DIR) + + # Get private key + salt, password = get_credentials(conf) + keys = SigningKey.from_credentials(salt, password) + print("Pubkey: "+keys.pubkey) + + # Generate peer info + local_peer = utils.Peer.generate(conf, keys) + + peer_file = open(os.path.expanduser(utils.getargv("-e", "peer_info.ubjson")), "wb") + peer_file.write(ubjson.dumpb([local_peer.raw])) + peer_file.close() elif "-k" in sys.argv: conf = read_config(DIR) @@ -814,22 +975,26 @@ if __name__ == "__main__": print(keys.pubkey) elif "-V" in sys.argv or "--version" in sys.argv: - print(VERSION) + print(utils.VERSION) elif "--help" in sys.argv: print("\ -ĞMixer-py Server "+VERSION+"\n\ +ĞMixer-py Server "+utils.VERSION+"\n\ \n\ Options:\n\ - -c <path> Change config & data dir\n\ - default: ~/.gmixer\n\ -s Start server\n\ -i Init config\n\ -I Init config (generate random keys)\n\ - -P Auto set public address (overwrites config)\n\ + -p <path> Import peers from file\n\ + -e <path> Export peer info to file (output is compatible with -p)\n\ -k Display public key\n\ - -v Verbose\n\ -V Display version\n\ --version\n\ --help Display help\n\ +\n\ + -d <path> Change config & data dir\n\ + default: ~/.gmixer\n\ + -v Verbose\n\ + -P Auto set public address (overwrites config)\n\ + -g (with -i or -I only) Generate identity signature\n\ ") diff --git a/utils.py b/utils.py index 8c8f5fd..a28a6e0 100644 --- a/utils.py +++ b/utils.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 """ + CopyLeft 2019 Pascal Engélibert <tuxmain@zettascript.org> This file is part of ĞMixer-py. ĞMixer-py is free software: you can redistribute it and/or modify @@ -16,11 +17,15 @@ along with ĞMixer-py. If not, see <https://www.gnu.org/licenses/>. """ -import sys, os, re, socket, time, secrets, hashlib +import sys, os, re, socket, time, secrets, hashlib, base64 import socks +import ubjson +import libnacl.sign +import plyvel from duniterpy.key import SigningKey, PublicKey -import silkaj.money, silkaj.tx, silkaj.auth +import silkaj.money, silkaj.tx +VERSION = "0.2.0" #-------- DATA @@ -37,7 +42,7 @@ def gen_keys() -> (str, str): return secrets.token_urlsafe(), secrets.token_urlsafe() def gen_comment(seeds:[bytes, bytes, bytes]) -> str: - return socks.b64encode(hashlib.sha512(b"".join(seeds)).digest()).decode() + return base64.urlsafe_b64encode(hashlib.sha512(b"".join(seeds)).digest()).decode() #-------- NETWORK @@ -120,49 +125,62 @@ def getargv(arg:str, default:str="", n:int=1, args:list=sys.argv) -> str: #-------- ĞMixer class Peer: - def __init__(self, pubkey:str, host:str, port:int, up:bool): - self.pubkey = pubkey - self.host = host - self.port = int(port) - self.up = up + VERSION = "1" + def __init__(self, raw:bytes): + self.rectime = time.time() + self.raw = raw + data = ubjson.loadb(raw) + pubkey = data["pubkey"] + raw = libnacl.sign.Verifier(PublicKey(pubkey).hex_pk()).verify(data["raw"]) + data = ubjson.loadb(raw) + # TODO try except - self.keys = PublicKey(pubkey) - - def __str__(self) -> str: - return self.pubkey[:8] + "@" + self.host + ":" + str(self.port) + # TODO tests + + self.doctype = data["doctype"] + self.docver = data["docver"] + self.pubkey = data["pubkey"] + self.sigtime = data["sigtime"] + self.host = tuple(data["host"]) # socket cannot manage lists + self.idty = data["idty"] + self.idtysig = data["idtysig"] + + self.hash = hashlib.sha512((self.pubkey+"@"+self.host[0]+":"+str(self.host[1])).encode()).digest() + self.keys = PublicKey(self.pubkey) + self.up = None - def export_str(self) -> str: - return self.pubkey + " " + self.host + " " + str(self.port) - -# Read peers list -def read_peers(cdir:str) -> (list, dict): - if not os.path.isfile(cdir+"/peers"): - open(cdir+"/peers", "w").close() + def to_human_str(self, short=True): + return (self.pubkey[:8] if short else self.pubkey)+"@"+self.host[0]+":"+str(self.host[1]) - peers = [] - peers_index = {} - peersfile = open(cdir+"/peers", "r") - while True: - line = peersfile.readline() - if len(line) <= 1: - break - else: - cols = line.replace("\n", "").split(" ") - peer = Peer(cols[0], cols[1], cols[2], None) - peers.append(peer) - peers_index[peer.pubkey] = peer - peersfile.close() - return peers, peers_index - -# Save peers list -def write_peers(cdir:str, peers:list): - peersfile = open(cdir+"/peers", "w") + def generate(conf:dict, keys:SigningKey) -> bytes: + data = { + "doctype": "peer", + "docver": Peer.VERSION, + "pubkey": keys.pubkey, + "sigtime": time.time(), + "host": [conf["server"]["public_host"], conf["server"]["public_port"]], + "idty": conf["idty"]["pubkey"], + "idtysig": bytes.fromhex(conf["idty"]["sig"]) + } + raw = keys.sign(ubjson.dumpb(data)) + data = { + "pubkey": keys.pubkey, + "raw": raw + } + raw = ubjson.dumpb(data) + return Peer(raw) + +def load_peers(db_peers:plyvel.DB, peers:dict): + for _, data in db_peers: + peer = Peer(data) + peers[peer.hash] = peer + +def save_peers(db_peers:plyvel.DB, peers:dict): for peer in peers: - peersfile.write(peer.export_str()+"\n") - peersfile.close() + db_peers.put(peers[peer].hash, peers[peer].raw) -def send_transaction(sender_keys:SigningKey, receiver_pubkey:str, amount:int, comment:str): - sender_amount = silkaj.money.get_amount_from_pubkey(sender_keys.pubkey)[0] - assert sender_amount >= amount, "not enough money" +async def send_transaction(sender_keys:SigningKey, receiver_pubkey:str, amount:int, comment:str): + #sender_amount = silkaj.money.get_amount_from_pubkey(sender_keys.pubkey)[0] + #assert sender_amount >= amount, "not enough money" - silkaj.tx.generate_and_send_transaction(sender_keys.hex_seed().decode(), sender_keys.pubkey, amount, [receiver_pubkey], comment) + await silkaj.tx.handle_intermediaries_transactions(sender_keys, sender_keys.pubkey, amount, [receiver_pubkey], comment) -- GitLab