Commit 226921d5 authored by Pascal Engélibert's avatar Pascal Engélibert

Remove expired requests

parent 0b0473f5
...@@ -58,7 +58,7 @@ def sendResponse(client, code, resp, dataformat="ubjson"): ...@@ -58,7 +58,7 @@ def sendResponse(client, code, resp, dataformat="ubjson"):
client.close() client.close()
class TX: class TX:
def __init__(self, sender_pubkey=None, receiver_pubkey=None, onetime_pubkey=None, in_amount=None, in_base=None, out_amount=None, out_base=None, message=None, in_comment=None, out_comment=None, send_confirm=True): def __init__(self, sender_pubkey=None, receiver_pubkey=None, onetime_pubkey=None, in_amount=None, in_base=None, out_amount=None, out_base=None, message=None, in_comment=None, out_comment=None, send_confirm=True, date=None, expire=None):
self.sender_pubkey = sender_pubkey self.sender_pubkey = sender_pubkey
self.receiver_pubkey = receiver_pubkey self.receiver_pubkey = receiver_pubkey
self.onetime_pubkey = onetime_pubkey self.onetime_pubkey = onetime_pubkey
...@@ -70,6 +70,8 @@ class TX: ...@@ -70,6 +70,8 @@ class TX:
self.in_comment = in_comment self.in_comment = in_comment
self.out_comment = out_comment self.out_comment = out_comment
self.send_confirm = send_confirm # True if sender is a server (in which case we send confirmation to it); False else (we wait a confirmation demand from it) self.send_confirm = send_confirm # True if sender is a server (in which case we send confirmation to it); False else (we wait a confirmation demand from it)
self.date = date
self.expire = expire
self.need_send = True self.need_send = True
self.can_confirm = False self.can_confirm = False
...@@ -77,7 +79,7 @@ class TX: ...@@ -77,7 +79,7 @@ class TX:
self.confirms = b"" self.confirms = b""
self.tx_sent = False self.tx_sent = False
def genMixConfirm(self, keys): def genMixConfirm(self, keys, conf):
message = { message = {
"document": "gmixer-mixconfirm1", "document": "gmixer-mixconfirm1",
"sender_pubkey": self.sender_pubkey, "sender_pubkey": self.sender_pubkey,
...@@ -88,7 +90,9 @@ class TX: ...@@ -88,7 +90,9 @@ class TX:
"receiver_pubkey": self.receiver_pubkey, "receiver_pubkey": self.receiver_pubkey,
"out_amount": self.out_amount, "out_amount": self.out_amount,
"out_base": self.out_base, "out_base": self.out_base,
"out_comment": self.out_comment "out_comment": self.out_comment,
"req_date": self.date,
"expire_date": self.expire
} }
message = ubjson.dumpb(message) message = ubjson.dumpb(message)
message = keys.sign(message) message = keys.sign(message)
...@@ -111,6 +115,8 @@ class TX: ...@@ -111,6 +115,8 @@ class TX:
"in_comment": self.in_comment, "in_comment": self.in_comment,
"out_comment": self.out_comment, "out_comment": self.out_comment,
"send_confirm": self.send_confirm, "send_confirm": self.send_confirm,
"date": self.date,
"expire": self.expire,
"need_send" : self.need_send, "need_send" : self.need_send,
"can_confirm" : self.can_confirm, "can_confirm" : self.can_confirm,
"need_confirm" : self.need_confirm, "need_confirm" : self.need_confirm,
...@@ -120,7 +126,7 @@ class TX: ...@@ -120,7 +126,7 @@ class TX:
def import_ubjson(d): def import_ubjson(d):
d = ubjson.loadb(d) d = ubjson.loadb(d)
tx = TX(d["sender_pubkey"], d["receiver_pubkey"], d["onetime_pubkey"], d["in_amount"], d["in_base"], d["out_amount"], d["out_base"], d["message"], d["in_comment"], d["out_comment"], d["send_confirm"]) tx = TX(d["sender_pubkey"], d["receiver_pubkey"], d["onetime_pubkey"], d["in_amount"], d["in_base"], d["out_amount"], d["out_base"], d["message"], d["in_comment"], d["out_comment"], d["send_confirm"], d["date"], d["expire"])
tx.need_send = d["need_send"] tx.need_send = d["need_send"]
tx.can_confirm = d["can_confirm"] tx.can_confirm = d["can_confirm"]
tx.need_confirm = d["need_confirm"] tx.need_confirm = d["need_confirm"]
...@@ -178,7 +184,7 @@ def readConfig(dir): ...@@ -178,7 +184,7 @@ def readConfig(dir):
return conf return conf
class ServerThread(Thread): class ServerThread(Thread):
def __init__(self, conf, peers, peers_index, keys, pool=[], tx_in_index={}, tx_out_index={}): def __init__(self, conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index, db_txs):
Thread.__init__(self) Thread.__init__(self)
self.conf = conf self.conf = conf
...@@ -188,6 +194,7 @@ class ServerThread(Thread): ...@@ -188,6 +194,7 @@ class ServerThread(Thread):
self.pool = pool self.pool = pool
self.tx_in_index = tx_in_index self.tx_in_index = tx_in_index
self.tx_out_index = tx_out_index self.tx_out_index = tx_out_index
self.db_txs = db_txs
self.sock = None self.sock = None
self.work = True self.work = True
...@@ -318,7 +325,8 @@ class ServerThread(Thread): ...@@ -318,7 +325,8 @@ class ServerThread(Thread):
out_comment = secrets.token_urlsafe(48) out_comment = secrets.token_urlsafe(48)
# Save tx in pool # Save tx in pool
tx = TX(sender_pubkey, receiver_pubkey, onetime_pubkey, in_amount, in_base, in_amount, in_base, message, in_comment, out_comment, send_confirm) t = time.time()
tx = TX(sender_pubkey, receiver_pubkey, onetime_pubkey, in_amount, in_base, in_amount, in_base, message, in_comment, out_comment, send_confirm, t, t+self.conf["Mix"]["MixReqAgeMax"])
last_node = len(message) == 0 last_node = len(message) == 0
tx.need_send = not last_node tx.need_send = not last_node
tx.can_confirm = last_node tx.can_confirm = last_node
...@@ -326,6 +334,7 @@ class ServerThread(Thread): ...@@ -326,6 +334,7 @@ class ServerThread(Thread):
self.tx_out_index[tx.out_comment] = tx self.tx_out_index[tx.out_comment] = tx
self.tx_in_index[tx.in_comment] = tx self.tx_in_index[tx.in_comment] = tx
self.pool.append(tx) self.pool.append(tx)
tx.export_ubjson(self.db_txs)
resp["mix_ok"] = in_comment resp["mix_ok"] = in_comment
...@@ -393,6 +402,7 @@ class ServerThread(Thread): ...@@ -393,6 +402,7 @@ class ServerThread(Thread):
message = tx.genMixConfirm(self.keys) message = tx.genMixConfirm(self.keys)
resp["confirm"] = message resp["confirm"] = message
tx.need_confirm = False tx.need_confirm = False
tx.export_ubjson(self.db_txs)
logPrint("Confirmed "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), LOG_TRACE) logPrint("Confirmed "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), LOG_TRACE)
if "list" in url: if "list" in url:
...@@ -443,7 +453,7 @@ class ServerThread(Thread): ...@@ -443,7 +453,7 @@ class ServerThread(Thread):
self.sock.shutdown(socket.SHUT_WR) self.sock.shutdown(socket.SHUT_WR)
class ClientThread(Thread): class ClientThread(Thread):
def __init__(self, conf, peers, peers_index, keys, pool): def __init__(self, conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index, db_txs):
Thread.__init__(self) Thread.__init__(self)
self.conf = conf self.conf = conf
...@@ -451,6 +461,9 @@ class ClientThread(Thread): ...@@ -451,6 +461,9 @@ class ClientThread(Thread):
self.peers_index = peers_index self.peers_index = peers_index
self.keys = keys self.keys = keys
self.pool = pool self.pool = pool
self.tx_in_index = tx_in_index
self.tx_out_index = tx_out_index
self.db_txs = db_txs
self.bma_endpoints = ["BMAS "+host for host in conf["Client"]["BMA_Hosts"].split(",")] self.bma_endpoints = ["BMAS "+host for host in conf["Client"]["BMA_Hosts"].split(",")]
self.work = True self.work = True
...@@ -567,6 +580,7 @@ class ClientThread(Thread): ...@@ -567,6 +580,7 @@ class ClientThread(Thread):
try: try:
sendTransaction(self.keys, tx.receiver_pubkey, int(tx.out_amount), tx.out_comment) sendTransaction(self.keys, tx.receiver_pubkey, int(tx.out_amount), tx.out_comment)
tx.tx_sent = True tx.tx_sent = True
tx.export_ubjson(self.db_txs)
except Exception as e: except Exception as e:
logPrint("Error when sending tx:\n" + str(e), LOG_ERROR) logPrint("Error when sending tx:\n" + str(e), LOG_ERROR)
else: else:
...@@ -609,6 +623,7 @@ class ClientThread(Thread): ...@@ -609,6 +623,7 @@ class ClientThread(Thread):
data = ubjson.loadb(content) data = ubjson.loadb(content)
assert data["mix_ok"] == tx.out_comment assert data["mix_ok"] == tx.out_comment
tx.need_send = False tx.need_send = False
tx.export_ubjson(self.db_txs)
peer.up = True peer.up = True
logPrint("Sent "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), LOG_TRACE) logPrint("Sent "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), LOG_TRACE)
logPrint("Up "+str(peer), LOG_TRACE) logPrint("Up "+str(peer), LOG_TRACE)
...@@ -632,6 +647,7 @@ class ClientThread(Thread): ...@@ -632,6 +647,7 @@ class ClientThread(Thread):
data = ubjson.loadb(content) data = ubjson.loadb(content)
assert data["confirm_ok"] == tx.in_comment assert data["confirm_ok"] == tx.in_comment
tx.need_confirm = False tx.need_confirm = False
tx.export_ubjson(self.db_txs)
peer.up = True peer.up = True
logPrint("Confirmed "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), LOG_TRACE) logPrint("Confirmed "+tx.sender_pubkey[:8]+" -> "+tx.receiver_pubkey[:8]+" = "+str(tx.in_amount)+":"+str(tx.in_base)+" -> "+str(tx.out_amount)+":"+str(tx.out_base), LOG_TRACE)
logPrint("Up "+str(peer), LOG_TRACE) logPrint("Up "+str(peer), LOG_TRACE)
...@@ -644,7 +660,17 @@ class ClientThread(Thread): ...@@ -644,7 +660,17 @@ class ClientThread(Thread):
else: else:
logPrint("Unknown peer: "+tx.sender_pubkey, LOG_WARN) logPrint("Unknown peer: "+tx.sender_pubkey, LOG_WARN)
# TODO: check too old txs and return them # Remove expired requests
expire_txs = []
expire_t = t - int(self.conf["Mix"]["MixReqAgeMax"])
for tx in self.pool:
if tx.date < expire_t:
expire_txs.append(tx)
for tx in expire_txs:
self.tx_in_index.pop(tx.in_comment)
self.tx_out_index.pop(tx.out_comment)
self.db_txs.delete(tx.in_comment.encode())
self.pool.remove(tx)
time.sleep(5) time.sleep(5)
...@@ -676,8 +702,8 @@ def main(): ...@@ -676,8 +702,8 @@ def main():
logPrint("Pubkey: "+keys.pubkey, LOG_INFO) logPrint("Pubkey: "+keys.pubkey, LOG_INFO)
# Start threads # Start threads
clientThread = ClientThread(conf, peers, peers_index, keys, pool) clientThread = ClientThread(conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index, db_txs)
serverThread = ServerThread(conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index) serverThread = ServerThread(conf, peers, peers_index, keys, pool, tx_in_index, tx_out_index, db_txs)
clientThread.start() clientThread.start()
serverThread.start() serverThread.start()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment