Skip to content
Snippets Groups Projects
Commit fc8aadf3 authored by Donald Stufft's avatar Donald Stufft
Browse files

Add an Encoding system instead of adhoc uses of binascii

* Adds an Encoding system that supports raw, hex, base16, base32,
  and base 64.
* Adds an encoding=ENCODING parameter to methods to control what
  encoding data is returned/passed in.
* encoding parmater supports passing in a custom encoder object
* Encoding system supports registering of new encoders through
  nacl.encoder.register.
parent 0cccff9b
No related branches found
No related tags found
No related merge requests found
...@@ -4,10 +4,11 @@ from __future__ import division ...@@ -4,10 +4,11 @@ from __future__ import division
from . import __about__ from . import __about__
from . import hash # pylint: disable=W0622 from . import hash # pylint: disable=W0622
from . import signing from . import signing
from .encoding import encoder
from .random import random from .random import random
__all__ = ["hash", "random"] + __about__.__all__ __all__ = ["encoder", "hash", "random"] + __about__.__all__
# - Meta Information - # - Meta Information -
......
import base64
import binascii
from . import six
class Encoder(object):
def __init__(self):
self._registry = {}
def __getitem__(self, name):
if isinstance(name, six.string_types):
return self._registry[name]
return name
def register(self, name, cls=None):
if cls is None:
def inner(cls):
self._registry[name] = cls()
return cls
return inner
else:
self._registry[name] = cls()
# Global encoder
encoder = Encoder()
@encoder.register("raw")
class RawEncoder(object):
def encode(self, data):
return data
def decode(self, data):
return data
@encoder.register("hex")
class HexEncoder(object):
def encode(self, data):
return binascii.hexlify(data)
def decode(self, data):
return binascii.unhexlify(data)
@encoder.register("base16")
class Base16Encoder(object):
def encode(self, data):
return base64.b16encode(data)
def decode(self, data):
return base64.b16decode(data)
@encoder.register("base32")
class Base32Encoder(object):
def encode(self, data):
return base64.b32encode(data)
def decode(self, data):
return base64.b32decode(data)
@encoder.register("base64")
class Base64Encoder(object):
def encode(self, data):
return base64.b64encode(data)
def decode(self, data):
return base64.b64decode(data)
from __future__ import absolute_import from __future__ import absolute_import
from __future__ import division from __future__ import division
import binascii
from . import nacl from . import nacl
from .encoding import encoder
from .exceptions import CryptoError from .exceptions import CryptoError
def sha256(message, binary=False): def sha256(message, encoding="hex"):
digest = nacl.ffi.new("unsigned char[]", nacl.lib.crypto_hash_sha256_BYTES) digest = nacl.ffi.new("unsigned char[]", nacl.lib.crypto_hash_sha256_BYTES)
if not nacl.lib.crypto_hash_sha256(digest, message, len(message)): if not nacl.lib.crypto_hash_sha256(digest, message, len(message)):
raise CryptoError("Hashing failed") raise CryptoError("Hashing failed")
digest = nacl.ffi.buffer(digest, nacl.lib.crypto_hash_sha256_BYTES)[:] digest = nacl.ffi.buffer(digest, nacl.lib.crypto_hash_sha256_BYTES)[:]
if binary: return encoder[encoding].encode(digest)
return digest
return binascii.hexlify(digest)
def sha512(message, binary=False): def sha512(message, encoding="hex"):
digest = nacl.ffi.new("unsigned char[]", nacl.lib.crypto_hash_sha512_BYTES) digest = nacl.ffi.new("unsigned char[]", nacl.lib.crypto_hash_sha512_BYTES)
if not nacl.lib.crypto_hash_sha512(digest, message, len(message)): if not nacl.lib.crypto_hash_sha512(digest, message, len(message)):
raise CryptoError("Hashing failed") raise CryptoError("Hashing failed")
digest = nacl.ffi.buffer(digest, nacl.lib.crypto_hash_sha512_BYTES)[:] digest = nacl.ffi.buffer(digest, nacl.lib.crypto_hash_sha512_BYTES)[:]
if binary: return encoder[encoding].encode(digest)
return digest
return binascii.hexlify(digest)
...@@ -4,6 +4,7 @@ from __future__ import division ...@@ -4,6 +4,7 @@ from __future__ import division
from . import six from . import six
from . import nacl from . import nacl
from .encoding import encoder
from .exceptions import CryptoError from .exceptions import CryptoError
from .random import random from .random import random
...@@ -19,19 +20,26 @@ class SignedMessage(six.binary_type): ...@@ -19,19 +20,26 @@ class SignedMessage(six.binary_type):
A bytes subclass that holds a messaged that has been signed by a :class:`SigningKey`. A bytes subclass that holds a messaged that has been signed by a :class:`SigningKey`.
""" """
@classmethod
def _from_parts(cls, signature, message, combined):
obj = cls(combined)
obj._signature = signature
obj._message = message
return obj
@property @property
def signature(self): def signature(self):
""" """
The signature contained within the :class:`SignedMessage`. The signature contained within the :class:`SignedMessage`.
""" """
return self[:nacl.lib.crypto_sign_BYTES] return self._signature
@property @property
def message(self): def message(self):
""" """
The message contained within the :class:`SignedMessage`. The message contained within the :class:`SignedMessage`.
""" """
return self[nacl.lib.crypto_sign_BYTES:] return self._message
class VerifyKey(object): class VerifyKey(object):
...@@ -42,14 +50,17 @@ class VerifyKey(object): ...@@ -42,14 +50,17 @@ class VerifyKey(object):
:param key: [:class:`bytes`] Serialized Ed25519 public key :param key: [:class:`bytes`] Serialized Ed25519 public key
""" """
def __init__(self, key): def __init__(self, key, encoding="raw"):
# Decode the key
key = encoder[encoding].decode(key)
if len(key) != nacl.lib.crypto_sign_PUBLICKEYBYTES: if len(key) != nacl.lib.crypto_sign_PUBLICKEYBYTES:
raise ValueError("The key must be exactly %s bytes long" % raise ValueError("The key must be exactly %s bytes long" %
nacl.lib.crypto_sign_PUBLICKEYBYTES) nacl.lib.crypto_sign_PUBLICKEYBYTES)
self._key = key self._key = key
def verify(self, smessage, signature=None): def verify(self, smessage, signature=None, encoding="raw"):
""" """
Verifies the signature of a signed message, returning the message Verifies the signature of a signed message, returning the message
if it has not been tampered with else raising if it has not been tampered with else raising
...@@ -66,6 +77,9 @@ class VerifyKey(object): ...@@ -66,6 +77,9 @@ class VerifyKey(object):
# them. # them.
smessage = signature + smessage smessage = signature + smessage
# Decode the signed message
smessage = encoder[encoding].decode(smessage)
message = nacl.ffi.new("unsigned char[]", len(smessage)) message = nacl.ffi.new("unsigned char[]", len(smessage))
message_len = nacl.ffi.new("unsigned long long *") message_len = nacl.ffi.new("unsigned long long *")
...@@ -93,7 +107,10 @@ class SigningKey(object): ...@@ -93,7 +107,10 @@ class SigningKey(object):
(i.e. public) key that corresponds with this signing key. (i.e. public) key that corresponds with this signing key.
""" """
def __init__(self, seed): def __init__(self, seed, encoding="raw"):
# Decode the seed
seed = encoder[encoding].decode(seed)
# Verify that our seed is the proper size # Verify that our seed is the proper size
seed_size = nacl.lib.crypto_sign_SECRETKEYBYTES // 2 seed_size = nacl.lib.crypto_sign_SECRETKEYBYTES // 2
if len(seed) != seed_size: if len(seed) != seed_size:
...@@ -120,9 +137,11 @@ class SigningKey(object): ...@@ -120,9 +137,11 @@ class SigningKey(object):
:rtype: :class:`~nacl.signing.SigningKey` :rtype: :class:`~nacl.signing.SigningKey`
""" """
return cls(random(nacl.lib.crypto_sign_SECRETKEYBYTES // 2)) return cls(random(nacl.lib.crypto_sign_SECRETKEYBYTES // 2),
encoding="raw",
)
def sign(self, message): def sign(self, message, encoding="raw"):
""" """
Sign a message using this key. Sign a message using this key.
...@@ -135,4 +154,10 @@ class SigningKey(object): ...@@ -135,4 +154,10 @@ class SigningKey(object):
if not nacl.lib.crypto_sign(sm, smlen, message, len(message), self._signing_key): if not nacl.lib.crypto_sign(sm, smlen, message, len(message), self._signing_key):
raise CryptoError("Failed to sign the message") raise CryptoError("Failed to sign the message")
return SignedMessage(nacl.ffi.buffer(sm, smlen[0])[:]) raw_signed = nacl.ffi.buffer(sm, smlen[0])[:]
signature = encoder[encoding].encode(raw_signed[:nacl.lib.crypto_sign_BYTES])
message = encoder[encoding].encode(raw_signed[nacl.lib.crypto_sign_BYTES:])
signed = encoder[encoding].encode(raw_signed)
return SignedMessage._from_parts(signature, message, signed)
...@@ -27,7 +27,7 @@ def test_sha256_hex(inp, expected): ...@@ -27,7 +27,7 @@ def test_sha256_hex(inp, expected):
) )
]) ])
def test_sha256_binary(inp, expected): def test_sha256_binary(inp, expected):
assert nacl.hash.sha256(inp, binary=True) == expected assert nacl.hash.sha256(inp, encoding="raw") == expected
@pytest.mark.parametrize(("inp", "expected"), [ @pytest.mark.parametrize(("inp", "expected"), [
...@@ -55,4 +55,4 @@ def test_sha512_hex(inp, expected): ...@@ -55,4 +55,4 @@ def test_sha512_hex(inp, expected):
) )
]) ])
def test_sha512_binary(inp, expected): def test_sha512_binary(inp, expected):
assert nacl.hash.sha512(inp, binary=True) == expected assert nacl.hash.sha512(inp, encoding="raw") == expected
...@@ -45,12 +45,12 @@ class TestSigningKey: ...@@ -45,12 +45,12 @@ class TestSigningKey:
for x in ed25519_known_answers()] for x in ed25519_known_answers()]
) )
def test_message_signing(self, seed, message, signature, expected): def test_message_signing(self, seed, message, signature, expected):
signing_key = nacl.signing.SigningKey(binascii.unhexlify(seed)) signing_key = nacl.signing.SigningKey(seed, encoding="hex")
signed = signing_key.sign(binascii.unhexlify(message)) signed = signing_key.sign(binascii.unhexlify(message), encoding="hex")
assert binascii.hexlify(signed) == expected assert signed == expected
assert binascii.hexlify(signed.message) == message assert signed.message == message
assert binascii.hexlify(signed.signature) == signature assert signed.signature == signature
class TestVerifyKey: class TestVerifyKey:
...@@ -59,13 +59,10 @@ class TestVerifyKey: ...@@ -59,13 +59,10 @@ class TestVerifyKey:
[(x["public_key"], x["signed"], x["message"], x["signature"]) for x in ed25519_known_answers()] [(x["public_key"], x["signed"], x["message"], x["signature"]) for x in ed25519_known_answers()]
) )
def test_valid_signed_message(self, public_key, signed, message, signature): def test_valid_signed_message(self, public_key, signed, message, signature):
key = nacl.signing.VerifyKey(binascii.unhexlify(public_key)) key = nacl.signing.VerifyKey(public_key, encoding="hex")
signedb = binascii.unhexlify(signed)
messageb = binascii.unhexlify(message)
signatureb = binascii.unhexlify(signature)
assert binascii.hexlify(key.verify(signedb)) == message assert binascii.hexlify(key.verify(signed, encoding="hex")) == message
assert binascii.hexlify(key.verify(messageb, signatureb)) == message assert binascii.hexlify(key.verify(message, signature, encoding="hex")) == message
def test_invalid_signed_message(self): def test_invalid_signed_message(self):
skey = nacl.signing.SigningKey.generate() skey = nacl.signing.SigningKey.generate()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment