import gradio as gr import os from pathlib import Path from pinecone import Pinecone from typing import List, Tuple import tempfile import shutil from dotenv import load_dotenv import time from datetime import datetime import json # Load environment variables load_dotenv() # Authentication configuration AUTH_PASSWORD = "gst_magic@##56$$" def authenticate(password): """Simple authentication function""" return password == AUTH_PASSWORD # Validate required environment variables required_env_vars = ["PINECONE_API_KEY"] missing_vars = [var for var in required_env_vars if not os.getenv(var)] if missing_vars: raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}") # Initialize Pinecone pinecone_api_key = os.getenv("PINECONE_API_KEY") pc = Pinecone(api_key=pinecone_api_key) # Create uploads directory UPLOAD_FOLDER = "uploads" os.makedirs(UPLOAD_FOLDER, exist_ok=True) def parse_pinecone_timestamp(iso_string): """ Parses an ISO 8601 string from Pinecone, handling nanosecond precision. Args: iso_string (str): The ISO-formatted timestamp string. Returns: datetime: The parsed datetime object. """ if not isinstance(iso_string, str) or not iso_string: return datetime.min # Return a very old date for sorting purposes # Handle ISO strings ending with 'Z' (convert to +00:00) if iso_string.endswith('Z'): iso_string = iso_string[:-1] + '+00:00' # Find the decimal point for fractional seconds decimal_point = iso_string.find('.') if decimal_point != -1: # Find where the timezone info starts (+ or -) tz_start = max(iso_string.rfind('+'), iso_string.rfind('-')) if tz_start > decimal_point: # Extract the fractional seconds part fractional_part = iso_string[decimal_point+1:tz_start] # Truncate to 6 digits (microseconds) if longer if len(fractional_part) > 6: fractional_part = fractional_part[:6] # Reconstruct the ISO string with truncated fractional seconds iso_string = iso_string[:decimal_point+1] + fractional_part + iso_string[tz_start:] return datetime.fromisoformat(iso_string) def get_all_files(): """Get all files from Pinecone Assistant and sort them""" try: assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") assistant = pc.assistant.Assistant(assistant_name=assistant_name) # List all files in the assistant files_response = assistant.list_files() # Check if files_response is directly a list or has a files attribute if hasattr(files_response, 'files'): files_list = files_response.files else: files_list = files_response if not files_list: return [] # Sort files by creation time (most recent first) using robust timestamp parsing sorted_files = sorted( files_list, key=lambda x: parse_pinecone_timestamp(getattr(x, 'created_on', '')), reverse=True ) return sorted_files except Exception as e: return [] def get_file_choices(): """Get file choices for the dropdown - returns list of (title, file_id) tuples""" try: all_files = get_all_files() if not all_files: return [] choices = [] for file_obj in all_files: file_name = getattr(file_obj, 'name', 'Unknown File') file_id = getattr(file_obj, 'id', 'unknown') created_on = getattr(file_obj, 'created_on', '') # Format timestamp for display try: if created_on: created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M') display_name = f"{file_name} (uploaded: {created_formatted})" else: display_name = file_name except: display_name = file_name choices.append((display_name, file_id)) return choices except Exception as e: return [] def refresh_delete_dropdown(): """Refresh the dropdown with current files""" choices = get_file_choices() if not choices: return gr.update(choices=[], value=None, interactive=False) return gr.update(choices=choices, value=None, interactive=True) def delete_selected_files(selected_file_ids, progress=gr.Progress()): """Delete multiple selected files by their IDs""" if not selected_file_ids: return "❌ **Error:** No files selected for deletion", "" try: progress(0.1, desc="🔧 Initializing Pinecone Assistant...") assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") assistant = pc.assistant.Assistant(assistant_name=assistant_name) # Get current files to map IDs to names all_files = get_all_files() file_id_to_name = {getattr(f, 'id', ''): getattr(f, 'name', 'Unknown') for f in all_files} total_files = len(selected_file_ids) deleted_files = [] failed_files = [] progress(0.2, desc=f"🗑️ Starting deletion of {total_files} files...") for i, file_id in enumerate(selected_file_ids): try: file_name = file_id_to_name.get(file_id, f"File ID: {file_id}") progress((0.2 + (i / total_files) * 0.7), desc=f"🗑️ Deleting: {file_name}...") # Delete the file response = assistant.delete_file(file_id=file_id) deleted_files.append({ 'name': file_name, 'id': file_id, 'status': 'success' }) time.sleep(0.2) # Small delay between deletions except Exception as delete_error: failed_files.append({ 'name': file_id_to_name.get(file_id, f"File ID: {file_id}"), 'id': file_id, 'error': str(delete_error) }) progress(1.0, desc="✅ Deletion process completed!") # Format results success_count = len(deleted_files) error_count = len(failed_files) status_message = f"📊 **Deletion Complete**\n\n" status_message += f"✅ **Successfully deleted:** {success_count} files\n" status_message += f"❌ **Failed to delete:** {error_count} files\n" status_message += f"📁 **Total processed:** {total_files} files\n\n" # Detailed results detailed_results = "## 🗑️ **Deletion Results**\n\n" if deleted_files: detailed_results += "### ✅ **Successfully Deleted Files:**\n" for file_info in deleted_files: detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n" detailed_results += "\n" if failed_files: detailed_results += "### ❌ **Failed Deletions:**\n" for file_info in failed_files: detailed_results += f"- **{file_info['name']}** (`{file_info['id']}`)\n" detailed_results += f" - Error: {file_info['error']}\n" detailed_results += "\n" return status_message, detailed_results except Exception as e: error_msg = f"❌ **Critical Error during deletion:** {str(e)}" return error_msg, "" def list_uploaded_files_paginated(page_num=0, progress=gr.Progress()): """List files with pagination - 100 files per page""" try: progress(0.1, desc="🔍 Getting files...") # Get all files all_files = get_all_files() if not all_files: progress(1.0, desc="✅ Complete - No files found") return "📋 **No files found in Pinecone Assistant**", "", "No files available", gr.update(visible=False), gr.update(visible=False) progress(0.5, desc="📊 Processing page...") # Pagination settings files_per_page = 100 start_idx = page_num * files_per_page end_idx = start_idx + files_per_page # Get files for current page page_files = all_files[start_idx:end_idx] total_files = len(all_files) total_pages = (total_files + files_per_page - 1) // files_per_page # Create summary summary = f"📊 **Files Summary (Page {page_num + 1} of {total_pages})**\n\n" summary += f"📁 **Total files:** {total_files}\n" summary += f"📋 **Showing:** {start_idx + 1}-{min(end_idx, total_files)} of {total_files}\n\n" # Create file list - ONLY TITLES, VERTICALLY detailed_info = f"## 📋 **Latest Uploaded Files - Page {page_num + 1}**\n\n" progress(0.8, desc="📝 Formatting file titles...") for i, file_obj in enumerate(page_files, 1): try: # Get only the file name/title file_name = getattr(file_obj, 'name', 'Unknown File') file_id = getattr(file_obj, 'id', 'Unknown ID') created_on = getattr(file_obj, 'created_on', '') global_index = start_idx + i # Format timestamp try: if created_on: created_formatted = parse_pinecone_timestamp(created_on).strftime('%Y-%m-%d %H:%M') else: created_formatted = 'Unknown' except: created_formatted = 'Unknown' # Display file info detailed_info += f"{global_index}. **{file_name}**\n" detailed_info += f" 📅 Uploaded: {created_formatted} | 🆔 ID: `{file_id}`\n\n" except Exception as file_error: detailed_info += f"{start_idx + i}. ❌ **Error loading file**\n\n" # Pagination info pagination_info = f"📄 Page {page_num + 1} of {total_pages} | Total: {total_files} files" # Show/hide next/prev buttons show_prev = page_num > 0 show_next = page_num < total_pages - 1 progress(1.0, desc="✅ Page loaded successfully!") return summary, detailed_info, pagination_info, gr.update(visible=show_prev), gr.update(visible=show_next) except Exception as e: error_msg = f"❌ **Error retrieving file list:** {str(e)}" return error_msg, "", "Error", gr.update(visible=False), gr.update(visible=False) def load_next_page(current_page_info): """Load next page of files""" try: # Extract current page number from pagination info current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1 return list_uploaded_files_paginated(current_page + 1) except: return list_uploaded_files_paginated(0) def load_prev_page(current_page_info): """Load previous page of files""" try: # Extract current page number from pagination info current_page = int(current_page_info.split("Page ")[1].split(" of")[0]) - 1 return list_uploaded_files_paginated(max(0, current_page - 1)) except: return list_uploaded_files_paginated(0) def refresh_file_list(): """Refresh the file list""" return "🔄 **Refreshing file list... Please wait**" def process_files_with_progress(files, *metadata_inputs, progress=gr.Progress()): """Process multiple files with individual metadata and show progress""" if not files: return "❌ Error: No files selected", "" if len(files) > 10: return "❌ Error: Maximum 10 files allowed at a time", "" try: results = [] errors = [] total_files = len(files) # Initialize Pinecone Assistant progress(0, desc="🔧 Initializing Pinecone Assistant...") time.sleep(0.5) # Small delay to show the progress assistant_name = os.getenv("PINECONE_ASSISTANT_NAME", "gstminutes") assistant = pc.assistant.Assistant(assistant_name=assistant_name) # Process each file with its individual metadata for i, file_path in enumerate(files): try: filename = os.path.basename(file_path) progress((i / total_files), desc=f"📄 Processing {filename}... ({i+1}/{total_files})") # Get metadata for this specific file (3 fields per file: sections, keywords, description) sections_idx = i * 3 keywords_idx = i * 3 + 1 description_idx = i * 3 + 2 if sections_idx < len(metadata_inputs): sections = metadata_inputs[sections_idx] or "" keywords = metadata_inputs[keywords_idx] or "" description = metadata_inputs[description_idx] or "" else: sections = keywords = description = "" # Skip if no metadata provided for this file if not sections.strip() and not keywords.strip() and not description.strip(): errors.append({ "filename": filename, "error": "❌ Error: No metadata provided" }) continue # Prepare metadata for this file progress((i / total_files), desc=f"🏷️ Preparing metadata for {filename}...") metadata = { "sections": [s.strip() for s in sections.split(",") if s.strip()], "keywords": [k.strip() for k in keywords.split(",") if k.strip()], "description": description.strip() } # Copy to uploads directory progress((i / total_files), desc=f"📁 Copying {filename} to uploads...") destination_path = os.path.join(UPLOAD_FOLDER, filename) shutil.copy2(file_path, destination_path) # Upload to Pinecone Assistant progress((i / total_files), desc=f"☁️ Uploading {filename} to Pinecone...") response = assistant.upload_file( file_path=destination_path, metadata=metadata, timeout=None ) results.append({ "filename": filename, "status": "✅ Success", "metadata": metadata, "response": str(response) }) except Exception as file_error: errors.append({ "filename": os.path.basename(file_path), "error": f"❌ Error: {str(file_error)}" }) # Final progress update progress(1.0, desc="✅ Processing complete!") time.sleep(0.5) # Format results for display success_count = len(results) error_count = len(errors) status_message = f"📊 **Processing Complete**\n\n" status_message += f"✅ **Successful uploads:** {success_count}\n" status_message += f"❌ **Failed uploads:** {error_count}\n" status_message += f"📁 **Total files processed:** {len(files)}\n\n" # Detailed results detailed_results = "## 📋 **Detailed Results**\n\n" if results: detailed_results += "### ✅ **Successful Uploads:**\n" for result in results: detailed_results += f"- **{result['filename']}**\n" detailed_results += f" - Sections: {', '.join(result['metadata']['sections'])}\n" detailed_results += f" - Keywords: {', '.join(result['metadata']['keywords'])}\n" detailed_results += f" - Description: {result['metadata']['description']}\n\n" if errors: detailed_results += "### ❌ **Failed Uploads:**\n" for error in errors: detailed_results += f"- **{error['filename']}** - {error['error']}\n" return status_message, detailed_results, "✅ **Processing completed successfully!**" except Exception as e: error_msg = f"❌ **Critical Error:** {str(e)}" return error_msg, "", "❌ **Processing failed with error**" def update_metadata_fields(files): """Update metadata fields based on uploaded files""" if not files: return [gr.update(visible=False)] * 30 # Hide all fields if len(files) > 10: # Show error and hide all fields return [gr.update(visible=False)] * 30 updates = [] for i in range(len(files)): if i < len(files): filename = os.path.basename(files[i]) # Show 3 fields per file (sections, keywords, description) updates.extend([ gr.update(visible=True, label=f"📑 Sections for {filename}", placeholder="e.g., Introduction, Financial Data, Compliance"), gr.update(visible=True, label=f"🔍 Keywords for {filename}", placeholder="e.g., GST, tax, compliance, revenue"), gr.update(visible=True, label=f"📝 Description for {filename}", placeholder="Brief description of this document") ]) # Hide remaining fields while len(updates) < 30: updates.append(gr.update(visible=False)) return updates[:30] def clear_form(): """Clear all form fields""" return [None] + [""] * 30 + ["", "", "🟢 **Ready to process documents**"] def clear_delete_form(): """Clear delete form""" return gr.update(value=[]), "", "" def start_processing(): """Show processing started status""" return "🔄 **Processing documents... Please wait**" def finish_processing(): """Show processing finished status""" return "✅ **Processing completed successfully!**" def create_main_interface(): """Create the main application interface""" with gr.Blocks( title="📄 Tax Document Ingestion System", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 1400px !important; margin: auto; } .upload-container { border: 2px dashed #4CAF50; border-radius: 10px; padding: 20px; text-align: center; background-color: #f8f9fa; } .delete-container { border: 2px dashed #f44336; border-radius: 10px; padding: 20px; background-color: #ffebee; } .tab-nav { margin-bottom: 20px; } """ ) as app: gr.Markdown( """ # 📄 Tax Document Ingestion System Upload, manage, and delete documents in the Pinecone Assistant for GST Minutes processing. ## 🚀 Features: - ✅ **Multiple file upload** - Select and upload multiple documents at once - 🏷️ **Metadata tagging** - Add sections, keywords, and descriptions - 🔄 **Batch processing** - All files processed with individual metadata - 🗑️ **File deletion** - Delete multiple files by selecting from dropdown - 📊 **File management** - View uploaded files with timestamps and metadata - 📋 **Detailed reporting** - See success/failure status for each operation --- """ ) # Create tabs for different functionalities with gr.Tabs() as tabs: # Tab 1: Upload Documents with gr.TabItem("📤 Upload Documents", id="upload_tab"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📁 **File Upload**") files_input = gr.File( label="Select Documents (Max 10 files)", file_count="multiple", file_types=[".pdf", ".doc", ".docx", ".txt"], elem_classes=["upload-container"] ) with gr.Column(scale=1): gr.Markdown("### 🏷️ **Document Metadata (Individual for Each File)**") gr.Markdown("*Upload files first, then metadata fields will appear for each document*") # Dynamic metadata fields container with gr.Column() as metadata_container: # Create 30 text fields (enough for 10 files with 3 fields each) metadata_fields = [] for i in range(30): field = gr.Textbox( label=f"Field {i}", placeholder="", visible=False, lines=2 ) metadata_fields.append(field) with gr.Row(): with gr.Column(scale=1): upload_btn = gr.Button( "🚀 Upload Documents to Pinecone Assistant", variant="primary", size="lg" ) with gr.Column(scale=1): clear_btn = gr.Button( "🗑️ Clear Form", variant="secondary", size="lg" ) # Processing status indicator with gr.Row(): processing_status = gr.Markdown( value="🟢 **Ready to process documents**", visible=True ) gr.Markdown("---") # Results section with gr.Row(): with gr.Column(): status_output = gr.Markdown( label="📊 Upload Status", value="*Ready to upload documents...*" ) with gr.Row(): with gr.Column(): results_output = gr.Markdown( label="📋 Detailed Results", value="", max_height=400 # Add height limit for scrolling ) # Tab 2: Delete Documents with gr.TabItem("🗑️ Delete Documents", id="delete_tab"): gr.Markdown("### 🗑️ **Delete Multiple Documents**") gr.Markdown("Select multiple files from the dropdown to delete them from the Pinecone Assistant.") with gr.Row(): with gr.Column(scale=2): file_dropdown = gr.Dropdown( label="📋 Select Files to Delete (Multiple Selection)", choices=[], multiselect=True, interactive=False, elem_classes=["delete-container"] ) with gr.Column(scale=1): refresh_dropdown_btn = gr.Button( "🔄 Refresh File List", variant="secondary", size="lg" ) with gr.Row(): with gr.Column(scale=1): delete_btn = gr.Button( "🗑️ Delete Selected Files", variant="stop", size="lg" ) with gr.Column(scale=1): clear_delete_btn = gr.Button( "↺ Clear Selection", variant="secondary", size="lg" ) gr.Markdown("---") # Delete results section with gr.Row(): with gr.Column(): delete_status_output = gr.Markdown( label="📊 Deletion Status", value="*Select files to delete...*" ) with gr.Row(): with gr.Column(): delete_results_output = gr.Markdown( label="🗑️ Deletion Results", value="", max_height=400 ) # Tab 3: View Uploaded Files with gr.TabItem("📋 View Uploaded Files", id="view_tab"): gr.Markdown("### 📋 **Uploaded Files Management**") gr.Markdown("View all files currently uploaded to the Pinecone Assistant with their metadata and timestamps.") with gr.Row(): refresh_btn = gr.Button( "🔄 Fetch Files", variant="primary", size="lg" ) # File list status with gr.Row(): file_list_status = gr.Markdown( value="🟡 **Click 'Fetch Files' to load uploaded files**", visible=True ) gr.Markdown("---") # Pagination controls with gr.Row(): prev_btn = gr.Button( "⬅️ Previous 100", variant="secondary", visible=False ) pagination_info = gr.Markdown( value="📄 Page 1 of 1 | Total: 0 files", elem_classes=["pagination-info"] ) next_btn = gr.Button( "Next 100 ➡️", variant="secondary", visible=False ) # File list results with gr.Row(): with gr.Column(scale=1): file_summary = gr.Markdown( label="📊 Files Summary", value="*Click refresh to load file summary...*" ) with gr.Row(): with gr.Column(): file_details = gr.Markdown( label="📋 File Details", value="*Click refresh to load file details...*", max_height=600 # Add height limit for scrolling ) # Event handlers for Upload tab # Update metadata fields when files are uploaded files_input.change( fn=update_metadata_fields, inputs=[files_input], outputs=metadata_fields ) # Show processing status when upload starts upload_btn.click( fn=start_processing, outputs=[processing_status] ).then( fn=process_files_with_progress, inputs=[files_input] + metadata_fields, outputs=[status_output, results_output, processing_status] ) clear_btn.click( fn=clear_form, outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status] ) # Event handlers for Delete tab # Refresh dropdown with current files refresh_dropdown_btn.click( fn=refresh_delete_dropdown, outputs=[file_dropdown] ) # Delete selected files delete_btn.click( fn=delete_selected_files, inputs=[file_dropdown], outputs=[delete_status_output, delete_results_output] ) # Clear delete form clear_delete_btn.click( fn=clear_delete_form, outputs=[file_dropdown, delete_status_output, delete_results_output] ) # Event handlers for View Files tab # Fetch files - load page 1 refresh_btn.click( fn=refresh_file_list, outputs=[file_list_status] ).then( fn=list_uploaded_files_paginated, inputs=[], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] ) # Next page button next_btn.click( fn=load_next_page, inputs=[pagination_info], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] ) # Previous page button prev_btn.click( fn=load_prev_page, inputs=[pagination_info], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] ) # Footer gr.Markdown( """ --- ### 💡 **Usage Tips:** **Upload Documents:** - Select up to 10 PDF, DOC, DOCX, or TXT files at once - Upload files first, then fill individual metadata for each document - Each file gets its own sections, keywords, and description - Check the results section for upload status **Delete Documents:** - Click 'Refresh File List' to load current files in dropdown - Select multiple files using the dropdown (supports multi-select) - Click 'Delete Selected Files' to remove them permanently - View deletion results for success/failure status **View Uploaded Files:** - Click 'Fetch Files' to see all uploaded files - View file details including upload timestamps and metadata - Files are sorted by most recent first - Use pagination to navigate through large file lists ### ⚠️ **Important Notes:** - File deletion is **permanent** and cannot be undone - Always verify your selection before deleting files - The system maps file titles to IDs internally for deletion ### 📞 **Support:** For issues or questions, contact the development team. """ ) return app def create_login_interface(): """Create the login interface""" with gr.Blocks( title="🔐 Authentication Required", theme=gr.themes.Soft(), css=""" .gradio-container { max-width: 500px !important; margin: auto; padding-top: 100px; } .login-container { border: 2px solid #2196F3; border-radius: 15px; padding: 30px; text-align: center; background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); box-shadow: 0 10px 20px rgba(0,0,0,0.1); } .password-input { margin: 20px 0; } .login-button { margin-top: 15px; } .error-message { color: #f44336; font-weight: bold; margin-top: 10px; } """ ) as login_app: with gr.Column(elem_classes=["login-container"]): gr.Markdown( """ # 🔐 **Tax Document System** ## **Authentication Required** Please enter the password to access the application. --- """ ) password_input = gr.Textbox( label="🔑 Password", type="password", placeholder="Enter password to access the system", elem_classes=["password-input"] ) login_btn = gr.Button( "🚀 Login", variant="primary", size="lg", elem_classes=["login-button"] ) error_message = gr.Markdown( value="", visible=False, elem_classes=["error-message"] ) gr.Markdown( """ --- ### 🛡️ **Security Notice:** - This system contains sensitive tax documentation - Authorized access only - All activities are logged ### 📞 **Need Access?** Contact your system administrator for credentials. """ ) return login_app, password_input, login_btn, error_message def main(): """Main application with authentication — single Gradio Blocks app. This builds both the login UI and the main application UI inside one `gr.Blocks` so we only call `.launch()` once. The main UI is hidden until authentication succeeds. This avoids launching two separate Gradio servers which was causing the server to close unexpectedly. """ # Reuse same theme/CSS as main app for consistency app_css = """ .gradio-container { max-width: 1400px !important; margin: auto; } .upload-container { border: 2px dashed #4CAF50; border-radius: 10px; padding: 20px; text-align: center; background-color: #f8f9fa; } .delete-container { border: 2px dashed #f44336; border-radius: 10px; padding: 20px; background-color: #ffebee; } .login-container { border: 2px solid #2196F3; border-radius: 15px; padding: 30px; text-align: center; background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); box-shadow: 0 10px 20px rgba(0,0,0,0.1); } """ with gr.Blocks(title="Tax Document Ingestion System", theme=gr.themes.Soft(), css=app_css) as app: # --- LOGIN CONTAINER (visible initially) --- with gr.Column(elem_classes=["login-container"]) as login_container: gr.Markdown(""" # Tax Document System Authentication required """) password_input = gr.Textbox( label="Password", type="password", placeholder="Enter password" ) login_btn = gr.Button("Login", variant="primary", size="lg") error_message = gr.Markdown(value="", visible=False) # --- MAIN APP CONTAINER (hidden until auth) --- with gr.Column(visible=False) as main_container: # Insert main UI contents (slimmed text, no emojis) gr.Markdown( """ # Tax Document Ingestion System Upload, manage, and delete documents in the Pinecone Assistant. """ ) with gr.Tabs() as tabs: # Upload Tab with gr.TabItem("Upload Documents", id="upload_tab"): with gr.Row(): with gr.Column(scale=1): gr.Markdown("### File Upload") files_input = gr.File( label="Select Documents (Max 10 files)", file_count="multiple", file_types=[".pdf", ".doc", ".docx", ".txt"], elem_classes=["upload-container"] ) with gr.Column(scale=1): gr.Markdown("### Document Metadata") with gr.Column() as metadata_container: metadata_fields = [] for i in range(30): field = gr.Textbox(label=f"Field {i}", placeholder="", visible=False, lines=2) metadata_fields.append(field) with gr.Row(): with gr.Column(scale=1): upload_btn = gr.Button("Upload Documents to Pinecone Assistant", variant="primary") with gr.Column(scale=1): clear_btn = gr.Button("Clear Form", variant="secondary") processing_status = gr.Markdown(value="Ready to process documents", visible=True) status_output = gr.Markdown(label="Upload Status", value="Ready to upload documents...") results_output = gr.Markdown(label="Detailed Results", value="", max_height=400) # Delete Tab with gr.TabItem("Delete Documents", id="delete_tab"): gr.Markdown("### Delete Documents") with gr.Row(): with gr.Column(scale=2): file_dropdown = gr.Dropdown(label="Select Files to Delete", choices=[], multiselect=True, interactive=False, elem_classes=["delete-container"]) with gr.Column(scale=1): refresh_dropdown_btn = gr.Button("Refresh File List", variant="secondary") with gr.Row(): with gr.Column(scale=1): delete_btn = gr.Button("Delete Selected Files", variant="stop") with gr.Column(scale=1): clear_delete_btn = gr.Button("Clear Selection", variant="secondary") delete_status_output = gr.Markdown(label="Deletion Status", value="Select files to delete...") delete_results_output = gr.Markdown(label="Deletion Results", value="", max_height=400) # View Tab with gr.TabItem("View Uploaded Files", id="view_tab"): gr.Markdown("### Uploaded Files Management") with gr.Row(): refresh_btn = gr.Button("Fetch Files", variant="primary") file_list_status = gr.Markdown(value="Click 'Fetch Files' to load uploaded files", visible=True) with gr.Row(): prev_btn = gr.Button("Previous 100", variant="secondary", visible=False) pagination_info = gr.Markdown(value="Page 1 of 1 | Total: 0 files") next_btn = gr.Button("Next 100", variant="secondary", visible=False) file_summary = gr.Markdown(label="Files Summary", value="Click refresh to load file summary...") file_details = gr.Markdown(label="File Details", value="Click refresh to load file details...", max_height=600) # --- EVENT HANDLERS --- # Login handlers: show main_container on success, show error on failure def handle_login(password): if authenticate(password): return gr.update(visible=False), gr.update(value="", visible=False), gr.update(visible=True) else: return gr.update(visible=True), gr.update(value="Invalid password. Please try again.", visible=True), gr.update(visible=False) login_btn.click(fn=handle_login, inputs=[password_input], outputs=[login_container, error_message, main_container]) password_input.submit(fn=handle_login, inputs=[password_input], outputs=[login_container, error_message, main_container]) # Wiring existing handlers from earlier in the file files_input.change(fn=update_metadata_fields, inputs=[files_input], outputs=metadata_fields) upload_btn.click(fn=start_processing, outputs=[processing_status]).then( fn=process_files_with_progress, inputs=[files_input] + metadata_fields, outputs=[status_output, results_output, processing_status] ) clear_btn.click(fn=clear_form, outputs=[files_input] + metadata_fields + [status_output, results_output, processing_status]) refresh_dropdown_btn.click(fn=refresh_delete_dropdown, outputs=[file_dropdown]) delete_btn.click(fn=delete_selected_files, inputs=[file_dropdown], outputs=[delete_status_output, delete_results_output]) clear_delete_btn.click(fn=clear_delete_form, outputs=[file_dropdown, delete_status_output, delete_results_output]) refresh_btn.click(fn=refresh_file_list, outputs=[file_list_status]).then( fn=list_uploaded_files_paginated, inputs=[], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn] ) next_btn.click(fn=load_next_page, inputs=[pagination_info], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]) prev_btn.click(fn=load_prev_page, inputs=[pagination_info], outputs=[file_summary, file_details, pagination_info, prev_btn, next_btn]) # Launch single app app.launch(server_name="0.0.0.0", server_port=7860, share=False, debug=True, show_error=True) if __name__ == "__main__": main()