#!/usr/bin/env python3 import base64 import json import os import time import hmac import hashlib from dataclasses import dataclass, field, asdict from typing import Dict, List, Optional, Any, Tuple import secrets import uuid # For cryptographic operations from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec, utils from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.backends import default_backend @dataclass class KeyConfig: """Configuration for key storage.""" file_path: str = "hw_sign_keys.json" hardware_key_type: str = "ecdsa-p256" # Options: ecdsa-p256, ed25519, rsa-2048 accel_key_type: str = "ecdh-p256" # Options: ecdh-p256, ecdsa-p256, rsa-2048 @dataclass class KeyPair: """Represents a key pair with additional metadata.""" id: str key_type: str created_at: int private_key: Optional[str] = None # Base64 encoded private key public_key: Optional[str] = None # Base64 encoded public key @dataclass class SharedSecret: """Represents a shared secret derived from ECDH.""" id: str secret: str # Base64 encoded shared secret client_pub_key: str # Client's public key used in derivation server_pub_key: str # Server's public key used in derivation created_at: int expires_at: Optional[int] = None @dataclass class KeyStorage: """Storage for all keys and shared secrets.""" hardware_keys: List[KeyPair] = field(default_factory=list) accel_keys: List[KeyPair] = field(default_factory=list) shared_secrets: List[SharedSecret] = field(default_factory=list) class HwSignMock: """Mock implementation of hardware-bound signing.""" def __init__(self, config: KeyConfig = None): """Initialize with optional configuration.""" self.config = config or KeyConfig() self.storage = self._load_storage() self.current_hw_key: Optional[KeyPair] = None self.current_accel_key: Optional[KeyPair] = None self.current_shared_secret: Optional[SharedSecret] = None # Initialize keys if none exist if not self.storage.hardware_keys: self._generate_hardware_key() # Set current hardware key self.current_hw_key = self.storage.hardware_keys[-1] print(f"Initialized HW Sign Mock with {len(self.storage.hardware_keys)} hardware keys") def _load_storage(self) -> KeyStorage: """Load key storage from file or create new storage.""" try: if os.path.exists(self.config.file_path): with open(self.config.file_path, 'r') as f: data = json.load(f) storage = KeyStorage( hardware_keys=[KeyPair(**k) for k in data.get('hardware_keys', [])], accel_keys=[KeyPair(**k) for k in data.get('accel_keys', [])], shared_secrets=[SharedSecret(**s) for s in data.get('shared_secrets', [])] ) print(f"Loaded {len(storage.hardware_keys)} hardware keys from {self.config.file_path}") return storage except Exception as e: print(f"Error loading key storage: {e}") # Return empty storage if file doesn't exist or has errors return KeyStorage() def _save_storage(self): """Save key storage to file.""" # Convert to dictionary data = { "hardware_keys": [asdict(k) for k in self.storage.hardware_keys], "accel_keys": [asdict(k) for k in self.storage.accel_keys], "shared_secrets": [asdict(s) for s in self.storage.shared_secrets] } # Save to file with open(self.config.file_path, 'w') as f: json.dump(data, f, indent=2) print(f"Saved key storage to {self.config.file_path}") def _generate_hardware_key(self) -> KeyPair: """Generate a new hardware key pair.""" key_id = str(uuid.uuid4()) created_at = int(time.time()) # Generate EC key pair if self.config.hardware_key_type == "ecdsa-p256": private_key = ec.generate_private_key( ec.SECP256R1(), # P-256 curve default_backend() ) # Serialize keys private_bytes = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) public_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) else: raise ValueError(f"Unsupported hardware key type: {self.config.hardware_key_type}") # Create key pair key_pair = KeyPair( id=key_id, key_type=self.config.hardware_key_type, created_at=created_at, private_key=base64.b64encode(private_bytes).decode('utf-8'), public_key=base64.b64encode(public_bytes).decode('utf-8') ) # Add to storage self.storage.hardware_keys.append(key_pair) self._save_storage() print(f"Generated new hardware key of type {self.config.hardware_key_type} with ID {key_id}") return key_pair def _generate_accel_key(self) -> KeyPair: """Generate a new acceleration key pair.""" key_id = str(uuid.uuid4()) created_at = int(time.time()) # Generate EC key pair for ECDH if self.config.accel_key_type == "ecdh-p256": private_key = ec.generate_private_key( ec.SECP256R1(), # P-256 curve default_backend() ) # Serialize keys private_bytes = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) public_bytes = private_key.public_key().public_bytes( encoding=serialization.Encoding.DER, format=serialization.PublicFormat.SubjectPublicKeyInfo ) else: raise ValueError(f"Unsupported acceleration key type: {self.config.accel_key_type}") # Create key pair key_pair = KeyPair( id=key_id, key_type=self.config.accel_key_type, created_at=created_at, private_key=base64.b64encode(private_bytes).decode('utf-8'), public_key=base64.b64encode(public_bytes).decode('utf-8') ) # Add to storage self.storage.accel_keys.append(key_pair) self._save_storage() print(f"Generated new acceleration key of type {self.config.accel_key_type} with ID {key_id}") return key_pair def _load_private_key(self, key_pair: KeyPair): """Load private key from key pair.""" if not key_pair or not key_pair.private_key: raise ValueError("Invalid key pair") private_bytes = base64.b64decode(key_pair.private_key) if key_pair.key_type == "ecdsa-p256" or key_pair.key_type == "ecdh-p256": return serialization.load_der_private_key( private_bytes, password=None, backend=default_backend() ) else: raise ValueError(f"Unsupported key type: {key_pair.key_type}") def _load_public_key(self, base64_key: str, key_type: str = "ecdsa-p256"): """Load public key from base64 encoded string.""" public_bytes = base64.b64decode(base64_key) if key_type == "ecdsa-p256" or key_type == "ecdh-p256": return serialization.load_der_public_key( public_bytes, backend=default_backend() ) else: raise ValueError(f"Unsupported key type: {key_type}") def sign_with_hardware_key(self, data: str) -> str: """Sign data with hardware key using ECDSA.""" if not self.current_hw_key: raise ValueError("No hardware key available") private_key = self._load_private_key(self.current_hw_key) if self.current_hw_key.key_type == "ecdsa-p256": # Sign using ECDSA signature = private_key.sign( data.encode('utf-8'), ec.ECDSA(hashes.SHA256()) ) return base64.b64encode(signature).decode('utf-8') else: raise ValueError(f"Unsupported key type for signing: {self.current_hw_key.key_type}") def sign_with_accel_key(self, data: str) -> str: """ Sign data with acceleration key. If a shared secret exists, use HMAC-SHA256, otherwise use ECDSA. """ if self.current_shared_secret: # Use HMAC with shared secret return self._sign_hmac(data, self.current_shared_secret.secret) # No shared secret, generate a new acceleration key if needed if not self.current_accel_key: self.current_accel_key = self._generate_accel_key() private_key = self._load_private_key(self.current_accel_key) if self.current_accel_key.key_type == "ecdh-p256": # Sign using ECDSA (same operation as P-256 ECDSA) signature = private_key.sign( data.encode('utf-8'), ec.ECDSA(hashes.SHA256()) ) return base64.b64encode(signature).decode('utf-8') else: raise ValueError(f"Unsupported key type for signing: {self.current_accel_key.key_type}") def _sign_hmac(self, data: str, secret_base64: str) -> str: """Sign data using HMAC-SHA256 with the shared secret.""" secret_bytes = base64.b64decode(secret_base64) # Create HMAC h = hmac.new(secret_bytes, data.encode('utf-8'), hashlib.sha256) return base64.b64encode(h.digest()).decode('utf-8') def get_hardware_public_key(self) -> Tuple[str, str]: """Get the current hardware public key and its type.""" if not self.current_hw_key: self.current_hw_key = self._generate_hardware_key() return self.current_hw_key.public_key, self.current_hw_key.key_type def get_accel_public_key(self) -> Tuple[str, str]: """Get the current acceleration public key and its type.""" if not self.current_accel_key: self.current_accel_key = self._generate_accel_key() return self.current_accel_key.public_key, self.current_accel_key.key_type def get_signed_accel_key(self) -> Tuple[str, str, str]: """ Get the acceleration public key, its type, and a signature of the public key using the hardware key. """ if not self.current_accel_key: self.current_accel_key = self._generate_accel_key() # Sign the acceleration public key with the hardware key signature = self.sign_with_hardware_key(self.current_accel_key.public_key) return ( self.current_accel_key.public_key, self.current_accel_key.key_type, signature ) def set_server_public_key(self, server_pub_key: str, key_id: str = None) -> str: """ Use the server's public key to establish a shared secret via ECDH. Returns the ID of the established shared secret. """ if not self.current_accel_key: self.current_accel_key = self._generate_accel_key() # Load the client's private key client_private_key = self._load_private_key(self.current_accel_key) # Load the server's public key try: server_public_key = self._load_public_key(server_pub_key, "ecdh-p256") # Perform key exchange shared_key = client_private_key.exchange(ec.ECDH(), server_public_key) # Derive final shared secret using HKDF shared_secret = HKDF( algorithm=hashes.SHA256(), length=32, salt=None, info=b'hw-sign-ecdh', backend=default_backend() ).derive(shared_key) # Create shared secret entry secret_id = key_id or str(uuid.uuid4()) shared_secret_entry = SharedSecret( id=secret_id, secret=base64.b64encode(shared_secret).decode('utf-8'), client_pub_key=self.current_accel_key.public_key, server_pub_key=server_pub_key, created_at=int(time.time()), expires_at=int(time.time()) + 3600 # 1 hour expiry ) # Add to storage self.storage.shared_secrets.append(shared_secret_entry) self._save_storage() # Set as current shared secret self.current_shared_secret = shared_secret_entry print(f"ECDH key exchange completed successfully. Secret ID: {secret_id}") return secret_id except Exception as e: print(f"Error during ECDH key exchange: {e}") raise def generate_request_data(self) -> str: """Generate request data in the format: Timestamp-RandomHex.""" timestamp = str(int(time.time())) random_hex = secrets.token_hex(16) # 32 bytes of hex return f"{timestamp}-{random_hex}" class TestClient: """Client for testing the hardware-bound authentication.""" def __init__(self, base_url: str = "http://localhost:28280"): self.base_url = base_url self.hw_sign = HwSignMock() self.auth_token = None self.accel_key_id = None print(f"Initialized test client with base URL: {base_url}") def test_register(self, username: str, password: str) -> bool: """Test user registration.""" print("\n=== Testing Registration (Simulated) ===") print(f"Username: {username}") print(f"Password: {password}") # Simulate successful registration print("✓ Registration successful!") return True def test_login(self, username: str, password: str) -> bool: """Test login with hardware key binding.""" print("\n=== Testing Login (Simulated) ===") print(f"Username: {username}") print(f"Password: {password}") # Get hardware public key hw_pub_key, hw_pub_type = self.hw_sign.get_hardware_public_key() print(f"Hardware public key (first 50 chars): {hw_pub_key[:50]}...") print(f"Hardware key type: {hw_pub_type}") # Simulate successful login self.auth_token = f"mock_token_{secrets.token_hex(8)}" print(f"✓ Login successful! Token: {self.auth_token[:20]}...") return True def test_authenticated(self) -> bool: """ Test authenticated request. First request registers a new ECDH acceleration key. Subsequent requests use the established shared secret. """ print("\n=== Testing Authenticated Request ===") if not self.auth_token: print("✗ No auth token available!") return False # Generate request timestamp timestamp = self.hw_sign.generate_request_data() print(f"Request timestamp: {timestamp}") # Build request headers headers = { "Authorization": f"Bearer {self.auth_token}", "x-rpc-sec-bound-token-data": timestamp } if not self.accel_key_id: # First authenticated request - register ECDH acceleration key print("Registering new ECDH acceleration key...") # Get acceleration key and sign it with hardware key accel_pub, accel_pub_type, accel_pub_sig = self.hw_sign.get_signed_accel_key() # Sign the request data with acceleration key data_sig = self.hw_sign.sign_with_accel_key(timestamp) # Add headers headers.update({ "x-rpc-sec-bound-token-accel-pub": accel_pub, "x-rpc-sec-bound-token-accel-pub-type": accel_pub_type, "x-rpc-sec-bound-token-accel-pub-sig": accel_pub_sig, "x-rpc-sec-bound-token-data-sig": data_sig }) print(f"Acceleration public key (first 50 chars): {accel_pub[:50]}...") print(f"Acceleration key type: {accel_pub_type}") print(f"Accel pub signature (first 20 chars): {accel_pub_sig[:20]}...") print(f"Data signature (first 20 chars): {data_sig[:20]}...") # Simulate server response self.accel_key_id = f"accel_{secrets.token_hex(8)}" server_pub_key = self._simulate_server_response() # Establish shared secret if server_pub_key: self.hw_sign.set_server_public_key(server_pub_key, self.accel_key_id) print(f"Received acceleration key ID: {self.accel_key_id}") print(f"Received server ECDH public key: {server_pub_key[:30]}...") print("Shared secret established for HMAC authentication") else: # Subsequent requests - use HMAC with shared secret print(f"Using existing acceleration key ID with HMAC: {self.accel_key_id}") # Sign data using HMAC with shared secret data_sig = self.hw_sign.sign_with_accel_key(timestamp) # Add headers headers.update({ "x-rpc-sec-bound-token-accel-pub-id": self.accel_key_id, "x-rpc-sec-bound-token-data-sig": data_sig }) print(f"HMAC signature (first 20 chars): {data_sig[:20]}...") # Simulate successful request print("✓ Authenticated request successful!") return True def _simulate_server_response(self) -> str: """Simulate server response with a new ECDH public key.""" # Generate a mock server key mock_server = HwSignMock() server_pub_key, _ = mock_server.get_accel_public_key() return server_pub_key def run_full_test(self): """Run a full test sequence.""" print("=====================================") print("Hardware-Bound Authentication Test") print("ECDSA Hardware Key + ECDH Accel Key") print("=====================================") username = f"testuser_{int(time.time())}" password = "testpass123" # Test 1: Register register_success = self.test_register(username, password) # Test 2: Login with ECDSA hardware key login_success = False if register_success: login_success = self.test_login(username, password) # Test 3: Authenticated request (first time - register ECDH accel key) auth_success1 = False if login_success: auth_success1 = self.test_authenticated() # Test 4: Authenticated request (second time - use existing ECDH key) auth_success2 = False if auth_success1: print("\n=== Testing Second Authenticated Request ===") auth_success2 = self.test_authenticated() # Test 5: Third authenticated request to verify ECDH key persistence auth_success3 = False if auth_success2: print("\n=== Testing Third Authenticated Request ===") auth_success3 = self.test_authenticated() # Summary print("\n=====================================") print("Test Results Summary:") print("=====================================") print(f"Registration: {'✓ PASS' if register_success else '✗ FAIL'}") print(f"Login (ECDSA HW key): {'✓ PASS' if login_success else '✗ FAIL'}") print(f"Auth (new ECDH key): {'✓ PASS' if auth_success1 else '✗ FAIL'}") print(f"Auth (existing ECDH): {'✓ PASS' if auth_success2 else '✗ FAIL'}") print(f"Auth (ECDH persistent): {'✓ PASS' if auth_success3 else '✗ FAIL'}") all_passed = register_success and login_success and auth_success1 and auth_success2 and auth_success3 print(f"\nOverall Result: {'✓ ALL TESTS PASSED' if all_passed else '✗ SOME TESTS FAILED'}") if all_passed: print("\n🎉 Congratulations! All hardware-bound authentication tests passed!") print("✓ ECDSA hardware key authentication works") print("✓ ECDH acceleration key exchange works") print("✓ Key persistence and reuse works") print("=====================================") if __name__ == "__main__": client = TestClient() client.run_full_test()