|
|
import gradio as gr |
|
|
import os |
|
|
from pathlib import Path |
|
|
from pinecone import Pinecone |
|
|
from typing import List, Tuple |
|
|
import tempfile |
|
|
import shutil |
|
|
from dotenv import load_dotenv |
|
|
import time |
|
|
from datetime import datetime |
|
|
import json |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
AUTH_PASSWORD = "gst_magic@##56$$" |
|
|
|
|
|
def authenticate(password): |
|
|
"""Simple authentication function""" |
|
|
return password == AUTH_PASSWORD |
|
|
|
|
|
|
|
|
required_env_vars = ["PINECONE_API_KEY"] |
|
|
missing_vars = [var for var in required_env_vars if not os.getenv(var)] |
|
|
|
|
|
if missing_vars: |
|
|
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") |
|
|
|
|
|
|
|
|
pinecone_api_key = os.getenv("PINECONE_API_KEY") |
|
|
pc = Pinecone(api_key=pinecone_api_key) |
|
|
|
|
|
|
|
|
UPLOAD_FOLDER = "uploads" |
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
|
|
|
def parse_pinecone_timestamp(iso_string): |
|
|
""" |
|
|
Parses an ISO 8601 string from Pinecone, handling nanosecond precision. |
|
|
|
|
|
Args: |
|
|
iso_string (str): The ISO-formatted timestamp string. |
|
|
|
|
|
Returns: |
|
|
datetime: The parsed datetime object. |
|
|
""" |
|
|
if not isinstance(iso_string, str) or not iso_string: |
|
|
return datetime.min |
|
|
|
|
|
|
|
|
if iso_string.endswith('Z'): |
|
|
iso_string = iso_string[:-1] + '+00:00' |
|
|
|
|
|
|
|
|
decimal_point = iso_string.find('.') |
|
|
|
|
|
if decimal_point != -1: |
|
|
|
|
|
tz_start = max(iso_string.rfind('+'), iso_string.rfind('-')) |
|
|
|
|
|
if tz_start > decimal_point: |
|
|
|
|
|
fractional_part = iso_string[decimal_point+1:tz_start] |
|
|
|
|
|
|
|
|
if len(fractional_part) > 6: |
|
|
fractional_part = fractional_part[:6] |
|
|
|
|
|
|
|
|
iso_string = iso_string[:decimal_point+1] + fractional_part + iso_string[tz_start:] |
|
|
|
|
|
return datetime.fromisoformat(iso_string) |
|
|
|
|
|
def get_all_files(): |
|
|
"""Get all files from Pinecone Assistant and sort them""" |
|
|
try: |
|
|
assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") |
|
|
assistant = pc.assistant.Assistant(assistant_name=assistant_name) |
|
|
|
|
|
|
|
|
files_response = assistant.list_files() |
|
|
|
|
|
|
|
|
if hasattr(files_response, 'files'): |
|
|
files_list = files_response.files |
|
|
else: |
|
|
files_list = files_response |
|
|
|
|
|
if not files_list: |
|
|
return [] |
|
|
|
|
|
|
|
|
sorted_files = sorted( |
|
|
files_list, |
|
|
key=lambda x: parse_pinecone_timestamp(getattr(x, 'created_on', '')), |
|
|
reverse=True |
|
|
) |
|
|
|
|
|
return sorted_files |
|
|
|
|
|
except Exception as e: |
|
|
return [] |
|
|
|
|
|
def get_file_choices(): |
|
|
"""Get file choices for the dropdown - returns list of (title, file_id) tuples""" |
|
|
try: |
|
|
all_files = get_all_files() |
|
|
if not all_files: |
|
|
return [] |
|
|
|
|
|
choices = [] |
|
|
for file_obj in all_files: |
|
|
file_name = getattr(file_obj, 'name', 'Unknown File') |
|
|
file_id = getattr(file_obj, 'id', 'unknown') |
|
|
created_on = getattr(file_obj, 'created_on', '') |
|
|
|
|
|
|
|
|
try: |
|
|
if created_on: |
|
|
created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M') |
|
|
display_name = f"{file_name} (uploaded: {created_formatted})" |
|
|
else: |
|
|
display_name = file_name |
|
|
except: |
|
|
display_name = file_name |
|
|
|
|
|
choices.append((display_name, file_id)) |
|
|
|
|
|
return choices |
|
|
except Exception as e: |
|
|
return [] |
|
|
|
|
|
def refresh_delete_dropdown(): |
|
|
"""Refresh the dropdown with current files""" |
|
|
choices = get_file_choices() |
|
|
if not choices: |
|
|
return gr.update(choices=[], value=None, interactive=False) |
|
|
return gr.update(choices=choices, value=None, interactive=True) |
|
|
|
|
|
def delete_selected_files(selected_file_ids, progress=gr.Progress()): |
|
|
"""Delete multiple selected files by their IDs""" |
|
|
if not selected_file_ids: |
|
|
return "❌ **Error:** No files selected for deletion", "" |
|
|
|
|
|
try: |
|
|
progress(0.1, desc="🔧 Initializing Pinecone Assistant...") |
|
|
assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") |
|
|
assistant = pc.assistant.Assistant(assistant_name=assistant_name) |
|
|
|
|
|
|
|
|
all_files = get_all_files() |
|
|
file_id_to_name = {getattr(f, 'id', ''): getattr(f, 'name', 'Unknown') for f in all_files} |
|
|
|
|
|
total_files = len(selected_file_ids) |
|
|
deleted_files = [] |
|
|
failed_files = [] |
|
|
|
|
|
progress(0.2, desc=f"🗑️ Starting deletion of {total_files} files...") |
|
|
|
|
|
for i, file_id in enumerate(selected_file_ids): |
|
|
try: |
|
|
file_name = file_id_to_name.get(file_id, f"File ID: {file_id}") |
|
|
progress((0.2 + (i / total_files) * 0.7), desc=f"🗑️ Deleting: {file_name}...") |
|
|
|
|
|
|
|
|
response = assistant.delete_file(file_id=file_id) |
|
|
deleted_files.append({ |
|
|
'name': file_name, |
|
|
'id': file_id, |
|
|
'status': 'success' |
|
|
}) |
|
|
|
|
|
time.sleep(0.2) |
|
|
|
|
|
except Exception as delete_error: |
|
|
failed_files.append({ |
|
|
'name': file_id_to_name.get(file_id, f"File ID: {file_id}"), |
|
|
'id': file_id, |
|
|
'error': str(delete_error) |
|
|
}) |
|
|
|
|
|
progress(1.0, desc="✅ Deletion process completed!") |
|
|
|
|
|
|
|
|
success_count = len(deleted_files) |
|
|
error_count = len(failed_files) |
|
|
|
|
|
status_message = f"📊 **Deletion Complete**\n\n" |
|
|
status_message += f"✅ **Successfully deleted:** {success_count} files\n" |
|
|
status_message += f"❌ **Failed to delete:** {error_count} files\n" |
|
|
status_message += f"📁 **Total processed:** {total_files} files\n\n" |
|
|
|
|
|
|
|
|
detailed_results = "## 🗑️ **Deletion Results**\n\n" |
|
|
|
|
|
if deleted_files: |
|
|
detailed_results += "### ✅ **Successfully Deleted Files:**\n" |
|
|
for file_info in deleted_files: |
|
|
detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n" |
|
|
detailed_results += "\n" |
|
|
|
|
|
if failed_files: |
|
|
detailed_results += "### ❌ **Failed Deletions:**\n" |
|
|
for file_info in failed_files: |
|
|
detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n" |
|
|
detailed_results += f" - Error: {file_info['error']}\n" |
|
|
detailed_results += "\n" |
|
|
|
|
|
return status_message, detailed_results |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ **Critical Error during deletion:** {str(e)}" |
|
|
return error_msg, "" |
|
|
|
|
|
def list_uploaded_files_paginated(page_num=0, progress=gr.Progress()): |
|
|
"""List files with pagination - 100 files per page""" |
|
|
try: |
|
|
progress(0.1, desc="🔍 Getting files...") |
|
|
|
|
|
|
|
|
all_files = get_all_files() |
|
|
|
|
|
if not all_files: |
|
|
progress(1.0, desc="✅ Complete - No files found") |
|
|
return "📋 **No files found in Pinecone Assistant**", "", "No files available", gr.update(visible=False), gr.update(visible=False) |
|
|
|
|
|
progress(0.5, desc="📊 Processing page...") |
|
|
|
|
|
|
|
|
files_per_page = 100 |
|
|
start_idx = page_num * files_per_page |
|
|
end_idx = start_idx + files_per_page |
|
|
|
|
|
|
|
|
page_files = all_files[start_idx:end_idx] |
|
|
total_files = len(all_files) |
|
|
total_pages = (total_files + files_per_page - 1) // files_per_page |
|
|
|
|
|
|
|
|
summary = f"📊 **Files Summary (Page {page_num + 1} of {total_pages})**\n\n" |
|
|
summary += f"📁 **Total files:** {total_files}\n" |
|
|
summary += f"📋 **Showing:** {start_idx + 1}-{min(end_idx, total_files)} of {total_files}\n\n" |
|
|
|
|
|
|
|
|
detailed_info = f"## 📋 **Latest Uploaded Files - Page {page_num + 1}**\n\n" |
|
|
|
|
|
progress(0.8, desc="📝 Formatting file titles...") |
|
|
|
|
|
for i, file_obj in enumerate(page_files, 1): |
|
|
try: |
|
|
|
|
|
file_name = getattr(file_obj, 'name', 'Unknown File') |
|
|
file_id = getattr(file_obj, 'id', 'Unknown ID') |
|
|
created_on = getattr(file_obj, 'created_on', '') |
|
|
|
|
|
global_index = start_idx + i |
|
|
|
|
|
|
|
|
try: |
|
|
if created_on: |
|
|
created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M') |
|
|
else: |
|
|
created_formatted = 'Unknown' |
|
|
except: |
|
|
created_formatted = 'Unknown' |
|
|
|
|
|
|
|
|
detailed_info += f"{global_index}. **{file_name}**\n" |
|
|
detailed_info += f" 📅 Uploaded: {created_formatted} | 🆔 ID: `{file_id}`\n\n" |
|
|
|
|
|
except Exception as file_error: |
|
|
detailed_info += f"{start_idx + i}. ❌ **Error loading file**\n\n" |
|
|
|
|
|
|
|
|
pagination_info = f"📄 Page {page_num + 1} of {total_pages} | Total: {total_files} files" |
|
|
|
|
|
|
|
|
show_prev = page_num > 0 |
|
|
show_next = page_num < total_pages - 1 |
|
|
|
|
|
progress(1.0, desc="✅ Page loaded successfully!") |
|
|
|
|
|
return summary, detailed_info, pagination_info, gr.update(visible=show_prev), gr.update(visible=show_next) |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ **Error retrieving file list:** {str(e)}" |
|
|
return error_msg, "", "Error", gr.update(visible=False), gr.update(visible=False) |
|
|
|
|
|
def load_next_page(current_page_info): |
|
|
"""Load next page of files""" |
|
|
try: |
|
|
|
|
|
current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1 |
|
|
return list_uploaded_files_paginated(current_page + 1) |
|
|
except: |
|
|
return list_uploaded_files_paginated(0) |
|
|
|
|
|
def load_prev_page(current_page_info): |
|
|
"""Load previous page of files""" |
|
|
try: |
|
|
|
|
|
current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1 |
|
|
return list_uploaded_files_paginated(max(0, current_page - 1)) |
|
|
except: |
|
|
return list_uploaded_files_paginated(0) |
|
|
|
|
|
def refresh_file_list(): |
|
|
"""Refresh the file list""" |
|
|
return "🔄 **Refreshing file list... Please wait**" |
|
|
|
|
|
def process_files_with_progress(files, *metadata_inputs, progress=gr.Progress()): |
|
|
"""Process multiple files with individual metadata and show progress""" |
|
|
if not files: |
|
|
return "❌ Error: No files selected", "" |
|
|
|
|
|
if len(files) > 10: |
|
|
return "❌ Error: Maximum 10 files allowed at a time", "" |
|
|
|
|
|
try: |
|
|
results = [] |
|
|
errors = [] |
|
|
total_files = len(files) |
|
|
|
|
|
|
|
|
progress(0, desc="🔧 Initializing Pinecone Assistant...") |
|
|
time.sleep(0.5) |
|
|
assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") |
|
|
assistant = pc.assistant.Assistant(assistant_name=assistant_name) |
|
|
|
|
|
|
|
|
for i, file_path in enumerate(files): |
|
|
try: |
|
|
filename = os.path.basename(file_path) |
|
|
progress((i / total_files), desc=f"📄 Processing {filename}... ({i+1}/{total_files})") |
|
|
|
|
|
|
|
|
sections_idx = i * 3 |
|
|
keywords_idx = i * 3 + 1 |
|
|
description_idx = i * 3 + 2 |
|
|
|
|
|
if sections_idx < len(metadata_inputs): |
|
|
sections = metadata_inputs[sections_idx] or "" |
|
|
keywords = metadata_inputs[keywords_idx] or "" |
|
|
description = metadata_inputs[description_idx] or "" |
|
|
else: |
|
|
sections = keywords = description = "" |
|
|
|
|
|
|
|
|
if not sections.strip() and not keywords.strip() and not description.strip(): |
|
|
errors.append({ |
|
|
"filename": filename, |
|
|
"error": "❌ Error: No metadata provided" |
|
|
}) |
|
|
continue |
|
|
|
|
|
|
|
|
progress((i / total_files), desc=f"🏷️ Preparing metadata for {filename}...") |
|
|
metadata = { |
|
|
"sections": [s.strip() for s in sections.split(",") if s.strip()], |
|
|
"keywords": [k.strip() for k in keywords.split(",") if k.strip()], |
|
|
"description": description.strip() |
|
|
} |
|
|
|
|
|
|
|
|
progress((i / total_files), desc=f"📁 Copying {filename} to uploads...") |
|
|
destination_path = os.path.join(UPLOAD_FOLDER, filename) |
|
|
shutil.copy2(file_path, destination_path) |
|
|
|
|
|
|
|
|
progress((i / total_files), desc=f"☁️ Uploading {filename} to Pinecone...") |
|
|
response = assistant.upload_file( |
|
|
file_path=destination_path, |
|
|
metadata=metadata, |
|
|
timeout=None |
|
|
) |
|
|
|
|
|
results.append({ |
|
|
"filename": filename, |
|
|
"status": "✅ Success", |
|
|
"metadata": metadata, |
|
|
"response": str(response) |
|
|
}) |
|
|
|
|
|
except Exception as file_error: |
|
|
errors.append({ |
|
|
"filename": os.path.basename(file_path), |
|
|
"error": f"❌ Error: {str(file_error)}" |
|
|
}) |
|
|
|
|
|
|
|
|
progress(1.0, desc="✅ Processing complete!") |
|
|
time.sleep(0.5) |
|
|
|
|
|
|
|
|
success_count = len(results) |
|
|
error_count = len(errors) |
|
|
|
|
|
status_message = f"📊 **Processing Complete**\n\n" |
|
|
status_message += f"✅ **Successful uploads:** {success_count}\n" |
|
|
status_message += f"❌ **Failed uploads:** {error_count}\n" |
|
|
status_message += f"📁 **Total files processed:** {len(files)}\n\n" |
|
|
|
|
|
|
|
|
detailed_results = "## 📋 **Detailed Results**\n\n" |
|
|
|
|
|
if results: |
|
|
detailed_results += "### ✅ **Successful Uploads:**\n" |
|
|
for result in results: |
|
|
detailed_results += f"- **{result['filename']}**\n" |
|
|
detailed_results += f" - Sections: {', '.join(result['metadata']['sections'])}\n" |
|
|
detailed_results += f" - Keywords: {', '.join(result['metadata']['keywords'])}\n" |
|
|
detailed_results += f" - Description: {result['metadata']['description']}\n\n" |
|
|
|
|
|
if errors: |
|
|
detailed_results += "### ❌ **Failed Uploads:**\n" |
|
|
for error in errors: |
|
|
detailed_results += f"- **{error['filename']}** - {error['error']}\n" |
|
|
|
|
|
return status_message, detailed_results, "✅ **Processing completed successfully!**" |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"❌ **Critical Error:** {str(e)}" |
|
|
return error_msg, "", "❌ **Processing failed with error**" |
|
|
|
|
|
def update_metadata_fields(files): |
|
|
"""Update metadata fields based on uploaded files""" |
|
|
if not files: |
|
|
return [gr.update(visible=False)] * 30 |
|
|
|
|
|
if len(files) > 10: |
|
|
|
|
|
return [gr.update(visible=False)] * 30 |
|
|
|
|
|
updates = [] |
|
|
for i in range(len(files)): |
|
|
if i < len(files): |
|
|
filename = os.path.basename(files[i]) |
|
|
|
|
|
updates.extend([ |
|
|
gr.update(visible=True, label=f"📑 Sections for {filename}", placeholder="e.g., Introduction, Financial Data, Compliance"), |
|
|
gr.update(visible=True, label=f"🔍 Keywords for {filename}", placeholder="e.g., GST, tax, compliance, revenue"), |
|
|
gr.update(visible=True, label=f"📝 Description for {filename}", placeholder="Brief description of this document") |
|
|
]) |
|
|
|
|
|
|
|
|
while len(updates) < 30: |
|
|
updates.append(gr.update(visible=False)) |
|
|
|
|
|
return updates[:30] |
|
|
|
|
|
def clear_form(): |
|
|
"""Clear all form fields""" |
|
|
return [None] + [""] * 30 + ["", "", "🟢 **Ready to process documents**"] |
|
|
|
|
|
def clear_delete_form(): |
|
|
"""Clear delete form""" |
|
|
return gr.update(value=[]), "", "" |
|
|
|
|
|
def start_processing(): |
|
|
"""Show processing started status""" |
|
|
return "🔄 **Processing documents... Please wait**" |
|
|
|
|
|
def finish_processing(): |
|
|
"""Show processing finished status""" |
|
|
return "✅ **Processing completed successfully!**" |
|
|
|
|
|
def create_main_interface(): |
|
|
"""Create the main application interface""" |
|
|
with gr.Blocks( |
|
|
title="📄 Tax Document Ingestion System", |
|
|
theme=gr.themes.Soft(), |
|
|
css=""" |
|
|
.gradio-container { |
|
|
max-width: 1400px !important; |
|
|
margin: auto; |
|
|
} |
|
|
.upload-container { |
|
|
border: 2px dashed #4CAF50; |
|
|
border-radius: 10px; |
|
|
padding: 20px; |
|
|
text-align: center; |
|
|
background-color: #f8f9fa; |
|
|
} |
|
|
.delete-container { |
|
|
border: 2px dashed #f44336; |
|
|
border-radius: 10px; |
|
|
padding: 20px; |
|
|
background-color: #ffebee; |
|
|
} |
|
|
.tab-nav { |
|
|
margin-bottom: 20px; |
|
|
} |
|
|
""" |
|
|
) as app: |
|
|
|
|
|
gr.Markdown( |
|
|
""" |
|
|
# 📄 Tax Document Ingestion System |
|
|
|
|
|
Upload, manage, and delete documents in the Pinecone Assistant for GST Minutes processing. |
|
|
|
|
|
## 🚀 Features: |
|
|
- ✅ **Multiple file upload** - Select and upload multiple documents at once |
|
|
- 🏷️ **Metadata tagging** - Add sections, keywords, and descriptions |
|
|
- 🔄 **Batch processing** - All files processed with individual metadata |
|
|
- 🗑️ **File deletion** - Delete multiple files by selecting from dropdown |
|
|
- 📊 **File management** - View uploaded files with timestamps and metadata |
|
|
- 📋 **Detailed reporting** - See success/failure status for each operation |
|
|
|
|
|
--- |
|
|
""" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tabs() as tabs: |
|
|
|
|
|
|
|
|
with gr.TabItem("📤 Upload Documents", id="upload_tab"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 📁 **File Upload**") |
|
|
files_input = gr.File( |
|
|
label="Select Documents (Max 10 files)", |
|
|
file_count="multiple", |
|
|
file_types=[".pdf", ".doc", ".docx", ".txt"], |
|
|
elem_classes=["upload-container"] |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### 🏷️ **Document Metadata (Individual for Each File)**") |
|
|
gr.Markdown("*Upload files first, then metadata fields will appear for each document*") |
|
|
|
|
|
|
|
|
with gr.Column() as metadata_container: |
|
|
|
|
|
metadata_fields = [] |
|
|
for i in range(30): |
|
|
field = gr.Textbox( |
|
|
label=f"Field {i}", |
|
|
placeholder="", |
|
|
visible=False, |
|
|
lines=2 |
|
|
) |
|
|
metadata_fields.append(field) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
upload_btn = gr.Button( |
|
|
"🚀 Upload Documents to Pinecone Assistant", |
|
|
variant="primary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
clear_btn = gr.Button( |
|
|
"🗑️ Clear Form", |
|
|
variant="secondary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
processing_status = gr.Markdown( |
|
|
value="🟢 **Ready to process documents**", |
|
|
visible=True |
|
|
) |
|
|
|
|
|
gr.Markdown("---") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
status_output = gr.Markdown( |
|
|
label="📊 Upload Status", |
|
|
value="*Ready to upload documents...*" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
results_output = gr.Markdown( |
|
|
label="📋 Detailed Results", |
|
|
value="", |
|
|
max_height=400 |
|
|
) |
|
|
|
|
|
|
|
|
with gr.TabItem("🗑️ Delete Documents", id="delete_tab"): |
|
|
gr.Markdown("### 🗑️ **Delete Multiple Documents**") |
|
|
gr.Markdown("Select multiple files from the dropdown to delete them from the Pinecone Assistant.") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
file_dropdown = gr.Dropdown( |
|
|
label="📋 Select Files to Delete (Multiple Selection)", |
|
|
choices=[], |
|
|
multiselect=True, |
|
|
interactive=False, |
|
|
elem_classes=["delete-container"] |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
refresh_dropdown_btn = gr.Button( |
|
|
"🔄 Refresh File List", |
|
|
variant="secondary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
delete_btn = gr.Button( |
|
|
"🗑️ Delete Selected Files", |
|
|
variant="stop", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
clear_delete_btn = gr.Button( |
|
|
"↺ Clear Selection", |
|
|
variant="secondary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
gr.Markdown("---") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
delete_status_output = gr.Markdown( |
|
|
label="📊 Deletion Status", |
|
|
value="*Select files to delete...*" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
delete_results_output = gr.Markdown( |
|
|
label="🗑️ Deletion Results", |
|
|
value="", |
|
|
max_height=400 |
|
|
) |
|
|
|
|
|
|
|
|
with gr.TabItem("📋 View Uploaded Files", id="view_tab"): |
|
|
gr.Markdown("### 📋 **Uploaded Files Management**") |
|
|
gr.Markdown("View all files currently uploaded to the Pinecone Assistant with their metadata and timestamps.") |
|
|
|
|
|
with gr.Row(): |
|
|
refresh_btn = gr.Button( |
|
|
"🔄 Fetch Files", |
|
|
variant="primary", |
|
|
size="lg" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
file_list_status = gr.Markdown( |
|
|
value="🟡 **Click 'Fetch Files' to load uploaded files**", |
|
|
visible=True |
|
|
) |
|
|
|
|
|
gr.Markdown("---") |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
prev_btn = gr.Button( |
|
|
"⬅️ Previous 100", |
|
|
variant="secondary", |
|
|
visible=False |
|
|
) |
|
|
pagination_info = gr.Markdown( |
|
|
value="📄 Page 1 of 1 | Total: 0 files", |
|
|
elem_classes=["pagination-info"] |
|
|
) |
|
|
next_btn = gr.Button( |
|
|
"Next 100 ➡️", |
|
|
variant="secondary", |
|
|
visible=False |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
file_summary = gr.Markdown( |
|
|
label="📊 Files Summary", |
|
|
value="*Click refresh to load file summary...*" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
file_details = gr.Markdown( |
|
|
label="📋 File Details", |
|
|
value="*Click refresh to load file details...*", |
|
|
max_height=600 |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
files_input.change( |
|
|
fn=update_metadata_fields, |
|
|
inputs=[files_input], |
|
|
outputs=metadata_fields |
|
|
) |
|
|
|
|
|
|
|
|
upload_btn.click( |
|
|
fn=start_processing, |
|
|
outputs=[processing_status] |
|
|
).then( |
|
|
fn=process_files_with_progress, |
|
|
inputs=[files_input] + metadata_fields, |
|
|
outputs=[status_output, results_output, processing_status] |
|
|
) |
|
|
|
|
|
clear_btn.click( |
|
|
fn=clear_form, |
|
|
outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
refresh_dropdown_btn.click( |
|
|
fn=refresh_delete_dropdown, |
|
|
outputs=[file_dropdown] |
|
|
) |
|
|
|
|
|
|
|
|
delete_btn.click( |
|
|
fn=delete_selected_files, |
|
|
inputs=[file_dropdown], |
|
|
outputs=[delete_status_output, delete_results_output] |
|
|
) |
|
|
|
|
|
|
|
|
clear_delete_btn.click( |
|
|
fn=clear_delete_form, |
|
|
outputs=[file_dropdown, delete_status_output, delete_results_output] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
refresh_btn.click( |
|
|
fn=refresh_file_list, |
|
|
outputs=[file_list_status] |
|
|
).then( |
|
|
fn=list_uploaded_files_paginated, |
|
|
inputs=[], |
|
|
outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] |
|
|
) |
|
|
|
|
|
|
|
|
next_btn.click( |
|
|
fn=load_next_page, |
|
|
inputs=[pagination_info], |
|
|
outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] |
|
|
) |
|
|
|
|
|
|
|
|
prev_btn.click( |
|
|
fn=load_prev_page, |
|
|
inputs=[pagination_info], |
|
|
outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] |
|
|
) |
|
|
|
|
|
|
|
|
gr.Markdown( |
|
|
""" |
|
|
--- |
|
|
|
|
|
### 💡 **Usage Tips:** |
|
|
|
|
|
**Upload Documents:** |
|
|
- Select up to 10 PDF, DOC, DOCX, or TXT files at once |
|
|
- Upload files first, then fill individual metadata for each document |
|
|
- Each file gets its own sections, keywords, and description |
|
|
- Check the results section for upload status |
|
|
|
|
|
**Delete Documents:** |
|
|
- Click 'Refresh File List' to load current files in dropdown |
|
|
- Select multiple files using the dropdown (supports multi-select) |
|
|
- Click 'Delete Selected Files' to remove them permanently |
|
|
- View deletion results for success/failure status |
|
|
|
|
|
**View Uploaded Files:** |
|
|
- Click 'Fetch Files' to see all uploaded files |
|
|
- View file details including upload timestamps and metadata |
|
|
- Files are sorted by most recent first |
|
|
- Use pagination to navigate through large file lists |
|
|
|
|
|
### ⚠️ **Important Notes:** |
|
|
- File deletion is **permanent** and cannot be undone |
|
|
- Always verify your selection before deleting files |
|
|
- The system maps file titles to IDs internally for deletion |
|
|
|
|
|
### 📞 **Support:** |
|
|
For issues or questions, contact the development team. |
|
|
""" |
|
|
) |
|
|
|
|
|
return app |
|
|
|
|
|
def create_login_interface(): |
|
|
"""Create the login interface""" |
|
|
with gr.Blocks( |
|
|
title="🔐 Authentication Required", |
|
|
theme=gr.themes.Soft(), |
|
|
css=""" |
|
|
.gradio-container { |
|
|
max-width: 500px !important; |
|
|
margin: auto; |
|
|
padding-top: 100px; |
|
|
} |
|
|
.login-container { |
|
|
border: 2px solid #2196F3; |
|
|
border-radius: 15px; |
|
|
padding: 30px; |
|
|
text-align: center; |
|
|
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); |
|
|
box-shadow: 0 10px 20px rgba(0,0,0,0.1); |
|
|
} |
|
|
.password-input { |
|
|
margin: 20px 0; |
|
|
} |
|
|
.login-button { |
|
|
margin-top: 15px; |
|
|
} |
|
|
.error-message { |
|
|
color: #f44336; |
|
|
font-weight: bold; |
|
|
margin-top: 10px; |
|
|
} |
|
|
""" |
|
|
) as login_app: |
|
|
|
|
|
with gr.Column(elem_classes=["login-container"]): |
|
|
gr.Markdown( |
|
|
""" |
|
|
# 🔐 **Tax Document System** |
|
|
## **Authentication Required** |
|
|
|
|
|
Please enter the password to access the application. |
|
|
|
|
|
--- |
|
|
""" |
|
|
) |
|
|
|
|
|
password_input = gr.Textbox( |
|
|
label="🔑 Password", |
|
|
type="password", |
|
|
placeholder="Enter password to access the system", |
|
|
elem_classes=["password-input"] |
|
|
) |
|
|
|
|
|
login_btn = gr.Button( |
|
|
"🚀 Login", |
|
|
variant="primary", |
|
|
size="lg", |
|
|
elem_classes=["login-button"] |
|
|
) |
|
|
|
|
|
error_message = gr.Markdown( |
|
|
value="", |
|
|
visible=False, |
|
|
elem_classes=["error-message"] |
|
|
) |
|
|
|
|
|
gr.Markdown( |
|
|
""" |
|
|
--- |
|
|
|
|
|
### 🛡️ **Security Notice:** |
|
|
- This system contains sensitive tax documentation |
|
|
- Authorized access only |
|
|
- All activities are logged |
|
|
|
|
|
### 📞 **Need Access?** |
|
|
Contact your system administrator for credentials. |
|
|
""" |
|
|
) |
|
|
|
|
|
return login_app, password_input, login_btn, error_message |
|
|
def main(): |
|
|
"""Main application with authentication — single Gradio Blocks app. |
|
|
|
|
|
This builds both the login UI and the main application UI inside one |
|
|
`gr.Blocks` so we only call `.launch()` once. The main UI is hidden |
|
|
until authentication succeeds. This avoids launching two separate |
|
|
Gradio servers which was causing the server to close unexpectedly. |
|
|
""" |
|
|
|
|
|
|
|
|
app_css = """ |
|
|
.gradio-container { |
|
|
max-width: 1400px !important; |
|
|
margin: auto; |
|
|
} |
|
|
.upload-container { |
|
|
border: 2px dashed #4CAF50; |
|
|
border-radius: 10px; |
|
|
padding: 20px; |
|
|
text-align: center; |
|
|
background-color: #f8f9fa; |
|
|
} |
|
|
.delete-container { |
|
|
border: 2px dashed #f44336; |
|
|
border-radius: 10px; |
|
|
padding: 20px; |
|
|
background-color: #ffebee; |
|
|
} |
|
|
.login-container { |
|
|
border: 2px solid #2196F3; |
|
|
border-radius: 15px; |
|
|
padding: 30px; |
|
|
text-align: center; |
|
|
background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); |
|
|
box-shadow: 0 10px 20px rgba(0,0,0,0.1); |
|
|
} |
|
|
""" |
|
|
|
|
|
with gr.Blocks(title="Tax Document Ingestion System", theme=gr.themes.Soft(), css=app_css) as app: |
|
|
|
|
|
|
|
|
with gr.Column(elem_classes=["login-container"]) as login_container: |
|
|
gr.Markdown(""" |
|
|
# Tax Document System |
|
|
Authentication required |
|
|
""") |
|
|
|
|
|
password_input = gr.Textbox( |
|
|
label="Password", |
|
|
type="password", |
|
|
placeholder="Enter password" |
|
|
) |
|
|
|
|
|
login_btn = gr.Button("Login", variant="primary", size="lg") |
|
|
|
|
|
error_message = gr.Markdown(value="", visible=False) |
|
|
|
|
|
|
|
|
with gr.Column(visible=False) as main_container: |
|
|
|
|
|
gr.Markdown( |
|
|
""" |
|
|
# Tax Document Ingestion System |
|
|
|
|
|
Upload, manage, and delete documents in the Pinecone Assistant. |
|
|
""" |
|
|
) |
|
|
|
|
|
with gr.Tabs() as tabs: |
|
|
|
|
|
with gr.TabItem("Upload Documents", id="upload_tab"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### File Upload") |
|
|
files_input = gr.File( |
|
|
label="Select Documents (Max 10 files)", |
|
|
file_count="multiple", |
|
|
file_types=[".pdf", ".doc", ".docx", ".txt"], |
|
|
elem_classes=["upload-container"] |
|
|
) |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Document Metadata") |
|
|
|
|
|
with gr.Column() as metadata_container: |
|
|
metadata_fields = [] |
|
|
for i in range(30): |
|
|
field = gr.Textbox(label=f"Field {i}", placeholder="", visible=False, lines=2) |
|
|
metadata_fields.append(field) |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
upload_btn = gr.Button("Upload Documents to Pinecone Assistant", variant="primary") |
|
|
with gr.Column(scale=1): |
|
|
clear_btn = gr.Button("Clear Form", variant="secondary") |
|
|
|
|
|
processing_status = gr.Markdown(value="Ready to process documents", visible=True) |
|
|
|
|
|
status_output = gr.Markdown(label="Upload Status", value="Ready to upload documents...") |
|
|
results_output = gr.Markdown(label="Detailed Results", value="", max_height=400) |
|
|
|
|
|
|
|
|
with gr.TabItem("Delete Documents", id="delete_tab"): |
|
|
gr.Markdown("### Delete Documents") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
file_dropdown = gr.Dropdown(label="Select Files to Delete", choices=[], multiselect=True, interactive=False, elem_classes=["delete-container"]) |
|
|
with gr.Column(scale=1): |
|
|
refresh_dropdown_btn = gr.Button("Refresh File List", variant="secondary") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
delete_btn = gr.Button("Delete Selected Files", variant="stop") |
|
|
with gr.Column(scale=1): |
|
|
clear_delete_btn = gr.Button("Clear Selection", variant="secondary") |
|
|
|
|
|
delete_status_output = gr.Markdown(label="Deletion Status", value="Select files to delete...") |
|
|
delete_results_output = gr.Markdown(label="Deletion Results", value="", max_height=400) |
|
|
|
|
|
|
|
|
with gr.TabItem("View Uploaded Files", id="view_tab"): |
|
|
gr.Markdown("### Uploaded Files Management") |
|
|
with gr.Row(): |
|
|
refresh_btn = gr.Button("Fetch Files", variant="primary") |
|
|
|
|
|
file_list_status = gr.Markdown(value="Click 'Fetch Files' to load uploaded files", visible=True) |
|
|
|
|
|
with gr.Row(): |
|
|
prev_btn = gr.Button("Previous 100", variant="secondary", visible=False) |
|
|
pagination_info = gr.Markdown(value="Page 1 of 1 | Total: 0 files") |
|
|
next_btn = gr.Button("Next 100", variant="secondary", visible=False) |
|
|
|
|
|
file_summary = gr.Markdown(label="Files Summary", value="Click refresh to load file summary...") |
|
|
file_details = gr.Markdown(label="File Details", value="Click refresh to load file details...", max_height=600) |
|
|
|
|
|
|
|
|
|
|
|
def handle_login(password): |
|
|
if authenticate(password): |
|
|
return gr.update(visible=False), gr.update(value="", visible=False), gr.update(visible=True) |
|
|
else: |
|
|
return gr.update(visible=True), gr.update(value="Invalid password. Please try again.", visible=True), gr.update(visible=False) |
|
|
|
|
|
login_btn.click(fn=handle_login, inputs=[password_input], outputs=[login_container, error_message, main_container]) |
|
|
password_input.submit(fn=handle_login, inputs=[password_input], outputs=[login_container, error_message, main_container]) |
|
|
|
|
|
|
|
|
files_input.change(fn=update_metadata_fields, inputs=[files_input], outputs=metadata_fields) |
|
|
|
|
|
upload_btn.click(fn=start_processing, outputs=[processing_status]).then( |
|
|
fn=process_files_with_progress, |
|
|
inputs=[files_input] + metadata_fields, |
|
|
outputs=[status_output, results_output, processing_status] |
|
|
) |
|
|
|
|
|
clear_btn.click(fn=clear_form, outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status]) |
|
|
|
|
|
refresh_dropdown_btn.click(fn=refresh_delete_dropdown, outputs=[file_dropdown]) |
|
|
delete_btn.click(fn=delete_selected_files, inputs=[file_dropdown], outputs=[delete_status_output, delete_results_output]) |
|
|
clear_delete_btn.click(fn=clear_delete_form, outputs=[file_dropdown, delete_status_output, delete_results_output]) |
|
|
|
|
|
refresh_btn.click(fn=refresh_file_list, outputs=[file_list_status]).then( |
|
|
fn=list_uploaded_files_paginated, |
|
|
inputs=[], |
|
|
outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] |
|
|
) |
|
|
|
|
|
next_btn.click(fn=load_next_page, inputs=[pagination_info], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]) |
|
|
prev_btn.click(fn=load_prev_page, inputs=[pagination_info], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]) |
|
|
|
|
|
|
|
|
app.launch(server_name="0.0.0.0", server_port=7860, share=False, debug=True, show_error=True) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |