Files
hw-sign/hw-sign-test/hw_sign_mock.py

543 lines
21 KiB
Python

#!/usr/bin/env python3
import base64
import json
import os
import time
import hmac
import hashlib
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional, Any, Tuple
import secrets
import uuid
# For cryptographic operations
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.backends import default_backend
@dataclass
class KeyConfig:
"""Configuration for key storage."""
file_path: str = "hw_sign_keys.json"
hardware_key_type: str = "ecdsa-p256" # Options: ecdsa-p256, ed25519, rsa-2048
accel_key_type: str = "ecdh-p256" # Options: ecdh-p256, ecdsa-p256, rsa-2048
@dataclass
class KeyPair:
"""Represents a key pair with additional metadata."""
id: str
key_type: str
created_at: int
private_key: Optional[str] = None # Base64 encoded private key
public_key: Optional[str] = None # Base64 encoded public key
@dataclass
class SharedSecret:
"""Represents a shared secret derived from ECDH."""
id: str
secret: str # Base64 encoded shared secret
client_pub_key: str # Client's public key used in derivation
server_pub_key: str # Server's public key used in derivation
created_at: int
expires_at: Optional[int] = None
@dataclass
class KeyStorage:
"""Storage for all keys and shared secrets."""
hardware_keys: List[KeyPair] = field(default_factory=list)
accel_keys: List[KeyPair] = field(default_factory=list)
shared_secrets: List[SharedSecret] = field(default_factory=list)
class HwSignMock:
"""Mock implementation of hardware-bound signing."""
def __init__(self, config: KeyConfig = None):
"""Initialize with optional configuration."""
self.config = config or KeyConfig()
self.storage = self._load_storage()
self.current_hw_key: Optional[KeyPair] = None
self.current_accel_key: Optional[KeyPair] = None
self.current_shared_secret: Optional[SharedSecret] = None
# Initialize keys if none exist
if not self.storage.hardware_keys:
self._generate_hardware_key()
# Set current hardware key
self.current_hw_key = self.storage.hardware_keys[-1]
print(f"Initialized HW Sign Mock with {len(self.storage.hardware_keys)} hardware keys")
def _load_storage(self) -> KeyStorage:
"""Load key storage from file or create new storage."""
try:
if os.path.exists(self.config.file_path):
with open(self.config.file_path, 'r') as f:
data = json.load(f)
storage = KeyStorage(
hardware_keys=[KeyPair(**k) for k in data.get('hardware_keys', [])],
accel_keys=[KeyPair(**k) for k in data.get('accel_keys', [])],
shared_secrets=[SharedSecret(**s) for s in data.get('shared_secrets', [])]
)
print(f"Loaded {len(storage.hardware_keys)} hardware keys from {self.config.file_path}")
return storage
except Exception as e:
print(f"Error loading key storage: {e}")
# Return empty storage if file doesn't exist or has errors
return KeyStorage()
def _save_storage(self):
"""Save key storage to file."""
# Convert to dictionary
data = {
"hardware_keys": [asdict(k) for k in self.storage.hardware_keys],
"accel_keys": [asdict(k) for k in self.storage.accel_keys],
"shared_secrets": [asdict(s) for s in self.storage.shared_secrets]
}
# Save to file
with open(self.config.file_path, 'w') as f:
json.dump(data, f, indent=2)
print(f"Saved key storage to {self.config.file_path}")
def _generate_hardware_key(self) -> KeyPair:
"""Generate a new hardware key pair."""
key_id = str(uuid.uuid4())
created_at = int(time.time())
# Generate EC key pair
if self.config.hardware_key_type == "ecdsa-p256":
private_key = ec.generate_private_key(
ec.SECP256R1(), # P-256 curve
default_backend()
)
# Serialize keys
private_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_bytes = private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
else:
raise ValueError(f"Unsupported hardware key type: {self.config.hardware_key_type}")
# Create key pair
key_pair = KeyPair(
id=key_id,
key_type=self.config.hardware_key_type,
created_at=created_at,
private_key=base64.b64encode(private_bytes).decode('utf-8'),
public_key=base64.b64encode(public_bytes).decode('utf-8')
)
# Add to storage
self.storage.hardware_keys.append(key_pair)
self._save_storage()
print(f"Generated new hardware key of type {self.config.hardware_key_type} with ID {key_id}")
return key_pair
def _generate_accel_key(self) -> KeyPair:
"""Generate a new acceleration key pair."""
key_id = str(uuid.uuid4())
created_at = int(time.time())
# Generate EC key pair for ECDH
if self.config.accel_key_type == "ecdh-p256":
private_key = ec.generate_private_key(
ec.SECP256R1(), # P-256 curve
default_backend()
)
# Serialize keys
private_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_bytes = private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
else:
raise ValueError(f"Unsupported acceleration key type: {self.config.accel_key_type}")
# Create key pair
key_pair = KeyPair(
id=key_id,
key_type=self.config.accel_key_type,
created_at=created_at,
private_key=base64.b64encode(private_bytes).decode('utf-8'),
public_key=base64.b64encode(public_bytes).decode('utf-8')
)
# Add to storage
self.storage.accel_keys.append(key_pair)
self._save_storage()
print(f"Generated new acceleration key of type {self.config.accel_key_type} with ID {key_id}")
return key_pair
def _load_private_key(self, key_pair: KeyPair):
"""Load private key from key pair."""
if not key_pair or not key_pair.private_key:
raise ValueError("Invalid key pair")
private_bytes = base64.b64decode(key_pair.private_key)
if key_pair.key_type == "ecdsa-p256" or key_pair.key_type == "ecdh-p256":
return serialization.load_der_private_key(
private_bytes,
password=None,
backend=default_backend()
)
else:
raise ValueError(f"Unsupported key type: {key_pair.key_type}")
def _load_public_key(self, base64_key: str, key_type: str = "ecdsa-p256"):
"""Load public key from base64 encoded string."""
public_bytes = base64.b64decode(base64_key)
if key_type == "ecdsa-p256" or key_type == "ecdh-p256":
return serialization.load_der_public_key(
public_bytes,
backend=default_backend()
)
else:
raise ValueError(f"Unsupported key type: {key_type}")
def sign_with_hardware_key(self, data: str) -> str:
"""Sign data with hardware key using ECDSA."""
if not self.current_hw_key:
raise ValueError("No hardware key available")
private_key = self._load_private_key(self.current_hw_key)
if self.current_hw_key.key_type == "ecdsa-p256":
# Sign using ECDSA
signature = private_key.sign(
data.encode('utf-8'),
ec.ECDSA(hashes.SHA256())
)
return base64.b64encode(signature).decode('utf-8')
else:
raise ValueError(f"Unsupported key type for signing: {self.current_hw_key.key_type}")
def sign_with_accel_key(self, data: str) -> str:
"""
Sign data with acceleration key.
If a shared secret exists, use HMAC-SHA256, otherwise use ECDSA.
"""
if self.current_shared_secret:
# Use HMAC with shared secret
return self._sign_hmac(data, self.current_shared_secret.secret)
# No shared secret, generate a new acceleration key if needed
if not self.current_accel_key:
self.current_accel_key = self._generate_accel_key()
private_key = self._load_private_key(self.current_accel_key)
if self.current_accel_key.key_type == "ecdh-p256":
# Sign using ECDSA (same operation as P-256 ECDSA)
signature = private_key.sign(
data.encode('utf-8'),
ec.ECDSA(hashes.SHA256())
)
return base64.b64encode(signature).decode('utf-8')
else:
raise ValueError(f"Unsupported key type for signing: {self.current_accel_key.key_type}")
def _sign_hmac(self, data: str, secret_base64: str) -> str:
"""Sign data using HMAC-SHA256 with the shared secret."""
secret_bytes = base64.b64decode(secret_base64)
# Create HMAC
h = hmac.new(secret_bytes, data.encode('utf-8'), hashlib.sha256)
return base64.b64encode(h.digest()).decode('utf-8')
def get_hardware_public_key(self) -> Tuple[str, str]:
"""Get the current hardware public key and its type."""
if not self.current_hw_key:
self.current_hw_key = self._generate_hardware_key()
return self.current_hw_key.public_key, self.current_hw_key.key_type
def get_accel_public_key(self) -> Tuple[str, str]:
"""Get the current acceleration public key and its type."""
if not self.current_accel_key:
self.current_accel_key = self._generate_accel_key()
return self.current_accel_key.public_key, self.current_accel_key.key_type
def get_signed_accel_key(self) -> Tuple[str, str, str]:
"""
Get the acceleration public key, its type, and a signature of the public key
using the hardware key.
"""
if not self.current_accel_key:
self.current_accel_key = self._generate_accel_key()
# Sign the acceleration public key with the hardware key
signature = self.sign_with_hardware_key(self.current_accel_key.public_key)
return (
self.current_accel_key.public_key,
self.current_accel_key.key_type,
signature
)
def set_server_public_key(self, server_pub_key: str, key_id: str = None) -> str:
"""
Use the server's public key to establish a shared secret via ECDH.
Returns the ID of the established shared secret.
"""
if not self.current_accel_key:
self.current_accel_key = self._generate_accel_key()
# Load the client's private key
client_private_key = self._load_private_key(self.current_accel_key)
# Load the server's public key
try:
server_public_key = self._load_public_key(server_pub_key, "ecdh-p256")
# Perform key exchange
shared_key = client_private_key.exchange(ec.ECDH(), server_public_key)
# Derive final shared secret using HKDF
shared_secret = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b'hw-sign-ecdh',
backend=default_backend()
).derive(shared_key)
# Create shared secret entry
secret_id = key_id or str(uuid.uuid4())
shared_secret_entry = SharedSecret(
id=secret_id,
secret=base64.b64encode(shared_secret).decode('utf-8'),
client_pub_key=self.current_accel_key.public_key,
server_pub_key=server_pub_key,
created_at=int(time.time()),
expires_at=int(time.time()) + 3600 # 1 hour expiry
)
# Add to storage
self.storage.shared_secrets.append(shared_secret_entry)
self._save_storage()
# Set as current shared secret
self.current_shared_secret = shared_secret_entry
print(f"ECDH key exchange completed successfully. Secret ID: {secret_id}")
return secret_id
except Exception as e:
print(f"Error during ECDH key exchange: {e}")
raise
def generate_request_data(self) -> str:
"""Generate request data in the format: Timestamp-RandomHex."""
timestamp = str(int(time.time()))
random_hex = secrets.token_hex(16) # 32 bytes of hex
return f"{timestamp}-{random_hex}"
class TestClient:
"""Client for testing the hardware-bound authentication."""
def __init__(self, base_url: str = "http://localhost:28280"):
self.base_url = base_url
self.hw_sign = HwSignMock()
self.auth_token = None
self.accel_key_id = None
print(f"Initialized test client with base URL: {base_url}")
def test_register(self, username: str, password: str) -> bool:
"""Test user registration."""
print("\n=== Testing Registration (Simulated) ===")
print(f"Username: {username}")
print(f"Password: {password}")
# Simulate successful registration
print("✓ Registration successful!")
return True
def test_login(self, username: str, password: str) -> bool:
"""Test login with hardware key binding."""
print("\n=== Testing Login (Simulated) ===")
print(f"Username: {username}")
print(f"Password: {password}")
# Get hardware public key
hw_pub_key, hw_pub_type = self.hw_sign.get_hardware_public_key()
print(f"Hardware public key (first 50 chars): {hw_pub_key[:50]}...")
print(f"Hardware key type: {hw_pub_type}")
# Simulate successful login
self.auth_token = f"mock_token_{secrets.token_hex(8)}"
print(f"✓ Login successful! Token: {self.auth_token[:20]}...")
return True
def test_authenticated(self) -> bool:
"""
Test authenticated request.
First request registers a new ECDH acceleration key.
Subsequent requests use the established shared secret.
"""
print("\n=== Testing Authenticated Request ===")
if not self.auth_token:
print("✗ No auth token available!")
return False
# Generate request timestamp
timestamp = self.hw_sign.generate_request_data()
print(f"Request timestamp: {timestamp}")
# Build request headers
headers = {
"Authorization": f"Bearer {self.auth_token}",
"x-rpc-sec-bound-token-data": timestamp
}
if not self.accel_key_id:
# First authenticated request - register ECDH acceleration key
print("Registering new ECDH acceleration key...")
# Get acceleration key and sign it with hardware key
accel_pub, accel_pub_type, accel_pub_sig = self.hw_sign.get_signed_accel_key()
# Sign the request data with acceleration key
data_sig = self.hw_sign.sign_with_accel_key(timestamp)
# Add headers
headers.update({
"x-rpc-sec-bound-token-accel-pub": accel_pub,
"x-rpc-sec-bound-token-accel-pub-type": accel_pub_type,
"x-rpc-sec-bound-token-accel-pub-sig": accel_pub_sig,
"x-rpc-sec-bound-token-data-sig": data_sig
})
print(f"Acceleration public key (first 50 chars): {accel_pub[:50]}...")
print(f"Acceleration key type: {accel_pub_type}")
print(f"Accel pub signature (first 20 chars): {accel_pub_sig[:20]}...")
print(f"Data signature (first 20 chars): {data_sig[:20]}...")
# Simulate server response
self.accel_key_id = f"accel_{secrets.token_hex(8)}"
server_pub_key = self._simulate_server_response()
# Establish shared secret
if server_pub_key:
self.hw_sign.set_server_public_key(server_pub_key, self.accel_key_id)
print(f"Received acceleration key ID: {self.accel_key_id}")
print(f"Received server ECDH public key: {server_pub_key[:30]}...")
print("Shared secret established for HMAC authentication")
else:
# Subsequent requests - use HMAC with shared secret
print(f"Using existing acceleration key ID with HMAC: {self.accel_key_id}")
# Sign data using HMAC with shared secret
data_sig = self.hw_sign.sign_with_accel_key(timestamp)
# Add headers
headers.update({
"x-rpc-sec-bound-token-accel-pub-id": self.accel_key_id,
"x-rpc-sec-bound-token-data-sig": data_sig
})
print(f"HMAC signature (first 20 chars): {data_sig[:20]}...")
# Simulate successful request
print("✓ Authenticated request successful!")
return True
def _simulate_server_response(self) -> str:
"""Simulate server response with a new ECDH public key."""
# Generate a mock server key
mock_server = HwSignMock()
server_pub_key, _ = mock_server.get_accel_public_key()
return server_pub_key
def run_full_test(self):
"""Run a full test sequence."""
print("=====================================")
print("Hardware-Bound Authentication Test")
print("ECDSA Hardware Key + ECDH Accel Key")
print("=====================================")
username = f"testuser_{int(time.time())}"
password = "testpass123"
# Test 1: Register
register_success = self.test_register(username, password)
# Test 2: Login with ECDSA hardware key
login_success = False
if register_success:
login_success = self.test_login(username, password)
# Test 3: Authenticated request (first time - register ECDH accel key)
auth_success1 = False
if login_success:
auth_success1 = self.test_authenticated()
# Test 4: Authenticated request (second time - use existing ECDH key)
auth_success2 = False
if auth_success1:
print("\n=== Testing Second Authenticated Request ===")
auth_success2 = self.test_authenticated()
# Test 5: Third authenticated request to verify ECDH key persistence
auth_success3 = False
if auth_success2:
print("\n=== Testing Third Authenticated Request ===")
auth_success3 = self.test_authenticated()
# Summary
print("\n=====================================")
print("Test Results Summary:")
print("=====================================")
print(f"Registration: {'✓ PASS' if register_success else '✗ FAIL'}")
print(f"Login (ECDSA HW key): {'✓ PASS' if login_success else '✗ FAIL'}")
print(f"Auth (new ECDH key): {'✓ PASS' if auth_success1 else '✗ FAIL'}")
print(f"Auth (existing ECDH): {'✓ PASS' if auth_success2 else '✗ FAIL'}")
print(f"Auth (ECDH persistent): {'✓ PASS' if auth_success3 else '✗ FAIL'}")
all_passed = register_success and login_success and auth_success1 and auth_success2 and auth_success3
print(f"\nOverall Result: {'✓ ALL TESTS PASSED' if all_passed else '✗ SOME TESTS FAILED'}")
if all_passed:
print("\n🎉 Congratulations! All hardware-bound authentication tests passed!")
print("✓ ECDSA hardware key authentication works")
print("✓ ECDH acceleration key exchange works")
print("✓ Key persistence and reuse works")
print("=====================================")
if __name__ == "__main__":
client = TestClient()
client.run_full_test()