""" ClarityOps Audit Logging - PHIPA/SOC 2 Compliant This module provides encrypted, PHI-free audit logging for compliance with: - PHIPA (Ontario) - PIPEDA (Federal) - SOC 2 Type II Key Principles: 1. Audit logs contain ONLY metadata (timestamps, session IDs, operation types, counts) 2. NEVER store actual data values, query text, or any PHI 3. All logs encrypted at rest with AES-256 4. Logs are append-only and tamper-evident (SHA-256 chain) """ import os import json import time import hashlib import base64 from typing import Any from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # ---------------------- Configuration ---------------------- LOG_DIR = os.getenv("CLARITYOPS_LOG_DIR", "./logs") LOG_FILE = os.path.join(LOG_DIR, "audit.log.enc") ENCRYPTION_KEY_ENV = "CLARITYOPS_AUDIT_KEY" # Fields that are ALLOWED in audit logs (whitelist approach) ALLOWED_FIELDS = frozenset({ # Identifiers (no PHI) "session_id", "user_id", "request_id", # Timestamps "timestamp", "ts", "duration_ms", "start_time", "end_time", # Operation metadata "kind", "event_type", "operation", "success", "status", # Counts and sizes (no actual values) "row_count", "rows", "column_count", "columns", "file_count", "output_size_bytes", "report_length", "code_lines", "key_count", "total_suppressions", # Hashes (not reversible) "prompt_hash", "code_hash", "content_hash", "prompt_length", "code_length", # Schema metadata (structure, not values) "column_names", "dtypes", "output_keys", "operations_detected", # File names (no paths that could leak info) "filename", "filenames", # Error info (sanitized) "error_type", "error_message", # Suppression metadata "min_cell_size_threshold", "offset_days", "dataframes_processed", # Warnings (structural only) "warnings", }) # Fields that are BLOCKED (PHI indicators) BLOCKED_FIELDS = frozenset({ "raw", "content", "data", "values", "text", "query", "prompt", # actual prompt text "response", "result", "output", "payload_raw", "ssn", "mrn", "phone", "email", "address", "name", "dob", "patient", }) # ---------------------- Encryption ---------------------- def _get_encryption_key() -> bytes: """ Derives encryption key from environment variable or generates one. In production, CLARITYOPS_AUDIT_KEY should be set to a secure 32+ char secret. """ key_material = os.getenv(ENCRYPTION_KEY_ENV) if not key_material: # Development fallback - generate deterministic key from machine ID # In production, this should NEVER be used import warnings warnings.warn( f"{ENCRYPTION_KEY_ENV} not set. Using development key. " "Set this environment variable in production!", RuntimeWarning ) key_material = "DEVELOPMENT_KEY_DO_NOT_USE_IN_PRODUCTION" # Derive a proper Fernet key using PBKDF2 salt = b"clarityops_audit_v1" # Fixed salt is OK since key material should be unique kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100_000, ) key = base64.urlsafe_b64encode(kdf.derive(key_material.encode())) return key def _get_fernet() -> Fernet: """Returns Fernet instance for encryption/decryption.""" return Fernet(_get_encryption_key()) # ---------------------- Integrity Chain ---------------------- _last_hash: str | None = None def _compute_chain_hash(record: dict, previous_hash: str | None) -> str: """ Computes SHA-256 hash linking this record to the previous one. Creates a tamper-evident chain. """ chain_input = json.dumps(record, sort_keys=True, default=str) + (previous_hash or "GENESIS") return hashlib.sha256(chain_input.encode()).hexdigest() # ---------------------- PHI Filtering ---------------------- def _sanitize_payload(payload: dict | None, path: str = "") -> dict: """ Recursively filters payload to remove any potential PHI. Uses whitelist approach - only allowed fields pass through. """ if payload is None: return {} if not isinstance(payload, dict): return {} sanitized = {} for key, value in payload.items(): key_lower = key.lower() # Block known PHI fields if key_lower in BLOCKED_FIELDS: continue # Handle nested dicts if isinstance(value, dict): nested = _sanitize_payload(value, f"{path}.{key}") if nested: # Only include non-empty sanitized[key] = nested continue # Handle lists if isinstance(value, list): # Allow lists of simple allowed values (column names, etc.) if all(isinstance(v, (str, int, float, bool, type(None))) for v in value): # Check if it's a list of potentially sensitive strings if key_lower in {"column_names", "filenames", "output_keys", "operations_detected", "warnings"}: sanitized[key] = value elif all(isinstance(v, dict) for v in value): # Recursively sanitize list of dicts sanitized_list = [_sanitize_payload(v, f"{path}.{key}[]") for v in value] sanitized[key] = [v for v in sanitized_list if v] continue # Allow whitelisted fields with simple values if key_lower in ALLOWED_FIELDS or key in ALLOWED_FIELDS: if isinstance(value, (str, int, float, bool, type(None))): sanitized[key] = value return sanitized def _truncate_error(error_msg: str | None, max_length: int = 200) -> str | None: """Truncates error messages and removes potential PHI patterns.""" if not error_msg: return None # Remove potential file paths import re cleaned = re.sub(r'/[^\s]+', '[PATH]', str(error_msg)) # Remove potential data values in quotes cleaned = re.sub(r'"[^"]{20,}"', '"[REDACTED]"', cleaned) cleaned = re.sub(r"'[^']{20,}'", "'[REDACTED]'", cleaned) if len(cleaned) > max_length: return cleaned[:max_length] + "..." return cleaned # ---------------------- Public API ---------------------- def log_event(kind: str, user_id: str | None, payload: dict | None) -> None: """ Logs an audit event with encryption and PHI filtering. Args: kind: Event type (e.g., "analysis_session_start", "code_execution") user_id: User identifier (None if no auth) payload: Event metadata (will be filtered to remove PHI) """ global _last_hash # Sanitize payload safe_payload = _sanitize_payload(payload) # Truncate any error messages if "error_message" in safe_payload: safe_payload["error_message"] = _truncate_error(safe_payload.get("error_message")) if "error" in safe_payload: safe_payload["error"] = _truncate_error(safe_payload.get("error")) # Build record record = { "ts": time.time(), "iso_time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "kind": kind, "user": user_id, "payload": safe_payload, } # Add integrity chain record["prev_hash"] = _last_hash record["hash"] = _compute_chain_hash(record, _last_hash) _last_hash = record["hash"] # Serialize record_json = json.dumps(record, ensure_ascii=False, default=str) # Encrypt fernet = _get_fernet() encrypted = fernet.encrypt(record_json.encode()) # Write to file os.makedirs(LOG_DIR, exist_ok=True) with open(LOG_FILE, "ab") as f: f.write(encrypted + b"\n") def hash_summary(label: str, text: str) -> dict: """ Creates a safe summary of text content using only length and hash. No actual content is stored. """ return { f"{label}_len": len(text or ""), f"{label}_hash": hashlib.sha256((text or "").encode()).hexdigest()[:16] } # ---------------------- Log Reading (for compliance review) ---------------------- def read_audit_log(limit: int = 100) -> list[dict]: """ Reads and decrypts audit log entries. For compliance review and auditing purposes only. Args: limit: Maximum number of recent entries to return Returns: List of decrypted audit records """ if not os.path.exists(LOG_FILE): return [] fernet = _get_fernet() records = [] with open(LOG_FILE, "rb") as f: lines = f.readlines() # Read most recent entries for line in lines[-limit:]: line = line.strip() if not line: continue try: decrypted = fernet.decrypt(line) record = json.loads(decrypted) records.append(record) except Exception: # Skip corrupted entries continue return records def verify_log_integrity() -> tuple[bool, str]: """ Verifies the integrity chain of the audit log. Returns: Tuple of (is_valid, message) """ records = read_audit_log(limit=10000) if not records: return True, "No records to verify" prev_hash = None for i, record in enumerate(records): expected_hash = record.get("hash") stored_prev = record.get("prev_hash") # Check chain linkage if stored_prev != prev_hash: return False, f"Chain break at record {i}: expected prev_hash {prev_hash}, got {stored_prev}" # Recompute hash record_copy = {k: v for k, v in record.items() if k not in ("hash",)} computed = _compute_chain_hash(record_copy, prev_hash) if computed != expected_hash: return False, f"Hash mismatch at record {i}: computed {computed}, stored {expected_hash}" prev_hash = expected_hash return True, f"All {len(records)} records verified" def export_audit_log_for_compliance(output_path: str) -> int: """ Exports decrypted audit log to JSON for compliance review. Args: output_path: Path to write the JSON export Returns: Number of records exported """ records = read_audit_log(limit=100000) with open(output_path, "w", encoding="utf-8") as f: json.dump(records, f, indent=2, default=str) return len(records)