Skip to content

Commit

Permalink
Generate DAC ourselves from our own (test) PAI
Browse files Browse the repository at this point in the history
Now generate and store keys in the json device state. Our PAI for
vendor ID 0xfff4 is hard coded. Other vendor ids require externally
generated certs.

Fixes #19
  • Loading branch information
tannewt committed Oct 17, 2024
1 parent 64d8fbd commit 818c9e6
Show file tree
Hide file tree
Showing 13 changed files with 404 additions and 420 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,4 @@ cython_debug/
# CircuitMatter replay logs
test_data/recorded_packets-*-*.jsonl
test_data/device_state-*-*.json
certification_declaration.der
*-device-state.json
Binary file added CircuitMatter-PAI-Cert.der
Binary file not shown.
Binary file added CircuitMatter-PAI-Key.der
Binary file not shown.
17 changes: 15 additions & 2 deletions circuitmatter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import binascii
import hashlib
import json
import pathlib
import time

from . import case
Expand All @@ -22,8 +24,9 @@ def __init__(
mdns_server=None,
random_source=None,
state_filename="matter-device-state.json",
vendor_id=0xFFF1,
product_id=0x8000,
vendor_id=0xFFF4,
product_id=0x1234,
product_name="CircuitMatter Device",
):
if socketpool is None:
import socket
Expand All @@ -43,6 +46,16 @@ def __init__(
random_source = random
self.random = random_source

state_file = pathlib.Path(state_filename)
if not state_file.exists():
from circuitmatter import certificates

initial_state = certificates.generate_initial_state(
vendor_id, product_id, product_name, random_source
)
with open(state_filename, "w") as f:
json.dump(initial_state, f, indent=1)

self.nonvolatile = nonvolatile.PersistentDictionary(state_filename)

for key in ["discriminator", "salt", "iteration-count", "verifier"]:
Expand Down
138 changes: 124 additions & 14 deletions circuitmatter/certificates.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# This file should only be needed when generating certificates.

import binascii
import hashlib

from . import tlv
Expand All @@ -8,7 +9,7 @@
import ecdsa
from ecdsa import der

import pathlib
PAI_KEY_DER = b"\x30\x77\x02\x01\x01\x04\x20\xbb\x76\xa5\x80\x5f\x97\x26\x49\xaf\x1e\x8a\x87\xdc\x45\x57\xe6\x2c\x09\x00\xe5\x07\x09\xe8\x5c\x79\xc6\x44\xdf\x78\x90\xe5\x96\xa0\x0a\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07\xa1\x44\x03\x42\x00\x04\x37\x5d\x2b\xc8\xc6\x15\x27\x5b\xfd\x84\x8b\x52\xfe\x21\x96\xe2\xa1\x4e\xf3\xcc\x91\xae\xf0\x5d\xff\x85\x1c\xbc\x19\xb1\xa9\x35\x45\x8c\xfe\x04\xaa\x42\x4e\x01\x6d\xe3\xd6\x74\xdc\x5b\x73\x29\xbd\x77\x57\xfd\xdb\x32\x38\xd6\x26\x73\x62\x9b\x3c\x79\x08\x45"


class CertificationType(Enum8):
Expand Down Expand Up @@ -41,39 +42,39 @@ def encode_set(*encoded_pieces):
return b"\x31" + der.encode_length(total_len) + b"".join(encoded_pieces)


def encode_utf8_string(s):
encoded = s.encode("utf-8")
return b"\x0c" + der.encode_length(len(encoded)) + encoded


def generate_certificates(
vendor_id=0xFFF1, product_id=0x8000, device_type=22, prefix=None
):
declaration = CertificationDeclaration()
declaration.format_version = 1 # Always 1
declaration.vendor_id = vendor_id
declaration.product_id_array = [product_id]
declaration.device_type_id = 0x1234 # device_type
declaration.certificate_id = "ZIG20141ZB330001-24" # "CSA00000SWC00000-00"
declaration.device_type_id = device_type
declaration.certificate_id = "CSA00000SWC00000-00"
declaration.security_level = 0 # Always 0
declaration.security_information = 0 # Always 0
declaration.version_number = 0x2694 # 1 # Always 1
declaration.version_number = 1 # Always 1
declaration.certification_type = CertificationType.DEVELOPMENT_AND_TEST
declaration = declaration.encode()

for i in range(0, len(declaration), 16):
print(f"{i:08x}", declaration[i : i + 16].hex(" "))

# From: https://github.com/project-chip/matter.js/blob/main/packages/protocol/src/certificate/CertificationDeclarationManager.ts
# NIST256p is the same as secp256r1
private_key = ecdsa.keys.SigningKey.from_string(
b"\xae\xf3\x48\x41\x16\xe9\x48\x1e\xc5\x7b\xe0\x47\x2d\xf4\x1b\xf4\x99\x06\x4e\x50\x24\xad\x86\x9e\xca\x5e\x88\x98\x02\xd4\x80\x75",
curve=ecdsa.curves.NIST256p,
hashfunc=hashlib.sha256,
)
print(private_key.to_string().hex().upper())
subject_key_identifier = b"\x62\xfa\x82\x33\x59\xac\xfa\xa9\x96\x3e\x1c\xfa\x14\x0a\xdd\xf5\x04\xf3\x71\x60"
signature = private_key.sign_deterministic(
declaration,
hashfunc=hashlib.sha256,
sigencode=ecdsa.util.sigencode_der_canonize,
)
print("signature", signature.hex(" "))

certification_declaration = []
# version
Expand Down Expand Up @@ -120,9 +121,118 @@ def generate_certificates(
return cms_signed


def generate_dac(
vendor_id, product_id, product_name, random_source
) -> tuple[bytes, bytes]:
dac_key = ecdsa.keys.SigningKey.generate(
curve=ecdsa.NIST256p, hashfunc=hashlib.sha256, entropy=random_source.urandom
)

version = der.encode_constructed(0, der.encode_integer(2))
serial_number = der.encode_integer(1)
signature_algorithm = der.encode_sequence(der.encode_oid(1, 2, 840, 10045, 4, 3, 2))
# CircuitMatter PAI for vendor ID 0xfff4
issuer = b"\x30\x32\x31\x1a\x30\x18\x06\x03\x55\x04\x03\x0c\x11\x43\x69\x72\x63\x75\x69\x74\x4d\x61\x74\x74\x65\x72\x20\x50\x41\x49\x31\x14\x30\x12\x06\x0a\x2b\x06\x01\x04\x01\x82\xa2\x7c\x02\x01\x0c\x04\x46\x46\x46\x34"

# Starting 10/17/2024 and never expiring
validity = b"\x30\x20\x17\x0d\x32\x34\x31\x30\x31\x37\x30\x30\x30\x30\x30\x30\x5a\x18\x0f\x39\x39\x39\x39\x31\x32\x33\x31\x32\x33\x35\x39\x35\x39\x5a"

common_name = encode_set(
der.encode_sequence(
der.encode_oid(2, 5, 4, 3), encode_utf8_string(product_name)
)
)
encoded_vendor_id = encode_set(
der.encode_sequence(
der.encode_oid(1, 3, 6, 1, 4, 1, 37244, 2, 1),
encode_utf8_string(f"{vendor_id:04X}"),
)
)
encoded_product_id = encode_set(
der.encode_sequence(
der.encode_oid(1, 3, 6, 1, 4, 1, 37244, 2, 2),
encode_utf8_string(f"{product_id:04X}"),
)
)
subject = der.encode_sequence(common_name, encoded_vendor_id, encoded_product_id)

algorithm_id = b"\x30\x13\x06\x07\x2a\x86\x48\xce\x3d\x02\x01\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07"

public_key = dac_key.verifying_key.to_string(encoding="uncompressed")
public_key_info = der.encode_sequence(
algorithm_id, der.encode_bitstring(public_key, unused=0)
)

basic_constraints = b"\x30\x0c\x06\x03\x55\x1d\x13\x01\x01\xff\x04\x02\x30\x00"
key_usage = b"\x30\x0e\x06\x03\x55\x1d\x0f\x01\x01\xff\x04\x04\x03\x02\x07\x80"
key_id = der.encode_sequence(
der.encode_oid(2, 5, 29, 14),
der.encode_octet_string(
der.encode_octet_string(hashlib.sha1(public_key).digest())
),
)
authority_key_id = b"\x30\x1f\x06\x03\x55\x1d\x23\x04\x18\x30\x16\x80\x14\x07\xf8\x38\x0a\x5f\x01\x36\xfc\xe2\x36\xbd\x45\xf2\x88\xff\x22\xdc\xa6\xf4\xa7"
extensions = der.encode_constructed(
3, der.encode_sequence(basic_constraints, key_usage, key_id, authority_key_id)
)

certificate = der.encode_sequence(
version,
serial_number,
signature_algorithm,
issuer,
validity,
subject,
public_key_info,
extensions,
)

pai_key = ecdsa.keys.SigningKey.from_der(PAI_KEY_DER, hashfunc=hashlib.sha256)
signature = pai_key.sign_deterministic(
certificate,
hashfunc=hashlib.sha256,
sigencode=ecdsa.util.sigencode_der_canonize,
)

dac_cert = der.encode_sequence(
certificate, signature_algorithm, der.encode_bitstring(signature, unused=0)
)
dac_key = dac_key.to_der()
return dac_cert, dac_key


def generate_initial_state(vendor_id, product_id, product_name, random_source):
if vendor_id != 0xFFF4 or product_id != 0x1234:
raise ValueError("Invalid vendor_id or product_id")

cd = generate_certificates(vendor_id=vendor_id, product_id=product_id)

dac_cert, dac_key = generate_dac(vendor_id, product_id, product_name, random_source)
initial_state = {
"discriminator": 3840,
"passcode": 67202583,
"iteration-count": 10000,
"salt": "5uCP0ITHYzI9qBEe6hfU4HfY3y7VopSk0qNvhvznhiQ=",
"verifier": "0xGqxJFBr/ViQt3lv1Yw5F0GcPBAtFFvXB+EcIIjH5cEsjkPZHDQyFWjA6Ide+2gafYnZgIy6gJBgdJOlD8htAZKe0i6nIhT/ADsBWH4CvZcl37n/ofEEECWSEBV4vy/0A==",
"devices": {
"root": {
"0x3e": {
"cd": binascii.b2a_base64(cd, newline=False).decode("utf-8"),
"dac_cert": binascii.b2a_base64(dac_cert, newline=False).decode(
"utf-8"
),
"dac_key": binascii.b2a_base64(dac_key, newline=False).decode(
"utf-8"
),
}
},
},
}
return initial_state


if __name__ == "__main__":
cd = generate_certificates()
pathlib.Path("certification_declaration.der").write_bytes(cd)
for i in range(0, len(cd), 16):
print(f"{i:08x}", cd[i : i + 16].hex(" "))
print(cd.hex(" "))
from circuitmatter.utility import random

initial_state = generate_initial_state(0xFFF4, 0x1234, "CircuitMatter", random)
print(initial_state)
29 changes: 11 additions & 18 deletions circuitmatter/device_types/utility/root_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import ecdsa
from ecdsa import der
import hashlib
import pathlib
import struct
import time

Expand Down Expand Up @@ -35,17 +34,7 @@

from .. import simple_device

TEST_CERTS = pathlib.Path(
"../esp-matter/connectedhomeip/connectedhomeip/credentials/test/attestation/"
)
TEST_PAI_CERT_DER = TEST_CERTS / "Chip-Test-PAI-FFF1-8000-Cert.der"
TEST_PAI_CERT_PEM = TEST_CERTS / "Chip-Test-PAI-FFF1-8000-Cert.pem"
TEST_DAC_CERT_DER = TEST_CERTS / "Chip-Test-DAC-FFF1-8000-0000-Cert.der"
TEST_DAC_CERT_PEM = TEST_CERTS / "Chip-Test-DAC-FFF1-8000-0000-Cert.pem"
TEST_DAC_KEY_DER = TEST_CERTS / "Chip-Test-DAC-FFF1-8000-0000-Key.der"
TEST_DAC_KEY_PEM = TEST_CERTS / "Chip-Test-DAC-FFF1-8000-0000-Key.pem"

TEST_CD_CERT_DER = pathlib.Path("test_data/certification_declaration.der")
PAI_CERT_DER = b"\x30\x82\x01\xaa\x30\x82\x01\x50\xa0\x03\x02\x01\x02\x02\x08\x5e\xf5\xaf\xd0\x13\x60\xf5\xd4\x30\x0a\x06\x08\x2a\x86\x48\xce\x3d\x04\x03\x02\x30\x1a\x31\x18\x30\x16\x06\x03\x55\x04\x03\x0c\x0f\x4d\x61\x74\x74\x65\x72\x20\x54\x65\x73\x74\x20\x50\x41\x41\x30\x20\x17\x0d\x32\x34\x31\x30\x31\x37\x30\x30\x30\x30\x30\x30\x5a\x18\x0f\x39\x39\x39\x39\x31\x32\x33\x31\x32\x33\x35\x39\x35\x39\x5a\x30\x32\x31\x1a\x30\x18\x06\x03\x55\x04\x03\x0c\x11\x43\x69\x72\x63\x75\x69\x74\x4d\x61\x74\x74\x65\x72\x20\x50\x41\x49\x31\x14\x30\x12\x06\x0a\x2b\x06\x01\x04\x01\x82\xa2\x7c\x02\x01\x0c\x04\x46\x46\x46\x34\x30\x59\x30\x13\x06\x07\x2a\x86\x48\xce\x3d\x02\x01\x06\x08\x2a\x86\x48\xce\x3d\x03\x01\x07\x03\x42\x00\x04\x37\x5d\x2b\xc8\xc6\x15\x27\x5b\xfd\x84\x8b\x52\xfe\x21\x96\xe2\xa1\x4e\xf3\xcc\x91\xae\xf0\x5d\xff\x85\x1c\xbc\x19\xb1\xa9\x35\x45\x8c\xfe\x04\xaa\x42\x4e\x01\x6d\xe3\xd6\x74\xdc\x5b\x73\x29\xbd\x77\x57\xfd\xdb\x32\x38\xd6\x26\x73\x62\x9b\x3c\x79\x08\x45\xa3\x66\x30\x64\x30\x12\x06\x03\x55\x1d\x13\x01\x01\xff\x04\x08\x30\x06\x01\x01\xff\x02\x01\x00\x30\x0e\x06\x03\x55\x1d\x0f\x01\x01\xff\x04\x04\x03\x02\x01\x06\x30\x1d\x06\x03\x55\x1d\x0e\x04\x16\x04\x14\x07\xf8\x38\x0a\x5f\x01\x36\xfc\xe2\x36\xbd\x45\xf2\x88\xff\x22\xdc\xa6\xf4\xa7\x30\x1f\x06\x03\x55\x1d\x23\x04\x18\x30\x16\x80\x14\x78\x5c\xe7\x05\xb8\x6b\x8f\x4e\x6f\xc7\x93\xaa\x60\xcb\x43\xea\x69\x68\x82\xd5\x30\x0a\x06\x08\x2a\x86\x48\xce\x3d\x04\x03\x02\x03\x48\x00\x30\x45\x02\x21\x00\x9c\x5f\x59\x83\x5c\xc6\x51\xc2\x5c\x79\x01\x33\x25\x22\x3d\x25\x6b\xe7\x43\x98\xbc\x03\x83\x89\xf4\x55\x6d\xf7\xf7\x4a\x8a\x34\x02\x20\x5c\x14\x17\x4c\xc3\x23\x07\xff\x42\x1c\x4f\x8b\x0b\x63\xb9\x62\x52\x58\xa2\x96\xe0\x31\xfd\xce\x51\xa2\x7a\x08\x49\x2b\xc0\x38"


class _GeneralCommissioningCluster(GeneralCommissioningCluster):
Expand Down Expand Up @@ -111,9 +100,7 @@ def __init__(self, group_key_manager, random_source, mdns_server, port):

self.group_key_manager = group_key_manager

self.dac_key = ecdsa.keys.SigningKey.from_der(
TEST_DAC_KEY_DER.read_bytes(), hashfunc=hashlib.sha256
)
self.dac_key = None

self.new_key_for_update = False
self.pending_root_cert = None
Expand All @@ -133,6 +120,10 @@ def __init__(self, group_key_manager, random_source, mdns_server, port):
def restore(self, nonvolatile):
super().restore(nonvolatile)

self.dac_key = ecdsa.keys.SigningKey.from_der(
binascii.a2b_base64(nonvolatile["dac_key"]), hashfunc=hashlib.sha256
)

if "pk" not in nonvolatile:
return

Expand Down Expand Up @@ -178,9 +169,9 @@ def certificate_chain_request(
) -> NodeOperationalCredentialsCluster.CertificateChainResponse:
response = NodeOperationalCredentialsCluster.CertificateChainResponse()
if args.CertificateType == CertificateChainTypeEnum.PAI:
response.Certificate = TEST_PAI_CERT_DER.read_bytes()
response.Certificate = PAI_CERT_DER
elif args.CertificateType == CertificateChainTypeEnum.DAC:
response.Certificate = TEST_DAC_CERT_DER.read_bytes()
response.Certificate = binascii.a2b_base64(self._nonvolatile["dac_cert"])
return response

def attestation_request(
Expand All @@ -189,7 +180,9 @@ def attestation_request(
args: NodeOperationalCredentialsCluster.AttestationRequest,
) -> NodeOperationalCredentialsCluster.AttestationResponse:
elements = AttestationElements()
elements.certification_declaration = TEST_CD_CERT_DER.read_bytes()
elements.certification_declaration = binascii.a2b_base64(
self._nonvolatile["cd"]
)
elements.attestation_nonce = args.AttestationNonce
elements.timestamp = int(time.time())
elements = elements.encode()
Expand Down
20 changes: 13 additions & 7 deletions examples/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ class NeoPixel(on_off.OnOffLight):


def run(replay_file=None):
device_state = pathlib.Path("test_data/device_state.json")
replay_device_state = pathlib.Path("test_data/replay_device_state.json")
device_state = pathlib.Path("live-device-state.json")
if replay_file:
replay_lines = []
with open(replay_file, "r") as f:
Expand All @@ -34,18 +33,25 @@ def run(replay_file=None):
mdns_server = DummyMDNS()
random_source = ReplayRandom(replay_lines)
# Reset device state to before the captured run
device_state.write_text(pathlib.Path(device_state_fn).read_text())
if device_state_fn == "none":
device_state.unlink(missing_ok=True)
else:
device_state.write_text(pathlib.Path(device_state_fn).read_text())
else:
timestamp = time.strftime("%Y%m%d-%H%M%S")
record_file = open(f"test_data/recorded_packets-{timestamp}.jsonl", "w")
device_state_fn = f"test_data/device_state-{timestamp}.json"
record_file.write(f"{device_state_fn}\n")
replay_device_state = pathlib.Path(device_state_fn)
if device_state.exists():
record_file.write(f"{device_state_fn}\n")
# Save device state before we run so replays can use it.
replay_device_state.write_text(device_state.read_text())
else:
# No starting state.
record_file.write("none\n")
socketpool = RecordingSocketPool(record_file, socket)
mdns_server = Avahi()
random_source = RecordingRandom(record_file, random)
# Save device state before we run so replays can use it.
replay_device_state = pathlib.Path(device_state_fn)
replay_device_state.write_text(device_state.read_text())

matter = cm.CircuitMatter(socketpool, mdns_server, random_source, device_state)
led = NeoPixel("neopixel1")
Expand Down
Loading

0 comments on commit 818c9e6

Please sign in to comment.