woransa's picture
Add warning when user try to query your uploaded documents
dae8148
import gradio as gr
import asyncio
import os
import tempfile
import sys
from mcp.client.sse import sse_client
from mcp.client.session import ClientSession
from src.temporal_router import TemporalRouter
from src.voice_agent import VoiceAgent
# Initialize components
# Note: In a real deployment, we might want to lazy load or handle this globally
router = TemporalRouter()
voice_agent = VoiceAgent()
async def query_remote_mcp(query: str, audio_file: str = None) -> tuple[str, dict]:
"""Query the remote Modal MCP server"""
modal_url = "https://woransa--temporal-mcp-router-server-fastapi-app.modal.run/sse"
print(f"Connecting to remote MCP: {modal_url}")
try:
async with sse_client(modal_url) as streams:
async with ClientSession(streams[0], streams[1]) as session:
await session.initialize()
# For now, we only support text queries on remote
# TODO: Upload audio file for voice queries
result = await session.call_tool(
"query_versioned_docs",
arguments={"query": query}
)
# Extract text content
content = result.content[0].text
# We don't get full metadata back in the same structure yet,
# but we can parse it if needed. For now, return empty metadata.
return content, {}
except Exception as e:
return f"Error connecting to remote server: {str(e)}", {}
async def process_query(audio_file, text_query, year, month, version, provider_choice, mcp_server_choice):
"""
Process either voice or text query with temporal filters
"""
# Handle Remote MCP
if mcp_server_choice == "Remote (Modal)":
query = text_query
if audio_file:
# Transcribe locally first
if voice_agent:
result = await voice_agent.process_voice_query(audio_file, router)
query = result['transcript']
else:
return "Voice input requires local Whisper.", "", None, "", {}
if not query:
return "", "", None, "", {}
answer, _ = await query_remote_mcp(query)
# Format output for remote
model_info = "Generated by Remote MCP (Modal)"
metadata = {"source": "Remote MCP", "query": query}
# We can optionally generate TTS for the remote answer too
audio_response_bytes = await voice_agent.text_to_speech(answer)
audio_path = None
if audio_response_bytes:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp:
fp.write(audio_response_bytes)
audio_path = fp.name
return answer, model_info, audio_path, "Audio generated by ElevenLabs", metadata
# --- Local Logic (Existing) ---
# Map UI choice to internal provider name
provider = "openai" if "OpenAI" in provider_choice else "gemini"
# Determine input type
if audio_file:
print(f"Processing voice query from {audio_file}")
# Process voice input
result = await voice_agent.process_voice_query(audio_file, router)
query = result['transcript']
# Re-doing the query part to ensure provider usage
doc_results = await router.query(query, provider=provider)
answer_data = await router.generate_answer(query, doc_results, provider=provider)
answer = answer_data["content"]
model_info = f"Generated by {answer_data['provider']}, model: {answer_data['model']}"
audio_response_bytes = await voice_agent.text_to_speech(answer)
else:
print(f"Processing text query: {text_query}")
# Process text input
query = text_query
# Query with temporal filters
# Convert inputs to correct types if needed
y = int(year) if year else None
m = int(month) if month else None
v = str(version) if version else None
doc_results = await router.query(
query=query,
year=y,
month=m,
version=v,
provider=provider
)
# Generate answer
answer_data = await router.generate_answer(query, doc_results, provider=provider)
answer = answer_data["content"]
model_info = f"Generated by {answer_data['provider']}, model: {answer_data['model']}"
# Generate voice response
audio_response_bytes = await voice_agent.text_to_speech(answer)
# Save audio bytes to a temporary file for Gradio
audio_path = None
if audio_response_bytes:
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as fp:
fp.write(audio_response_bytes)
audio_path = fp.name
# Format metadata for display
metadata = {
"query": query,
"temporal_filters": {
"year": year,
"month": month,
"version": version
},
"intent_analysis": doc_results.get('temporal_context', {}).get('intent_analysis', {}),
"documents_found": len(doc_results.get('results', [])),
"citations": doc_results.get('metadata', [])
}
return answer, model_info, audio_path, "Audio generated by ElevenLabs", metadata
# Custom CSS for styling tabs
# Custom CSS for styling tabs
custom_css = """
/* Target Gradio 5/6 Tab Buttons */
.gradio-container button[role="tab"] {
font-size: 1.3em !important;
font-weight: 900 !important;
background-color: #e0f2fe !important;
color: #0369a1 !important;
padding: 12px 24px !important;
border-radius: 8px 8px 0 0 !important;
}
.gradio-container button[role="tab"][aria-selected="true"] {
background-color: var(--button-primary-background-fill) !important;
color: white !important;
border: none !important;
}
"""
# Create Gradio interface
with gr.Blocks(title="Temporal MCP Router AI Agent") as demo:
gr.Markdown("""
# πŸ• Temporal MCP Router β€” Time-Aware Document Intelligence
Ask any question by **voice or text**, and this agent will retrieve the **correct document version** for the **correct year**, every time.
### ⚠️ The Problem
LLMs mix information from different document versions, leading to **unsafe**, **contradictory**, or **outdated** answers.
### πŸ’‘ The Solution
This agent applies **temporal filtering BEFORE retrieval**, ensuring:
- No mixed-version hallucinations
- Deterministic, time-accurate answers
- Reliable outputs for **time-sensitive knowledge bases** (API docs, compliance, medical/legal rules)
### 🧰 Powered By
- **LLMs**: OpenAI GPT-4o, Google Gemini 2.0 Flash
- **Voice**: Whisper (ASR), ElevenLabs (TTS)
- **Vector DB**: ChromaDB
- **Frameworks**: Gradio, MCP, LlamaIndex
- **Deployment**: Hugging Face Spaces & Modal
""")
# --- Tab 1: Knowledge Base ---
with gr.Tab("πŸ“š Knowledge Base"):
gr.Markdown("""
### πŸ“₯ Ingest Your Documents
**What is this?** This is where you feed the AI its "brain". Upload your versioned documentation here.
**How to use:** Drag & drop PDF, Text, or JSON files. The system will automatically extract temporal metadata (Year, Version).
**Expectation:** Once processed, these documents become available for the AI to search and reference in its answers.
""")
file_upload = gr.File(
label="Upload PDFs/Text/JSON",
file_count="multiple",
file_types=[".pdf", ".txt", ".json"]
)
upload_btn = gr.Button("Process & Ingest", variant="primary")
upload_status = gr.JSON(label="Ingestion Status")
async def handle_upload(files):
if not files:
return {"error": "No files uploaded"}
results = []
from src.ingest_pdfs import ingest_pdf
# Process files in parallel with a concurrency limit
semaphore = asyncio.Semaphore(3) # Limit to 3 concurrent uploads to avoid rate limits
async def process_file(file):
async with semaphore:
try:
# file is a temp file path in Gradio 4.x
file_path = file.name if hasattr(file, 'name') else file
data = await ingest_pdf(file_path)
if data:
return {
"file": os.path.basename(file_path),
"status": "success",
"extracted_metadata": data["temporal_metadata"]
}
else:
return {
"file": os.path.basename(file_path),
"status": "failed",
"reason": "No content extracted"
}
except Exception as e:
return {
"file": os.path.basename(file.name if hasattr(file, 'name') else str(file)),
"status": "error",
"error": str(e)
}
# Run all tasks
tasks = [process_file(f) for f in files]
results = await asyncio.gather(*tasks)
results = list(results) # Convert tuple to list
# Reload router to pick up new files
try:
router.reload_data()
results.append({"info": "Router knowledge base reloaded successfully"})
except Exception as e:
results.append({"warning": f"Failed to reload router: {e}"})
return results
upload_btn.click(
fn=handle_upload,
inputs=[file_upload],
outputs=[upload_status]
)
# --- Tab 2: Temporal MCP AI Agent ---
with gr.Tab("πŸ€– Temporal MCP AI Agent"):
gr.Markdown("""
### 🧠 Chat with Your Data
**What is this?** The main interface to ask questions. It uses the "Temporal Router" to find the *right* document version.
**How to use:** Select your AI model (OpenAI/Gemini) and Server (Local/Remote). Then speak or type your question.
**Expectation:** You'll get a precise answer citing the specific year/version used, plus a voice response.
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Configuration")
with gr.Row():
model_selector = gr.Radio(
choices=["OpenAI", "Gemini"],
value="OpenAI",
label="Select AI Model",
info="Choose the LLM provider for generating answers"
)
mcp_server_selector = gr.Radio(
choices=["Local (Default)", "Remote (Modal)"],
value="Local (Default)",
label="Select MCP Server",
info="πŸ‘‰ To query your uploaded documents, please use the Local MCP Server option only.\n\nChoose where to run the MCP server logic"
)
gr.Markdown("### Input")
# Voice input
# Handle Gradio version compatibility
audio_input = gr.Audio(
type="filepath",
label="🎀 Voice Query (speak your question)",
sources=["microphone"]
)
gr.Markdown("**OR**")
# Text input
text_input = gr.Textbox(
label="πŸ’¬ Text Query",
placeholder="e.g., How do I authenticate with XYZ API in version 3.0?"
)
# Example Queries
gr.Examples(
examples=[
"What were the breach notification requirements in 2008?",
"Show me the 2013 HITECH Act rules",
"What changed in HITECH Act rules version 3.0?",
"What are the current breach notification deadlines?"
],
inputs=text_input,
label="πŸ“ Try these examples (Click on the example then click Query button to get the response):"
)
gr.Markdown("### Temporal Filters (Optional)")
with gr.Row():
year_input = gr.Number(
label="Year",
precision=0
)
month_input = gr.Number(
label="Month",
precision=0
)
version_input = gr.Textbox(
label="Version",
placeholder="e.g., 3.0 or v3"
)
submit_btn = gr.Button("πŸš€ Query", variant="primary")
with gr.Column(scale=1):
gr.Markdown("### Response")
# Text answer
answer_output = gr.Textbox(
label="πŸ“ Answer",
lines=10
)
model_label = gr.Markdown("") # For model attribution
# Voice response
audio_output = gr.Audio(
label="πŸ”Š Voice Response"
)
voice_label = gr.Markdown("") # For voice attribution
# Metadata
metadata_output = gr.JSON(
label="πŸ“Š Temporal Context & Sources"
)
# Connect button
submit_btn.click(
fn=process_query,
inputs=[audio_input, text_input, year_input, month_input, version_input, model_selector, mcp_server_selector],
outputs=[answer_output, model_label, audio_output, voice_label, metadata_output]
)
# --- Tab 3: Temporal MCP Server ---
with gr.Tab("πŸ”Œ Temporal MCP Server"):
gr.Markdown("""
### πŸ”— Connect External Tools
**What is this?** Instructions for connecting this AI logic to external tools like **Claude Desktop**.
**How to use:** Copy the configuration JSON provided below into your Claude Desktop config file.
**Expectation:** You will be able to use the "Temporal Router" tools directly inside your Claude Desktop chat interface.
""")
gr.Markdown("### πŸš€ MCP Server Status & Connection")
with gr.Row():
with gr.Column():
gr.Markdown("""
### Status: βœ… Logic Ready
The core logic is available. To expose it as an MCP server for Claude Desktop or other clients, run the standalone server script.
""")
gr.Info("The MCP Server runs as a separate process to handle standard input/output communication.")
with gr.Column():
gr.Markdown("### πŸ› οΈ How to Run")
gr.Code(
value="python src/mcp_server.py",
language="shell",
label="Command to Run Server"
)
gr.Markdown("### βš™οΈ Claude Desktop Configuration")
gr.Markdown("Add this to your `claude_desktop_config.json`:")
# Get absolute path for better UX
abs_path = os.path.abspath("src/mcp_server.py")
python_path = sys.executable
config_json = f"""{{
"mcpServers": {{
"temporal-router": {{
"command": "{python_path}",
"args": [
"{abs_path}"
]
}}
}}
}}"""
gr.Code(
value=config_json,
language="json",
label="claude_desktop_config.json (Local)"
)
gr.Markdown("---")
gr.Markdown("### ☁️ Cloud Deployment (Modal)")
gr.Markdown("You can also deploy the MCP server to the cloud using Modal.")
with gr.Row():
with gr.Column():
gr.Markdown("#### 1. Deploy")
gr.Code(
value="modal deploy src/mcp_server_modal.py",
language="shell",
label="Deploy Command"
)
with gr.Column():
gr.Markdown("#### 2. Connect (SSE)")
modal_config = """{
"mcpServers": {
"temporal-router-cloud": {
"url": "https://<your-modal-username>--temporal-mcp-router-server-fastapi-app.modal.run/sse"
}
}
}"""
gr.Code(
value=modal_config,
language="json",
label="claude_desktop_config.json (Remote)"
)
if __name__ == "__main__":
demo.launch(theme=gr.themes.Soft(), css=custom_css)