diff --git a/silkaj/auth.py b/silkaj/auth.py index 8b421e1c4cfec6ee802a0185dab4b20a3d24e4b9..2be8c9ecee76180d465d4c002d7de6a5c1de4494 100644 --- a/silkaj/auth.py +++ b/silkaj/auth.py @@ -38,8 +38,7 @@ def auth_method(ctx: Context) -> SigningKey: return auth_by_auth_file() if ctx.obj["AUTH_WIF"]: return auth_by_wif() - else: - return auth_by_scrypt() + return auth_by_scrypt() @pass_context @@ -81,7 +80,7 @@ def auth_by_auth_file(ctx: Context) -> SigningKey: authfile = Path(file) if not authfile.is_file(): sys.exit(f"Error: `{file}` file does not exist") - filetxt = authfile.open("r").read() + filetxt = authfile.open("r", encoding="utf-8").read() # two regural expressions for the PubSec format regex_pubkey = re.compile(PUBSEC_PUBKEY_PATTERN, re.MULTILINE) @@ -91,16 +90,17 @@ def auth_by_auth_file(ctx: Context) -> SigningKey: if re.search(re.compile(SEED_HEX_PATTERN), filetxt): return SigningKey.from_seedhex_file(file) # PubSec format - elif re.search(regex_pubkey, filetxt) and re.search(regex_signkey, filetxt): + if re.search(regex_pubkey, filetxt) and re.search(regex_signkey, filetxt): return SigningKey.from_pubsec_file(file) - else: - sys.exit("Error: the format of the file is invalid") + sys.exit("Error: the format of the file is invalid") def auth_by_seed() -> SigningKey: seedhex = getpass("Please enter your seed on hex format: ") try: return SigningKey.from_seedhex(seedhex) + # To be fixed upstream in DuniterPy + # pylint: disable=broad-except except Exception as error: sys.exit(error) @@ -114,6 +114,7 @@ def auth_by_scrypt(ctx: Context) -> SigningKey: n, r, p = ctx.obj["AUTH_SCRYPT_PARAMS"].split(",") if n.isnumeric() and r.isnumeric() and p.isnumeric(): + # pylint: disable=too-many-boolean-expressions n, r, p = int(n), int(r), int(p) if n <= 0 or n > 65536 or r <= 0 or r > 512 or p <= 0 or p > 32: sys.exit("Error: the values of Scrypt parameters are not good") @@ -136,5 +137,7 @@ def auth_by_wif() -> SigningKey: ) try: return SigningKey.from_wif_or_ewif_hex(wif_hex, password) + # To be fixed upstream in DuniterPy + # pylint: disable=broad-except except Exception as error: sys.exit(error) diff --git a/silkaj/blocks.py b/silkaj/blocks.py index 5cdb3dd624aa49c6bc977e2d26d21d5a4d9c7ee7..8922e56afc1b26d2fd09f619b11ff8c74031e3ba 100644 --- a/silkaj/blocks.py +++ b/silkaj/blocks.py @@ -38,13 +38,15 @@ If nothing specified, the whole blockchain gets verified.", def verify_blocks_signatures(from_block: int, to_block: int) -> None: client = client_instance() to_block = check_passed_blocks_range(client, from_block, to_block) - invalid_blocks_signatures = list() # type: List[int] + invalid_blocks_signatures = [] # type: List[int] chunks_from = range(from_block, to_block + 1, BMA_MAX_BLOCKS_CHUNK_SIZE) with progressbar(chunks_from, label="Processing blocks verification") as bar: for chunk_from in bar: chunk_size = get_chunk_size(from_block, to_block, chunks_from, chunk_from) logging.info( - f"Processing chunk from block {chunk_from} to {chunk_from + chunk_size}" + "Processing chunk from block %d to %d", + chunk_from, + chunk_from + chunk_size, ) chunk = get_chunk(client, chunk_size, chunk_from) @@ -75,8 +77,7 @@ def get_chunk_size( Otherwise, calculate the size for the last chunk""" if chunk_from != chunks_from[-1]: return BMA_MAX_BLOCKS_CHUNK_SIZE - else: - return (to_block + 1 - from_block) % BMA_MAX_BLOCKS_CHUNK_SIZE + return (to_block + 1 - from_block) % BMA_MAX_BLOCKS_CHUNK_SIZE def get_chunk(client: Client, chunk_size: int, chunk_from: int) -> List: diff --git a/silkaj/cert.py b/silkaj/cert.py index 4aed20f978bc59f836c4f20750aee15b680dca16..88a22faf455b60b8e9f124f6a8af8f5d156ba352 100644 --- a/silkaj/cert.py +++ b/silkaj/cert.py @@ -44,6 +44,7 @@ def send_certification(ctx: click.Context, uid_pubkey_to_certify: str) -> None: if checked_pubkey: uid_pubkey_to_certify = str(checked_pubkey) + # pylint: disable=unused-variable idty_to_certify, pubkey_to_certify, send_certs = wot.choose_identity( uid_pubkey_to_certify ) @@ -118,7 +119,7 @@ def certification_confirmation( pubkey_to_certify: str, idty_to_certify: Dict, ) -> None: - cert = list() + cert = [] cert.append(["Cert", "Issuer", "–>", "Recipient: Published: #block-hash date"]) client = client_instance() idty_timestamp = idty_to_certify["meta"]["timestamp"] diff --git a/silkaj/cli.py b/silkaj/cli.py index 9ec4bac6ee09659f242de0780a8bfdcb91d2cc90..585b86ff25e2b53bc8ed0a40532eaadb7cc3ee07 100644 --- a/silkaj/cli.py +++ b/silkaj/cli.py @@ -16,7 +16,7 @@ import sys from click import Context, group, help_option, option, pass_context, version_option -from duniterpy.api.endpoint import endpoint +from duniterpy.api.endpoint import endpoint as du_endpoint from silkaj import revocation from silkaj.auth import generate_auth_file @@ -44,7 +44,7 @@ from silkaj.wot import id_pubkey_correspondence, received_sent_certifications "--endpoint", "-ep", help=f"Default endpoint to reach Ğ1 currency by its official node\ - {endpoint(G1_DEFAULT_ENDPOINT).host}\ + {du_endpoint(G1_DEFAULT_ENDPOINT).host}\ This option allows to specify a custom endpoint as follow: <host>:<port>.\ In case no port is specified, it defaults to 443.", ) @@ -53,7 +53,7 @@ from silkaj.wot import id_pubkey_correspondence, received_sent_certifications "-gt", is_flag=True, help=f"Default endpoint to reach ĞTest currency by its official node: \ -{endpoint(G1_TEST_DEFAULT_ENDPOINT).host}", +{du_endpoint(G1_TEST_DEFAULT_ENDPOINT).host}", ) @option( "--auth-scrypt", @@ -106,7 +106,7 @@ def cli( if display and dry_run: sys.exit("ERROR: display and dry-run options can not be used together") - ctx.obj = dict() + ctx.obj = {} ctx.ensure_object(dict) ctx.obj["ENDPOINT"] = endpoint ctx.obj["GTEST"] = gtest diff --git a/silkaj/cli_tools.py b/silkaj/cli_tools.py index 498a110a658a15d83b67f3548585ad80568ae299..2268e58d4ed9ffb2b85c7ead40cefd2c54023e05 100644 --- a/silkaj/cli_tools.py +++ b/silkaj/cli_tools.py @@ -18,15 +18,16 @@ from typing import Any from click import Context, Option, UsageError +# pylint: disable=too-few-public-methods class MutuallyExclusiveOption(Option): def __init__(self, *args: Any, **kwargs: Any) -> None: self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) - help = kwargs.get("help", "") + _help = kwargs.get("help", "") if self.mutually_exclusive: ex_str = ", ".join(self.mutually_exclusive) kwargs[ "help" - ] = f"{help} NOTE: This argument is mutually exclusive with arguments: [{ex_str}]." + ] = f"{_help} NOTE: This argument is mutually exclusive with arguments: [{ex_str}]." super().__init__(*args, **kwargs) def handle_parse_result(self, ctx: Context, opts: Any, args: Any) -> Any: diff --git a/silkaj/commands.py b/silkaj/commands.py index adad2ec56e760fc0c3a2338315eb786c691d0927..e29e158df7dbe4dae383773ed97fbd5653cfbde1 100644 --- a/silkaj/commands.py +++ b/silkaj/commands.py @@ -61,24 +61,24 @@ def currency_info() -> None: ) -def match_pattern(pow: int, match: str = "", p: int = 1) -> Tuple[str, int]: - while pow > 0: - if pow >= 16: +def match_pattern(_pow: int, match: str = "", p: int = 1) -> Tuple[str, int]: + while _pow > 0: + if _pow >= 16: match += "0" - pow -= 16 + _pow -= 16 p *= 16 else: - match += f"[0-{hex(15 - pow)[2:].upper()}]" - p *= pow - pow = 0 + match += f"[0-{hex(15 - _pow)[2:].upper()}]" + p *= _pow + _pow = 0 return f"{match}*", p -def power(nbr: float, pow: int = 0) -> str: +def compute_power(nbr: float, power: int = 0) -> str: while nbr >= 10: nbr /= 10 - pow += 1 - return f"{nbr:.1f} × 10^{pow}" + power += 1 + return f"{nbr:.1f} × 10^{power}" @command( @@ -107,7 +107,7 @@ def display_diffi(current: WSConnection, diffi: Dict) -> None: if d["level"] / 2 < current["powMin"]: issuers += 1 d["match"] = match_pattern(d["level"])[0][:20] - d["Πdiffi"] = power(match_pattern(d["level"])[1]) + d["Πdiffi"] = compute_power(match_pattern(d["level"])[1]) d["Σ diffi"] = d.pop("level") system("cls||clear") block_gen = from_timestamp(current["time"], tz="local").format(ALL) @@ -135,8 +135,8 @@ def list_blocks(number: int, detailed: bool) -> None: number = head_block["issuersFrame"] client = client_instance() blocks = client(bma.blockchain.blocks, number, current_nbr - number + 1) - issuers = list() - issuers_dict = dict() + issuers = [] + issuers_dict = {} for block in blocks: issuer = OrderedDict() issuer["pubkey"] = block["issuer"] @@ -164,6 +164,10 @@ def list_blocks(number: int, detailed: bool) -> None: ): issuer2["uid"] = idty["uid"] if idty else None issuer2.pop("pubkey") + print_blocks_views(issuers, current_nbr, number, detailed) + + +def print_blocks_views(issuers, current_nbr, number, detailed): header = ( f"Last {number} blocks from n°{current_nbr - number + 1} to n°{current_nbr}" ) @@ -175,7 +179,7 @@ def list_blocks(number: int, detailed: bool) -> None: ) print(f"\n{table}") else: - list_issued = list() # type: List[OrderedDict] + list_issued = [] # type: List[OrderedDict] for issuer in issuers: found = False for issued in list_issued: diff --git a/silkaj/crypto_tools.py b/silkaj/crypto_tools.py index ba183a84a6ee50cf5ab44ba88f9054b47d358a42..5e39c2f305b1540f954e6df150504de4255d918a 100644 --- a/silkaj/crypto_tools.py +++ b/silkaj/crypto_tools.py @@ -49,9 +49,9 @@ def check_pubkey_format(pubkey: str, display_error: bool = True) -> Optional[boo """ if re.search(re.compile(PUBKEY_DELIMITED_PATTERN), pubkey): return False - elif re.search(re.compile(PUBKEY_CHECKSUM_PATTERN), pubkey): + if re.search(re.compile(PUBKEY_CHECKSUM_PATTERN), pubkey): return True - elif display_error: + if display_error: message_exit(f"Error: bad format for following public key: {pubkey}") return None @@ -77,5 +77,5 @@ def gen_checksum(pubkey: str) -> str: Returns the checksum of the input pubkey (encoded in b58) """ pubkey_byte = base58.b58decode(pubkey) - hash = hashlib.sha256(hashlib.sha256(pubkey_byte).digest()).digest() - return str(base58.b58encode(hash)[:3].decode("utf-8")) + _hash = hashlib.sha256(hashlib.sha256(pubkey_byte).digest()).digest() + return str(base58.b58encode(_hash)[:3].decode("utf-8")) diff --git a/silkaj/g1_monetary_license.py b/silkaj/g1_monetary_license.py index b89ce1e1b23ee0a6d4cb4c8217c19fcec55c190e..4a381880654272dece8cf898e9853ee9ccb96b0f 100644 --- a/silkaj/g1_monetary_license.py +++ b/silkaj/g1_monetary_license.py @@ -51,8 +51,8 @@ class G1MonetaryLicense: """ selected_language_code = self.language_prompt() license_path = self.get_license_path(selected_language_code) - with open(license_path) as license: - click.echo_via_pager(license.read()) + with open(license_path, encoding="utf-8") as _license: + click.echo_via_pager(_license.read()) def language_prompt(self) -> str: return click.prompt( @@ -68,7 +68,7 @@ class G1MonetaryLicense: Handle long language codes ie: 'fr-FR' """ self.languages_codes = [] - licenses_path = sorted(Path(self.licenses_dir_path).glob(self.file_name("*"))) + licenses_path = sorted(Path(self.licenses_dir_path).glob(file_name("*"))) for license_path in licenses_path: language_code = license_path.stem[-2:] if language_code.isupper(): @@ -76,7 +76,8 @@ class G1MonetaryLicense: self.languages_codes.append(language_code) def get_license_path(self, language_code: str) -> Path: - return Path(self.licenses_dir_path, self.file_name(language_code)) + return Path(self.licenses_dir_path, file_name(language_code)) - def file_name(self, language_code: str) -> str: - return f"g1_monetary_license_{language_code}.rst" + +def file_name(language_code: str) -> str: + return f"g1_monetary_license_{language_code}.rst" diff --git a/silkaj/idty_tools.py b/silkaj/idty_tools.py index 2f844aecd127865e59d3f4084ce90b94fafd463e..cc4215bc37757a76a1724654fe3e21fd70c319dc 100644 --- a/silkaj/idty_tools.py +++ b/silkaj/idty_tools.py @@ -35,7 +35,7 @@ def display_identity(idty: Identity) -> Texttable: Creates a table containing the identity infos """ client = client_instance() - id_table = list() + id_table = [] id_table.append(["Public key", gen_pubkey_checksum(idty.pubkey)]) id_table.append(["User ID", idty.uid]) id_table.append(["Blockstamp", str(idty.block_id)]) @@ -81,17 +81,18 @@ def check_many_identities(document: Union[Identity, Revocation]) -> bool: if len(lookup_ids) >= 1: click.echo(f"One matching identity!\nSimilar identities:\n{alternate_ids}") return True - else: - click.echo(f"{error_no_identical_id}\nSimilar identities:\n{alternate_ids}") - return False + click.echo(f"{error_no_identical_id}\nSimilar identities:\n{alternate_ids}") + return False def display_alternate_ids(ids_list: List) -> Texttable: labels = ["uid", "public key", "timestamp"] table = Texttable(max_width=shutil.get_terminal_size().columns) table.header(labels) - for id in ids_list: - table.add_row([id.uid, gen_pubkey_checksum(id.pubkey), str(id.block_id)[:12]]) + for _id in ids_list: + table.add_row( + [_id.uid, gen_pubkey_checksum(_id.pubkey), str(_id.block_id)[:12]] + ) return table @@ -101,30 +102,30 @@ def merge_ids_lists(lookups_pubkey: List, lookups_uid: List, currency: str) -> L """ ids = ids_list_from_lookups(lookups_pubkey, currency) ids_uid = ids_list_from_lookups(lookups_uid, currency) - for id in ids_uid: + for _id in ids_uid: # __equal__ does not work. This is condition "id in ids". for listed_id in ids: - if id.signed_raw() == listed_id.signed_raw(): + if _id.signed_raw() == listed_id.signed_raw(): id_in_ids = True break id_in_ids = False if not id_in_ids: - ids.append(id) + ids.append(_id) return ids def ids_list_from_lookups(lookups: List, currency: str) -> List: - ids = list() + ids = [] for lookup in lookups: pubkey = lookup["pubkey"] lookup_ids = lookup["uids"] - for id in lookup_ids: + for _id in lookup_ids: appended_id = Identity( currency=currency, pubkey=pubkey, - uid=id["uid"], - block_id=BlockID.from_str(id["meta"]["timestamp"]), + uid=_id["uid"], + block_id=BlockID.from_str(_id["meta"]["timestamp"]), ) - appended_id.signature = id["self"] + appended_id.signature = _id["self"] ids.append(appended_id) return ids diff --git a/silkaj/membership.py b/silkaj/membership.py index a26e20a88ccd40c9ae5a87ef506552cfe0a98ac3..6862199660b2e9aaec942b2c2df105a31ff87122 100644 --- a/silkaj/membership.py +++ b/silkaj/membership.py @@ -104,20 +104,25 @@ def display_confirmation_table( pending_memberships = identity_requirements["pendingMemberships"] break - table = list() + table = [] if membership_expires: expires = pendulum.now().add(seconds=membership_expires).diff_for_humans() table.append(["Expiration date of current membership", expires]) if pending_memberships: - line = [ - "Number of pending membership(s) in the mempool", - len(pending_memberships), - ] - table.append(line) - - expiration = pendulum.now().add(seconds=pending_expires).diff_for_humans() - table.append(["Pending membership documents will expire", expiration]) + table.append( + [ + "Number of pending membership(s) in the mempool", + len(pending_memberships), + ] + ) + + table.append( + [ + "Pending membership documents will expire", + pendulum.now().add(seconds=pending_expires).diff_for_humans(), + ] + ) table.append(["User Identifier (UID)", identity_uid]) table.append(["Public Key", tui.gen_pubkey_checksum(pubkey)]) @@ -125,20 +130,26 @@ def display_confirmation_table( table.append(["Block Identity", str(identity_block_id)[:45] + "…"]) block = client(bma.blockchain.block, identity_block_id.number) - date_idty_pub = pendulum.from_timestamp(block["time"], tz="local").format(DATE) - table.append(["Identity published", date_idty_pub]) + table.append( + [ + "Identity published", + pendulum.from_timestamp(block["time"], tz="local").format(DATE), + ] + ) params = get_blockchain_parameters() - membership_validity = ( - pendulum.now().add(seconds=params["msValidity"]).diff_for_humans() + table.append( + [ + "Expiration date of new membership", + pendulum.now().add(seconds=params["msValidity"]).diff_for_humans(), + ] ) - table.append(["Expiration date of new membership", membership_validity]) - membership_mempool = ( - pendulum.now().add(seconds=params["msPeriod"]).diff_for_humans() - ) table.append( - ["Expiration date of new membership from the mempool", membership_mempool] + [ + "Expiration date of new membership from the mempool", + pendulum.now().add(seconds=params["msPeriod"]).diff_for_humans(), + ] ) click.echo(tabulate(table, tablefmt="fancy_grid")) diff --git a/silkaj/money.py b/silkaj/money.py index c8ac2fcc9812b8585d18e2e0651a57f2efa5e345..4eef77e39470e5a6e70efe5732aca3a8c4524076 100644 --- a/silkaj/money.py +++ b/silkaj/money.py @@ -40,7 +40,7 @@ def cmd_amount(ctx: Context, pubkeys: str) -> None: # check input pubkeys if not pubkeys: sys.exit("You should specify one or many pubkeys") - pubkeys_list = list() + pubkeys_list = [] wrong_pubkeys = False for input_pubkey in pubkeys: checked_pubkey = is_pubkey_and_check(input_pubkey) @@ -81,7 +81,7 @@ def show_amount_from_pubkey(label: str, inputs_balance: List[int]) -> None: balance = inputs_balance[1] currency_symbol = get_currency_symbol() ud_value = get_ud_value() - average, monetary_mass = get_average() + average = get_average() member = None # if `pubkey` is a pubkey, get pubkey:checksum and uid @@ -89,7 +89,7 @@ def show_amount_from_pubkey(label: str, inputs_balance: List[int]) -> None: member = wt.is_member(label) label = gen_pubkey_checksum(label) # display balance table - display = list() + display = [] display.append(["Balance of pubkey", label]) if member: @@ -114,20 +114,17 @@ def show_amount_from_pubkey(label: str, inputs_balance: List[int]) -> None: echo(tabulate(display, tablefmt="fancy_grid")) -def get_average() -> Tuple[int, int]: +def get_average() -> int: head = get_head_block() - monetary_mass = head["monetaryMass"] - members_count = head["membersCount"] - average = monetary_mass / members_count - return average, monetary_mass + return head["monetaryMass"] / head["membersCount"] def get_amount_from_pubkey(pubkey: str) -> List[int]: listinput, amount = get_sources(pubkey) totalAmountInput = 0 - for input in listinput: - totalAmountInput += amount_in_current_base(input) + for _input in listinput: + totalAmountInput += amount_in_current_base(_input) return [totalAmountInput, amount] @@ -136,7 +133,7 @@ def get_sources(pubkey: str) -> Tuple[List[InputSource], int]: # Sources written into the blockchain sources = client(tx.sources, pubkey) - listinput = list() + listinput = [] amount = 0 for source in sources["sources"]: if source["conditions"] == f"SIG({pubkey})": @@ -152,35 +149,34 @@ def get_sources(pubkey: str) -> Tuple[List[InputSource], int]: amount += amount_in_current_base(listinput[-1]) # pending source - history = client(tx.pending, pubkey) - history = history["history"] + history = (client(tx.pending, pubkey))["history"] pendings = history["sending"] + history["receiving"] + history["pending"] # add pending output - pending_sources = list() + pending_sources = [] for pending in pendings: - identifier = pending["hash"] for i, output in enumerate(pending["outputs"]): + # duniterpy#80 outputsplited = output.split(":") if outputsplited[2] == f"SIG({pubkey})": inputgenerated = InputSource( amount=int(outputsplited[0]), base=int(outputsplited[1]), source="T", - origin_id=identifier, + origin_id=pending["hash"], index=i, ) if inputgenerated not in listinput: # add pendings before blockchain sources for change txs listinput.insert(0, inputgenerated) - for input in pending["inputs"]: - pending_sources.append(InputSource.from_inline(input)) + for _input in pending["inputs"]: + pending_sources.append(InputSource.from_inline(_input)) # remove input already used - for input in pending_sources: - if input in listinput: - listinput.remove(input) + for _input in pending_sources: + if _input in listinput: + listinput.remove(_input) return listinput, amount diff --git a/silkaj/network_tools.py b/silkaj/network_tools.py index 13c2f074e663d7484468e90e18fcfd13eb10c747..433ec6f271292b29e9759923154fb3cb49381820 100644 --- a/silkaj/network_tools.py +++ b/silkaj/network_tools.py @@ -61,13 +61,11 @@ Expected format: {host|ipv4|[ipv6]}:{port}{/path}" if port == 443: return ep.SecuredBMAEndpoint(host, ipv4, m["ipv6"], port, m["path"]) - else: - return ep.BMAEndpoint(host, ipv4, m["ipv6"], port) + return ep.BMAEndpoint(host, ipv4, m["ipv6"], port) - elif gtest: + if gtest: return ep.endpoint(constants.G1_TEST_DEFAULT_ENDPOINT) - else: - return ep.endpoint(constants.G1_DEFAULT_ENDPOINT) + return ep.endpoint(constants.G1_DEFAULT_ENDPOINT) @functools.lru_cache(maxsize=1) diff --git a/silkaj/revocation.py b/silkaj/revocation.py index 23c81380d6ec2e8d343ef18ba1f2471049a13343..f7cb09691be05c3ba4f8f3e7dc6a55c2e2a0bc09 100644 --- a/silkaj/revocation.py +++ b/silkaj/revocation.py @@ -48,8 +48,8 @@ def save(ctx: click.Context, file: str) -> None: key = auth.auth_method() tui.gen_pubkey_checksum(key.pubkey) - id = (wot.choose_identity(key.pubkey))[0] - rev_doc = create_revocation_doc(id, key.pubkey, currency) + _id = (wot.choose_identity(key.pubkey))[0] + rev_doc = create_revocation_doc(_id, key.pubkey, currency) rev_doc.sign(key) if ctx.obj["DRY_RUN"]: @@ -80,8 +80,8 @@ def revoke_now(ctx: click.Context) -> None: key = auth.auth_method() tui.gen_pubkey_checksum(key.pubkey) - id = (wot.choose_identity(key.pubkey))[0] - rev_doc = create_revocation_doc(id, key.pubkey, currency) + _id = (wot.choose_identity(key.pubkey))[0] + rev_doc = create_revocation_doc(_id, key.pubkey, currency) rev_doc.sign(key) if ctx.obj["DRY_RUN"]: @@ -165,18 +165,18 @@ All currently sent certifications will remain valid until they expire." tui.send_doc_confirmation("revocation document immediately") -def create_revocation_doc(id: Dict, pubkey: str, currency: str) -> Revocation: +def create_revocation_doc(_id: Dict, pubkey: str, currency: str) -> Revocation: """ Creates an unsigned revocation document. - id is the dict object containing id infos from request wot.requirements + _id is the dict object containing id infos from request wot.requirements """ idty = Identity( currency=currency, pubkey=pubkey, - uid=id["uid"], - block_id=BlockID.from_str(id["meta"]["timestamp"]), + uid=_id["uid"], + block_id=BlockID.from_str(_id["meta"]["timestamp"]), ) - idty.signature = id["self"] + idty.signature = _id["self"] return Revocation( currency=currency, identity=idty, @@ -197,7 +197,7 @@ generated revocation document corresponding to {pubkey_cksum} public key?" click.echo("Ok, goodbye!") sys.exit(SUCCESS_EXIT_STATUS) # write doc - with open(rev_path, "w") as file: + with open(rev_path, "w", encoding="utf-8") as file: file.write(content) click.echo( f"Revocation document file stored into `{path}` for following public key: {pubkey_cksum}" @@ -218,7 +218,7 @@ def verify_document(doc: str) -> Revocation: if not Path(doc).is_file(): sys.exit(f"Error: file {doc} does not exist") - with open(doc) as document: + with open(doc, encoding="utf-8") as document: original_doc = document.read() try: diff --git a/silkaj/tools.py b/silkaj/tools.py index 60e30d1e803e524ad693666655c5a992497cbf7b..70e8f85f07d2f8880d4e17bbb198664d2dba8bfc 100644 --- a/silkaj/tools.py +++ b/silkaj/tools.py @@ -14,7 +14,7 @@ # along with Silkaj. If not, see <https://www.gnu.org/licenses/>. import functools -from sys import exit +import sys from silkaj.blockchain_tools import get_blockchain_parameters from silkaj.constants import FAILURE_EXIT_STATUS, G1_SYMBOL, GTEST_SYMBOL @@ -30,4 +30,4 @@ def get_currency_symbol() -> str: def message_exit(message: str) -> None: print(message) - exit(FAILURE_EXIT_STATUS) + sys.exit(FAILURE_EXIT_STATUS) diff --git a/silkaj/tui.py b/silkaj/tui.py index ecc407ac94d95ec245907d38cc2af7f27aa78629..99a3dec81b8303e4d8101d30974b60e20b96df0d 100644 --- a/silkaj/tui.py +++ b/silkaj/tui.py @@ -41,12 +41,12 @@ def display_amount( def display_pubkey(tx: List, message: str, pubkey: str) -> None: """ - Displays a pubkey and the eventually associated id. + Displays a pubkey and the eventually associated identity """ tx.append([f"{message} (pubkey:checksum)", gen_pubkey_checksum(pubkey)]) - id = wot_tools.is_member(pubkey) - if id: - tx.append([f"{message} (id)", id["uid"]]) + idty = wot_tools.is_member(pubkey) + if idty: + tx.append([f"{message} (id)", idty["uid"]]) def gen_pubkey_checksum( diff --git a/silkaj/tx.py b/silkaj/tx.py index 432d672a3558b82496fbb4ecb0b78569a4e41e45..c7c10ff483d09aa76156e84d9dfca1626081a3e1 100644 --- a/silkaj/tx.py +++ b/silkaj/tx.py @@ -15,8 +15,8 @@ import math +import re import shlex -from re import compile, search from typing import List, Tuple import click @@ -211,7 +211,7 @@ def parse_file_containing_amounts_recipients( """ reference = "" amounts, recipients = [], [] - with open(file_path) as file: + with open(file_path, encoding="utf-8") as file: for n, raw_line in enumerate(file): line = shlex.split(raw_line, True) if line: @@ -224,7 +224,7 @@ def parse_file_containing_amounts_recipients( except (ValueError, IndexError): tools.message_exit(f"Syntax error at line {n + 1}") - if not reference or (reference != "ABSOLUTE" and reference != "RELATIVE"): + if not reference or reference not in ("ABSOLUTE", "RELATIVE"): tools.message_exit( f"{file_path} must contain at first line 'ABSOLUTE' or 'RELATIVE' header" ) @@ -273,7 +273,7 @@ def compute_amounts(amounts: List[float], multiplicator: float) -> List[int]: If relative amount, check that amount is superior to minimal amount. """ # Create amounts list - amounts_list = list() + amounts_list = [] for amount in amounts: computed_amount = amount * multiplicator # check if relative amounts are high enough @@ -333,7 +333,7 @@ def gen_confirmation_table( currency_symbol = tools.get_currency_symbol() ud_value = money.get_ud_value() total_tx_amount = sum(tx_amounts) - tx = list() # type: List[List[str]] + tx = [] # type: List[List[str]] # display account situation tui.display_amount( tx, @@ -371,16 +371,16 @@ def gen_confirmation_table( def get_list_input_for_transaction( pubkey: str, TXamount: int, outputs_number: int ) -> Tuple[List[InputSource], int, bool]: - listinput, amount = money.get_sources(pubkey) + listinput = money.get_sources(pubkey)[0] maxInputsNumber = max_inputs_number(outputs_number, NBR_ISSUERS) # generate final list source listinputfinal = [] totalAmountInput = 0 intermediatetransaction = False - for nbr_inputs, input in enumerate(listinput, start=1): - listinputfinal.append(input) - totalAmountInput += money.amount_in_current_base(input) - TXamount -= money.amount_in_current_base(input) + for nbr_inputs, _input in enumerate(listinput, start=1): + listinputfinal.append(_input) + totalAmountInput += money.amount_in_current_base(_input) + TXamount -= money.amount_in_current_base(_input) # if too much sources, it's an intermediate transaction. amount_not_reached_and_max_doc_size_reached = ( TXamount > 0 and MAX_INPUTS_PER_TX <= nbr_inputs @@ -509,9 +509,6 @@ def generate_transaction_document( total_tx_amount = sum(tx_amounts) head_block = bt.get_head_block() - currency_name = head_block["currency"] - current_block_id = BlockID(head_block["number"], head_block["hash"]) - curentUnitBase = head_block["unitbase"] if not OutputbackChange: OutputbackChange = issuers @@ -520,18 +517,18 @@ def generate_transaction_document( # we remove units after two digits after the decimal point if issuers not in outputAddresses: total_tx_amount = ( - total_tx_amount // 10**curentUnitBase - ) * 10**curentUnitBase + total_tx_amount // 10 ** head_block["unitbase"] + ) * 10 ** head_block["unitbase"] # Generate output ################ listoutput = [] # type: List[OutputSource] for tx_amount, outputAddress in zip(tx_amounts, outputAddresses): - generate_output(listoutput, curentUnitBase, tx_amount, outputAddress) + generate_output(listoutput, head_block["unitbase"], tx_amount, outputAddress) # Outputs to himself rest = totalAmountInput - total_tx_amount - generate_output(listoutput, curentUnitBase, rest, OutputbackChange) + generate_output(listoutput, head_block["unitbase"], rest, OutputbackChange) # Unlocks unlocks = generate_unlocks(listinput) @@ -540,19 +537,19 @@ def generate_transaction_document( ############################## return Transaction( - block_id=current_block_id, + block_id=BlockID(head_block["number"], head_block["hash"]), locktime=0, issuers=[issuers], inputs=listinput, unlocks=unlocks, outputs=listoutput, comment=Comment, - currency=currency_name, + currency=head_block["currency"], ) def generate_unlocks(listinput: List[InputSource]) -> List[Unlock]: - unlocks = list() + unlocks = [] for i in range(0, len(listinput)): unlocks.append(Unlock(index=i, parameters=[SIGParameter(0)])) return unlocks @@ -579,16 +576,16 @@ def generate_output( def checkComment(comment: str) -> None: if len(comment) > MAX_COMMENT_LENGTH: tools.message_exit("Error: Comment is too long") - regex = compile( + regex = re.compile( "^[0-9a-zA-Z\\ \\-\\_\\:\\/\\;\\*\\[\\]\\(\\)\\?\ \\!\\^\\+\\=\\@\\&\\~\\#\\{\\}\\|\\\\<\\>\\%\\.]*$" ) - if not search(regex, comment): + if not re.search(regex, comment): tools.message_exit("Error: the format of the comment is invalid") def truncBase(amount: int, base: int) -> int: - pow = int(math.pow(10, base)) - if amount < pow: + _pow = int(math.pow(10, base)) + if amount < _pow: return 0 - return math.trunc(amount / pow) * pow + return math.trunc(amount / _pow) * _pow diff --git a/silkaj/tx_history.py b/silkaj/tx_history.py index f7e630231b292c6fb0f24f4559249ccf6eb86ec6..b331c8ec5c3c2dc46c5603321d67b2c4c9bdea47 100644 --- a/silkaj/tx_history.py +++ b/silkaj/tx_history.py @@ -16,6 +16,7 @@ import shutil from operator import eq, itemgetter, ne, neg from typing import Any, List, Optional, Tuple +from urllib.error import HTTPError from click import argument, command, echo_via_pager, option from duniterpy.api.bma.tx import history @@ -47,10 +48,7 @@ def transaction_history(pubkey: str, uids: bool, full_pubkey: bool) -> None: currency_symbol = get_currency_symbol() header = generate_header(pubkey, currency_symbol, ud_value) - received_txs, sent_txs = ( - list(), - list(), - ) # type: List[Transaction], List[Transaction] + received_txs, sent_txs = [], [] # type: List[Transaction], List[Transaction] get_transactions_history(client, pubkey, received_txs, sent_txs) remove_duplicate_txs(received_txs, sent_txs) txs_list = generate_table( @@ -64,7 +62,7 @@ def transaction_history(pubkey: str, uids: bool, full_pubkey: bool) -> None: def generate_header(pubkey: str, currency_symbol: str, ud_value: int) -> str: try: idty = wt.identity_of(pubkey) - except Exception: + except HTTPError: idty = dict([("uid", "")]) balance = get_amount_from_pubkey(pubkey) balance_ud = round(balance[1] / ud_value, 2) @@ -118,8 +116,8 @@ def generate_table( """ received_txs_table, sent_txs_table = ( - list(), - list(), + [], + [], ) # type: List[Transaction], List[Transaction] parse_received_tx( received_txs_table, received_txs, pubkey, ud_value, uids, full_pubkey @@ -156,13 +154,13 @@ def parse_received_tx( Get amounts and assign amounts and amounts_ud Append comment """ - issuers = list() + issuers = [] for received_tx in received_txs: for issuer in received_tx.issuers: issuers.append(issuer) identities = wt.identities_from_pubkeys(issuers, uids) for received_tx in received_txs: - tx_list = list() + tx_list = [] tx_list.append(from_timestamp(received_tx.time, tz="local").format(ALL_DIGITAL)) tx_list.append("") for i, issuer in enumerate(received_tx.issuers): @@ -184,6 +182,7 @@ def parse_sent_tx( uids: bool, full_pubkey: bool, ) -> None: + # pylint: disable=too-many-locals """ Extract recipients’ pubkeys from outputs Get identities from pubkeys @@ -192,7 +191,7 @@ def parse_sent_tx( If not output back return: Assign amounts, amounts_ud, identities, and comment """ - pubkeys = list() + pubkeys = [] for sent_tx in sent_txs: outputs = tx_amount(sent_tx, pubkey, sent_func)[1] for output in outputs: @@ -201,7 +200,7 @@ def parse_sent_tx( identities = wt.identities_from_pubkeys(pubkeys, uids) for sent_tx in sent_txs: - tx_list = list() + tx_list = [] tx_list.append(from_timestamp(sent_tx.time, tz="local").format(ALL_DIGITAL)) total_amount, outputs = tx_amount(sent_tx, pubkey, sent_func) @@ -238,7 +237,7 @@ def tx_amount( Determine transaction amount from output sources """ amount = 0 - outputs = list() + outputs = [] for output in tx.outputs: # type: ignore if output_available(output.condition, ne, pubkey): outputs.append(output) @@ -266,8 +265,7 @@ def output_available(condition: Condition, comparison: Any, value: str) -> bool: """ if hasattr(condition.left, "pubkey"): return comparison(condition.left.pubkey, value) - else: - return False + return False def assign_idty_from_pubkey(pubkey: str, identities: List, full_pubkey: bool) -> str: diff --git a/silkaj/wot.py b/silkaj/wot.py index ba138f4ede2c9b9d64e6a27edc683cc7f4214101..ea2671e1d7204c7fdfcb87b76fed353210346ecb 100644 --- a/silkaj/wot.py +++ b/silkaj/wot.py @@ -33,8 +33,8 @@ from silkaj.tui import gen_pubkey_checksum def get_sent_certifications( signed: List, time_first_block: int, params: Dict ) -> Tuple[List[str], List[str]]: - sent = list() - expire = list() + sent = [] + expire = [] if signed: for cert in signed: sent.append(cert["uid"]) @@ -69,12 +69,15 @@ def received_sent_certifications(uid_pubkey: str) -> None: identity, pubkey, signed = choose_identity(uid_pubkey) certifications = OrderedDict() # type: OrderedDict params = get_blockchain_parameters() + + req = None requirements = client(wot.requirements, pubkey) for req in requirements["identities"]: if req["pubkey"] == pubkey: break - certifications["received_expire"] = list() - certifications["received"] = list() + + certifications["received_expire"] = [] + certifications["received"] = [] for cert in identity["others"]: certifications["received_expire"].append( expiration_date_from_block_id( @@ -92,13 +95,14 @@ def received_sent_certifications(uid_pubkey: str) -> None: table = tabulate( certifications, headers="keys", tablefmt="orgtbl", stralign="right" ) - txt = f'{identity["uid"]} ({gen_pubkey_checksum(pubkey, True)}) \ + print( + f'{identity["uid"]} ({gen_pubkey_checksum(pubkey, True)}) \ from block #{identity["meta"]["timestamp"][:15]}…\n\ received {len(certifications["received"])} and \ sent {nbr_sent_certs}/{params["sigStock"]} certifications:\n\ {table}\n\ ✔: Certification available to be written or already written into the blockchain\n' - print(txt) + ) membership_status(certifications, pubkey, req) diff --git a/silkaj/wot_tools.py b/silkaj/wot_tools.py index dc6f286a5df6917c0bc1cdfea348690970f6824b..30249b09f7ee9f6083cb3e0126e23a54f5a36e5c 100644 --- a/silkaj/wot_tools.py +++ b/silkaj/wot_tools.py @@ -58,10 +58,10 @@ def identities_from_pubkeys(pubkeys: List[str], uids: bool) -> List: Request identities """ if not uids: - return list() + return [] uniq_pubkeys = list(filter(None, set(pubkeys))) - identities = list() + identities = [] for pubkey in uniq_pubkeys: try: identities.append(identity_of(pubkey)) diff --git a/tests/helpers.py b/tests/helpers.py index 36f673ff007b69c734ab3d574a25a2c8bf0c86c0..01bf6976e6ae82d8df76eb80328bb4183c0b346e 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -20,7 +20,7 @@ from silkaj import cli def define_click_context(endpoint=None, gtest=False): ctx = click.Context(cli.cli) - ctx.obj = dict() + ctx.obj = {} ctx.obj["ENDPOINT"] = endpoint ctx.obj["GTEST"] = gtest click.globals.push_context(ctx) diff --git a/tests/patched/money.py b/tests/patched/money.py index 067f8151329e7dfbcc3e33623c6145ffd35af27f..f309885814b68e1dc9ef79e80a73e7ada82b55d5 100644 --- a/tests/patched/money.py +++ b/tests/patched/money.py @@ -80,7 +80,7 @@ def patched_get_sources(pubkey): a += 1 return balance - listinput = list() + listinput = [] balance = 0 if pubkey == "CtM5RZHopnSRAAoWNgTWrUhDEmspcCAxn6fuCEWDWudp": max_ud = 0 @@ -144,4 +144,5 @@ def patched_get_sources(pubkey): class Counter: + # pylint: disable=too-few-public-methods counter = 0 diff --git a/tests/patched/tx.py b/tests/patched/tx.py index e123ae53e0c2146f74e8a082039b47357c209102..a1c4460daa94f38fced65bf0fae79599b3e3e8ac 100644 --- a/tests/patched/tx.py +++ b/tests/patched/tx.py @@ -94,4 +94,3 @@ def patched_generate_and_send_transaction( sys.exit( "Test error : patched_generate_and_send_transaction() : Parameters are not coherent" ) - pass diff --git a/tests/patched/wot.py b/tests/patched/wot.py index 5d5330b174a83e2504c1c44922e6abe1c057b849..d1c7823806975f9430237a445a32232ef9740a87 100644 --- a/tests/patched/wot.py +++ b/tests/patched/wot.py @@ -75,12 +75,12 @@ def patched_wot_requirements_no_pending(pubkey, identity_uid): # for history def patched_identities_from_pubkeys(pubkeys, uids): if not uids: - return list() + return [] uniq_pubkeys = list(filter(None, set(pubkeys))) - identities = list() + identities = [] for pubkey in uniq_pubkeys: - for id in pubkey_list: - if id.get("pubkey", False) == pubkey: - identities.append(id) + for idty in pubkey_list: + if idty.get("pubkey", False) == pubkey: + identities.append(idty) return identities diff --git a/tests/test_checksum.py b/tests/test_checksum.py index cf96558a1b7ac7f1c72396af0139428c5da30fa7..951fb33309b4580e873164ddcf8fb5f2778f07c7 100644 --- a/tests/test_checksum.py +++ b/tests/test_checksum.py @@ -41,7 +41,7 @@ pubkey_seedhex_authfile = ( ) def test_checksum_command(command, excepted_output): with CliRunner().isolated_filesystem(): - with open("authfile", "w") as f: + with open("authfile", "w", encoding="utf-8") as f: f.write(pubkey_seedhex_authfile) result = CliRunner().invoke(cli, args=command) assert result.output == f"{excepted_output}\n" diff --git a/tests/test_g1_monetary_license.py b/tests/test_g1_monetary_license.py index 33ebf57ae87dd8ed8ea37c3563cdbb1c7336f433..a4a5f5f69fb023f821ae4a5653771dd703131a65 100644 --- a/tests/test_g1_monetary_license.py +++ b/tests/test_g1_monetary_license.py @@ -28,7 +28,7 @@ def test_license_approval_g1_test(capsys): assert not capsys.readouterr().out -# TODO: approve is not tested +# Approve is not tested @pytest.mark.parametrize( "display, approve", [ @@ -78,7 +78,7 @@ def test_long_language_code_handling(): license_path = g1ml.get_license_path(language_code) runner = CliRunner() with runner.isolated_filesystem(): - with open(license_path, "w") as f: + with open(license_path, "w", encoding="utf-8") as f: f.write(content) result = runner.invoke(cli.cli, args=["license"], input=language_code) assert language_code in result.output diff --git a/tests/test_idty_tools.py b/tests/test_idty_tools.py index 872d8b3a1abd87674da8929745a0ab3369cc5920..0f7e0cd150e622656df560eb136145161e6690a4 100644 --- a/tests/test_idty_tools.py +++ b/tests/test_idty_tools.py @@ -386,10 +386,9 @@ def test_check_many_identities( if not lookup_pk: raise http_error return lookup_pk - else: - if not lookup_uid: - raise http_error - return lookup_uid + if not lookup_uid: + raise http_error + return lookup_uid monkeypatch.setattr(bma.wot, "lookup", patched_bma_lookup) diff --git a/tests/test_membership.py b/tests/test_membership.py index 72b2dfd04ded78f76e40ef4063d20310ba27f723..877f66d102efcfbb66919885c8efee71139d306e 100644 --- a/tests/test_membership.py +++ b/tests/test_membership.py @@ -42,7 +42,7 @@ from silkaj.network_tools import client_instance from silkaj.tui import gen_pubkey_checksum # Values and patches -pubkey = "EA7Dsw39ShZg4SpURsrgMaMqrweJPUFPYHwZA8e92e3D" +PUBKEY = "EA7Dsw39ShZg4SpURsrgMaMqrweJPUFPYHwZA8e92e3D" identity_block_id = get_block_id( "0-E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855" ) @@ -58,7 +58,7 @@ def patched_auth_method(): def patched_choose_identity(pubkey): return ( {"uid": identity_uid, "meta": {"timestamp": identity_block_id}}, - pubkey, + PUBKEY, None, ) @@ -106,7 +106,7 @@ def test_membership_cmd(dry_run, display, confirmation, monkeypatch): # Assert functions are called patched_display_confirmation_table.assert_called_once_with( identity_uid, - pubkey, + PUBKEY, identity_block_id, ) if dry_run or display: @@ -116,7 +116,7 @@ def test_membership_cmd(dry_run, display, confirmation, monkeypatch): patched_generate_membership_document.assert_called_once() # membership_block_id is different # patched_generate_membership_document.assert_called_once_with( - # pubkey, + # PUBKEY, # membership_block_id, # identity_uid, # identity_block_id, @@ -135,7 +135,7 @@ def test_display_confirmation_table(patched_wot_requirements, monkeypatch, capsy monkeypatch.setattr(bma.blockchain, "block", patched_block) client = client_instance() - identities_requirements = client(bma.wot.requirements, pubkey) + identities_requirements = client(bma.wot.requirements, PUBKEY) for identity_requirements in identities_requirements["identities"]: if identity_requirements["uid"] == identity_uid: membership_expires = identity_requirements["membershipExpiresIn"] @@ -143,47 +143,60 @@ def test_display_confirmation_table(patched_wot_requirements, monkeypatch, capsy pending_memberships = identity_requirements["pendingMemberships"] break - table = list() + table = [] if membership_expires: - expires = pendulum.now().add(seconds=membership_expires).diff_for_humans() - table.append(["Expiration date of current membership", expires]) + table.append( + [ + "Expiration date of current membership", + pendulum.now().add(seconds=membership_expires).diff_for_humans(), + ] + ) if pending_memberships: - line = [ - "Number of pending membership(s) in the mempool", - len(pending_memberships), - ] - table.append(line) - expiration = pendulum.now().add(seconds=pending_expires).diff_for_humans() - table.append(["Pending membership documents will expire", expiration]) + table.append( + [ + "Number of pending membership(s) in the mempool", + len(pending_memberships), + ] + ) + table.append( + [ + "Pending membership documents will expire", + pendulum.now().add(seconds=pending_expires).diff_for_humans(), + ] + ) table.append(["User Identifier (UID)", identity_uid]) - table.append(["Public Key", gen_pubkey_checksum(pubkey)]) + table.append(["Public Key", gen_pubkey_checksum(PUBKEY)]) table.append(["Block Identity", str(identity_block_id)[:45] + "…"]) block = client(bma.blockchain.block, identity_block_id.number) - date_idty_pub = pendulum.from_timestamp(block["time"], tz="local").format(DATE) table.append( - ["Identity published", date_idty_pub], + [ + "Identity published", + pendulum.from_timestamp(block["time"], tz="local").format(DATE), + ], ) params = get_blockchain_parameters() - membership_validity = ( - pendulum.now().add(seconds=params["msValidity"]).diff_for_humans() + table.append( + [ + "Expiration date of new membership", + pendulum.now().add(seconds=params["msValidity"]).diff_for_humans(), + ] ) - table.append(["Expiration date of new membership", membership_validity]) - membership_mempool = ( - pendulum.now().add(seconds=params["msPeriod"]).diff_for_humans() - ) table.append( - ["Expiration date of new membership from the mempool", membership_mempool] + [ + "Expiration date of new membership from the mempool", + pendulum.now().add(seconds=params["msPeriod"]).diff_for_humans(), + ] ) expected = tabulate(table, tablefmt="fancy_grid") + "\n" - membership.display_confirmation_table(identity_uid, pubkey, identity_block_id) + membership.display_confirmation_table(identity_uid, PUBKEY, identity_block_id) captured = capsys.readouterr() assert expected == captured.out @@ -191,7 +204,7 @@ def test_display_confirmation_table(patched_wot_requirements, monkeypatch, capsy def test_generate_membership_document(): signing_key = patched_auth_method() generated_membership = membership.generate_membership_document( - pubkey, + PUBKEY, membership_block_id, identity_uid, identity_block_id, @@ -199,7 +212,7 @@ def test_generate_membership_document(): key=signing_key, ) expected = Membership( - issuer=pubkey, + issuer=PUBKEY, membership_block_id=membership_block_id, uid=identity_uid, identity_block_id=identity_block_id, diff --git a/tests/test_network_tools.py b/tests/test_network_tools.py index b09db732c7fbe09b84a163d8357cd2509ae5153b..7f3d9de3239c78ed84124170821b902a4ed6b6aa 100644 --- a/tests/test_network_tools.py +++ b/tests/test_network_tools.py @@ -26,7 +26,7 @@ from silkaj import constants, network_tools # from silkaj.membership import generate_membership_document from tests import helpers -ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334" +IPV6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334" @pytest.mark.parametrize( @@ -39,9 +39,9 @@ ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334" ("127.0.0.1:80/path", "", "127.0.0.1", None, 80, "path"), ("domain.tld:80/path", "domain.tld", None, None, 80, "path"), ("localhost:80/path", "localhost", None, None, 80, "path"), - (f"[{ipv6}]", None, None, ipv6, 443, None), - (f"[{ipv6}]/path", None, None, ipv6, 443, "path"), - (f"[{ipv6}]:80/path", None, None, ipv6, 80, "path"), + (f"[{IPV6}]", None, None, IPV6, 443, None), + (f"[{IPV6}]/path", None, None, IPV6, 443, "path"), + (f"[{IPV6}]:80/path", None, None, IPV6, 80, "path"), ], ) def test_determine_endpoint_custom(endpoint, host, ipv4, ipv6, port, path): diff --git a/tests/test_revocation.py b/tests/test_revocation.py index 956a5d7a700be6887ddadaf6312d5028e2197026..d81085111f7f0e427c4d07a05b2508ebf2c8bfb4 100644 --- a/tests/test_revocation.py +++ b/tests/test_revocation.py @@ -39,10 +39,9 @@ from silkaj.tui import gen_pubkey_checksum def display_dry_options(display, dry_run): if display: return ["--display"] - elif dry_run: + if dry_run: return ["--dry-run"] - else: - return [] + return [] # Values @@ -182,7 +181,7 @@ def test_revocation_cli_dry_run(subcommand, expected_warn, monkeypatch): file = "revocation.txt" runner = CliRunner() with runner.isolated_filesystem(): - with open(file, "w") as f: + with open(file, "w", encoding="utf-8") as f: f.write(REV_DOC.signed_raw()) result = runner.invoke(cli, args=command) assert "6upqFiJ66WV6N3bPc8x8y7rXT3syqKRmwnVyunCtEj7o" in result.output @@ -428,7 +427,7 @@ def test_revocation_cli_verify( # verify file runner = CliRunner() with runner.isolated_filesystem(): - with open(file, "w") as f: + with open(file, "w", encoding="utf-8") as f: f.write(doc.signed_raw()) result = runner.invoke(cli, args=command) for expect in expected: @@ -694,16 +693,17 @@ def test_revocation_cli_publish( file = revocation.REVOCATION_LOCAL_PATH # test publication - client = client_instance() runner = CliRunner() with runner.isolated_filesystem(): - with open(file, "w") as f: + with open(file, "w", encoding="utf-8") as f: f.write(doc.signed_raw()) result = runner.invoke(cli, args=command, input=user_input) if user_input == "yes\n" and not dry_run: - patched_send_bma_revoke.assert_called_once_with(client, doc.signed_raw()) + patched_send_bma_revoke.assert_called_once_with( + client_instance(), doc.signed_raw() + ) elif dry_run or user_input == "no\n": - patched_send_bma_revoke.assert_not_called + patched_send_bma_revoke.assert_not_called() for expect in expected: assert expect in result.output @@ -780,7 +780,7 @@ def test_revocation_cli_publish_send_errors( # test publication runner = CliRunner() with runner.isolated_filesystem(): - with open(file, "w") as f: + with open(file, "w", encoding="utf-8") as f: f.write(REV_DOC.signed_raw()) result = runner.invoke(cli, args=command, input=user_input) for expect in expected: @@ -960,7 +960,7 @@ def test_save_doc(path, rev_1, rev_2, pubkey, capsys, monkeypatch): test_path = Path(path) revocation.save_doc(path, rev_1.signed_raw(), pubkey) assert test_path.is_file() - with open(path) as f: + with open(path, encoding="utf-8") as f: assert f.read() == rev_1.signed_raw() # test file is overwritten if confirm monkeypatch.setattr(click, "confirm", value=conf_true) @@ -968,7 +968,7 @@ def test_save_doc(path, rev_1, rev_2, pubkey, capsys, monkeypatch): expected_confirm = f"Revocation document file stored into `{path}` \ for following public key: {gen_pubkey_checksum(pubkey)}" assert expected_confirm in capsys.readouterr().out - with open(path) as f: + with open(path, encoding="utf-8") as f: assert f.read() == rev_2.signed_raw() # test file is not overwritten if not confirm monkeypatch.setattr(click, "confirm", value=conf_false) @@ -978,7 +978,7 @@ for following public key: {gen_pubkey_checksum(pubkey)}" assert pytest_exit.value.code == SUCCESS_EXIT_STATUS expected_confirm = "Ok, goodbye!" assert expected_confirm in capsys.readouterr().out - with open(path) as f: + with open(path, encoding="utf-8") as f: assert f.read() == rev_2.signed_raw() @@ -1001,7 +1001,7 @@ def test_verify_document(doc, lookup, capsys, monkeypatch): # test runner = CliRunner() with runner.isolated_filesystem(): - with open(path, "w") as f: + with open(path, "w", encoding="utf-8") as f: f.write(doc.signed_raw()) result = revocation.verify_document(path) display = capsys.readouterr().out @@ -1040,7 +1040,7 @@ def test_verify_document_missing_id(doc, lookup, capsys, monkeypatch): # test runner = CliRunner() with runner.isolated_filesystem(): - with open(path, "w") as f: + with open(path, "w", encoding="utf-8") as f: f.write(doc.signed_raw()) with pytest.raises(SystemExit) as pytest_exit: revocation.verify_document(path) @@ -1075,7 +1075,7 @@ def test_verify_document_sign_errors(doc, currency, monkeypatch): # test runner = CliRunner() with runner.isolated_filesystem(): - with open(path, "w") as f: + with open(path, "w", encoding="utf-8") as f: if isinstance(doc, str): f.write(doc) elif isinstance(doc, Revocation): diff --git a/tests/test_tui.py b/tests/test_tui.py index e4592ad448f3660ca18af47370e6e877f60913eb..e488e3090f01af9659158f729b70129228288856 100644 --- a/tests/test_tui.py +++ b/tests/test_tui.py @@ -34,26 +34,26 @@ def test_display_amount(message, amount, currency_symbol): f"{str(amount / 100)} {currency_symbol} | {str(amount_UD)} UD {currency_symbol}", ] ] - tx = list() + tx = [] display_amount(tx, message, amount, mock_ud_value, currency_symbol) assert tx == expected # display_pubkey() @pytest.mark.parametrize( - "message, pubkey, id", + "message, pubkey, uid", [ ("From", "CtM5RZHopnSRAAoWNgTWrUhDEmspcCAxn6fuCEWDWudp", "riri"), ("To", "DBM6F5ChMJzpmkUdL5zD9UXKExmZGfQ1AgPDQy4MxSBw", ""), ], ) -def test_display_pubkey(message, pubkey, id, monkeypatch): +def test_display_pubkey(message, pubkey, uid, monkeypatch): monkeypatch.setattr(wot_tools, "is_member", patched_is_member) expected = [[f"{message} (pubkey:checksum)", gen_pubkey_checksum(pubkey)]] - if id: - expected.append([f"{message} (id)", id]) - tx = list() + if uid: + expected.append([f"{message} (id)", uid]) + tx = [] display_pubkey(tx, message, pubkey) assert tx == expected diff --git a/tests/test_tx.py b/tests/test_tx.py index 0808ec7a2a1ecbc5e1349906622f6a15fb48a118..295b93f57c2d207cd4c8f8c7da51eae66b615af6 100644 --- a/tests/test_tx.py +++ b/tests/test_tx.py @@ -25,7 +25,6 @@ from patched.test_constants import mock_ud_value from silkaj import auth, money, tx from silkaj.cli import cli from silkaj.constants import ( - CENT_MULT_TO_UNIT, FAILURE_EXIT_STATUS, MINIMAL_ABSOLUTE_TX_AMOUNT, MINIMAL_RELATIVE_TX_AMOUNT, @@ -111,12 +110,6 @@ def test_transaction_amount_errors( # patched functions monkeypatch.setattr(money, "get_ud_value", patched_get_ud_value) - def too_little_amount(amounts, multiplicator): - for amount in amounts: - if amount * multiplicator < MINIMAL_ABSOLUTE_TX_AMOUNT * CENT_MULT_TO_UNIT: - return True - return False - # check program exit on error with pytest.raises(SystemExit) as pytest_exit: # read output to check error. @@ -126,7 +119,7 @@ def test_transaction_amount_errors( def test_tx_passed_amount_cli(): - """One option""" + # One option result = CliRunner().invoke(cli, ["tx", "--amount", "1"]) assert "Error: A recipient should be passed\n" in result.output assert result.exit_code == 1 @@ -139,7 +132,7 @@ def test_tx_passed_amount_cli(): assert "Error: A recipient should be passed\n" in result.output assert result.exit_code == 1 - """Multiple options""" + # Multiple options result = CliRunner().invoke(cli, ["tx", "--amount", 1, "--amountUD", 1]) assert "Error: Usage" in result.output assert result.exit_code == 2 @@ -236,4 +229,4 @@ No transaction sent." # test that error don't occur when issuer balance > 0 else: - tx.gen_confirmation_table.assert_called_once() + patched_gen_confirmation_table.assert_called_once() diff --git a/tests/test_tx_file.py b/tests/test_tx_file.py index 1c02a44ee10309c20121c1ce303f1244b2eb82fc..d003228b5fbaf865413a41a66ecda724aaf92570 100644 --- a/tests/test_tx_file.py +++ b/tests/test_tx_file.py @@ -45,7 +45,7 @@ def test_parse_file_containing_amounts_recipients( ): runner = CliRunner() with runner.isolated_filesystem(): - with open(FILE_PATH, "w") as f: + with open(FILE_PATH, "w", encoding="utf-8") as f: f.write(file_content) amounts, recipients = parse_file_containing_amounts_recipients(FILE_PATH) assert amounts == amounts_exp @@ -72,7 +72,7 @@ SPEC_ERR = "No amounts or recipients specified" def test_parse_file_containing_amounts_recipients_errors(file_content, error, capsys): runner = CliRunner() with runner.isolated_filesystem(): - with open(FILE_PATH, "w") as f: + with open(FILE_PATH, "w", encoding="utf-8") as f: f.write(file_content) with pytest.raises(SystemExit): parse_file_containing_amounts_recipients(FILE_PATH) diff --git a/tests/test_tx_history.py b/tests/test_tx_history.py index 496686c16b82e66e3128b1132fdda2240a561d02..7ae7bfc2c2a3f74f2fd4470d5b0788d441700d6b 100644 --- a/tests/test_tx_history.py +++ b/tests/test_tx_history.py @@ -49,7 +49,7 @@ def test_tx_history_generate_table_and_pubkey_uid_display(monkeypatch): table_columns = 5 pubkey = "d88fPFbDdJXJANHH7hedFMaRyGcnVZj9c5cDaE76LRN" - received_txs, sent_txs = list(), list() + received_txs, sent_txs = [], [] patched_get_transactions_history(client, pubkey, received_txs, sent_txs) # simple table diff --git a/tests/test_unit_tx.py b/tests/test_unit_tx.py index 0a5db4545105fcda593b6a44de0a2b2807b466e4..d7d5cfb167c3b00649411aba118d3a50ae107558 100644 --- a/tests/test_unit_tx.py +++ b/tests/test_unit_tx.py @@ -114,7 +114,7 @@ def test_gen_confirmation_table( monkeypatch.setattr(tools, "get_currency_symbol", patched_get_currency_symbol) # creating expected list - expected = list() + expected = [] total_tx_amount = sum(tx_amounts) # display account situation display_amount( @@ -956,6 +956,7 @@ outputbackchange, yes, confirmation_answer", ), ], ) +# pylint: disable=too-many-locals def test_send_transaction( amounts, amountsud, @@ -972,44 +973,6 @@ def test_send_transaction( Errors are tested in test_tx.py. """ - @pass_context - def patched_auth_method_tx(ctx): - return key_fifi - - def compute_test_amounts(amounts, mult): - list_amounts = [] - for amount in amounts: - list_amounts.append(round(amount * mult)) - return list_amounts - - # construct click arguments list - def construct_args( - amounts, amountsud, allsources, recipients, comment, outputbackchange, yes - ): - args_list = ["tx"] - if yes: - args_list.append("--yes") - if amounts: - for amount in amounts: - args_list.append("-a") - args_list.append(str(amount)) - elif amountsud: - for amountud in amountsud: - args_list.append("-d") - args_list.append(str(amountud)) - elif allsources: - args_list.append("--allSources") - for recipient in recipients: - args_list.append("-r") - args_list.append(recipient) - if comment: - args_list.append("--comment") - args_list.append(comment) - if outputbackchange is not None: - args_list.append("--outputBackChange") - args_list.append(outputbackchange) - return args_list - # mocking functions patched_gen_confirmation_table = Mock(return_value=None) patched_handle_intermediaries_transactions = Mock(return_value=None) @@ -1048,8 +1011,7 @@ def test_send_transaction( arguments = construct_args( amounts, amountsud, allsources, recipients, comment, outputbackchange, yes ) - result = CliRunner().invoke(cli, args=arguments, input=confirmation_answer) - print(result.output) + CliRunner().invoke(cli, args=arguments, input=confirmation_answer) if confirmation_answer: patched_gen_confirmation_table.assert_called_once_with( @@ -1073,6 +1035,47 @@ def test_send_transaction( patched_handle_intermediaries_transactions.assert_not_called() +@pass_context +def patched_auth_method_tx(ctx): + return key_fifi + + +def compute_test_amounts(amounts, mult): + list_amounts = [] + for amount in amounts: + list_amounts.append(round(amount * mult)) + return list_amounts + + +# construct click arguments list +def construct_args( + amounts, amountsud, allsources, recipients, comment, outputbackchange, yes +): + args_list = ["tx"] + if yes: + args_list.append("--yes") + if amounts: + for amount in amounts: + args_list.append("-a") + args_list.append(str(amount)) + elif amountsud: + for amountud in amountsud: + args_list.append("-d") + args_list.append(str(amountud)) + elif allsources: + args_list.append("--allSources") + for recipient in recipients: + args_list.append("-r") + args_list.append(recipient) + if comment: + args_list.append("--comment") + args_list.append(comment) + if outputbackchange is not None: + args_list.append("--outputBackChange") + args_list.append(outputbackchange) + return args_list + + # generate_and_send_transaction() diff --git a/tests/test_verify_blocks.py b/tests/test_verify_blocks.py index 8e2a9c84779f89640cc761873b871c3ce74d2b52..4bc455564aa250a44ac198934c9ce2cecf848074 100644 --- a/tests/test_verify_blocks.py +++ b/tests/test_verify_blocks.py @@ -78,7 +78,7 @@ def test_verify_blocks_signatures(from_block, to_block, monkeypatch): if to_block == 0: to_block = HEAD_BLOCK expected = f"Within {from_block}-{to_block} range, " - if from_block == 1 or from_block == HEAD_BLOCK - 1: + if from_block in (1, HEAD_BLOCK - 1): expected += "no blocks with a wrong signature." else: expected += "blocks with a wrong signature: " + str(G1_INVALID_BLOCK_SIG) diff --git a/tests/test_wot.py b/tests/test_wot.py index 6ffbe292d4dc30bea68b5a9297538cecdb42846e..6647379e00753c2c3bd0e1c8d2161c430f888644 100644 --- a/tests/test_wot.py +++ b/tests/test_wot.py @@ -146,10 +146,11 @@ def test_choose_identity( ): monkeypatch.setattr(wot_tools, "wot_lookup", patched_lookup) monkeypatch.setattr(click, "prompt", patched_prompt) - identity_card, get_pubkey, signed = wot.choose_identity(pubkey) + # pylint: disable=unused-variable + idty_card, get_pubkey, signed = wot.choose_identity(pubkey) expected_pubkey = pubkey.split(":")[0] assert expected_pubkey == get_pubkey - assert selected_uid == identity_card["uid"] + assert selected_uid == idty_card["uid"] # Check whether the table is not displayed in case of one identity # Check it is displayed for more than one identity