From 1156b2313d9b0ac814f4102cecf697108397a742 Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Thu, 19 Dec 2024 20:53:11 -0500 Subject: [PATCH] testing snapshot --- blink.py | 42 + circuitmatter/__init__.py | 8 +- circuitmatter/certificates.py | 6 +- circuitmatter/crypto.py | 27 +- .../device_types/utility/root_node.py | 2 +- circuitmatter/hm_aesccm.py | 14 + circuitmatter/hm_aesccm_aesio.py | 669 ++++++++++++++ circuitmatter/interaction_model.py | 2 + circuitmatter/pase.py | 11 +- circuitmatter/protocol.py | 7 +- circuitmatter/session.py | 13 +- circuitmatter/tlv.py | 133 ++- cm_ecdsa/__init__.py | 20 +- cm_ecdsa/_compat.py | 12 +- cm_ecdsa/_sha3.py | 1 - cm_ecdsa/curves.py | 54 +- cm_ecdsa/der.py | 34 +- cm_ecdsa/ecdh.py | 50 +- cm_ecdsa/ecdsa.py | 837 +----------------- cm_ecdsa/ellipticcurve.py | 101 +-- cm_ecdsa/errors.py | 1 + cm_ecdsa/keys.py | 137 +-- cm_ecdsa/numbertheory.py | 3 +- cm_ecdsa/rfc6979.py | 3 +- cm_ecdsa/util.py | 42 +- cm_fake_random.py | 14 + cm_hmac.py | 22 +- cm_sha.py | 20 + tests/test_chunked_message.py | 30 +- 29 files changed, 1049 insertions(+), 1266 deletions(-) create mode 100644 blink.py create mode 100644 circuitmatter/hm_aesccm.py create mode 100644 circuitmatter/hm_aesccm_aesio.py create mode 100644 cm_fake_random.py create mode 100644 cm_sha.py diff --git a/blink.py b/blink.py new file mode 100644 index 0000000..39048f0 --- /dev/null +++ b/blink.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 Dan Halbert for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +"""Simple LED on and off as a light.""" + +import os +import cm_fake_random + +import board +import digitalio + +import circuitmatter as cm +from circuitmatter.device_types.lighting import on_off + +try: + print("removing matter-device-state.json if present") + os.remove("matter-device-state.json") +except FileNotFoundError: + pass + +class LED(on_off.OnOffLight): + def __init__(self, name, led): + super().__init__(name) + # self._led = led + # self._led.direction = digitalio.Direction.OUTPUT + + def on(self): + print("_led set to on") + # self._led.value = True + + def off(self): + print("_led set to off") + # self._led.value = False + + +matter = cm.CircuitMatter(random_source=cm_fake_random) +# led = LED("led1", digitalio.DigitalInOut(board.D13)) +led = LED("led1", None) +matter.add_device(led) +while True: + matter.process_packets() diff --git a/circuitmatter/__init__.py b/circuitmatter/__init__.py index be91e3a..5229c82 100644 --- a/circuitmatter/__init__.py +++ b/circuitmatter/__init__.py @@ -5,11 +5,12 @@ """Pure Python implementation of the Matter IOT protocol.""" import binascii -import hashlib import json import os import time +from cm_sha import sha256 + from . import case, interaction_model, nonvolatile, session from .device_types.utility.root_node import RootNode from .message import Message @@ -113,7 +114,8 @@ def start_commissioning(self): } from . import pase - pase.show_qr_code(self.vendor_id, self.product_id, discriminator, passcode) + if not pase.show_qr_code(self.vendor_id, self.product_id, discriminator, passcode): + print("QR code not available") print("Manual code:", self.nonvolatile["manual_code"]) instance_name = self.random.urandom(8).hex().upper() self.mdns_server.advertise_service( @@ -297,7 +299,7 @@ def process_packet(self, address, data): # noqa: PLR0912, PLR0914, PLR0915 Too # This is Section 4.14.1.2 request = pase.PBKDFParamRequest.decode(message.application_payload) - exchange.commissioning_hash = hashlib.sha256(b"CHIP PAKE V1 Commissioning") + exchange.commissioning_hash = sha256(b"CHIP PAKE V1 Commissioning") exchange.commissioning_hash.update(message.application_payload) if request.passcodeId == 0: pass diff --git a/circuitmatter/certificates.py b/circuitmatter/certificates.py index 232db2b..f55c3b1 100644 --- a/circuitmatter/certificates.py +++ b/circuitmatter/certificates.py @@ -5,11 +5,11 @@ # This file should only be needed when generating certificates. import binascii -import hashlib +from cm_ecdsa import der from cm_ecdsa.curves import NIST256p +from cm_sha import sha1 -from cm_ecdsa import der from . import crypto, pase, tlv from .data_model import Enum8 @@ -165,7 +165,7 @@ def generate_dac(vendor_id, product_id, product_name, random_source) -> tuple[by key_usage = b"\x30\x0e\x06\x03\x55\x1d\x0f\x01\x01\xff\x04\x04\x03\x02\x07\x80" key_id = der.encode_sequence( der.encode_oid(2, 5, 29, 14), - der.encode_octet_string(der.encode_octet_string(hashlib.sha1(public_key).digest())), + der.encode_octet_string(der.encode_octet_string(sha1(public_key).digest())), ) # ID of the CircuitMatter 0xFFF4 PAI authority_key_id = b"\x30\x1f\x06\x03\x55\x1d\x23\x04\x18\x30\x16\x80\x14\x07\xf8\x38\x0a\x5f\x01\x36\xfc\xe2\x36\xbd\x45\xf2\x88\xff\x22\xdc\xa6\xf4\xa7" # noqa: E501 Line too long diff --git a/circuitmatter/crypto.py b/circuitmatter/crypto.py index 14c1208..d0c339f 100644 --- a/circuitmatter/crypto.py +++ b/circuitmatter/crypto.py @@ -5,17 +5,16 @@ try: import enum except ImportError: - - class Enum: + class enum: class IntEnum: pass - import hashlib -import cm_hmac as hmac import struct +from cm_sha import sha256 import cm_ecdsa as ecdsa +import cm_hmac as hmac from . import tlv @@ -81,7 +80,7 @@ class DNAttribute(tlv.List): pseudonym_ps = tlv.OctetStringMember(143, 100) -class BasicContraints(tlv.Structure): +class BasicConstraints(tlv.Structure): # Section 6.5.11.1 is_ca = tlv.BoolMember(1) path_len_constraint = tlv.IntMember(2, signed=False, octets=1, optional=True) @@ -89,7 +88,7 @@ class BasicContraints(tlv.Structure): class Extensions(tlv.List): # Section 6.5.11 - basic_cnstr = tlv.StructMember(1, BasicContraints) + basic_cnstr = tlv.StructMember(1, BasicConstraints) key_usage = tlv.IntMember(2, signed=False, octets=2) extended_key_usage = tlv.ArrayMember( 3, tlv.IntMember(None, signed=False, octets=1), max_length=100 @@ -150,33 +149,33 @@ def TRNG_bytes(byte_len: int) -> bytes: def Hash(*message) -> bytes: - h = hashlib.sha256() + h = sha256() for m in message: h.update(m) return h.digest() def HMAC(key, message) -> bytes: - m = hmac.new(key, digestmod=hashlib.sha256) + m = hmac.new(key, digestmod=sha256) m.update(message) return m.digest() def GenerateKeyPair(random_source): return ecdsa.keys.SigningKey.generate( - curve=ecdsa.NIST256p, hashfunc=hashlib.sha256, entropy=random_source + curve=ecdsa.NIST256p, hashfunc=sha256, entropy=random_source ) def Sign_as_der(private_key, message): return private_key.sign_deterministic( - message, hashfunc=hashlib.sha256, sigencode=ecdsa.util.sigencode_der_canonize + message, hashfunc=sha256, sigencode=ecdsa.util.sigencode_der_canonize ) def Sign_as_string(private_key, message): return private_key.sign_deterministic( - message, hashfunc=hashlib.sha256, sigencode=ecdsa.util.sigencode_string + message, hashfunc=sha256, sigencode=ecdsa.util.sigencode_string ) @@ -194,7 +193,7 @@ def HKDF_Expand(prk, info, length) -> bytes: while num_bytes_generated < length: num_bytes_generated += HASH_LEN_BYTES # Do the hmac directly so we don't need to allocate a buffer for last_hash + info + i. - m = hmac.new(prk, digestmod=hashlib.sha256) + m = hmac.new(prk, digestmod=sha256) m.update(last_hash) m.update(info) m.update(struct.pack("b", i)) @@ -219,9 +218,9 @@ def ECDH(private_key: ecdsa.keys.SigningKey, public_key: bytes) -> bytes: def key_from_bytes(key_bytes: bytes): return ecdsa.keys.SigningKey.from_string( - key_bytes, curve=ecdsa.curves.NIST256p, hashfunc=hashlib.sha256 + key_bytes, curve=ecdsa.curves.NIST256p, hashfunc=sha256 ) def key_from_der(der): - return ecdsa.keys.SigningKey.from_der(der, hashfunc=hashlib.sha256) + return ecdsa.keys.SigningKey.from_der(der, hashfunc=sha256) diff --git a/circuitmatter/device_types/utility/root_node.py b/circuitmatter/device_types/utility/root_node.py index 3001ed4..1d4d5d8 100644 --- a/circuitmatter/device_types/utility/root_node.py +++ b/circuitmatter/device_types/utility/root_node.py @@ -6,7 +6,6 @@ import struct import time -from cm_ecdsa import der from circuitmatter import crypto, interaction_model, tlv from circuitmatter.clusters.device_management.basic_information import ( BasicInformationCluster, @@ -33,6 +32,7 @@ ) from circuitmatter.clusters.system_model import user_label from circuitmatter.clusters.system_model.access_control import AccessControlCluster +from cm_ecdsa import der from .. import simple_device diff --git a/circuitmatter/hm_aesccm.py b/circuitmatter/hm_aesccm.py new file mode 100644 index 0000000..89ae66a --- /dev/null +++ b/circuitmatter/hm_aesccm.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 Dan Halbert for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +try: + from cryptography.hazmat.primitives.ciphers.aead import AESCCM + from cryptography.exceptions import InvalidTag +except ImportError: + # Provide an exception raised by cryptography. + class InvalidTag(Exception): + pass + + # Use a CircuitPython implementation. + from hm_aesccm_aesio import * diff --git a/circuitmatter/hm_aesccm_aesio.py b/circuitmatter/hm_aesccm_aesio.py new file mode 100644 index 0000000..3a90316 --- /dev/null +++ b/circuitmatter/hm_aesccm_aesio.py @@ -0,0 +1,669 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 Dan Halbert for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +# Adapted from https://github.com/pevandenburie/micropython-aes-ccm/ +# Licenses follow: +# +# =================================================================== +# +# Copyright (c) 2014, Legrandin +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. +# =================================================================== + +# Written by Andrew M. Kuchling, Barry A. Warsaw, and others +# +# =================================================================== +# The contents of this file are dedicated to the public domain. To +# the extent that dedication to the public domain is not available, +# everyone is granted a worldwide, perpetual, royalty-free, +# non-exclusive license to exercise all rights associated with the +# contents of this file for any purpose whatsoever. +# No rights are reserved. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# =================================================================== +# +import struct +from binascii import unhexlify + + +def bytes_xor(var1, var2): + return bytes([ a ^ b for (a,b) in zip(var1, var2)]) + + +class MacStatus: + NOT_STARTED = 0 + PROCESSING_AUTH_DATA = 1 + PROCESSING_PLAINTEXT = 2 + + +from aesio import AES +import binascii +import array + + +class AesFactory(AES): + # Size of a data block (in bytes) + block_size = 16 + # Size of a key (in bytes) + key_size = (16, 24, 32) + + def new(key, mode, nonce, **cipher_params): + if (mode == AES.MODE_CBC): + return AES(key, mode, IV=nonce) + elif (mode == AES.MODE_CTR): + # If nonce is smaller than block size, complete with zeros + counter = bytearray(nonce) + counter.extend( bytes(b'\x00' *(16-len(nonce))) ) + return AES(key, mode, counter=counter) + else: + return AES(key, mode, IV=nonce) + + + +def new(key, *args, **kwargs): + return _create_cipher(factory=AesFactory, key=key, *args, **kwargs) + + + +def _create_cipher(factory, key, *args, **kwargs): + + kwargs["key"] = key + + if args: + if len(args) > 1: + raise TypeError("Too many arguments for this mode") + kwargs["nonce"] = args[0] + + return _create_ccm_cipher(factory, **kwargs) + + +# Utils +def _copy_bytes(start, end, seq): + return seq[start:end] + + +class CcmMode(object): + """Counter with CBC-MAC (CCM). + This is an Authenticated Encryption with Associated Data (`AEAD`_) mode. + It provides both confidentiality and authenticity. + The header of the message may be left in the clear, if needed, and it will + still be subject to authentication. The decryption step tells the receiver + if the message comes from a source that really knowns the secret key. + Additionally, decryption detects if any part of the message - including the + header - has been modified or corrupted. + This mode requires a nonce. The nonce shall never repeat for two + different messages encrypted with the same key, but it does not need + to be random. + Note that there is a trade-off between the size of the nonce and the + maximum size of a single message you can encrypt. + It is important to use a large nonce if the key is reused across several + messages and the nonce is chosen randomly. + It is acceptable to us a short nonce if the key is only used a few times or + if the nonce is taken from a counter. + The following table shows the trade-off when the nonce is chosen at + random. The column on the left shows how many messages it takes + for the keystream to repeat **on average**. In practice, you will want to + stop using the key way before that. + +--------------------+---------------+-------------------+ + | Avg. # of messages | nonce | Max. message | + | before keystream | size | size | + | repeats | (bytes) | (bytes) | + +====================+===============+===================+ + | 2^52 | 13 | 64K | + +--------------------+---------------+-------------------+ + | 2^48 | 12 | 16M | + +--------------------+---------------+-------------------+ + | 2^44 | 11 | 4G | + +--------------------+---------------+-------------------+ + | 2^40 | 10 | 1T | + +--------------------+---------------+-------------------+ + | 2^36 | 9 | 64P | + +--------------------+---------------+-------------------+ + | 2^32 | 8 | 16E | + +--------------------+---------------+-------------------+ + This mode is only available for ciphers that operate on 128 bits blocks + (e.g. AES but not TDES). + See `NIST SP800-38C`_ or RFC3610_. + .. _`NIST SP800-38C`: http://csrc.nist.gov/publications/nistpubs/800-38C/SP800-38C.pdf + .. _RFC3610: https://tools.ietf.org/html/rfc3610 + .. _AEAD: http://blog.cryptographyengineering.com/2012/05/how-to-choose-authenticated-encryption.html + :undocumented: __init__ + """ + + def __init__(self, factory, key, nonce, mac_len, msg_len, assoc_len, + cipher_params): + + self.block_size = factory.block_size + """The block size of the underlying cipher, in bytes.""" + + self.nonce = _copy_bytes(None, None, nonce) + """The nonce used for this cipher instance""" + + self._factory = factory + self._key = _copy_bytes(None, None, key) + self._mac_len = mac_len + self._msg_len = msg_len + self._assoc_len = assoc_len + self._cipher_params = cipher_params + + self._mac_tag = None # Cache for MAC tag + + if self.block_size != 16: + raise ValueError("CCM mode is only available for ciphers" + " that operate on 128 bits blocks") + + # MAC tag length (Tlen) + if mac_len not in (4, 6, 8, 10, 12, 14, 16): + raise ValueError("Parameter 'mac_len' must be even" + " and in the range 4..16 (not %d)" % mac_len) + + # Nonce value + if not (nonce and 7 <= len(nonce) <= 13): + raise ValueError("Length of parameter 'nonce' must be" + " in the range 7..13 bytes") + + # Create MAC object (the tag will be the last block + # bytes worth of ciphertext) + self._mac = self._factory.new(key=key, + mode=factory.MODE_CBC, + #iv=b'\x00' * 16, + nonce=b'\x00' * 16, + **cipher_params) + self._mac_status = MacStatus.NOT_STARTED + self._t = None + + # Allowed transitions after initialization + self._next = ["self.update", "self.encrypt", "self.decrypt", + "self.digest", "self.verify"] + + # Cumulative lengths + self._cumul_assoc_len = 0 + self._cumul_msg_len = 0 + + # Cache for unaligned associated data/plaintext. + # This is a list with byte strings, but when the MAC starts, + # it will become a binary string no longer than the block size. + self._cache = [] + + # Start CTR cipher, by formatting the counter (A.3) + q = 15 - len(nonce) # length of Q, the encoded message length + self._cipher = self._factory.new(key, + self._factory.MODE_CTR, + nonce=struct.pack("B", q - 1) + self.nonce, + **cipher_params) + + # S_0, step 6 in 6.1 for j=0 + dest = bytearray(16) + self._cipher.encrypt_into(b'\x00' * 16, dest) + self._s_0 = dest + + # Try to start the MAC + if None not in (assoc_len, msg_len): + self._start_mac() + + def _start_mac(self): + + assert(self._mac_status == MacStatus.NOT_STARTED) + assert(None not in (self._assoc_len, self._msg_len)) + assert(isinstance(self._cache, list)) + + # Formatting control information and nonce (A.2.1) + q = 15 - len(self.nonce) # length of Q, the encoded message length + flags = (64 * (self._assoc_len > 0) + 8 * ((self._mac_len - 2) // 2) + + (q - 1)) + b_0 = struct.pack("B", flags) + self.nonce + self._msg_len.to_bytes(q, "big") + + # Formatting associated data (A.2.2) + # Encoded 'a' is concatenated with the associated data 'A' + assoc_len_encoded = b'' + if self._assoc_len > 0: + if self._assoc_len < (2 ** 16 - 2 ** 8): + enc_size = 2 + elif self._assoc_len < (2 ** 32): + assoc_len_encoded = b'\xFF\xFE' + enc_size = 4 + else: + assoc_len_encoded = b'\xFF\xFF' + enc_size = 8 + assoc_len_encoded += self._assoc_len.to_bytes(enc_size, "big") + + # b_0 and assoc_len_encoded must be processed first + self._cache.insert(0, b_0) + self._cache.insert(1, assoc_len_encoded) + + # Process all the data cached so far + first_data_to_mac = b"".join(self._cache) + self._cache = b"" + self._mac_status = MacStatus.PROCESSING_AUTH_DATA + self._update(first_data_to_mac) + + def _pad_cache_and_update(self): + + assert(self._mac_status != MacStatus.NOT_STARTED) + assert(len(self._cache) < self.block_size) + + # Associated data is concatenated with the least number + # of zero bytes (possibly none) to reach alignment to + # the 16 byte boundary (A.2.3) + len_cache = len(self._cache) + if len_cache > 0: + self._update(b'\x00' * (self.block_size - len_cache)) + + def update(self, assoc_data): + """Protect associated data + If there is any associated data, the caller has to invoke + this function one or more times, before using + ``decrypt`` or ``encrypt``. + By *associated data* it is meant any data (e.g. packet headers) that + will not be encrypted and will be transmitted in the clear. + However, the receiver is still able to detect any modification to it. + In CCM, the *associated data* is also called + *additional authenticated data* (AAD). + If there is no associated data, this method must not be called. + The caller may split associated data in segments of any size, and + invoke this method multiple times, each time with the next segment. + :Parameters: + assoc_data : bytes/bytearray/memoryview + A piece of associated data. There are no restrictions on its size. + """ + + if "self.update" not in self._next: + raise TypeError("update() can only be called" + " immediately after initialization") + + self._next = ["self.update", "self.encrypt", "self.decrypt", + "self.digest", "self.verify"] + + self._cumul_assoc_len += len(assoc_data) + if self._assoc_len is not None and \ + self._cumul_assoc_len > self._assoc_len: + raise ValueError("Associated data is too long") + + self._update(assoc_data) + return self + + def _update(self, assoc_data_pt=b""): + """Update the MAC with associated data or plaintext + (without FSM checks)""" + + # If MAC has not started yet, we just park the data into a list. + # If the data is mutable, we create a copy and store that instead. + if self._mac_status == MacStatus.NOT_STARTED: + assoc_data_pt = _copy_bytes(None, None, assoc_data_pt) + self._cache.append(assoc_data_pt) + return + + assert(len(self._cache) < self.block_size) + + if len(self._cache) > 0: + filler = min(self.block_size - len(self._cache), + len(assoc_data_pt)) + self._cache += _copy_bytes(None, filler, assoc_data_pt) + assoc_data_pt = _copy_bytes(filler, None, assoc_data_pt) + + if len(self._cache) < self.block_size: + return + + # The cache is exactly one block + dest + dest = bytearray(len(self._cache)) + self._mac.encrypt_into(self._cache) + self._t = dest + self._cache = b"" + + update_len = len(assoc_data_pt) // self.block_size * self.block_size + self._cache = _copy_bytes(update_len, None, assoc_data_pt) + if update_len > 0: + dest = bytearray(update_len) + self._mac.encrypt_into(assoc_data_pt[:update_len], dest) + self._t = dest[-16:] + + def encrypt(self, plaintext, output=None): + """Encrypt data with the key set at initialization. + A cipher object is stateful: once you have encrypted a message + you cannot encrypt (or decrypt) another message using the same + object. + This method can be called only **once** if ``msg_len`` was + not passed at initialization. + If ``msg_len`` was given, the data to encrypt can be broken + up in two or more pieces and `encrypt` can be called + multiple times. + That is, the statement: + >>> c.encrypt(a) + c.encrypt(b) + is equivalent to: + >>> c.encrypt(a+b) + This function does not add any padding to the plaintext. + :Parameters: + plaintext : bytes/bytearray/memoryview + The piece of data to encrypt. + It can be of any length. + :Keywords: + output : bytearray/memoryview + The location where the ciphertext must be written to. + If ``None``, the ciphertext is returned. + :Return: + If ``output`` is ``None``, the ciphertext as ``bytes``. + Otherwise, ``None``. + """ + + if "self.encrypt" not in self._next: + raise TypeError("encrypt() can only be called after" + " initialization or an update()") + self._next = ["self.encrypt", "self.digest"] + + # No more associated data allowed from now + if self._assoc_len is None: + assert(isinstance(self._cache, list)) + self._assoc_len = sum([len(x) for x in self._cache]) + if self._msg_len is not None: + self._start_mac() + else: + if self._cumul_assoc_len < self._assoc_len: + raise ValueError("Associated data is too short") + + # Only once piece of plaintext accepted if message length was + # not declared in advance + if self._msg_len is None: + self._msg_len = len(plaintext) + self._start_mac() + self._next = ["self.digest"] + + self._cumul_msg_len += len(plaintext) + if self._cumul_msg_len > self._msg_len: + raise ValueError("Message is too long") + + if self._mac_status == MacStatus.PROCESSING_AUTH_DATA: + # Associated data is concatenated with the least number + # of zero bytes (possibly none) to reach alignment to + # the 16 byte boundary (A.2.3) + self._pad_cache_and_update() + self._mac_status = MacStatus.PROCESSING_PLAINTEXT + + self._update(plaintext) + dest = bytearray(len(plaintext)) + self._cipher.encrypt_into(plaintext, dest) + return dest + + def decrypt(self, ciphertext, output=None): + """Decrypt data with the key set at initialization. + A cipher object is stateful: once you have decrypted a message + you cannot decrypt (or encrypt) another message with the same + object. + This method can be called only **once** if ``msg_len`` was + not passed at initialization. + If ``msg_len`` was given, the data to decrypt can be + broken up in two or more pieces and `decrypt` can be + called multiple times. + That is, the statement: + >>> c.decrypt(a) + c.decrypt(b) + is equivalent to: + >>> c.decrypt(a+b) + This function does not remove any padding from the plaintext. + :Parameters: + ciphertext : bytes/bytearray/memoryview + The piece of data to decrypt. + It can be of any length. + :Keywords: + output : bytearray/memoryview + The location where the plaintext must be written to. + If ``None``, the plaintext is returned. + :Return: + If ``output`` is ``None``, the plaintext as ``bytes``. + Otherwise, ``None``. + """ + + if "self.decrypt" not in self._next: + raise TypeError("decrypt() can only be called" + " after initialization or an update()") + self._next = ["self.decrypt", "self.verify"] + + # No more associated data allowed from now + if self._assoc_len is None: + assert(isinstance(self._cache, list)) + self._assoc_len = sum([len(x) for x in self._cache]) + if self._msg_len is not None: + self._start_mac() + else: + if self._cumul_assoc_len < self._assoc_len: + raise ValueError("Associated data is too short") + + # Only once piece of ciphertext accepted if message length was + # not declared in advance + if self._msg_len is None: + self._msg_len = len(ciphertext) + self._start_mac() + self._next = ["self.verify"] + + self._cumul_msg_len += len(ciphertext) + if self._cumul_msg_len > self._msg_len: + raise ValueError("Message is too long") + + if self._mac_status == MacStatus.PROCESSING_AUTH_DATA: + # Associated data is concatenated with the least number + # of zero bytes (possibly none) to reach alignment to + # the 16 byte boundary (A.2.3) + self._pad_cache_and_update() + self._mac_status = MacStatus.PROCESSING_PLAINTEXT + + # Encrypt is equivalent to decrypt with the CTR mode + plaintext = bytearray(len(ciphertext)) + self._cipher.encrypt_into(ciphertext, plaintext) + if output is None: + self._update(plaintext) + else: + self._update(output) + return plaintext + + def digest(self): + """Compute the *binary* MAC tag. + The caller invokes this function at the very end. + This method returns the MAC that shall be sent to the receiver, + together with the ciphertext. + :Return: the MAC, as a byte string. + """ + + if "self.digest" not in self._next: + raise TypeError("digest() cannot be called when decrypting" + " or validating a message") + self._next = ["self.digest"] + + return self._digest() + + def _digest(self): + if self._mac_tag: + return self._mac_tag + + if self._assoc_len is None: + assert(isinstance(self._cache, list)) + self._assoc_len = sum([len(x) for x in self._cache]) + if self._msg_len is not None: + self._start_mac() + else: + if self._cumul_assoc_len < self._assoc_len: + raise ValueError("Associated data is too short") + + if self._msg_len is None: + self._msg_len = 0 + self._start_mac() + + if self._cumul_msg_len != self._msg_len: + raise ValueError("Message is too short") + + # Both associated data and payload are concatenated with the least + # number of zero bytes (possibly none) that align it to the + # 16 byte boundary (A.2.2 and A.2.3) + self._pad_cache_and_update() + + # Step 8 in 6.1 (T xor MSB_Tlen(S_0)) + self._mac_tag = bytes_xor(self._t, self._s_0)[:self._mac_len] + + return self._mac_tag + + def hexdigest(self): + """Compute the *printable* MAC tag. + This method is like `digest`. + :Return: the MAC, as a hexadecimal string. + """ + return "".join(["%02x" % bord(x) for x in self.digest()]) + + def verify(self, received_mac_tag): + """Validate the *binary* MAC tag. + The caller invokes this function at the very end. + This method checks if the decrypted message is indeed valid + (that is, if the key is correct) and it has not been + tampered with while in transit. + :Parameters: + received_mac_tag : bytes/bytearray/memoryview + This is the *binary* MAC, as received from the sender. + :Raises ValueError: + if the MAC does not match. The message has been tampered with + or the key is incorrect. + """ + + if "self.verify" not in self._next: + raise TypeError("verify() cannot be called" + " when encrypting a message") + self._next = ["self.verify"] + + self._digest() + secret = random_source.urandom(16) + + if (self._mac_tag != received_mac_tag): + raise ValueError("MAC check failed") + + # mac1 = BLAKE2s.new(digest_bits=160, key=secret, data=self._mac_tag) + # mac2 = BLAKE2s.new(digest_bits=160, key=secret, data=received_mac_tag) + # + # if mac1.digest() != mac2.digest(): + # raise ValueError("MAC check failed") + + def hexverify(self, hex_mac_tag): + """Validate the *printable* MAC tag. + This method is like `verify`. + :Parameters: + hex_mac_tag : string + This is the *printable* MAC, as received from the sender. + :Raises ValueError: + if the MAC does not match. The message has been tampered with + or the key is incorrect. + """ + + self.verify(unhexlify(hex_mac_tag)) + + def encrypt_and_digest(self, plaintext, output=None): + """Perform encrypt() and digest() in one step. + :Parameters: + plaintext : bytes/bytearray/memoryview + The piece of data to encrypt. + :Keywords: + output : bytearray/memoryview + The location where the ciphertext must be written to. + If ``None``, the ciphertext is returned. + :Return: + a tuple with two items: + - the ciphertext, as ``bytes`` + - the MAC tag, as ``bytes`` + The first item becomes ``None`` when the ``output`` parameter + specified a location for the result. + """ + + return self.encrypt(plaintext, output=output), self.digest() + + def decrypt_and_verify(self, ciphertext, received_mac_tag, output=None): + """Perform decrypt() and verify() in one step. + :Parameters: + ciphertext : bytes/bytearray/memoryview + The piece of data to decrypt. + received_mac_tag : bytes/bytearray/memoryview + This is the *binary* MAC, as received from the sender. + :Keywords: + output : bytearray/memoryview + The location where the plaintext must be written to. + If ``None``, the plaintext is returned. + :Return: the plaintext as ``bytes`` or ``None`` when the ``output`` + parameter specified a location for the result. + :Raises ValueError: + if the MAC does not match. The message has been tampered with + or the key is incorrect. + """ + + # plaintext = self.decrypt(ciphertext, output=output) + plaintext = self.decrypt(ciphertext) + self.verify(received_mac_tag) + return plaintext + + +def _create_ccm_cipher(factory, **kwargs): + """Create a new block cipher, configured in CCM mode. + :Parameters: + factory : module + A symmetric cipher module from `Crypto.Cipher` (like + `Crypto.Cipher.AES`). + :Keywords: + key : bytes/bytearray/memoryview + The secret key to use in the symmetric cipher. + nonce : bytes/bytearray/memoryview + A value that must never be reused for any other encryption. + Its length must be in the range ``[7..13]``. + 11 or 12 bytes are reasonable values in general. Bear in + mind that with CCM there is a trade-off between nonce length and + maximum message size. + If not specified, a 11 byte long random string is used. + mac_len : integer + Length of the MAC, in bytes. It must be even and in + the range ``[4..16]``. The default is 16. + msg_len : integer + Length of the message to (de)cipher. + If not specified, ``encrypt`` or ``decrypt`` may only be called once. + assoc_len : integer + Length of the associated data. + If not specified, all data is internally buffered. + """ + + try: + key = key = kwargs.pop("key") + except KeyError as e: + raise TypeError("Missing parameter: " + str(e)) + + nonce = kwargs.pop("nonce", None) # N + if nonce is None: + nonce = random_source.urandom(11) + mac_len = kwargs.pop("mac_len", factory.block_size) + msg_len = kwargs.pop("msg_len", None) # p + assoc_len = kwargs.pop("assoc_len", None) # a + cipher_params = dict(kwargs) + + return CcmMode(factory, key, nonce, mac_len, msg_len, assoc_len, cipher_params) diff --git a/circuitmatter/interaction_model.py b/circuitmatter/interaction_model.py index 75a1dcc..f694ca8 100644 --- a/circuitmatter/interaction_model.py +++ b/circuitmatter/interaction_model.py @@ -149,6 +149,7 @@ class ChunkedMessage(InteractionModelMessage): """Chunked messages take multiple encodes or decodes before they are complete.""" def encode_into(self, buffer: memoryview, offset: int = 0) -> int: + print("ChunkedMessage encode_into: ", end="") ######### # Leave room for MoreChunkedMessages, SupressResponse, and InteractionModelRevision. buffer[0] = tlv.ElementType.STRUCTURE offset += 1 @@ -167,6 +168,7 @@ def encode_into(self, buffer: memoryview, offset: int = 0) -> int: else: offset = descriptor_class.encode_into(self, buffer, offset) buffer[offset] = tlv.ElementType.END_OF_CONTAINER + print() return offset + 1 diff --git a/circuitmatter/pase.py b/circuitmatter/pase.py index bd7c5c1..66fefaa 100644 --- a/circuitmatter/pase.py +++ b/circuitmatter/pase.py @@ -1,4 +1,3 @@ - # SPDX-FileCopyrightText: Copyright (c) 2024 Scott Shawcroft for Adafruit Industries # # SPDX-License-Identifier: MIT @@ -7,6 +6,7 @@ import struct from cryptography.hazmat.primitives.ciphers.aead import AESCCM + from cm_ecdsa.curves import NIST256p from cm_ecdsa.ellipticcurve import AbstractPoint, Point, PointJacobi @@ -274,9 +274,13 @@ def compute_qr_code(vendor_id, product_id, discriminator, passcode) -> str: return _base38_encode(buf) -def show_qr_code(vendor_id, product_id, discriminator, passcode): +def show_qr_code(vendor_id, product_id, discriminator, passcode) -> bool: + try: + import qrcode + except ImportError: + return False + encoded = compute_qr_code(vendor_id, product_id, discriminator, passcode) - import qrcode qr = qrcode.QRCode( version=1, @@ -288,3 +292,4 @@ def show_qr_code(vendor_id, product_id, discriminator, passcode): qr.add_data(encoded) print("QR code data: MT:" + encoded) qr.print_ascii() + return True diff --git a/circuitmatter/protocol.py b/circuitmatter/protocol.py index a5afa9e..0b635e8 100644 --- a/circuitmatter/protocol.py +++ b/circuitmatter/protocol.py @@ -2,7 +2,12 @@ # # SPDX-License-Identifier: MIT -import enum +try: + import enum +except ImportError: + class enum: + class IntEnum: + pass class SecureProtocolOpcode(enum.IntEnum): diff --git a/circuitmatter/session.py b/circuitmatter/session.py index c8eca6a..30c3942 100644 --- a/circuitmatter/session.py +++ b/circuitmatter/session.py @@ -2,14 +2,21 @@ # # SPDX-License-Identifier: MIT -import enum -import hashlib import struct import time +try: + import enum +except ImportError: + class enum: + class IntEnum: + pass + import cryptography from cryptography.hazmat.primitives.ciphers.aead import AESCCM +from cm_sha import sha256 + from . import case, crypto, protocol, tlv from .exchange import Exchange from .message import ExchangeFlags, SecurityFlags @@ -635,7 +642,7 @@ def reply_to_sigma1(self, exchange, sigma1): # noqa: PLR0915, PLR0914 Too many tbedata.resumptionID = session_context.resumption_id random = self.random.urandom(32) - exchange.transcript_hash = hashlib.sha256(sigma1.encode()) + exchange.transcript_hash = sha256(sigma1.encode()) salt = ( identity_protection_key + random diff --git a/circuitmatter/tlv.py b/circuitmatter/tlv.py index bfa8d30..27199a0 100644 --- a/circuitmatter/tlv.py +++ b/circuitmatter/tlv.py @@ -4,24 +4,42 @@ from __future__ import annotations -import abc -import enum +try: + import enum +except ImportError: + class enum: + class IntEnum: + pass + pass + import math import struct -from collections.abc import Iterable -from typing import ( - AnyStr, - Generic, - Literal, - TypeVar, - overload, -) + +try: + from typing import ( + AnyStr, + Generic, + Iterable, + Literal, + TypeVar, + overload, + ) +except ImportError: + pass # As a byte string to save space. TAG_LENGTH = b"\x00\x01\x02\x04\x02\x04\x06\x08" INT_SIZE = "BHIQ" +def class_hierarchy(cls): + classes = set() + classes.add(cls) + for base in cls.__bases__: + classes.update(class_hierarchy(base)) + return classes + + class ElementType(enum.IntEnum): SIGNED_INT = 0b00000 UNSIGNED_INT = 0b00100 @@ -111,7 +129,7 @@ def max_length(cls): @classmethod def _members(cls) -> Iterable[tuple[str, Member]]: - for superclass in cls.__mro__: + for superclass in class_hierarchy(cls): for field_name, descriptor in vars(superclass).items(): if not field_name.startswith("_") and isinstance(descriptor, Member): yield field_name, descriptor @@ -121,7 +139,8 @@ def _members_by_tag(cls) -> dict[int, tuple[str, Member]]: if hasattr(cls, "_members_by_tag_cache"): return cls._members_by_tag_cache members = {} - for field_name, descriptor in vars(cls).items(): + for field_name, descriptor in cls.__dict__.items(): +# for field_name, descriptor in vars(cls).items(): if not field_name.startswith("_") and isinstance(descriptor, Member): members[descriptor.tag] = (field_name, descriptor) cls._members_by_tag_cache = members @@ -158,8 +177,10 @@ def encode(self) -> memoryview: return memoryview(buffer)[:end] def encode_into(self, buffer: bytearray, offset: int = 0) -> int: + print("Structure encode_into:", end="") ######### for _, descriptor_class in self._members(): offset = descriptor_class.encode_into(self, buffer, offset) + print() buffer[offset] = ElementType.END_OF_CONTAINER return offset + 1 @@ -203,12 +224,7 @@ def from_value(cls, value): return instance -_T = TypeVar("_T") -_NULLABLE = TypeVar("_NULLABLE", Literal[True], Literal[False]) -_OPT = TypeVar("_OPT", Literal[True], Literal[False]) - - -class Member(abc.ABC, Generic[_T, _OPT, _NULLABLE]): +class Member: max_value_length: int = 0 def __init__( @@ -243,20 +259,6 @@ def __set_name__(self, owner, name): def max_length(self): return 1 + self.tag_length + self.max_value_length - @overload - def __get__( - self: Member[_T, Literal[True], _NULLABLE] | Member[_T, _OPT, Literal[True]], - obj: Structure, - objtype: type[Structure] | None = None, - ) -> _T | None: ... - - @overload - def __get__( - self: Member[_T, Literal[False], Literal[False]], - obj: Structure, - objtype: type[Structure] | None = None, - ) -> _T: ... - def __get__(self, obj, objtype=None): if obj is None: return self.tag @@ -264,16 +266,6 @@ def __get__(self, obj, objtype=None): return obj.values[self.tag] return self._default - @overload - def __set__( - self: Member[_T, Literal[True], _NULLABLE] | Member[_T, _OPT, Literal[True]], - obj: Structure, - value: _T | None, - ) -> None: ... - @overload - def __set__( - self: Member[_T, Literal[False], Literal[False]], obj: Structure, value: _T - ) -> None: ... def __set__(self, obj, value): if value is None and not self.nullable: raise ValueError("Not nullable") @@ -297,6 +289,7 @@ def encode_into( anonymous_ok=False, ) -> int: value = self.__get__(obj) # type: ignore # self inference issues + print("Member encode_into:", "tag:", f"0x{self.tag:x}" if type(self.tag) is int else self.tag, "value:", f"0x{value:x}" if type(value) is int else value, type(value)) ######## return self._encode_value_into(value, buffer, offset, anonymous_ok) def _encode_value_into( # noqa: PLR0912 Too many branches @@ -356,39 +349,33 @@ def decode(self, buffer: memoryview, offset: int = 0) -> _T: """Return the decoded value at ``offset`` in ``buffer``""" return self.decode_member(buffer[offset], buffer, offset + 1)[0] - @abc.abstractmethod def decode_member(self, control_octet: int, buffer: memoryview, offset: int = 0) -> (_T, int): """Return the decoded value at ``offset`` in ``buffer``. ``offset`` is after the tag (but before any length)""" ... - @abc.abstractmethod def encode_element_type(self, value: _T) -> int: """Return Element Type Field as defined in Appendix A in the spec""" ... - @overload - @abc.abstractmethod def encode_value_into( self: Member[_T, Literal[True], _NULLABLE] | Member[_T, _OPT, Literal[True]], value: _T | None, buffer: bytearray, offset: int, ) -> int: ... - @overload - @abc.abstractmethod + def encode_value_into( self: Member[_T, Literal[False], Literal[False]], value: _T, buffer: bytearray, offset: int, ) -> int: ... - @abc.abstractmethod + def encode_value_into(self, value: _T | None, buffer: bytearray, offset: int) -> int: """Encode ``value`` into ``buffer`` and return the new offset""" ... - @abc.abstractmethod def print(self, value: _T) -> str: """Return string representation of ``value``""" ... @@ -404,11 +391,7 @@ def _from_value(self, value): return value -# number type -_NT = TypeVar("_NT", float, int) - - -class NumberMember(Member[_NT, _OPT, _NULLABLE], Generic[_NT, _OPT, _NULLABLE]): +class NumberMember(Member): def __init__( self, tag, @@ -427,7 +410,7 @@ def __init__( self._maximum = maximum if self.integer: self._element_type = ElementType.SIGNED_INT if self.signed else ElementType.UNSIGNED_INT - self._element_type |= int(math.log(self.max_value_length, 2)) + self._element_type |= self.max_value_length.bit_length() - 1 else: self._element_type = ElementType.FLOAT if self.max_value_length == 8: @@ -458,7 +441,7 @@ def decode_member(control_octet, buffer, offset=0, depth=0) -> tuple[_NT, int]: element_category = element_type >> 2 if element_category in {0, 1}: length = 1 << (control_octet & 0x3) - encoded_format = INT_SIZE[int(math.log(length, 2))] + encoded_format = INT_SIZE[length.bit_length() - 1] if element_category == 0: encoded_format = encoded_format.lower() else: @@ -518,7 +501,7 @@ def encode_value_into(self, value, buffer, offset) -> int: return offset + self.max_value_length -class IntMember(NumberMember[int, _OPT, _NULLABLE]): +class IntMember(NumberMember): def __init__( self, tag, @@ -537,7 +520,7 @@ def __init__( :param nullable: Indicates whether a TLV Null MAY be encoded in place of a value. """ # TODO 7.18.1 mentions other bit lengths (that are not a power of 2) than the TLV Appendix - uformat = INT_SIZE[int(math.log2(octets))] + uformat = INT_SIZE[octets.bit_length() - 1] # < = little-endian self.format = f"<{uformat.lower() if signed else uformat}" super().__init__(tag, _format=self.format, optional=optional, nullable=nullable, **kwargs) @@ -568,7 +551,7 @@ def print(self, value): return repr(self.enum_class(value)) -class FloatMember(NumberMember[float, _OPT, _NULLABLE]): +class FloatMember(NumberMember): def __init__( self, tag, @@ -590,7 +573,7 @@ def __init__( super().__init__(tag, _format=self.format, optional=optional, nullable=nullable, **kwargs) -class BoolMember(Member[bool, _OPT, _NULLABLE]): +class BoolMember(Member): max_value_length = 0 @staticmethod @@ -609,7 +592,7 @@ def encode_value_into(self, value, buffer, offset) -> int: return offset -class StringMember(Member[AnyStr, _OPT, _NULLABLE], Generic[AnyStr, _OPT, _NULLABLE]): +class StringMember(Member): _base_element_type: ElementType def __init__( @@ -624,7 +607,7 @@ def __init__( ): self._element_type = self._base_element_type - max_length_encoding = int(math.log(max_length, 256)) + max_length_encoding = (max_length.bit_length() - 1) // 8 max_length_format = INT_SIZE[max_length_encoding] self.length_length = struct.calcsize(max_length_format) self.min_length = min_length @@ -646,19 +629,19 @@ def __set__(self, obj, value): super().__set__(obj, value) # type: ignore # self inference issues def encode_element_type(self, value): - # Log only works for 1+ so make 0 1 for length encoding. + # bit_length() only works for 1+ so make 0 1 for length encoding. value_length = len(value) if value_length <= 0: value_length = 1 - length_encoding = int(math.log(value_length, 256)) + length_encoding = (value_length.bit_length() - 1) // 8 return self._element_type | length_encoding def encode_value_into(self, value, buffer: bytearray, offset: int) -> int: - # Log only works for 1+ so make 0 1 for length encoding. + # bit_length() only works for 1+ so make 0 be 1 for length encoding. value_length = len(value) if value_length <= 0: value_length = 1 - length_encoding = int(math.log(value_length, 256)) + length_encoding = (value_length.bit_length() - 1) // 8 length_format = INT_SIZE[length_encoding] length_length = struct.calcsize(length_format) struct.pack_into(length_format, buffer, offset, len(value)) @@ -675,7 +658,7 @@ def parse_length(control_octet, buffer, offset=0): return value_length, offset + length_length -class OctetStringMember(StringMember[bytes, _OPT, _NULLABLE]): +class OctetStringMember(StringMember): _base_element_type: ElementType = ElementType.OCTET_STRING @staticmethod @@ -684,7 +667,7 @@ def decode_member(control_octet, buffer, offset=0, depth=0): return (buffer[offset : offset + length].tobytes(), offset + length) -class UTF8StringMember(StringMember[str, _OPT, _NULLABLE]): +class UTF8StringMember(StringMember): _base_element_type = ElementType.UTF8_STRING @staticmethod @@ -702,10 +685,7 @@ def print(self, value): return f'"{value}"' -_TLVStruct = TypeVar("_TLVStruct", bound=Structure) - - -class StructMember(Member[_TLVStruct, _OPT, _NULLABLE]): +class StructMember(Member): def __init__( self, tag, @@ -745,7 +725,7 @@ def __init__(self, index, offset): self.offset = offset -class ArrayMember(Member[_TLVStruct, _OPT, _NULLABLE]): +class ArrayMember(Member): def __init__( self, tag, @@ -858,6 +838,7 @@ def encode(self) -> memoryview: def encode_into(self, buffer: bytearray, offset: int = 0) -> int: member_by_tag = self._members_by_tag() for item in self.items: + print("List encode_into: ", end="") ######### if isinstance(item, tuple): tag, _ = item if tag in member_by_tag: @@ -867,6 +848,7 @@ def encode_into(self, buffer: bytearray, offset: int = 0) -> int: offset = member.encode_into(self, buffer, offset, anonymous_ok=True) else: raise NotImplementedError("Anonymous list member") + print() ############# buffer[offset] = ElementType.END_OF_CONTAINER return offset + 1 @@ -908,9 +890,6 @@ def copy(self): return new -_TLVList = TypeVar("_TLVList", bound=List) - - class ListMember(Member): def __init__( self, diff --git a/cm_ecdsa/__init__.py b/cm_ecdsa/__init__.py index 5b7e515..bb32a0c 100644 --- a/cm_ecdsa/__init__.py +++ b/cm_ecdsa/__init__.py @@ -5,21 +5,21 @@ # # Derived from https://github.com/tlsfuzzer/python-ecdsa -from .keys import ( - SigningKey, - VerifyingKey, - BadSignatureError, - BadDigestError, - MalformedPointError, -) from .curves import ( NIST256p, ) +from .der import UnexpectedDER from .ecdh import ( ECDH, - NoKeyError, - NoCurveError, InvalidCurveError, InvalidSharedSecretError, + NoCurveError, + NoKeyError, +) +from .keys import ( + BadDigestError, + BadSignatureError, + MalformedPointError, + SigningKey, + VerifyingKey, ) -from .der import UnexpectedDER diff --git a/cm_ecdsa/_compat.py b/cm_ecdsa/_compat.py index 7108313..56868bf 100644 --- a/cm_ecdsa/_compat.py +++ b/cm_ecdsa/_compat.py @@ -5,18 +5,15 @@ # # Derived from https://github.com/tlsfuzzer/python-ecdsa -import sys -import re import binascii +import re +import sys def normalise_bytes(buffer_object): """Cast the input into array of bytes.""" return memoryview(buffer_object).cast("B") -def remove_whitespace(text): - """Removes all whitespace from passed in string""" - return re.sub(r"\s+", "", text, flags=re.UNICODE) def a2b_hex(val): try: @@ -24,12 +21,14 @@ def a2b_hex(val): except Exception as e: raise ValueError("base16 error: %s" % e) + # pylint: disable=invalid-name # pylint is stupid here and doesn't notice it's a function, not # constant bytes_to_int = int.from_bytes # pylint: enable=invalid-name + def int_to_bytes(val, length=None, byteorder="big"): """Convert integer to bytes.""" if length is None: @@ -42,5 +41,6 @@ def byte_length(val): length = val.bit_length() return (length + 7) // 8 + def int2byte(i): - return i.to_bytes(1, 'big') + return i.to_bytes(1, "big") diff --git a/cm_ecdsa/_sha3.py b/cm_ecdsa/_sha3.py index d12fc8e..397a2dd 100644 --- a/cm_ecdsa/_sha3.py +++ b/cm_ecdsa/_sha3.py @@ -17,7 +17,6 @@ def shake_256(msg, outlen): return hashlib.new("shake256", msg).digest(outlen) except (TypeError, ValueError): - from ._compat import bytes_to_int, int_to_bytes # From little endian. diff --git a/cm_ecdsa/curves.py b/cm_ecdsa/curves.py index 5997bbd..afae9cc 100644 --- a/cm_ecdsa/curves.py +++ b/cm_ecdsa/curves.py @@ -6,9 +6,8 @@ # Derived from https://github.com/tlsfuzzer/python-ecdsa from . import der, ecdsa, ellipticcurve -from .util import orderlen, number_to_string, string_to_number from ._compat import normalise_bytes - +from .util import number_to_string, orderlen, string_to_number PRIME_FIELD_OID = (1, 2, 840, 10045, 1, 1) CHARACTERISTIC_TWO_FIELD_OID = (1, 2, 840, 10045, 1, 2) @@ -34,9 +33,7 @@ def __init__(self, name, curve, generator, oid, openssl_name=None): def __eq__(self, other): if isinstance(other, Curve): - return ( - self.curve == other.curve and self.generator == other.generator - ) + return self.curve == other.curve and self.generator == other.generator return NotImplemented def __ne__(self, other): @@ -65,15 +62,12 @@ def to_der(self, encoding=None, point_encoding="uncompressed"): encoding = "explicit" if encoding not in ("named_curve", "explicit"): - raise ValueError( - "Only 'named_curve' and 'explicit' encodings supported" - ) + raise ValueError("Only 'named_curve' and 'explicit' encodings supported") if encoding == "named_curve": if not self.oid: raise UnknownCurveError( - "Can't encode curve using named_curve encoding without " - "associated curve OID" + "Can't encode curve using named_curve encoding without associated curve OID" ) return der.encode_oid(*self.oid) @@ -84,12 +78,8 @@ def to_der(self, encoding=None, point_encoding="uncompressed"): der.encode_oid(*PRIME_FIELD_OID), der.encode_integer(curve_p) ) curve = der.encode_sequence( - der.encode_octet_string( - number_to_string(self.curve.a() % curve_p, curve_p) - ), - der.encode_octet_string( - number_to_string(self.curve.b() % curve_p, curve_p) - ), + der.encode_octet_string(number_to_string(self.curve.a() % curve_p, curve_p)), + der.encode_octet_string(number_to_string(self.curve.b() % curve_p, curve_p)), ) base = der.encode_octet_string(self.generator.to_bytes(point_encoding)) order = der.encode_integer(self.generator.order()) @@ -114,9 +104,7 @@ def to_pem(self, encoding=None, point_encoding="uncompressed"): :return: PEM encoded ECParameters structure :rtype: str """ - return der.topem( - self.to_der(encoding, point_encoding), "EC PARAMETERS" - ) + return der.topem(self.to_der(encoding, point_encoding), "EC PARAMETERS") @staticmethod def from_der(data, valid_encodings=None): @@ -132,15 +120,11 @@ def from_der(data, valid_encodings=None): if not valid_encodings: valid_encodings = set(("named_curve", "explicit")) if not all(i in ["named_curve", "explicit"] for i in valid_encodings): - raise ValueError( - "Only named_curve and explicit encodings supported" - ) + raise ValueError("Only named_curve and explicit encodings supported") data = normalise_bytes(data) if not der.is_sequence(data): if "named_curve" not in valid_encodings: - raise der.UnexpectedDER( - "named_curve curve parameters not allowed" - ) + raise der.UnexpectedDER("named_curve curve parameters not allowed") oid, empty = der.remove_object(data) if empty: raise der.UnexpectedDER("Unexpected data after OID") @@ -151,9 +135,7 @@ def from_der(data, valid_encodings=None): seq, empty = der.remove_sequence(data) if empty: - raise der.UnexpectedDER( - "Unexpected data after ECParameters structure" - ) + raise der.UnexpectedDER("Unexpected data after ECParameters structure") # decode the ECParameters sequence version, rest = der.remove_integer(seq) if version != 1: @@ -173,14 +155,10 @@ def from_der(data, valid_encodings=None): if field_type == CHARACTERISTIC_TWO_FIELD_OID: raise UnknownCurveError("Characteristic 2 curves unsupported") if field_type != PRIME_FIELD_OID: - raise UnknownCurveError( - "Unknown field type: {0}".format(field_type) - ) + raise UnknownCurveError(f"Unknown field type: {field_type}") prime, empty = der.remove_integer(rest) if empty: - raise der.UnexpectedDER( - "Unexpected data after ECParameters.fieldID.Prime-p element" - ) + raise der.UnexpectedDER("Unexpected data after ECParameters.fieldID.Prime-p element") # decode the ECParameters.curve sequence curve_a_bytes, rest = der.remove_octet_string(curve) @@ -227,9 +205,7 @@ def from_pem(cls, string, valid_encodings=None): if ec_param_index == -1: raise der.UnexpectedDER("EC PARAMETERS PEM header not found") - return cls.from_der( - der.unpem(string[ec_param_index:]), valid_encodings - ) + return cls.from_der(der.unpem(string[ec_param_index:]), valid_encodings) NIST256p = Curve( @@ -284,7 +260,5 @@ def curve_by_name(name): if name == c.name or (c.openssl_name and name == c.openssl_name): return c raise UnknownCurveError( - "Curve with name {0!r} unknown, only curves supported: {1}".format( - name, [c.name for c in curves] - ) + f"Curve with name {name!r} unknown, only curves supported: {[c.name for c in curves]}" ) diff --git a/cm_ecdsa/der.py b/cm_ecdsa/der.py index 0f83996..40bb50e 100644 --- a/cm_ecdsa/der.py +++ b/cm_ecdsa/der.py @@ -6,6 +6,7 @@ # Derived from https://github.com/tlsfuzzer/python-ecdsa import binascii + from ._compat import int2byte @@ -126,9 +127,7 @@ def is_sequence(string): def remove_constructed(string): s0 = string[0] if (s0 & 0xE0) != 0xA0: - raise UnexpectedDER( - "wanted type 'constructed tag' (0xa0-0xbf), got 0x%02x" % s0 - ) + raise UnexpectedDER("wanted type 'constructed tag' (0xa0-0xbf), got 0x%02x" % s0) tag = s0 & 0x1F length, llen = read_length(string[1:]) body = string[1 + llen : 1 + llen + length] @@ -161,9 +160,7 @@ def remove_octet_string(string): def remove_object(string): if not string: - raise UnexpectedDER( - "Empty string does not encode an object identifier" - ) + raise UnexpectedDER("Empty string does not encode an object identifier") if string[:1] != b"\x06": n = string[0] raise UnexpectedDER("wanted type 'object' (0x06), got 0x%02x" % n) @@ -173,9 +170,7 @@ def remove_object(string): if not body: raise UnexpectedDER("Empty object identifier") if len(body) != length: - raise UnexpectedDER( - "Length of object identifier longer than the provided buffer" - ) + raise UnexpectedDER("Length of object identifier longer than the provided buffer") numbers = [] while body: n, ll = read_number(body) @@ -194,9 +189,7 @@ def remove_object(string): def remove_integer(string): if not string: - raise UnexpectedDER( - "Empty string is an invalid encoding of an integer" - ) + raise UnexpectedDER("Empty string is an invalid encoding of an integer") if string[:1] != b"\x02": n = string[0] raise UnexpectedDER("wanted type 'integer' (0x02), got 0x%02x" % n) @@ -216,10 +209,7 @@ def remove_integer(string): # considered a negative number otherwise smsb = numberbytes[1] if smsb < 0x80: - raise UnexpectedDER( - "Invalid encoding of integer, unnecessary " - "zero padding bytes" - ) + raise UnexpectedDER("Invalid encoding of integer, unnecessary zero padding bytes") return int(binascii.hexlify(numberbytes), 16), rest @@ -378,21 +368,13 @@ def unpem(pem): if isinstance(pem, str): # pragma: no branch pem = pem.encode() - d = b"".join( - [ - l.strip() - for l in pem.split(b"\n") - if l and not l.startswith(b"-----") - ] - ) + d = b"".join([l.strip() for l in pem.split(b"\n") if l and not l.startswith(b"-----")]) return binascii.a2b_base64(d) def topem(der, name): b64 = binascii.b2a_base64(der, newline=False) lines = [("-----BEGIN %s-----\n" % name).encode()] - lines.extend( - [b64[start : start + 76] + b"\n" for start in range(0, len(b64), 76)] - ) + lines.extend([b64[start : start + 76] + b"\n" for start in range(0, len(b64), 76)]) lines.append(("-----END %s-----\n" % name).encode()) return b"".join(lines) diff --git a/cm_ecdsa/ecdh.py b/cm_ecdsa/ecdh.py index 7f697d9..fd9285e 100644 --- a/cm_ecdsa/ecdh.py +++ b/cm_ecdsa/ecdh.py @@ -2,10 +2,9 @@ Class for performing Elliptic-curve Diffie-Hellman (ECDH) operations. """ -from .util import number_to_string from .ellipticcurve import INFINITY from .keys import SigningKey, VerifyingKey - +from .util import number_to_string __all__ = [ "ECDH", @@ -42,7 +41,7 @@ class InvalidSharedSecretError(Exception): pass -class ECDH(object): +class ECDH: """ Elliptic-curve Diffie-Hellman (ECDH). A key agreement protocol. @@ -76,25 +75,14 @@ def __init__(self, curve=None, private_key=None, public_key=None): def _get_shared_secret(self, remote_public_key): if not self.private_key: - raise NoKeyError( - "Private key needs to be set to create shared secret" - ) + raise NoKeyError("Private key needs to be set to create shared secret") if not self.public_key: - raise NoKeyError( - "Public key needs to be set to create shared secret" - ) - if not ( - self.private_key.curve == self.curve == remote_public_key.curve - ): - raise InvalidCurveError( - "Curves for public key and private key is not equal." - ) + raise NoKeyError("Public key needs to be set to create shared secret") + if not (self.private_key.curve == self.curve == remote_public_key.curve): + raise InvalidCurveError("Curves for public key and private key is not equal.") # shared secret = PUBKEYtheirs * PRIVATEKEYours - result = ( - remote_public_key.pubkey.point - * self.private_key.privkey.secret_multiplier - ) + result = remote_public_key.pubkey.point * self.private_key.privkey.secret_multiplier if result == INFINITY: raise InvalidSharedSecretError("Invalid shared secret (INFINITY).") @@ -162,9 +150,7 @@ def load_private_key_bytes(self, private_key): """ if not self.curve: raise NoCurveError("Curve must be set prior to key load.") - return self.load_private_key( - SigningKey.from_string(private_key, curve=self.curve) - ) + return self.load_private_key(SigningKey.from_string(private_key, curve=self.curve)) def load_private_key_der(self, private_key_der): """ @@ -237,9 +223,7 @@ def load_received_public_key(self, public_key): raise InvalidCurveError("Curve mismatch.") self.public_key = public_key - def load_received_public_key_bytes( - self, public_key_str, valid_encodings=None - ): + def load_received_public_key_bytes(self, public_key_str, valid_encodings=None): """ Load public key from byte string. @@ -256,9 +240,7 @@ def load_received_public_key_bytes( :type valid_encodings: :term:`set-like object` """ return self.load_received_public_key( - VerifyingKey.from_string( - public_key_str, self.curve, valid_encodings - ) + VerifyingKey.from_string(public_key_str, self.curve, valid_encodings) ) def load_received_public_key_der(self, public_key_der): @@ -276,9 +258,7 @@ def load_received_public_key_der(self, public_key_der): :raises InvalidCurveError: public_key curve not the same as self.curve """ - return self.load_received_public_key( - VerifyingKey.from_der(public_key_der) - ) + return self.load_received_public_key(VerifyingKey.from_der(public_key_der)) def load_received_public_key_pem(self, public_key_pem): """ @@ -295,9 +275,7 @@ def load_received_public_key_pem(self, public_key_pem): :raises InvalidCurveError: public_key curve not the same as self.curve """ - return self.load_received_public_key( - VerifyingKey.from_pem(public_key_pem) - ) + return self.load_received_public_key(VerifyingKey.from_pem(public_key_pem)) def generate_sharedsecret_bytes(self): """ @@ -312,9 +290,7 @@ def generate_sharedsecret_bytes(self): :return: shared secret :rtype: bytes """ - return number_to_string( - self.generate_sharedsecret(), self.private_key.curve.curve.p() - ) + return number_to_string(self.generate_sharedsecret(), self.private_key.curve.curve.p()) def generate_sharedsecret(self): """ diff --git a/cm_ecdsa/ecdsa.py b/cm_ecdsa/ecdsa.py index f0166c1..8ef8494 100644 --- a/cm_ecdsa/ecdsa.py +++ b/cm_ecdsa/ecdsa.py @@ -71,10 +71,8 @@ modified as part of the python-ecdsa package. """ -import warnings -from . import ellipticcurve -from . import numbertheory -from ._compat import int2byte, remove_whitespace +from . import ellipticcurve, numbertheory +from ._compat import int2byte class RSZeroError(RuntimeError): @@ -85,7 +83,7 @@ class InvalidPointError(RuntimeError): pass -class Signature(object): +class Signature: """ ECDSA signature. @@ -115,9 +113,7 @@ def recover_public_keys(self, hash, generator): x = r # Compute the curve point with x as x-coordinate - alpha = ( - pow(x, 3, curve.p()) + (curve.a() * x) + curve.b() - ) % curve.p() + alpha = (pow(x, 3, curve.p()) + (curve.a() * x) + curve.b()) % curve.p() beta = numbertheory.square_root_mod_prime(alpha, curve.p()) y = beta if beta % 2 == 0 else curve.p() - beta @@ -134,7 +130,7 @@ def recover_public_keys(self, hash, generator): return [Pk1, Pk2] -class Public_key(object): +class Public_key: """Public key for ECDSA.""" def __init__(self, generator, point, verify=True): @@ -154,9 +150,7 @@ def __init__(self, generator, point, verify=True): n = generator.order() p = self.curve.p() if not (0 <= point.x() < p) or not (0 <= point.y() < p): - raise InvalidPointError( - "The public point has x or y out of range." - ) + raise InvalidPointError("The public point has x or y out of range.") if verify and not self.curve.contains_point(point.x(), point.y()): raise InvalidPointError("Point does not lay on the curve") if not n: @@ -164,11 +158,7 @@ def __init__(self, generator, point, verify=True): # for curve parameters with base point with cofactor 1, all points # that are on the curve are scalar multiples of the base point, so # verifying that is not necessary. See Section 3.2.2.1 of SEC 1 v2 - if ( - verify - and self.curve.cofactor() != 1 - and not n * point == ellipticcurve.INFINITY - ): + if verify and self.curve.cofactor() != 1 and not n * point == ellipticcurve.INFINITY: raise InvalidPointError("Generator point order is bad.") def __eq__(self, other): @@ -212,7 +202,7 @@ def verifies(self, hash, signature): return v == r -class Private_key(object): +class Private_key: """Private key for ECDSA.""" def __init__(self, public_key, secret_multiplier): @@ -266,72 +256,12 @@ def sign(self, hash, random_k): r = p1.x() % n if r == 0: raise RSZeroError("amazingly unlucky random number r") - s = ( - numbertheory.inverse_mod(k, n) - * (hash + (self.secret_multiplier * r) % n) - ) % n + s = (numbertheory.inverse_mod(k, n) * (hash + (self.secret_multiplier * r) % n)) % n if s == 0: raise RSZeroError("amazingly unlucky random number s") return Signature(r, s) -def int_to_string(x): # pragma: no cover - """Convert integer x into a string of bytes, as per X9.62.""" - # deprecated in 0.19 - warnings.warn( - "Function is unused in library code. If you use this code, " - "change to util.number_to_string.", - DeprecationWarning, - ) - assert x >= 0 - if x == 0: - return b"\0" - result = [] - while x: - ordinal = x & 0xFF - result.append(int2byte(ordinal)) - x >>= 8 - - result.reverse() - return b"".join(result) - - -def string_to_int(s): # pragma: no cover - """Convert a string of bytes into an integer, as per X9.62.""" - # deprecated in 0.19 - warnings.warn( - "Function is unused in library code. If you use this code, " - "change to util.string_to_number.", - DeprecationWarning, - ) - result = 0 - for c in s: - if not isinstance(c, int): - c = ord(c) - result = 256 * result + c - return result - - -def digest_integer(m): # pragma: no cover - """Convert an integer into a string of bytes, compute - its SHA-1 hash, and convert the result to an integer.""" - # deprecated in 0.19 - warnings.warn( - "Function is unused in library code. If you use this code, " - "change to a one-liner with util.number_to_string and " - "util.string_to_number methods.", - DeprecationWarning, - ) - # - # I don't expect this function to be used much. I wrote - # it in order to be able to duplicate the examples - # in ECDSAVS. - # - from hashlib import sha1 - - return string_to_int(sha1(int_to_string(m)).digest()) - - def point_is_valid(generator, x, y): """Is (x,y) a valid public key based on the specified generator?""" @@ -346,754 +276,19 @@ def point_is_valid(generator, x, y): return False if ( curve.cofactor() != 1 - and not n * ellipticcurve.PointJacobi(curve, x, y, 1) - == ellipticcurve.INFINITY + and not n * ellipticcurve.PointJacobi(curve, x, y, 1) == ellipticcurve.INFINITY ): return False return True - -# secp112r1 curve -_p = int(remove_whitespace("DB7C 2ABF62E3 5E668076 BEAD208B"), 16) -# s = 00F50B02 8E4D696E 67687561 51752904 72783FB1 -_a = int(remove_whitespace("DB7C 2ABF62E3 5E668076 BEAD2088"), 16) -_b = int(remove_whitespace("659E F8BA0439 16EEDE89 11702B22"), 16) -_Gx = int(remove_whitespace("09487239 995A5EE7 6B55F9C2 F098"), 16) -_Gy = int(remove_whitespace("A89C E5AF8724 C0A23E0E 0FF77500"), 16) -_r = int(remove_whitespace("DB7C 2ABF62E3 5E7628DF AC6561C5"), 16) -_h = 1 -curve_112r1 = ellipticcurve.CurveFp(_p, _a, _b, _h) -generator_112r1 = ellipticcurve.PointJacobi( - curve_112r1, _Gx, _Gy, 1, _r, generator=True -) - - -# secp112r2 curve -_p = int(remove_whitespace("DB7C 2ABF62E3 5E668076 BEAD208B"), 16) -# s = 022757A1 114D69E 67687561 51755316 C05E0BD4 -_a = int(remove_whitespace("6127 C24C05F3 8A0AAAF6 5C0EF02C"), 16) -_b = int(remove_whitespace("51DE F1815DB5 ED74FCC3 4C85D709"), 16) -_Gx = int(remove_whitespace("4BA30AB5 E892B4E1 649DD092 8643"), 16) -_Gy = int(remove_whitespace("ADCD 46F5882E 3747DEF3 6E956E97"), 16) -_r = int(remove_whitespace("36DF 0AAFD8B8 D7597CA1 0520D04B"), 16) -_h = 4 -curve_112r2 = ellipticcurve.CurveFp(_p, _a, _b, _h) -generator_112r2 = ellipticcurve.PointJacobi( - curve_112r2, _Gx, _Gy, 1, _r, generator=True -) - - -# secp128r1 curve -_p = int(remove_whitespace("FFFFFFFD FFFFFFFF FFFFFFFF FFFFFFFF"), 16) -# S = 000E0D4D 69E6768 75615175 0CC03A44 73D03679 -# a and b are mod p, so a is equal to p-3, or simply -3 -# _a = -3 -_b = int(remove_whitespace("E87579C1 1079F43D D824993C 2CEE5ED3"), 16) -_Gx = int(remove_whitespace("161FF752 8B899B2D 0C28607C A52C5B86"), 16) -_Gy = int(remove_whitespace("CF5AC839 5BAFEB13 C02DA292 DDED7A83"), 16) -_r = int(remove_whitespace("FFFFFFFE 00000000 75A30D1B 9038A115"), 16) -_h = 1 -curve_128r1 = ellipticcurve.CurveFp(_p, -3, _b, _h) -generator_128r1 = ellipticcurve.PointJacobi( - curve_128r1, _Gx, _Gy, 1, _r, generator=True -) - - -# secp160r1 -_p = int(remove_whitespace("FFFFFFFF FFFFFFFF FFFFFFFF FFFFFFFF 7FFFFFFF"), 16) -# S = 1053CDE4 2C14D696 E6768756 1517533B F3F83345 -# a and b are mod p, so a is equal to p-3, or simply -3 -# _a = -3 -_b = int(remove_whitespace("1C97BEFC 54BD7A8B 65ACF89F 81D4D4AD C565FA45"), 16) -_Gx = int( - remove_whitespace("4A96B568 8EF57328 46646989 68C38BB9 13CBFC82"), - 16, -) -_Gy = int( - remove_whitespace("23A62855 3168947D 59DCC912 04235137 7AC5FB32"), - 16, -) -_r = int( - remove_whitespace("01 00000000 00000000 0001F4C8 F927AED3 CA752257"), - 16, -) -_h = 1 -curve_160r1 = ellipticcurve.CurveFp(_p, -3, _b, _h) -generator_160r1 = ellipticcurve.PointJacobi( - curve_160r1, _Gx, _Gy, 1, _r, generator=True -) - - -# NIST Curve P-192: -_p = 6277101735386680763835789423207666416083908700390324961279 -_r = 6277101735386680763835789423176059013767194773182842284081 -# s = 0x3045ae6fc8422f64ed579528d38120eae12196d5L -# c = 0x3099d2bbbfcb2538542dcd5fb078b6ef5f3d6fe2c745de65L -_b = int( - remove_whitespace( - """ - 64210519 E59C80E7 0FA7E9AB 72243049 FEB8DEEC C146B9B1""" - ), - 16, -) -_Gx = int( - remove_whitespace( - """ - 188DA80E B03090F6 7CBF20EB 43A18800 F4FF0AFD 82FF1012""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 07192B95 FFC8DA78 631011ED 6B24CDD5 73F977A1 1E794811""" - ), - 16, -) - -curve_192 = ellipticcurve.CurveFp(_p, -3, _b, 1) -generator_192 = ellipticcurve.PointJacobi( - curve_192, _Gx, _Gy, 1, _r, generator=True -) - - -# NIST Curve P-224: -_p = int( - remove_whitespace( - """ - 2695994666715063979466701508701963067355791626002630814351 - 0066298881""" - ) -) -_r = int( - remove_whitespace( - """ - 2695994666715063979466701508701962594045780771442439172168 - 2722368061""" - ) -) -# s = 0xbd71344799d5c7fcdc45b59fa3b9ab8f6a948bc5L -# c = 0x5b056c7e11dd68f40469ee7f3c7a7d74f7d121116506d031218291fbL -_b = int( - remove_whitespace( - """ - B4050A85 0C04B3AB F5413256 5044B0B7 D7BFD8BA 270B3943 - 2355FFB4""" - ), - 16, -) -_Gx = int( - remove_whitespace( - """ - B70E0CBD 6BB4BF7F 321390B9 4A03C1D3 56C21122 343280D6 - 115C1D21""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - BD376388 B5F723FB 4C22DFE6 CD4375A0 5A074764 44D58199 - 85007E34""" - ), - 16, -) - -curve_224 = ellipticcurve.CurveFp(_p, -3, _b, 1) -generator_224 = ellipticcurve.PointJacobi( - curve_224, _Gx, _Gy, 1, _r, generator=True -) - # NIST Curve P-256: -_p = int( - remove_whitespace( - """ - 1157920892103562487626974469494075735300861434152903141955 - 33631308867097853951""" - ) -) -_r = int( - remove_whitespace( - """ - 115792089210356248762697446949407573529996955224135760342 - 422259061068512044369""" - ) -) +_p = 1157920892103562487626974469494075735300861434152903141955_33631308867097853951 +_r = 115792089210356248762697446949407573529996955224135760342_422259061068512044369 # s = 0xc49d360886e704936a6678e1139d26b7819f7e90L # c = 0x7efba1662985be9403cb055c75d4f7e0ce8d84a9c5114abcaf3177680104fa0dL -_b = int( - remove_whitespace( - """ - 5AC635D8 AA3A93E7 B3EBBD55 769886BC 651D06B0 CC53B0F6 - 3BCE3C3E 27D2604B""" - ), - 16, -) -_Gx = int( - remove_whitespace( - """ - 6B17D1F2 E12C4247 F8BCE6E5 63A440F2 77037D81 2DEB33A0 - F4A13945 D898C296""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 4FE342E2 FE1A7F9B 8EE7EB4A 7C0F9E16 2BCE3357 6B315ECE - CBB64068 37BF51F5""" - ), - 16, -) +_b = 0x5AC635D8_AA3A93E7_B3EBBD55_769886BC_651D06B0_CC53B0F6_3BCE3C3E_27D2604B +_Gx = 0x6B17D1F2_E12C4247_F8BCE6E5_63A440F2_77037D81_2DEB33A0_F4A13945_D898C296 +_Gy = 0x4FE342E2_FE1A7F9B_8EE7EB4A_7C0F9E16_2BCE3357_6B315ECE_CBB64068_37BF51F5 curve_256 = ellipticcurve.CurveFp(_p, -3, _b, 1) -generator_256 = ellipticcurve.PointJacobi( - curve_256, _Gx, _Gy, 1, _r, generator=True -) - -# NIST Curve P-384: -_p = int( - remove_whitespace( - """ - 3940200619639447921227904010014361380507973927046544666794 - 8293404245721771496870329047266088258938001861606973112319""" - ) -) -_r = int( - remove_whitespace( - """ - 3940200619639447921227904010014361380507973927046544666794 - 6905279627659399113263569398956308152294913554433653942643""" - ) -) -# s = 0xa335926aa319a27a1d00896a6773a4827acdac73L -# c = int(remove_whitespace( -# """ -# 79d1e655 f868f02f ff48dcde e14151dd b80643c1 406d0ca1 -# 0dfe6fc5 2009540a 495e8042 ea5f744f 6e184667 cc722483""" -# ), 16) -_b = int( - remove_whitespace( - """ - B3312FA7 E23EE7E4 988E056B E3F82D19 181D9C6E FE814112 - 0314088F 5013875A C656398D 8A2ED19D 2A85C8ED D3EC2AEF""" - ), - 16, -) -_Gx = int( - remove_whitespace( - """ - AA87CA22 BE8B0537 8EB1C71E F320AD74 6E1D3B62 8BA79B98 - 59F741E0 82542A38 5502F25D BF55296C 3A545E38 72760AB7""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 3617DE4A 96262C6F 5D9E98BF 9292DC29 F8F41DBD 289A147C - E9DA3113 B5F0B8C0 0A60B1CE 1D7E819D 7A431D7C 90EA0E5F""" - ), - 16, -) - -curve_384 = ellipticcurve.CurveFp(_p, -3, _b, 1) -generator_384 = ellipticcurve.PointJacobi( - curve_384, _Gx, _Gy, 1, _r, generator=True -) - -# NIST Curve P-521: -_p = int( - "686479766013060971498190079908139321726943530014330540939" - "446345918554318339765605212255964066145455497729631139148" - "0858037121987999716643812574028291115057151" -) -_r = int( - "686479766013060971498190079908139321726943530014330540939" - "446345918554318339765539424505774633321719753296399637136" - "3321113864768612440380340372808892707005449" -) -# s = 0xd09e8800291cb85396cc6717393284aaa0da64baL -# c = int(remove_whitespace( -# """ -# 0b4 8bfa5f42 0a349495 39d2bdfc 264eeeeb 077688e4 -# 4fbf0ad8 f6d0edb3 7bd6b533 28100051 8e19f1b9 ffbe0fe9 -# ed8a3c22 00b8f875 e523868c 70c1e5bf 55bad637""" -# ), 16) -_b = int( - remove_whitespace( - """ - 051 953EB961 8E1C9A1F 929A21A0 B68540EE A2DA725B - 99B315F3 B8B48991 8EF109E1 56193951 EC7E937B 1652C0BD - 3BB1BF07 3573DF88 3D2C34F1 EF451FD4 6B503F00""" - ), - 16, -) -_Gx = int( - remove_whitespace( - """ - C6 858E06B7 0404E9CD 9E3ECB66 2395B442 9C648139 - 053FB521 F828AF60 6B4D3DBA A14B5E77 EFE75928 FE1DC127 - A2FFA8DE 3348B3C1 856A429B F97E7E31 C2E5BD66""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 118 39296A78 9A3BC004 5C8A5FB4 2C7D1BD9 98F54449 - 579B4468 17AFBD17 273E662C 97EE7299 5EF42640 C550B901 - 3FAD0761 353C7086 A272C240 88BE9476 9FD16650""" - ), - 16, -) - -curve_521 = ellipticcurve.CurveFp(_p, -3, _b, 1) -generator_521 = ellipticcurve.PointJacobi( - curve_521, _Gx, _Gy, 1, _r, generator=True -) - -# Certicom secp256-k1 -_a = 0x0000000000000000000000000000000000000000000000000000000000000000 -_b = 0x0000000000000000000000000000000000000000000000000000000000000007 -_p = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEFFFFFC2F -_Gx = 0x79BE667EF9DCBBAC55A06295CE870B07029BFCDB2DCE28D959F2815B16F81798 -_Gy = 0x483ADA7726A3C4655DA4FBFC0E1108A8FD17B448A68554199C47D08FFB10D4B8 -_r = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 - -curve_secp256k1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_secp256k1 = ellipticcurve.PointJacobi( - curve_secp256k1, _Gx, _Gy, 1, _r, generator=True -) - -# Brainpool P-160-r1 -_a = 0x340E7BE2A280EB74E2BE61BADA745D97E8F7C300 -_b = 0x1E589A8595423412134FAA2DBDEC95C8D8675E58 -_p = 0xE95E4A5F737059DC60DFC7AD95B3D8139515620F -_Gx = 0xBED5AF16EA3F6A4F62938C4631EB5AF7BDBCDBC3 -_Gy = 0x1667CB477A1A8EC338F94741669C976316DA6321 -_q = 0xE95E4A5F737059DC60DF5991D45029409E60FC09 - -curve_brainpoolp160r1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp160r1 = ellipticcurve.PointJacobi( - curve_brainpoolp160r1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-160-t1 -_a = 0xE95E4A5F737059DC60DFC7AD95B3D8139515620C -_b = 0x7A556B6DAE535B7B51ED2C4D7DAA7A0B5C55F380 -# _z = 0x24DBFF5DEC9B986BBFE5295A29BFBAE45E0F5D0B -_Gx = 0xB199B13B9B34EFC1397E64BAEB05ACC265FF2378 -_Gy = 0xADD6718B7C7C1961F0991B842443772152C9E0AD -_q = 0xE95E4A5F737059DC60DF5991D45029409E60FC09 -curve_brainpoolp160t1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp160t1 = ellipticcurve.PointJacobi( - curve_brainpoolp160t1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-192-r1 -_a = 0x6A91174076B1E0E19C39C031FE8685C1CAE040E5C69A28EF -_b = 0x469A28EF7C28CCA3DC721D044F4496BCCA7EF4146FBF25C9 -_p = 0xC302F41D932A36CDA7A3463093D18DB78FCE476DE1A86297 -_Gx = 0xC0A0647EAAB6A48753B033C56CB0F0900A2F5C4853375FD6 -_Gy = 0x14B690866ABD5BB88B5F4828C1490002E6773FA2FA299B8F -_q = 0xC302F41D932A36CDA7A3462F9E9E916B5BE8F1029AC4ACC1 - -curve_brainpoolp192r1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp192r1 = ellipticcurve.PointJacobi( - curve_brainpoolp192r1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-192-t1 -_a = 0xC302F41D932A36CDA7A3463093D18DB78FCE476DE1A86294 -_b = 0x13D56FFAEC78681E68F9DEB43B35BEC2FB68542E27897B79 -# _z = 0x1B6F5CC8DB4DC7AF19458A9CB80DC2295E5EB9C3732104CB -_Gx = 0x3AE9E58C82F63C30282E1FE7BBF43FA72C446AF6F4618129 -_Gy = 0x097E2C5667C2223A902AB5CA449D0084B7E5B3DE7CCC01C9 -_q = 0xC302F41D932A36CDA7A3462F9E9E916B5BE8F1029AC4ACC1 - -curve_brainpoolp192t1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp192t1 = ellipticcurve.PointJacobi( - curve_brainpoolp192t1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-224-r1 -_a = 0x68A5E62CA9CE6C1C299803A6C1530B514E182AD8B0042A59CAD29F43 -_b = 0x2580F63CCFE44138870713B1A92369E33E2135D266DBB372386C400B -_p = 0xD7C134AA264366862A18302575D1D787B09F075797DA89F57EC8C0FF -_Gx = 0x0D9029AD2C7E5CF4340823B2A87DC68C9E4CE3174C1E6EFDEE12C07D -_Gy = 0x58AA56F772C0726F24C6B89E4ECDAC24354B9E99CAA3F6D3761402CD -_q = 0xD7C134AA264366862A18302575D0FB98D116BC4B6DDEBCA3A5A7939F - -curve_brainpoolp224r1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp224r1 = ellipticcurve.PointJacobi( - curve_brainpoolp224r1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-224-t1 -_a = 0xD7C134AA264366862A18302575D1D787B09F075797DA89F57EC8C0FC -_b = 0x4B337D934104CD7BEF271BF60CED1ED20DA14C08B3BB64F18A60888D -# _z = 0x2DF271E14427A346910CF7A2E6CFA7B3F484E5C2CCE1C8B730E28B3F -_Gx = 0x6AB1E344CE25FF3896424E7FFE14762ECB49F8928AC0C76029B4D580 -_Gy = 0x0374E9F5143E568CD23F3F4D7C0D4B1E41C8CC0D1C6ABD5F1A46DB4C -_q = 0xD7C134AA264366862A18302575D0FB98D116BC4B6DDEBCA3A5A7939F - -curve_brainpoolp224t1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp224t1 = ellipticcurve.PointJacobi( - curve_brainpoolp224t1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-256-r1 -_a = 0x7D5A0975FC2C3057EEF67530417AFFE7FB8055C126DC5C6CE94A4B44F330B5D9 -_b = 0x26DC5C6CE94A4B44F330B5D9BBD77CBF958416295CF7E1CE6BCCDC18FF8C07B6 -_p = 0xA9FB57DBA1EEA9BC3E660A909D838D726E3BF623D52620282013481D1F6E5377 -_Gx = 0x8BD2AEB9CB7E57CB2C4B482FFC81B7AFB9DE27E1E3BD23C23A4453BD9ACE3262 -_Gy = 0x547EF835C3DAC4FD97F8461A14611DC9C27745132DED8E545C1D54C72F046997 -_q = 0xA9FB57DBA1EEA9BC3E660A909D838D718C397AA3B561A6F7901E0E82974856A7 - -curve_brainpoolp256r1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp256r1 = ellipticcurve.PointJacobi( - curve_brainpoolp256r1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-256-t1 -_a = 0xA9FB57DBA1EEA9BC3E660A909D838D726E3BF623D52620282013481D1F6E5374 -_b = 0x662C61C430D84EA4FE66A7733D0B76B7BF93EBC4AF2F49256AE58101FEE92B04 -# _z = 0x3E2D4BD9597B58639AE7AA669CAB9837CF5CF20A2C852D10F655668DFC150EF0 -_Gx = 0xA3E8EB3CC1CFE7B7732213B23A656149AFA142C47AAFBC2B79A191562E1305F4 -_Gy = 0x2D996C823439C56D7F7B22E14644417E69BCB6DE39D027001DABE8F35B25C9BE -_q = 0xA9FB57DBA1EEA9BC3E660A909D838D718C397AA3B561A6F7901E0E82974856A7 - -curve_brainpoolp256t1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp256t1 = ellipticcurve.PointJacobi( - curve_brainpoolp256t1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-320-r1 -_a = int( - remove_whitespace( - """ - 3EE30B568FBAB0F883CCEBD46D3F3BB8A2A73513F5EB79DA66190EB085FFA9 - F492F375A97D860EB4""" - ), - 16, -) -_b = int( - remove_whitespace( - """ - 520883949DFDBC42D3AD198640688A6FE13F41349554B49ACC31DCCD884539 - 816F5EB4AC8FB1F1A6""" - ), - 16, -) -_p = int( - remove_whitespace( - """ - D35E472036BC4FB7E13C785ED201E065F98FCFA6F6F40DEF4F92B9EC7893EC - 28FCD412B1F1B32E27""" - ), - 16, -) -_Gx = int( - remove_whitespace( - """ - 43BD7E9AFB53D8B85289BCC48EE5BFE6F20137D10A087EB6E7871E2A10A599 - C710AF8D0D39E20611""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 14FDD05545EC1CC8AB4093247F77275E0743FFED117182EAA9C77877AAAC6A - C7D35245D1692E8EE1""" - ), - 16, -) -_q = int( - remove_whitespace( - """ - D35E472036BC4FB7E13C785ED201E065F98FCFA5B68F12A32D482EC7EE8658 - E98691555B44C59311""" - ), - 16, -) - -curve_brainpoolp320r1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp320r1 = ellipticcurve.PointJacobi( - curve_brainpoolp320r1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-320-t1 -_a = int( - remove_whitespace( - """ - D35E472036BC4FB7E13C785ED201E065F98FCFA6F6F40DEF4F92B9EC7893EC - 28FCD412B1F1B32E24""" - ), - 16, -) -_b = int( - remove_whitespace( - """ - A7F561E038EB1ED560B3D147DB782013064C19F27ED27C6780AAF77FB8A547 - CEB5B4FEF422340353""" - ), - 16, -) -# _z = int( -# remove_whitespace( -# """ -# 15F75CAF668077F7E85B42EB01F0A81FF56ECD6191D55CB82B7D861458A18F -# EFC3E5AB7496F3C7B1""" -# ), -# 16, -# ) -_Gx = int( - remove_whitespace( - """ - 925BE9FB01AFC6FB4D3E7D4990010F813408AB106C4F09CB7EE07868CC136F - FF3357F624A21BED52""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 63BA3A7A27483EBF6671DBEF7ABB30EBEE084E58A0B077AD42A5A0989D1EE7 - 1B1B9BC0455FB0D2C3""" - ), - 16, -) -_q = int( - remove_whitespace( - """ - D35E472036BC4FB7E13C785ED201E065F98FCFA5B68F12A32D482EC7EE8658 - E98691555B44C59311""" - ), - 16, -) - -curve_brainpoolp320t1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp320t1 = ellipticcurve.PointJacobi( - curve_brainpoolp320t1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-384-r1 -_a = int( - remove_whitespace( - """ - 7BC382C63D8C150C3C72080ACE05AFA0C2BEA28E4FB22787139165EFBA91F9 - 0F8AA5814A503AD4EB04A8C7DD22CE2826""" - ), - 16, -) -_b = int( - remove_whitespace( - """ - 04A8C7DD22CE28268B39B55416F0447C2FB77DE107DCD2A62E880EA53EEB62 - D57CB4390295DBC9943AB78696FA504C11""" - ), - 16, -) -_p = int( - remove_whitespace( - """ - 8CB91E82A3386D280F5D6F7E50E641DF152F7109ED5456B412B1DA197FB711 - 23ACD3A729901D1A71874700133107EC53""" - ), - 16, -) -_Gx = int( - remove_whitespace( - """ - 1D1C64F068CF45FFA2A63A81B7C13F6B8847A3E77EF14FE3DB7FCAFE0CBD10 - E8E826E03436D646AAEF87B2E247D4AF1E""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 8ABE1D7520F9C2A45CB1EB8E95CFD55262B70B29FEEC5864E19C054FF991292 - 80E4646217791811142820341263C5315""" - ), - 16, -) -_q = int( - remove_whitespace( - """ - 8CB91E82A3386D280F5D6F7E50E641DF152F7109ED5456B31F166E6CAC0425 - A7CF3AB6AF6B7FC3103B883202E9046565""" - ), - 16, -) - -curve_brainpoolp384r1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp384r1 = ellipticcurve.PointJacobi( - curve_brainpoolp384r1, _Gx, _Gy, 1, _q, generator=True -) - -_a = int( - remove_whitespace( - """ - 8CB91E82A3386D280F5D6F7E50E641DF152F7109ED5456B412B1DA197FB711 - 23ACD3A729901D1A71874700133107EC50""" - ), - 16, -) -_b = int( - remove_whitespace( - """ - 7F519EADA7BDA81BD826DBA647910F8C4B9346ED8CCDC64E4B1ABD11756DCE - 1D2074AA263B88805CED70355A33B471EE""" - ), - 16, -) -# _z = int( -# remove_whitespace( -# """ -# 41DFE8DD399331F7166A66076734A89CD0D2BCDB7D068E44E1F378F41ECBAE -# 97D2D63DBC87BCCDDCCC5DA39E8589291C""" -# ), -# 16, -# ) -_Gx = int( - remove_whitespace( - """ - 18DE98B02DB9A306F2AFCD7235F72A819B80AB12EBD653172476FECD462AAB - FFC4FF191B946A5F54D8D0AA2F418808CC""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 25AB056962D30651A114AFD2755AD336747F93475B7A1FCA3B88F2B6A208CC - FE469408584DC2B2912675BF5B9E582928""" - ), - 16, -) -_q = int( - remove_whitespace( - """ - 8CB91E82A3386D280F5D6F7E50E641DF152F7109ED5456B31F166E6CAC0425 - A7CF3AB6AF6B7FC3103B883202E9046565""" - ), - 16, -) - -curve_brainpoolp384t1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp384t1 = ellipticcurve.PointJacobi( - curve_brainpoolp384t1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-512-r1 -_a = int( - remove_whitespace( - """ - 7830A3318B603B89E2327145AC234CC594CBDD8D3DF91610A83441CAEA9863 - BC2DED5D5AA8253AA10A2EF1C98B9AC8B57F1117A72BF2C7B9E7C1AC4D77FC94CA""" - ), - 16, -) -_b = int( - remove_whitespace( - """ - 3DF91610A83441CAEA9863BC2DED5D5AA8253AA10A2EF1C98B9AC8B57F1117 - A72BF2C7B9E7C1AC4D77FC94CADC083E67984050B75EBAE5DD2809BD638016F723""" - ), - 16, -) -_p = int( - remove_whitespace( - """ - AADD9DB8DBE9C48B3FD4E6AE33C9FC07CB308DB3B3C9D20ED6639CCA703308 - 717D4D9B009BC66842AECDA12AE6A380E62881FF2F2D82C68528AA6056583A48F3""" - ), - 16, -) -_Gx = int( - remove_whitespace( - """ - 81AEE4BDD82ED9645A21322E9C4C6A9385ED9F70B5D916C1B43B62EEF4D009 - 8EFF3B1F78E2D0D48D50D1687B93B97D5F7C6D5047406A5E688B352209BCB9F822""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 7DDE385D566332ECC0EABFA9CF7822FDF209F70024A57B1AA000C55B881F81 - 11B2DCDE494A5F485E5BCA4BD88A2763AED1CA2B2FA8F0540678CD1E0F3AD80892""" - ), - 16, -) -_q = int( - remove_whitespace( - """ - AADD9DB8DBE9C48B3FD4E6AE33C9FC07CB308DB3B3C9D20ED6639CCA703308 - 70553E5C414CA92619418661197FAC10471DB1D381085DDADDB58796829CA90069""" - ), - 16, -) - -curve_brainpoolp512r1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp512r1 = ellipticcurve.PointJacobi( - curve_brainpoolp512r1, _Gx, _Gy, 1, _q, generator=True -) - -# Brainpool P-512-t1 -_a = int( - remove_whitespace( - """ - AADD9DB8DBE9C48B3FD4E6AE33C9FC07CB308DB3B3C9D20ED6639CCA703308 - 717D4D9B009BC66842AECDA12AE6A380E62881FF2F2D82C68528AA6056583A48F0""" - ), - 16, -) -_b = int( - remove_whitespace( - """ - 7CBBBCF9441CFAB76E1890E46884EAE321F70C0BCB4981527897504BEC3E36 - A62BCDFA2304976540F6450085F2DAE145C22553B465763689180EA2571867423E""" - ), - 16, -) -# _z = int( -# remove_whitespace( -# """ -# 12EE58E6764838B69782136F0F2D3BA06E27695716054092E60A80BEDB212B -# 64E585D90BCE13761F85C3F1D2A64E3BE8FEA2220F01EBA5EEB0F35DBD29D922AB""" -# ), -# 16, -# ) -_Gx = int( - remove_whitespace( - """ - 640ECE5C12788717B9C1BA06CBC2A6FEBA85842458C56DDE9DB1758D39C031 - 3D82BA51735CDB3EA499AA77A7D6943A64F7A3F25FE26F06B51BAA2696FA9035DA""" - ), - 16, -) -_Gy = int( - remove_whitespace( - """ - 5B534BD595F5AF0FA2C892376C84ACE1BB4E3019B71634C01131159CAE03CE - E9D9932184BEEF216BD71DF2DADF86A627306ECFF96DBB8BACE198B61E00F8B332""" - ), - 16, -) -_q = int( - remove_whitespace( - """ - AADD9DB8DBE9C48B3FD4E6AE33C9FC07CB308DB3B3C9D20ED6639CCA703308 - 70553E5C414CA92619418661197FAC10471DB1D381085DDADDB58796829CA90069""" - ), - 16, -) - -curve_brainpoolp512t1 = ellipticcurve.CurveFp(_p, _a, _b, 1) -generator_brainpoolp512t1 = ellipticcurve.PointJacobi( - curve_brainpoolp512t1, _Gx, _Gy, 1, _q, generator=True -) +generator_256 = ellipticcurve.PointJacobi(curve_256, _Gx, _Gy, 1, _r, generator=True) diff --git a/cm_ecdsa/ellipticcurve.py b/cm_ecdsa/ellipticcurve.py index 0d237c1..0177301 100644 --- a/cm_ecdsa/ellipticcurve.py +++ b/cm_ecdsa/ellipticcurve.py @@ -41,12 +41,12 @@ from . import numbertheory -from ._compat import normalise_bytes, int_to_bytes, bytes_to_int +from ._compat import bytes_to_int, int_to_bytes, normalise_bytes from .errors import MalformedPointError -from .util import orderlen, string_to_number, number_to_string +from .util import number_to_string, orderlen, string_to_number -class CurveFp(object): +class CurveFp: """ :term:`Short Weierstrass Elliptic Curve ` over a prime field. @@ -108,20 +108,11 @@ def contains_point(self, x, y): def __str__(self): if self.__h is not None: - return "CurveFp(p={0}, a={1}, b={2}, h={3})".format( - self.__p, - self.__a, - self.__b, - self.__h, - ) - return "CurveFp(p={0}, a={1}, b={2})".format( - self.__p, - self.__a, - self.__b, - ) + return f"CurveFp(p={self.__p}, a={self.__a}, b={self.__b}, h={self.__h})" + return f"CurveFp(p={self.__p}, a={self.__a}, b={self.__b})" -class AbstractPoint(object): +class AbstractPoint: """Class for common methods of elliptic curve points.""" @staticmethod @@ -158,9 +149,7 @@ def _from_compressed(data, curve): try: beta = numbertheory.square_root_mod_prime(alpha, p) except numbertheory.Error as e: - raise MalformedPointError( - "Encoding does not correspond to a point on curve", e - ) + raise MalformedPointError("Encoding does not correspond to a point on curve", e) if is_even == bool(beta & 1): y = p - beta else: @@ -178,10 +167,7 @@ def _from_hybrid(cls, data, raw_encoding_length, validate_encoding): # but validate if it's self-consistent if we're asked to do that if validate_encoding and ( - y & 1 - and data[:1] != b"\x07" - or (not y & 1) - and data[:1] != b"\x06" + y & 1 and data[:1] != b"\x07" or (not y & 1) and data[:1] != b"\x06" ): raise MalformedPointError("Inconsistent hybrid point encoding") @@ -202,18 +188,12 @@ def _from_edwards(cls, curve, data): y = bytes_to_int(data, "little") - x2 = ( - (y * y - 1) - * numbertheory.inverse_mod(curve.d() * y * y - curve.a(), p) - % p - ) + x2 = (y * y - 1) * numbertheory.inverse_mod(curve.d() * y * y - curve.a(), p) % p try: x = numbertheory.square_root_mod_prime(x2, p) except numbertheory.Error as e: - raise MalformedPointError( - "Encoding does not correspond to a point on curve", e - ) + raise MalformedPointError("Encoding does not correspond to a point on curve", e) if x % 2 != x_0: x = -x % p @@ -221,9 +201,7 @@ def _from_edwards(cls, curve, data): return x, y @classmethod - def from_bytes( - cls, curve, data, validate_encoding=True, valid_encodings=None - ): + def from_bytes(cls, curve, data, validate_encoding=True, valid_encodings=None): """ Initialise the object from byte encoding of a point. @@ -255,44 +233,27 @@ def from_bytes( :rtype: tuple(int, int) """ if not valid_encodings: - valid_encodings = set( - ["uncompressed", "compressed", "hybrid", "raw"] - ) + valid_encodings = set(["uncompressed", "compressed", "hybrid", "raw"]) if not all( - i in set(("uncompressed", "compressed", "hybrid", "raw")) - for i in valid_encodings + i in set(("uncompressed", "compressed", "hybrid", "raw")) for i in valid_encodings ): - raise ValueError( - "Only uncompressed, compressed, hybrid or raw encoding " - "supported." - ) + raise ValueError("Only uncompressed, compressed, hybrid or raw encoding supported.") data = normalise_bytes(data) key_len = len(data) raw_encoding_length = 2 * orderlen(curve.p()) if key_len == raw_encoding_length and "raw" in valid_encodings: - coord_x, coord_y = cls._from_raw_encoding( - data, raw_encoding_length - ) + coord_x, coord_y = cls._from_raw_encoding(data, raw_encoding_length) elif key_len == raw_encoding_length + 1 and ( "hybrid" in valid_encodings or "uncompressed" in valid_encodings ): if data[:1] in (b"\x06", b"\x07") and "hybrid" in valid_encodings: - coord_x, coord_y = cls._from_hybrid( - data, raw_encoding_length, validate_encoding - ) + coord_x, coord_y = cls._from_hybrid(data, raw_encoding_length, validate_encoding) elif data[:1] == b"\x04" and "uncompressed" in valid_encodings: - coord_x, coord_y = cls._from_raw_encoding( - data[1:], raw_encoding_length - ) + coord_x, coord_y = cls._from_raw_encoding(data[1:], raw_encoding_length) else: - raise MalformedPointError( - "Invalid X9.62 encoding of the public point" - ) - elif ( - key_len == raw_encoding_length // 2 + 1 - and "compressed" in valid_encodings - ): + raise MalformedPointError("Invalid X9.62 encoding of the public point") + elif key_len == raw_encoding_length // 2 + 1 and "compressed" in valid_encodings: coord_x, coord_y = cls._from_compressed(data, curve) else: raise MalformedPointError( @@ -521,9 +482,7 @@ def __eq__(self, other): # compare the fractions by bringing them to the same denominator # depend on short-circuit to save 4 multiplications in case of # inequality - return (x1 * zz2 - x2 * zz1) % p == 0 and ( - y1 * zz2 * z2 - y2 * zz1 * z1 - ) % p == 0 + return (x1 * zz2 - x2 * zz1) % p == 0 and (y1 * zz2 * z2 - y2 * zz1 * z1) % p == 0 def __ne__(self, other): """Compare for inequality two points with each-other.""" @@ -611,9 +570,7 @@ def from_affine(point, generator=False): multiplication table - useful for public point when verifying many signatures (around 100 or so) or for generator points of a curve. """ - return PointJacobi( - point.curve(), point.x(), point.y(), 1, point.order(), generator - ) + return PointJacobi(point.curve(), point.x(), point.y(), 1, point.order(), generator) # please note that all the methods that use the equations from # hyperelliptic @@ -1025,11 +982,7 @@ def __eq__(self, other): if other is INFINITY: return self.__x is None or self.__y is None if isinstance(other, Point): - return ( - self.__curve == other.__curve - and self.__x == other.__x - and self.__y == other.__y - ) + return self.__curve == other.__curve and self.__x == other.__x and self.__y == other.__y return NotImplemented def __ne__(self, other): @@ -1059,10 +1012,7 @@ def __add__(self, other): p = self.__curve.p() - l = ( - (other.__y - self.__y) - * numbertheory.inverse_mod(other.__x - self.__x, p) - ) % p + l = ((other.__y - self.__y) * numbertheory.inverse_mod(other.__x - self.__x, p)) % p x3 = (l * l - self.__x - other.__x) % p y3 = (l * (self.__x - x3) - self.__y) % p @@ -1130,10 +1080,7 @@ def double(self): p = self.__curve.p() a = self.__curve.a() - l = ( - (3 * self.__x * self.__x + a) - * numbertheory.inverse_mod(2 * self.__y, p) - ) % p + l = ((3 * self.__x * self.__x + a) * numbertheory.inverse_mod(2 * self.__y, p)) % p if not l: return INFINITY diff --git a/cm_ecdsa/errors.py b/cm_ecdsa/errors.py index 2e97fdc..6a55913 100644 --- a/cm_ecdsa/errors.py +++ b/cm_ecdsa/errors.py @@ -5,6 +5,7 @@ # # Derived from https://github.com/tlsfuzzer/python-ecdsa + class MalformedPointError(AssertionError): """Raised in case the encoding of private or public key is malformed.""" diff --git a/cm_ecdsa/keys.py b/cm_ecdsa/keys.py index 3088c26..a9fe82b 100644 --- a/cm_ecdsa/keys.py +++ b/cm_ecdsa/keys.py @@ -9,27 +9,28 @@ """ import binascii -from hashlib import sha1 import os -from . import ecdsa -from . import der -from . import rfc6979 -from . import ellipticcurve + +from cm_sha import sha1 + +from . import der, ecdsa, ellipticcurve, rfc6979 +from ._compat import normalise_bytes from .curves import Curve from .ecdsa import RSZeroError -from .util import string_to_number, number_to_string, randrange -from .util import sigencode_string, sigdecode_string +from .ellipticcurve import PointJacobi +from .errors import MalformedPointError from .util import ( - oid_ecPublicKey, + MalformedSignature, encoded_oid_ecPublicKey, + number_to_string, oid_ecDH, oid_ecMQV, - MalformedSignature, + oid_ecPublicKey, + randrange, + sigdecode_string, + sigencode_string, + string_to_number, ) -from ._compat import normalise_bytes -from .errors import MalformedPointError -from .ellipticcurve import PointJacobi - __all__ = [ "BadSignatureError", @@ -66,10 +67,7 @@ def _truncate_and_convert_digest(digest, curve, allow_truncate): if not allow_truncate: if len(digest) > curve.baselen: raise BadDigestError( - "this curve ({0}) is too short " - "for the length of your digest ({1})".format( - curve.name, 8 * len(digest) - ) + f"this curve ({curve.name}) is too short for the length of your digest ({8 * len(digest)})" ) else: digest = digest[: curve.baselen] @@ -93,7 +91,7 @@ def _truncate_and_convert_digest(digest, curve, allow_truncate): return number -class VerifyingKey(object): +class VerifyingKey: """ Class for handling keys that can verify signatures (public keys). @@ -109,9 +107,7 @@ class VerifyingKey(object): def __init__(self, _error__please_use_generate=None): """Unsupported, please use one of the classmethods to initialise.""" if not _error__please_use_generate: - raise TypeError( - "Please use VerifyingKey.generate() to construct me" - ) + raise TypeError("Please use VerifyingKey.generate() to construct me") self.curve = None self.default_hashfunc = None self.pubkey = None @@ -122,9 +118,7 @@ def __repr__(self): hash_name = self.default_hashfunc().name else: hash_name = "None" - return "VerifyingKey.from_string({0!r}, {1!r}, {2})".format( - pub_key, self.curve, hash_name - ) + return f"VerifyingKey.from_string({pub_key!r}, {self.curve!r}, {hash_name})" def __eq__(self, other): """Return True if the points are identical, False otherwise.""" @@ -137,9 +131,7 @@ def __ne__(self, other): return not self == other @classmethod - def from_public_point( - cls, point, curve, hashfunc, validate_point=True - ): + def from_public_point(cls, point, curve, hashfunc, validate_point=True): """ Initialise the object from a Point object. @@ -169,9 +161,7 @@ def from_public_point( self.curve = curve self.default_hashfunc = hashfunc try: - self.pubkey = ecdsa.Public_key( - curve.generator, point, validate_point - ) + self.pubkey = ecdsa.Public_key(curve.generator, point, validate_point) except ecdsa.InvalidPointError: raise MalformedPointError("Point does not lay on the curve") self.pubkey.order = curve.order @@ -196,9 +186,7 @@ def precompute(self, lazy=False): (if set to False) or if it should be delayed to the time of first use (when set to True) """ - self.pubkey.point = ellipticcurve.PointJacobi.from_affine( - self.pubkey.point, True - ) + self.pubkey.point = ellipticcurve.PointJacobi.from_affine(self.pubkey.point, True) # as precomputation in now delayed to the time of first use of the # point and we were asked specifically to precompute now, make # sure the precomputation is performed now to preserve the behaviour @@ -207,12 +195,12 @@ def precompute(self, lazy=False): @classmethod def from_string( - cls, - string, - curve, - hashfunc, - validate_point=True, - valid_encodings=None, + cls, + string, + curve, + hashfunc, + validate_point=True, + valid_encodings=None, ): """ Initialise the object from byte encoding of public key. @@ -352,23 +340,19 @@ def from_der( # [[oid_ecPublicKey,oid_curve], point_str_bitstring] s1, empty = der.remove_sequence(string) if empty != b"": - raise der.UnexpectedDER( - "trailing junk after DER pubkey: %s" % binascii.hexlify(empty) - ) + raise der.UnexpectedDER("trailing junk after DER pubkey: %s" % binascii.hexlify(empty)) s2, point_str_bitstring = der.remove_sequence(s1) # s2 = oid_ecPublicKey,oid_curve oid_pk, rest = der.remove_object(s2) if not oid_pk == oid_ecPublicKey: raise der.UnexpectedDER( - "Unexpected object identifier in DER " - "encoding: {0!r}".format(oid_pk) + f"Unexpected object identifier in DER encoding: {oid_pk!r}" ) curve = Curve.from_der(rest, valid_curve_encodings) point_str, empty = der.remove_bitstring(point_str_bitstring, 0) if empty != b"": raise der.UnexpectedDER( - "trailing junk after pubkey pointstring: %s" - % binascii.hexlify(empty) + "trailing junk after pubkey pointstring: %s" % binascii.hexlify(empty) ) # raw encoding of point is invalid in DER files if len(point_str) == curve.verifying_key_length: @@ -475,15 +459,11 @@ def from_public_key_recovery_with_digest( sig = ecdsa.Signature(r, s) digest = normalise_bytes(digest) - digest_as_number = _truncate_and_convert_digest( - digest, curve, allow_truncate - ) + digest_as_number = _truncate_and_convert_digest(digest, curve, allow_truncate) pks = sig.recover_public_keys(digest_as_number, generator) # Transforms the ecdsa.Public_key object into a VerifyingKey - verifying_keys = [ - cls.from_public_point(pk.point, curve, hashfunc) for pk in pks - ] + verifying_keys = [cls.from_public_point(pk.point, curve, hashfunc) for pk in pks] return verifying_keys def to_string(self, encoding="raw"): @@ -509,9 +489,7 @@ def to_string(self, encoding="raw"): assert encoding in ("raw", "uncompressed", "compressed", "hybrid") return self.pubkey.point.to_bytes(encoding) - def to_pem( - self, point_encoding="uncompressed", curve_parameters_encoding=None - ): + def to_pem(self, point_encoding="uncompressed", curve_parameters_encoding=None): """ Convert the public key to the :term:`PEM` format. @@ -540,9 +518,7 @@ def to_pem( "PUBLIC KEY", ) - def to_der( - self, point_encoding="uncompressed", curve_parameters_encoding=None - ): + def to_der(self, point_encoding="uncompressed", curve_parameters_encoding=None): """ Convert the public key to the :term:`DER` format. @@ -683,7 +659,7 @@ def verify_digest( raise BadSignatureError("Signature verification failed") -class SigningKey(object): +class SigningKey: """ Class for handling keys that can create signatures (private keys). @@ -780,15 +756,12 @@ def from_secret_exponent(cls, secexp, curve, hashfunc=sha1): n = curve.order if not 1 <= secexp < n: raise MalformedPointError( - "Invalid value for secexp, expected integer " - "between 1 and {0}".format(n) + f"Invalid value for secexp, expected integer between 1 and {n}" ) pubkey_point = curve.generator * secexp if hasattr(pubkey_point, "scale"): pubkey_point = pubkey_point.scale() - self.verifying_key = VerifyingKey.from_public_point( - pubkey_point, curve, hashfunc, False - ) + self.verifying_key = VerifyingKey.from_public_point(pubkey_point, curve, hashfunc, False) pubkey = self.verifying_key.pubkey self.privkey = ecdsa.Private_key(pubkey, secexp) self.privkey.order = n @@ -824,8 +797,7 @@ def from_string(cls, string, curve, hashfunc=sha1): if len(string) != curve.baselen: raise MalformedPointError( - "Invalid length of private key, received {0}, " - "expected {1}".format(len(string), curve.baselen) + f"Invalid length of private key, received {len(string)}, expected {curve.baselen}" ) secexp = string_to_number(string) return cls.from_secret_exponent(secexp, curve, hashfunc) @@ -947,9 +919,7 @@ def from_der(cls, string, hashfunc=sha1, valid_curve_encodings=None): s, empty = der.remove_sequence(s) if empty != b"": - raise der.UnexpectedDER( - "trailing junk after DER privkey: %s" % binascii.hexlify(empty) - ) + raise der.UnexpectedDER("trailing junk after DER privkey: %s" % binascii.hexlify(empty)) version, s = der.remove_integer(s) @@ -960,17 +930,14 @@ def from_der(cls, string, hashfunc=sha1, valid_curve_encodings=None): if der.is_sequence(s): if version not in (0, 1): raise der.UnexpectedDER( - "expected version '0' or '1' at start of privkey, got %d" - % version + "expected version '0' or '1' at start of privkey, got %d" % version ) sequence, s = der.remove_sequence(s) algorithm_oid, algorithm_identifier = der.remove_object(sequence) if algorithm_oid not in (oid_ecPublicKey, oid_ecDH, oid_ecMQV): - raise der.UnexpectedDER( - "unexpected algorithm identifier '%s'" % (algorithm_oid,) - ) + raise der.UnexpectedDER("unexpected algorithm identifier '%s'" % (algorithm_oid,)) curve = Curve.from_der(algorithm_identifier, valid_curve_encodings) @@ -983,8 +950,7 @@ def from_der(cls, string, hashfunc=sha1, valid_curve_encodings=None): s, empty = der.remove_sequence(s) if empty != b"": raise der.UnexpectedDER( - "trailing junk after DER privkey: %s" - % binascii.hexlify(empty) + "trailing junk after DER privkey: %s" % binascii.hexlify(empty) ) version, s = der.remove_integer(s) @@ -992,8 +958,7 @@ def from_der(cls, string, hashfunc=sha1, valid_curve_encodings=None): # The version of the ECPrivateKey must be 1. if version != 1: raise der.UnexpectedDER( - "expected version '1' at start of DER privkey, got %d" - % version + "expected version '1' at start of DER privkey, got %d" % version ) privkey_str, s = der.remove_octet_string(s) @@ -1001,9 +966,7 @@ def from_der(cls, string, hashfunc=sha1, valid_curve_encodings=None): if not curve: tag, curve_oid_str, s = der.remove_constructed(s) if tag != 0: - raise der.UnexpectedDER( - "expected tag 0 in DER privkey, got %d" % tag - ) + raise der.UnexpectedDER("expected tag 0 in DER privkey, got %d" % tag) curve = Curve.from_der(curve_oid_str, valid_curve_encodings) # we don't actually care about the following fields @@ -1020,9 +983,7 @@ def from_der(cls, string, hashfunc=sha1, valid_curve_encodings=None): # our from_string method likes fixed-length privkey strings if len(privkey_str) < curve.baselen: - privkey_str = ( - b"\x00" * (curve.baselen - len(privkey_str)) + privkey_str - ) + privkey_str = b"\x00" * (curve.baselen - len(privkey_str)) + privkey_str return cls.from_string(privkey_str, curve, hashfunc) def to_string(self): @@ -1116,15 +1077,11 @@ def to_der( ] if format == "ssleay": priv_key_elems.append( - der.encode_constructed( - 0, self.curve.to_der(curve_parameters_encoding) - ) + der.encode_constructed(0, self.curve.to_der(curve_parameters_encoding)) ) # the 0 in encode_bitstring specifies the number of unused bits # in the `encoded_vk` string - priv_key_elems.append( - der.encode_constructed(1, der.encode_bitstring(encoded_vk, 0)) - ) + priv_key_elems.append(der.encode_constructed(1, der.encode_bitstring(encoded_vk, 0))) ec_private_key = der.encode_sequence(*priv_key_elems) if format == "ssleay": diff --git a/cm_ecdsa/numbertheory.py b/cm_ecdsa/numbertheory.py index d3f1ac5..76f08fd 100644 --- a/cm_ecdsa/numbertheory.py +++ b/cm_ecdsa/numbertheory.py @@ -15,10 +15,9 @@ # 2008.11.14: Use pow(base, exponent, modulus) for modular_exp. # Make gcd and lcm accept arbitrarily many arguments. -import sys - import math import random +import sys class Error(Exception): diff --git a/cm_ecdsa/rfc6979.py b/cm_ecdsa/rfc6979.py index 559dc81..a7695fc 100644 --- a/cm_ecdsa/rfc6979.py +++ b/cm_ecdsa/rfc6979.py @@ -15,8 +15,9 @@ https://github.com/codahale/rfc6979 """ -import hmac +import cm_hmac as hmac from binascii import hexlify + from .util import number_to_string, number_to_string_crop diff --git a/cm_ecdsa/util.py b/cm_ecdsa/util.py index 5a34ee3..123d24c 100644 --- a/cm_ecdsa/util.py +++ b/cm_ecdsa/util.py @@ -17,14 +17,15 @@ :func:`sigdecode_der` functions. """ -import os -import math import binascii +import math +import os import sys -from hashlib import sha256 -from . import der -from ._compat import normalise_bytes, int2byte +from cm_sha import sha256 + +from . import der +from ._compat import int2byte, normalise_bytes # RFC5480: # The "unrestricted" algorithm identifier is: @@ -50,6 +51,7 @@ oid_ecMQV = (1, 3, 132, 1, 13) + def entropy_to_bits(ent_256): """Convert a bytestring to string of 0's and 1's""" return bin(int.from_bytes(ent_256, "big"))[2:].zfill(len(ent_256) * 8) @@ -99,9 +101,7 @@ def __call__(self, numbytes): def block_generator(self, seed): counter = 0 while True: - for byte in sha256( - ("prng-%d-%s" % (counter, seed)).encode() - ).digest(): + for byte in sha256(("prng-%d-%s" % (counter, seed)).encode()).digest(): yield byte counter += 1 @@ -404,8 +404,7 @@ def sigdecode_string(signature, order): l = orderlen(order) if not len(signature) == 2 * l: raise MalformedSignature( - "Invalid length of signature, expected {0} bytes long, " - "provided string is {1} bytes long".format(2 * l, len(signature)) + f"Invalid length of signature, expected {2 * l} bytes long, provided string is {len(signature)} bytes long" ) r = string_to_number_fixedlen(signature[:l], order) s = string_to_number_fixedlen(signature[l:], order) @@ -433,9 +432,7 @@ def sigdecode_strings(rs_strings, order): """ if not len(rs_strings) == 2: raise MalformedSignature( - "Invalid number of strings provided: {0}, expected 2".format( - len(rs_strings) - ) + f"Invalid number of strings provided: {len(rs_strings)}, expected 2" ) (r_str, s_str) = rs_strings r_str = normalise_bytes(r_str) @@ -444,14 +441,14 @@ def sigdecode_strings(rs_strings, order): if not len(r_str) == l: raise MalformedSignature( "Invalid length of first string ('r' parameter), " - "expected {0} bytes long, provided string is {1} " - "bytes long".format(l, len(r_str)) + f"expected {l} bytes long, provided string is {len(r_str)} " + "bytes long" ) if not len(s_str) == l: raise MalformedSignature( "Invalid length of second string ('s' parameter), " - "expected {0} bytes long, provided string is {1} " - "bytes long".format(l, len(s_str)) + f"expected {l} bytes long, provided string is {len(s_str)} " + "bytes long" ) r = string_to_number_fixedlen(r_str, order) s = string_to_number_fixedlen(s_str, order) @@ -487,16 +484,13 @@ def sigdecode_der(sig_der, order): # return der.encode_sequence(der.encode_integer(r), der.encode_integer(s)) rs_strings, empty = der.remove_sequence(sig_der) if empty != b"": - raise der.UnexpectedDER( - "trailing junk after DER sig: %s" % binascii.hexlify(empty) - ) + raise der.UnexpectedDER("trailing junk after DER sig: %s" % binascii.hexlify(empty)) r, rest = der.remove_integer(rs_strings) s, empty = der.remove_integer(rest) if empty != b"": - raise der.UnexpectedDER( - "trailing junk after DER numbers: %s" % binascii.hexlify(empty) - ) + raise der.UnexpectedDER("trailing junk after DER numbers: %s" % binascii.hexlify(empty)) return r, s + def int2byte(i): - return i.to_bytes(1, 'big') + return i.to_bytes(1, "big") diff --git a/cm_fake_random.py b/cm_fake_random.py new file mode 100644 index 0000000..635534f --- /dev/null +++ b/cm_fake_random.py @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 Dan Halbert for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +import random + +# Always start at the same place +random.seed(0) + +def urandom(nbytes): + return random.randbytes(nbytes) + +def randbelow(n): + return random.randrange(n) diff --git a/cm_hmac.py b/cm_hmac.py index f327d55..5805baf 100644 --- a/cm_hmac.py +++ b/cm_hmac.py @@ -44,11 +44,7 @@ __version__ = "0.0.0-auto.0" __repo__ = "https://github.com/jimbobbennett/CircuitPython_HMAC.git" -try: - import hashlib as _hashlib -except ImportError: - import adafruit_hashlib as _hashlib - +from cm_sha import sha256 TRANS_5C = bytes((x ^ 0x5C) for x in range(256)) TRANS_36 = bytes((x ^ 0x36) for x in range(256)) @@ -78,12 +74,10 @@ def __init__(self, key, msg=None, digestmod=None): """ if not isinstance(key, (bytes, bytearray)): - raise TypeError( - "key: expected bytes or bytearray, but got %r" % type(key).__name__ - ) + raise TypeError("key: expected bytes or bytearray, but got %r" % type(key).__name__) if digestmod is None: - digestmod = _hashlib.sha256 + digestmod = sha256 if callable(digestmod): self.digest_cons = digestmod @@ -120,16 +114,13 @@ def __init__(self, key, msg=None, digestmod=None): def _translate(key, translation): return bytes(translation[x] for x in key) - @property def name(self): - """Return the name of this object - """ + """Return the name of this object""" return "hmac-" + self.inner.name def update(self, msg): - """Update this hashing object with the string msg. - """ + """Update this hashing object with the string msg.""" self.inner.update(msg) def copy(self): @@ -165,8 +156,7 @@ def digest(self): return hmac.digest() def hexdigest(self): - """Like digest(), but returns a string of hexadecimal digits instead. - """ + """Like digest(), but returns a string of hexadecimal digits instead.""" hmac = self._current() return hmac.hexdigest() diff --git a/cm_sha.py b/cm_sha.py new file mode 100644 index 0000000..e4a753b --- /dev/null +++ b/cm_sha.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 Dan Halbert for Adafruit Industries +# +# SPDX-License-Identifier: MIT + +import hashlib + +try: + sha1 = hashlib.sha1 +except AttributeError: + # CircuitPython hashlib does not have shortcut constructors. + def sha1(data=b""): + return hashlib.new("sha1", data) + +try: + sha256 = hashlib.sha256 +except AttributeError: + # CircuitPython hashlib does not support sha256. Use the Python equivalent. + import adafruit_hashlib + def sha256(data=b""): + return adafruit_hashlib.new("sha256", data) diff --git a/tests/test_chunked_message.py b/tests/test_chunked_message.py index b4a24df..4bf2e57 100644 --- a/tests/test_chunked_message.py +++ b/tests/test_chunked_message.py @@ -22,34 +22,44 @@ def test_only_one(): buf = bytearray(20) t = fill_array(4) end = t.encode_into(buf) - print(buf[:end].hex("x")) - assert buf[:end] == b"\x15\x36\x00\x04\x00\x04\x01\x04\x02\x04\x03\x18\x28\x02\x24\xff\x0b\x18" + correct = b"\x15\x36\x00\x04\x00\x04\x01\x04\x02\x04\x03\x18\x28\x02\x24\xff\x0b\x18" + print("result ", buf[:end].hex("-")) + print("correct", correct.hex("-")) + assert buf[:end] == correct def test_two_chunks(): buf = bytearray(16) t = fill_array(4) end = t.encode_into(buf) - print(buf[:end].hex("x")) - assert buf[:end] == b"\x15\x36\x00\x04\x00\x04\x01\x18\x29\x01\x28\x02\x24\xff\x0b\x18" + correct = b"\x15\x36\x00\x04\x00\x04\x01\x18\x29\x01\x28\x02\x24\xff\x0b\x18" + print("result ", buf[:end].hex("-")) + print("correct", correct.hex("-")) + assert buf[:end] == correct buf = bytearray(16) end = t.encode_into(buf) - print(buf[:end].hex("x")) - assert buf[:end] == b"\x15\x36\x00\x04\x02\x04\x03\x18\x28\x02\x24\xff\x0b\x18" + correct = b"\x15\x36\x00\x04\x02\x04\x03\x18\x28\x02\x24\xff\x0b\x18" + print("result ", buf[:end].hex("-")) + print("correct", correct.hex("-")) + assert buf[:end] == correct def test_two_chunks_odd(): buf = bytearray(17) t = fill_array(4) end = t.encode_into(buf) - print(buf[:end].hex("x")) - assert buf[:end] == b"\x15\x36\x00\x04\x00\x04\x01\x18\x29\x01\x28\x02\x24\xff\x0b\x18" + correct = b"\x15\x36\x00\x04\x00\x04\x01\x18\x29\x01\x28\x02\x24\xff\x0b\x18" + print("result ", buf[:end].hex("-")) + print("correct", correct.hex("-")) + assert buf[:end] == correct buf = bytearray(17) end = t.encode_into(buf) - print(buf[:end].hex("x")) - assert buf[:end] == b"\x15\x36\x00\x04\x02\x04\x03\x18\x28\x02\x24\xff\x0b\x18" + correct = b"\x15\x36\x00\x04\x02\x04\x03\x18\x28\x02\x24\xff\x0b\x18" + print("result ", buf[:end].hex("-")) + print("correct", correct.hex("-")) + assert buf[:end] == correct def test_three_chunks():