Spaces:
Sleeping
Sleeping
File size: 3,242 Bytes
ec89b57 f32fc57 ec89b57 f32fc57 88a67b0 f32fc57 88a67b0 f32fc57 88a67b0 f32fc57 88a67b0 f32fc57 ec89b57 88a67b0 ec89b57 f32fc57 ec89b57 88a67b0 ec89b57 f32fc57 ec89b57 88a67b0 ec89b57 f32fc57 ec89b57 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from datetime import datetime
import uuid
from codex_logic import (
list_capabilities,
invoke_capability,
wake_module
)
app = FastAPI(
title="CodexHF API Gateway",
version="1.0.0",
description="Sovereign orchestration gateway for Codex Spaces",
)
# -------------------------
# Human-facing landing page
# -------------------------
@app.get("/", response_class=HTMLResponse)
def root():
return """
<!DOCTYPE html>
<html>
<head>
<title>CodexHF Sovereign API Gateway</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="
background-color:#000;
color:#00ff00;
font-family:monospace;
display:flex;
align-items:center;
justify-content:center;
height:100vh;
margin:0;
text-align:center;
">
<div>
<h1>🧠 CodexHF Sovereign API Gateway</h1>
<p>✅ Status: Running</p>
<p>Use <code>/invoke</code> via POST</p>
<p>
<a href="/docs" style="color:#00ff00;">View API Documentation</a>
</p>
</div>
</body>
</html>
"""
# -------------------------
# Machine-readable status
# -------------------------
@app.get("/status")
def status():
return {
"service": "CodexHF API Gateway",
"status": "running",
"timestamp": datetime.utcnow().isoformat(),
"endpoints": {
"health": "/health",
"status": "/status",
"capabilities": "/capabilities",
"invoke": "/invoke",
"wake": "/wake",
"docs": "/docs"
}
}
# -------------------------
# Health check
# -------------------------
@app.get("/health")
def health():
return {
"status": "ok",
"timestamp": datetime.utcnow().isoformat()
}
# -------------------------
# Capability registry
# -------------------------
@app.get("/capabilities")
def capabilities():
return {
"capabilities": list_capabilities()
}
# -------------------------
# Invocation endpoint
# -------------------------
@app.post("/invoke")
async def invoke(request: dict):
trace_id = request.get("trace_id", str(uuid.uuid4()))
capability = request.get("capability")
payload = request.get("payload", {})
mode = request.get("execution_mode", "sync")
if not capability:
raise HTTPException(status_code=400, detail="Missing capability")
if mode == "async":
return {
"trace_id": trace_id,
"status": "accepted"
}
result, duration = await invoke_capability(capability, payload)
return {
"trace_id": trace_id,
"result": result,
"duration_ms": duration
}
# -------------------------
# Wake a downstream Space
# -------------------------
@app.post("/wake")
async def wake(request: dict):
module = request.get("module")
if not module:
raise HTTPException(status_code=400, detail="Missing module")
await wake_module(module)
return {
"status": "awakened",
"module": module
} |