|
|
import gradio as gr |
|
|
import asyncio |
|
|
import os |
|
|
import tempfile |
|
|
import sys |
|
|
from mcp.client.sse import sse_client |
|
|
from mcp.client.session import ClientSession |
|
|
from src.temporal_router import TemporalRouter |
|
|
from src.voice_agent import VoiceAgent |
|
|
|
|
|
|
|
|
|
|
|
router = TemporalRouter() |
|
|
voice_agent = VoiceAgent() |
|
|
|
|
|
async def query_remote_mcp(query: str, audio_file: str = None) -> tuple[str, dict]: |
|
|
"""Query the remote Modal MCP server""" |
|
|
modal_url = "https://woransa--temporal-mcp-router-server-fastapi-app.modal.run/sse" |
|
|
|
|
|
print(f"Connecting to remote MCP: {modal_url}") |
|
|
try: |
|
|
async with sse_client(modal_url) as streams: |
|
|
async with ClientSession(streams[0], streams[1]) as session: |
|
|
await session.initialize() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
result = await session.call_tool( |
|
|
"query_versioned_docs", |
|
|
arguments={"query": query} |
|
|
) |
|
|
|
|
|
|
|
|
content = result.content[0].text |
|
|
|
|
|
|
|
|
|
|
|
return content, {} |
|
|
except Exception as e: |
|
|
return f"Error connecting to remote server: {str(e)}", {} |
|
|
|
|
|
async def process_query(audio_file, text_query, year, month, version, provider_choice, mcp_server_choice): |
|
|
""" |
|
|
Process either voice or text query with temporal filters |
|
|
""" |
|
|
|
|
|
if mcp_server_choice == "Remote (Modal)": |
|
|
query = text_query |
|
|
if audio_file: |
|
|
|
|
|
if voice_agent: |
|
|
result = await voice_agent.process_voice_query(audio_file, router) |
|
|
query = result['transcript'] |
|
|
else: |
|
|
return "Voice input requires local Whisper.", "", None, "", {} |
|
|
|
|
|
if not query: |
|
|
return "", "", None, "", {} |
|
|
|
|
|
answer, _ = await query_remote_mcp(query) |
|
|
|
|
|
|
|
|
model_info = "Generated by Remote MCP (Modal)" |
|
|
metadata = {"source": "Remote MCP", "query": query} |
|
|
|
|
|
|
|
|
audio_response_bytes = await voice_agent.text_to_speech(answer) |
|
|
audio_path = None |
|
|
if audio_response_bytes: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: |
|
|
fp.write(audio_response_bytes) |
|
|
audio_path = fp.name |
|
|
|
|
|
return answer, model_info, audio_path, "Audio generated by ElevenLabs", metadata |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
provider = "openai" if "OpenAI" in provider_choice else "gemini" |
|
|
|
|
|
|
|
|
if audio_file: |
|
|
print(f"Processing voice query from {audio_file}") |
|
|
|
|
|
result = await voice_agent.process_voice_query(audio_file, router) |
|
|
query = result['transcript'] |
|
|
|
|
|
|
|
|
doc_results = await router.query(query, provider=provider) |
|
|
answer_data = await router.generate_answer(query, doc_results, provider=provider) |
|
|
answer = answer_data["content"] |
|
|
model_info = f"Generated by {answer_data['provider']}, model: {answer_data['model']}" |
|
|
|
|
|
audio_response_bytes = await voice_agent.text_to_speech(answer) |
|
|
else: |
|
|
print(f"Processing text query: {text_query}") |
|
|
|
|
|
query = text_query |
|
|
|
|
|
|
|
|
|
|
|
y = int(year) if year else None |
|
|
m = int(month) if month else None |
|
|
v = str(version) if version else None |
|
|
|
|
|
doc_results = await router.query( |
|
|
query=query, |
|
|
year=y, |
|
|
month=m, |
|
|
version=v, |
|
|
provider=provider |
|
|
) |
|
|
|
|
|
|
|
|
answer_data = await router.generate_answer(query, doc_results, provider=provider) |
|
|
answer = answer_data["content"] |
|
|
model_info = f"Generated by {answer_data['provider']}, model: {answer_data['model']}" |
|
|
|
|
|
|
|
|
audio_response_bytes = await voice_agent.text_to_speech(answer) |
|
|
|
|
|
|
|
|
audio_path = None |
|
|
if audio_response_bytes: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp: |
|
|
fp.write(audio_response_bytes) |
|
|
audio_path = fp.name |
|
|
|
|
|
|
|
|
metadata = { |
|
|
"query": query, |
|
|
"temporal_filters": { |
|
|
"year": year, |
|
|
"month": month, |
|
|
"version": version |
|
|
}, |
|
|
"intent_analysis": doc_results.get('temporal_context', {}).get('intent_analysis', {}), |
|
|
"documents_found": len(doc_results.get('results', [])), |
|
|
"citations": doc_results.get('metadata', []) |
|
|
} |
|
|
|
|
|
return answer, model_info, audio_path, "Audio generated by ElevenLabs", metadata |
|
|
|
|
|
|
|
|
|
|
|
custom_css = """ |
|
|
/* Target Gradio 5/6 Tab Buttons */ |
|
|
.gradio-container button[role="tab"] { |
|
|
font-size: 1.3em !important; |
|
|
font-weight: 900 !important; |
|
|
background-color: #e0f2fe !important; |
|
|
color: #0369a1 !important; |
|
|
padding: 12px 24px !important; |
|
|
border-radius: 8px 8px 0 0 !important; |
|
|
} |
|
|
|
|
|
.gradio-container button[role="tab"][aria-selected="true"] { |
|
|
background-color: var(--button-primary-background-fill) !important; |
|
|
color: white !important; |
|
|
border: none !important; |
|
|
} |
|
|
""" |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Temporal MCP Router AI Agent") as demo: |
|
|
|
|
|
gr.Markdown(""" |
|
|
# π Temporal MCP Router β Time-Aware Document Intelligence |
|
|
|
|
|
Ask any question by **voice or text**, and this agent will retrieve the **correct document version** for the **correct year**, every time. |
|
|
|
|
|
### β οΈ The Problem |
|
|
LLMs mix information from different document versions, leading to **unsafe**, **contradictory**, or **outdated** answers. |
|
|
|
|
|
### π‘ The Solution |
|
|
This agent applies **temporal filtering BEFORE retrieval**, ensuring: |
|
|
- No mixed-version hallucinations |
|
|
- Deterministic, time-accurate answers |
|
|
- Reliable outputs for **time-sensitive knowledge bases** (API docs, compliance, medical/legal rules) |
|
|
|
|
|
### π§° Powered By |
|
|
- **LLMs**: OpenAI GPT-4o, Google Gemini 2.0 Flash |
|
|
- **Voice**: Whisper (ASR), ElevenLabs (TTS) |
|
|
- **Vector DB**: ChromaDB |
|
|
- **Frameworks**: Gradio, MCP, LlamaIndex |
|
|
- **Deployment**: Hugging Face Spaces & Modal |
|
|
""") |
|
|
|
|
|
|
|
|
with gr.Tab("π Knowledge Base"): |
|
|
gr.Markdown(""" |
|
|
### π₯ Ingest Your Documents |
|
|
**What is this?** This is where you feed the AI its "brain". Upload your versioned documentation here. |
|
|
**How to use:** Drag & drop PDF, Text, or JSON files. The system will automatically extract temporal metadata (Year, Version). |
|
|
**Expectation:** Once processed, these documents become available for the AI to search and reference in its answers. |
|
|
""") |
|
|
|
|
|
file_upload = gr.File( |
|
|
label="Upload PDFs/Text/JSON", |
|
|
file_count="multiple", |
|
|
file_types=[".pdf", ".txt", ".json"] |
|
|
) |
|
|
|
|
|
upload_btn = gr.Button("Process & Ingest", variant="primary") |
|
|
upload_status = gr.JSON(label="Ingestion Status") |
|
|
|
|
|
async def handle_upload(files): |
|
|
if not files: |
|
|
return {"error": "No files uploaded"} |
|
|
|
|
|
results = [] |
|
|
from src.ingest_pdfs import ingest_pdf |
|
|
|
|
|
|
|
|
semaphore = asyncio.Semaphore(3) |
|
|
|
|
|
async def process_file(file): |
|
|
async with semaphore: |
|
|
try: |
|
|
|
|
|
file_path = file.name if hasattr(file, 'name') else file |
|
|
|
|
|
data = await ingest_pdf(file_path) |
|
|
if data: |
|
|
return { |
|
|
"file": os.path.basename(file_path), |
|
|
"status": "success", |
|
|
"extracted_metadata": data["temporal_metadata"] |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"file": os.path.basename(file_path), |
|
|
"status": "failed", |
|
|
"reason": "No content extracted" |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"file": os.path.basename(file.name if hasattr(file, 'name') else str(file)), |
|
|
"status": "error", |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
tasks = [process_file(f) for f in files] |
|
|
results = await asyncio.gather(*tasks) |
|
|
results = list(results) |
|
|
|
|
|
|
|
|
try: |
|
|
router.reload_data() |
|
|
results.append({"info": "Router knowledge base reloaded successfully"}) |
|
|
except Exception as e: |
|
|
results.append({"warning": f"Failed to reload router: {e}"}) |
|
|
|
|
|
return results |
|
|
|
|
|
upload_btn.click( |
|
|
fn=handle_upload, |
|
|
inputs=[file_upload], |
|
|
outputs=[upload_status] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("π€ Temporal MCP AI Agent"): |
|
|
gr.Markdown(""" |
|
|
### π§ Chat with Your Data |
|
|
**What is this?** The main interface to ask questions. It uses the "Temporal Router" to find the *right* document version. |
|
|
**How to use:** Select your AI model (OpenAI/Gemini) and Server (Local/Remote). Then speak or type your question. |
|
|
**Expectation:** You'll get a precise answer citing the specific year/version used, plus a voice response. |
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Configuration") |
|
|
with gr.Row(): |
|
|
model_selector = gr.Radio( |
|
|
choices=["OpenAI", "Gemini"], |
|
|
value="OpenAI", |
|
|
label="Select AI Model", |
|
|
info="Choose the LLM provider for generating answers" |
|
|
) |
|
|
mcp_server_selector = gr.Radio( |
|
|
choices=["Local (Default)", "Remote (Modal)"], |
|
|
value="Local (Default)", |
|
|
label="Select MCP Server", |
|
|
info="π To query your uploaded documents, please use the Local MCP Server option only.\n\nChoose where to run the MCP server logic" |
|
|
) |
|
|
|
|
|
gr.Markdown("### Input") |
|
|
|
|
|
|
|
|
|
|
|
audio_input = gr.Audio( |
|
|
type="filepath", |
|
|
label="π€ Voice Query (speak your question)", |
|
|
sources=["microphone"] |
|
|
) |
|
|
|
|
|
gr.Markdown("**OR**") |
|
|
|
|
|
|
|
|
text_input = gr.Textbox( |
|
|
label="π¬ Text Query", |
|
|
placeholder="e.g., How do I authenticate with XYZ API in version 3.0?" |
|
|
) |
|
|
|
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
"What were the breach notification requirements in 2008?", |
|
|
"Show me the 2013 HITECH Act rules", |
|
|
"What changed in HITECH Act rules version 3.0?", |
|
|
"What are the current breach notification deadlines?" |
|
|
], |
|
|
inputs=text_input, |
|
|
label="π Try these examples (Click on the example then click Query button to get the response):" |
|
|
) |
|
|
|
|
|
gr.Markdown("### Temporal Filters (Optional)") |
|
|
|
|
|
with gr.Row(): |
|
|
year_input = gr.Number( |
|
|
label="Year", |
|
|
precision=0 |
|
|
) |
|
|
month_input = gr.Number( |
|
|
label="Month", |
|
|
precision=0 |
|
|
) |
|
|
|
|
|
version_input = gr.Textbox( |
|
|
label="Version", |
|
|
placeholder="e.g., 3.0 or v3" |
|
|
) |
|
|
|
|
|
submit_btn = gr.Button("π Query", variant="primary") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Response") |
|
|
|
|
|
|
|
|
answer_output = gr.Textbox( |
|
|
label="π Answer", |
|
|
lines=10 |
|
|
) |
|
|
model_label = gr.Markdown("") |
|
|
|
|
|
|
|
|
audio_output = gr.Audio( |
|
|
label="π Voice Response" |
|
|
) |
|
|
voice_label = gr.Markdown("") |
|
|
|
|
|
|
|
|
metadata_output = gr.JSON( |
|
|
label="π Temporal Context & Sources" |
|
|
) |
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
fn=process_query, |
|
|
inputs=[audio_input, text_input, year_input, month_input, version_input, model_selector, mcp_server_selector], |
|
|
outputs=[answer_output, model_label, audio_output, voice_label, metadata_output] |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("π Temporal MCP Server"): |
|
|
gr.Markdown(""" |
|
|
### π Connect External Tools |
|
|
**What is this?** Instructions for connecting this AI logic to external tools like **Claude Desktop**. |
|
|
**How to use:** Copy the configuration JSON provided below into your Claude Desktop config file. |
|
|
**Expectation:** You will be able to use the "Temporal Router" tools directly inside your Claude Desktop chat interface. |
|
|
""") |
|
|
|
|
|
gr.Markdown("### π MCP Server Status & Connection") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown(""" |
|
|
### Status: β
Logic Ready |
|
|
The core logic is available. To expose it as an MCP server for Claude Desktop or other clients, run the standalone server script. |
|
|
""") |
|
|
|
|
|
gr.Info("The MCP Server runs as a separate process to handle standard input/output communication.") |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("### π οΈ How to Run") |
|
|
gr.Code( |
|
|
value="python src/mcp_server.py", |
|
|
language="shell", |
|
|
label="Command to Run Server" |
|
|
) |
|
|
|
|
|
gr.Markdown("### βοΈ Claude Desktop Configuration") |
|
|
gr.Markdown("Add this to your `claude_desktop_config.json`:") |
|
|
|
|
|
|
|
|
abs_path = os.path.abspath("src/mcp_server.py") |
|
|
python_path = sys.executable |
|
|
|
|
|
config_json = f"""{{ |
|
|
"mcpServers": {{ |
|
|
"temporal-router": {{ |
|
|
"command": "{python_path}", |
|
|
"args": [ |
|
|
"{abs_path}" |
|
|
] |
|
|
}} |
|
|
}} |
|
|
}}""" |
|
|
|
|
|
gr.Code( |
|
|
value=config_json, |
|
|
language="json", |
|
|
label="claude_desktop_config.json (Local)" |
|
|
) |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("### βοΈ Cloud Deployment (Modal)") |
|
|
gr.Markdown("You can also deploy the MCP server to the cloud using Modal.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("#### 1. Deploy") |
|
|
gr.Code( |
|
|
value="modal deploy src/mcp_server_modal.py", |
|
|
language="shell", |
|
|
label="Deploy Command" |
|
|
) |
|
|
|
|
|
with gr.Column(): |
|
|
gr.Markdown("#### 2. Connect (SSE)") |
|
|
modal_config = """{ |
|
|
"mcpServers": { |
|
|
"temporal-router-cloud": { |
|
|
"url": "https://<your-modal-username>--temporal-mcp-router-server-fastapi-app.modal.run/sse" |
|
|
} |
|
|
} |
|
|
}""" |
|
|
gr.Code( |
|
|
value=modal_config, |
|
|
language="json", |
|
|
label="claude_desktop_config.json (Remote)" |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(theme=gr.themes.Soft(), css=custom_css) |
|
|
|