From 6ddd3ad89136471f609908a1c2d308c253205050 Mon Sep 17 00:00:00 2001 From: Dan Halbert Date: Thu, 21 Nov 2024 19:28:45 -0500 Subject: [PATCH] wip; removing direct use of ecdsa --- blink.py | 30 +++++++++++++++++++++ circuitmatter/__init__.py | 7 ++--- circuitmatter/certificates.py | 18 +++---------- circuitmatter/crypto.py | 49 ++++++++++++++++++++++++++++++++++- circuitmatter/session.py | 10 ++----- 5 files changed, 88 insertions(+), 26 deletions(-) create mode 100644 blink.py diff --git a/blink.py b/blink.py new file mode 100644 index 0000000..6d6f092 --- /dev/null +++ b/blink.py @@ -0,0 +1,30 @@ +"""Simple LED on and off as a light.""" + +import circuitmatter as cm +from circuitmatter.device_types.lighting import on_off + +import digitalio +import board + + +class LED(on_off.OnOffLight): + def __init__(self, name, led): + super().__init__(name) + # self._led = led + # self._led.direction = digitalio.Direction.OUTPUT + + def on(self): + print("_led set to on") + # self._led.value = True + + def off(self): + print("_led set to on") + # self._led.value = False + + +matter = cm.CircuitMatter() +#led = LED("led1", digitalio.DigitalInOut(board.D13)) +led = LED("led1", None) +matter.add_device(led) +while True: + matter.process_packets() diff --git a/circuitmatter/__init__.py b/circuitmatter/__init__.py index 3cd743f..be91e3a 100644 --- a/circuitmatter/__init__.py +++ b/circuitmatter/__init__.py @@ -7,7 +7,7 @@ import binascii import hashlib import json -import pathlib +import os import time from . import case, interaction_model, nonvolatile, session @@ -48,8 +48,9 @@ def __init__( random_source = random self.random = random_source - state_file = pathlib.Path(state_filename) - if not state_file.exists(): + try: + os.stat(state_filename) + except OSError: from circuitmatter import certificates initial_state = certificates.generate_initial_state( diff --git a/circuitmatter/certificates.py b/circuitmatter/certificates.py index abbf02f..2843cf8 100644 --- a/circuitmatter/certificates.py +++ b/circuitmatter/certificates.py @@ -11,7 +11,7 @@ from ecdsa import der from ecdsa.curves import NIST256p -from . import pase, tlv +from . import crypto, pase, tlv from .data_model import Enum8 PAI_KEY_DER = b"\x30\x77\x02\x01\x01\x04\x20\xbb\x76\xa5\x80\x5f\x97\x26\x49\xaf\x1e\x8a\x87\xdc\x45\x57\xe6\x2c\x09\x00\xe5\x07\x09\xe8\x5c\x79\xc6\x44\xdf\x78\x90\xe5\x96\xa0\x0a\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07\xa1\x44\x03\x42\x00\x04\x37\x5d\x2b\xc8\xc6\x15\x27\x5b\xfd\x84\x8b\x52\xfe\x21\x96\xe2\xa1\x4e\xf3\xcc\x91\xae\xf0\x5d\xff\x85\x1c\xbc\x19\xb1\xa9\x35\x45\x8c\xfe\x04\xaa\x42\x4e\x01\x6d\xe3\xd6\x74\xdc\x5b\x73\x29\xbd\x77\x57\xfd\xdb\x32\x38\xd6\x26\x73\x62\x9b\x3c\x79\x08\x45" # noqa: E501 Line too long @@ -87,11 +87,7 @@ def generate_certificates(vendor_id=0xFFF1, product_id=0x8000, device_type=22, p subject_key_identifier = ( b"\x62\xfa\x82\x33\x59\xac\xfa\xa9\x96\x3e\x1c\xfa\x14\x0a\xdd\xf5\x04\xf3\x71\x60" ) - signature = private_key.sign_deterministic( - declaration, - hashfunc=hashlib.sha256, - sigencode=ecdsa.util.sigencode_der_canonize, - ) + signature = crypto.Sign_as_der(private_key, declaration) certification_declaration = [] # version @@ -133,9 +129,7 @@ def generate_certificates(vendor_id=0xFFF1, product_id=0x8000, device_type=22, p def generate_dac(vendor_id, product_id, product_name, random_source) -> tuple[bytes, bytes]: # noqa: PLR0914 Too many locals - dac_key = ecdsa.keys.SigningKey.generate( - curve=ecdsa.NIST256p, hashfunc=hashlib.sha256, entropy=random_source.urandom - ) + dac_key = crypto.GenerateKeyPair(random_source.urandom) version = der.encode_constructed(0, der.encode_integer(2)) serial_number = der.encode_integer(1) @@ -194,11 +188,7 @@ def generate_dac(vendor_id, product_id, product_name, random_source) -> tuple[by ) pai_key = ecdsa.keys.SigningKey.from_der(PAI_KEY_DER, hashfunc=hashlib.sha256) - signature = pai_key.sign_deterministic( - certificate, - hashfunc=hashlib.sha256, - sigencode=ecdsa.util.sigencode_der_canonize, - ) + signature = crypto.Sign_as_der(pai_key, certificate) dac_cert = der.encode_sequence( certificate, signature_algorithm, der.encode_bitstring(signature, unused=0) diff --git a/circuitmatter/crypto.py b/circuitmatter/crypto.py index 04d0069..7d2b1cc 100644 --- a/circuitmatter/crypto.py +++ b/circuitmatter/crypto.py @@ -2,7 +2,13 @@ # # SPDX-License-Identifier: MIT -import enum +try: + import enum +except ImportError: + class Enum: + class IntEnum: + pass + import hashlib import hmac import struct @@ -122,6 +128,25 @@ class MatterCertificate(tlv.Structure): signature = tlv.OctetStringMember(11, GROUP_SIZE_BYTES * 2) +def DRBG_seed(seed): + random.seed(seed) + + +def DRBG_bytes(byte_len: int) -> bytes: + """Matter spec DRBG() specifies length in bits.""" + return random.randbytes(n) + + +def DRBG_32() -> int: + """Return a 32-bit random unsigned int.""" + return random.randrange(0, 0xFFFFFFFF) + + +def TRNG_bytes(byte_len: int) -> bytes: + """Matter spec TRNG() specifies length in bits.""" + return os.urandom(byte_len) + + def Hash(*message) -> bytes: h = hashlib.sha256() for m in message: @@ -135,6 +160,28 @@ def HMAC(key, message) -> bytes: return m.digest() +def GenerateKeyPair(random_source): + return ecdsa.keys.SigningKey.generate( + curve=ecdsa.NIST256p, hashfunc=hashlib.sha256, entropy=random_source + ) + + +def Sign_as_der(private_key, message): + return private_key.sign_deterministic( + message, + hashfunc=hashlib.sha256, + sigencode=ecdsa.util.sigencode_der_canonize + ) + + +def Sign_as_string(private_key, message): + return private_key.sign_deterministic( + message, + hashfunc=hashlib.sha256, + sigencode=ecdsa.util.sigencode_string + ) + + def HKDF_Extract(salt, input_key) -> bytes: return HMAC(salt, input_key) diff --git a/circuitmatter/session.py b/circuitmatter/session.py index e708883..515bfcf 100644 --- a/circuitmatter/session.py +++ b/circuitmatter/session.py @@ -608,9 +608,7 @@ def reply_to_sigma1(self, exchange, sigma1): # noqa: PLR0915, PLR0914 Too many session_context.session_active_threshold = ( sigma1.initiatorSessionParams.session_active_threshold / 1000 ) - ephemeral_key_pair = ecdsa.keys.SigningKey.generate( - curve=ecdsa.NIST256p, hashfunc=hashlib.sha256, entropy=self.random.urandom - ) + ephemeral_key_pair = crypto.GenerateKeyPair(self.random.urandom) ephemeral_public_key = ephemeral_key_pair.verifying_key.to_string(encoding="uncompressed") @@ -632,11 +630,7 @@ def reply_to_sigma1(self, exchange, sigma1): # noqa: PLR0915, PLR0914 Too many tbsdata = tbsdata.encode() - tbedata.signature = self.node_credentials.noc_keys[matching_noc].sign_deterministic( - tbsdata, - hashfunc=hashlib.sha256, - sigencode=ecdsa.util.sigencode_string, - ) + tbedata.signature = crypto.Sign_as_string(self.node_credentials.noc_keys[matching_noc], tbsdata) tbedata.resumptionID = session_context.resumption_id random = self.random.urandom(32)