Spaces:
Sleeping
Sleeping
| # ============================================================================= | |
| # COPYRIGHT NOTICE | |
| # ----------------------------------------------------------------------------- | |
| # This source code is the intellectual property of Aditya Pandey. | |
| # Any unauthorized reproduction, distribution, or modification of this code | |
| # is strictly prohibited. | |
| # If you wish to use or modify this code for your project, please ensure | |
| # to give full credit to Aditya Pandey. | |
| # | |
| # PROJECT DESCRIPTION | |
| # ----------------------------------------------------------------------------- | |
| # This code is for a chatbot crafted with powerful prompts, designed to | |
| # utilize the Gemini API. It is tailored to assist cybersecurity researchers. | |
| # | |
| # Author: Aditya Pandey | |
| # ============================================================================= | |
| # Import library | |
| import os | |
| import faiss | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| from PIL import Image | |
| from PyPDF2 import PdfReader | |
| import streamlit as st | |
| from gtts import gTTS | |
| from io import BytesIO | |
| import google.generativeai as genai | |
| from constants import gemini_key | |
| from bs4 import BeautifulSoup | |
| import urllib.request | |
| import re | |
| import json | |
| from google.api_core.exceptions import GoogleAPIError | |
| import speech_recognition as sr | |
| from collections import defaultdict | |
| # Streamlit configuration | |
| st.set_page_config( | |
| page_title="OxSecure RAG", | |
| page_icon="π€Ώ", | |
| layout="wide" | |
| ) | |
| def load_css(file_name): | |
| with open(file_name) as f: | |
| st.markdown(f'<style>{f.read()}</style>', unsafe_allow_html=True) | |
| # Load the CSS file | |
| load_css("ui/Style.css") | |
| # API configuration | |
| os.environ["GOOGLE_API_KEY"] = gemini_key | |
| genai.configure(api_key=os.environ['GOOGLE_API_KEY']) | |
| # Function to query Gemini model | |
| def query_gemini(context, prompt, image=None): | |
| try: | |
| if image: | |
| model = genai.GenerativeModel('gemini-1.5-pro-latest') | |
| response = model.generate_content([context + prompt, image]) | |
| else: | |
| model = genai.GenerativeModel('gemini-1.5-pro-latest') | |
| response = model.generate_content(context + prompt) | |
| if hasattr(response, 'candidates') and response.candidates: | |
| return ' '.join(part.text for part in response.candidates[0].content.parts) | |
| else: | |
| st.error("Unexpected response format from Gemini API.") | |
| return None | |
| except GoogleAPIError as e: | |
| st.error(f"An error occurred while querying the Gemini API: {e}") | |
| return None | |
| # Function to extract text from PDF | |
| def extract_text_from_pdf(file): | |
| try: | |
| pdf_reader = PdfReader(file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| return text | |
| except Exception as e: | |
| st.error(f"An error occurred while extracting text from PDF: {e}") | |
| return "" | |
| # Function to extract text from URL | |
| def extract_text_from_url(url): | |
| try: | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'} | |
| request = urllib.request.Request(url, headers=headers) | |
| response = urllib.request.urlopen(request) | |
| html = response.read() | |
| soup = BeautifulSoup(html, 'html.parser') | |
| paragraphs = soup.find_all('p') | |
| text = ' '.join([para.get_text() for para in paragraphs]) | |
| return text | |
| except Exception as e: | |
| st.error(f"An error occurred while extracting text from URL: {e}") | |
| return "" | |
| # Function to extract text from CSV | |
| def extract_text_from_csv(file): | |
| try: | |
| df = pd.read_csv(file) | |
| return df.to_string(index=False) | |
| except Exception as e: | |
| st.error(f"An error occurred while extracting text from CSV: {e}") | |
| return "" | |
| # Function to extract text from Excel | |
| def extract_text_from_excel(file): | |
| try: | |
| df = pd.read_excel(file) | |
| return df.to_string(index=False) | |
| except Exception as e: | |
| st.error(f"An error occurred while extracting text from Excel: {e}") | |
| return "" | |
| # Function to extract text from JSON | |
| def extract_text_from_json(file): | |
| try: | |
| json_data = json.load(file) | |
| formatted_text = json.dumps(json_data, indent=4) | |
| return formatted_text | |
| except Exception as e: | |
| st.error(f"An error occurred while extracting text from JSON: {e}") | |
| return "" | |
| # Remove special characters and improve formatting | |
| def clean_text(text): | |
| # Retain only alphabetic characters, numbers, punctuation, and spaces | |
| clean_text = re.sub(r'[^a-zA-Z0-9.,!?;:()\'\" \n]', '', text) | |
| return re.sub(r'\s+', ' ', clean_text).strip() | |
| # Placeholder function to create embeddings | |
| def embed_text(text): | |
| # This should be replaced with the actual embedding generation logic | |
| # For demonstration, return a dummy vector | |
| return np.random.rand(512).astype('float32') | |
| # Function to create embeddings and store in FAISS | |
| def store_embeddings(text): | |
| chunks = [text[i:i+512] for i in range(0, len(text), 512)] | |
| vectors = [embed_text(chunk) for chunk in chunks] | |
| dimension = vectors[0].shape[0] | |
| index = faiss.IndexFlatL2(dimension) | |
| index.add(np.array(vectors)) | |
| return index, chunks | |
| # Function to search embeddings and retrieve relevant text | |
| def search_embeddings(index, query, top_k): | |
| query_vector = embed_text(query) # Replace with actual embedding generation | |
| D, I = index.search(np.array([query_vector]), k=top_k) | |
| return I[0] | |
| # Function to handle Q&A | |
| def handle_qa(query, faiss_index, document_chunks, top_k): | |
| if faiss_index: | |
| retrieved_indices = search_embeddings(faiss_index, query, top_k) | |
| context = " ".join([document_chunks[i] for i in retrieved_indices]) | |
| response = query_gemini(context, query) | |
| else: | |
| response = query_gemini(st.session_state.context, query) | |
| return response | |
| # Function for speech recognition | |
| def recognize_speech(): | |
| r = sr.Recognizer() | |
| try: | |
| with sr.Microphone() as source: | |
| st.info("Listening...") | |
| audio = r.listen(source) | |
| text = r.recognize_google(audio) | |
| st.success(f"You said: {text}") | |
| return text | |
| except sr.UnknownValueError: | |
| st.error("Could not understand audio") | |
| return None | |
| except sr.RequestError as e: | |
| st.error(f"Could not request results from Google Speech Recognition service; {e}") | |
| return None | |
| except Exception as e: | |
| st.error(f"An error occurred: {e}") | |
| return None | |
| # Function to analyze log file | |
| def analyze_log_file(file): | |
| log_summary = { | |
| 'total_lines': 0, | |
| 'error_count': 0, | |
| 'warning_count': 0, | |
| 'info_count': 0, | |
| 'error_details': defaultdict(int), | |
| 'warning_details': defaultdict(int), | |
| 'info_details': defaultdict(int), | |
| } | |
| error_pattern = re.compile(r'\bERROR\b') | |
| warning_pattern = re.compile(r'\bWARNING\b') | |
| info_pattern = re.compile(r'\bINFO\b') | |
| with open(file, 'r') as file: | |
| for line in file: | |
| log_summary['total_lines'] += 1 | |
| if error_pattern.search(line): | |
| log_summary['error_count'] += 1 | |
| log_summary['error_details'][line.strip()] += 1 | |
| elif warning_pattern.search(line): | |
| log_summary['warning_count'] += 1 | |
| log_summary['warning_details'][line.strip()] += 1 | |
| elif info_pattern.search(line): | |
| log_summary['info_count'] += 1 | |
| log_summary['info_details'][line.strip()] += 1 | |
| return log_summary | |
| # Main App Function | |
| def render_main_app(): | |
| st.title('OxSecure RAG β¨οΈ') | |
| st.divider() | |
| st.markdown('**By :- Aditya Pandey π§π»βπ»**') | |
| input_prompt = st.text_input("Input Prompt: ", key="input") | |
| uploaded_file = st.file_uploader("Choose a file (image, PDF, CSV, Excel, JSON, or LOG)...", type=["jpg", "jpeg", "png", "pdf", "csv", "xlsx", "json", "log"]) | |
| uploaded_url = st.text_input("Or enter an article URL:") | |
| image = None | |
| file_text = "" | |
| if uploaded_file is not None: | |
| if uploaded_file.type in ["image/jpeg", "image/png", "image/jpg", "image/webp"]: | |
| image = Image.open(uploaded_file) | |
| st.image(image, caption="Uploaded Image.", use_column_width=True) | |
| elif uploaded_file.type == "application/pdf": | |
| file_text = extract_text_from_pdf(uploaded_file) | |
| st.text_area("Extracted Text from PDF:", file_text, height=300) | |
| elif uploaded_file.type == "text/csv": | |
| df = pd.read_csv(uploaded_file) | |
| st.dataframe(df) | |
| file_text = df.to_string(index=False) | |
| elif uploaded_file.type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": | |
| df = pd.read_excel(uploaded_file) | |
| st.dataframe(df) | |
| file_text = df.to_string(index=False) | |
| elif uploaded_file.type == "application/json": | |
| df = pd.read_json(uploaded_file) | |
| st.json(df.to_dict()) | |
| file_text = df.to_string(index=False) | |
| elif uploaded_file.type == "text/plain": | |
| if uploaded_file.name.endswith(".log"): | |
| file_text = uploaded_file.read().decode("utf-8") | |
| log_summary = analyze_log_file(file_text.splitlines()) | |
| st.write("Log Summary:") | |
| st.write(f"Total Lines: {log_summary['total_lines']}") | |
| st.write(f"Error Count: {log_summary['error_count']}") | |
| st.write(f"Warning Count: {log_summary['warning_count']}") | |
| st.write(f"Info Count: {log_summary['info_count']}") | |
| st.write("\nError Details:") | |
| for error, count in log_summary['error_details'].items(): | |
| st.write(f"{count} occurrence(s): {error}") | |
| st.write("\nWarning Details:") | |
| for warning, count in log_summary['warning_details'].items(): | |
| st.write(f"{count} occurrence(s): {warning}") | |
| st.write("\nInfo Details:") | |
| for info, count in log_summary['info_details'].items(): | |
| st.write(f"{count} occurrence(s): {info}") | |
| else: | |
| st.error("Please upload a valid log file.") | |
| else: | |
| st.error("Unsupported file type.") | |
| elif uploaded_url: | |
| file_text = extract_text_from_url(uploaded_url) | |
| st.text_area("Extracted Text from URL:", file_text, height=300) | |
| # Initialize or update session state for context | |
| if "context" not in st.session_state: | |
| st.session_state.context = "" | |
| if "faiss_index" not in st.session_state: | |
| st.session_state.faiss_index = None | |
| if "document_chunks" not in st.session_state: | |
| st.session_state.document_chunks = [] | |
| def clear_previous_data(): | |
| st.session_state.faiss_index = None | |
| st.session_state.document_chunks = [] | |
| st.session_state.context = "" | |
| submit = st.button("Start Deep Diving π€Ώ", key="start_button") | |
| if submit: | |
| if input_prompt or file_text: | |
| clear_previous_data() | |
| prompt = input_prompt if input_prompt else "" | |
| st.session_state.context += " " + file_text # Update the context with new extracted text | |
| if file_text: | |
| st.session_state.faiss_index, st.session_state.document_chunks = store_embeddings(file_text) | |
| # Start spinner before processing | |
| spinner = st.spinner("Processing..... Getting Results β³") | |
| with spinner: | |
| response = query_gemini(st.session_state.context, prompt, image) | |
| # Stop spinner after processing | |
| if response: | |
| st.subheader("Extracted Data π‘") | |
| st.write(response) | |
| clean_response = clean_text(response) | |
| # Text-to-Speech conversion | |
| tts = gTTS(clean_response) | |
| audio_file = BytesIO() | |
| tts.write_to_fp(audio_file) | |
| st.audio(audio_file, format='audio/mp3') | |
| else: | |
| st.warning("Please provide an input prompt or upload a file.") | |
| # Q&A section with slider and radio button | |
| st.markdown("-----") | |
| st.markdown("**Q/A Section π€**") | |
| query = st.text_input("Enter your query:", key="qa_query") | |
| top_k = st.slider("Select the number of document chunks to retrieve:", min_value=1, max_value=10, value=5, step=1) | |
| response_mode = st.radio("Select response mode:", ("Text", "Text-to-Speech")) | |
| qa_button = st.button("Ask", key="qa_button") | |
| if qa_button: | |
| if query: | |
| spinner = st.spinner("Processing your query...") | |
| with spinner: | |
| response = handle_qa(query, st.session_state.faiss_index, st.session_state.document_chunks, top_k) | |
| if response: | |
| st.divider() | |
| st.markdown("**Q&A Response π€**") | |
| clean_response = clean_text(response) | |
| if response_mode == "Text": | |
| st.write(response) | |
| else: | |
| st.write(response) | |
| tts = gTTS(clean_response) | |
| audio_file = BytesIO() | |
| tts.write_to_fp(audio_file) | |
| st.audio(audio_file, format='audio/mp3') | |
| else: | |
| st.warning("Please enter a query to ask.") | |
| st.markdown("-----") | |
| # Voice recognition section | |
| # st.markdown("**Voice Input π£οΈ**") | |
| # query = recognize_speech() | |
| # if st.button("Start Voice Recognition") and query: | |
| # with st.spinner("Processing your voice query..."): | |
| # response = handle_qa(query, st.session_state.faiss_index, st.session_state.document_chunks, top_k) | |
| # if response: | |
| # st.divider() | |
| # st.markdown("**Voice Q&A Response π€**") | |
| # clean_response = clean_text(response) | |
| # st.write(clean_response) | |
| # tts = gTTS(clean_response) | |
| # audio_file = BytesIO() | |
| # tts.write_to_fp(audio_file) | |
| # st.audio(audio_file, format='audio/mp3') | |
| # st.markdown("---") | |
| linkedin_url = "https://www.linkedin.com/in/aditya-pandey-896109224" | |
| st.markdown(f"Created with π€ π By Aditya Pandey [ LinkedIn π ]({linkedin_url})") | |
| # Description and Framework Section | |
| def render_description_and_framework(): | |
| st.title("OxSecure RAG - Description and Framework") | |
| st.markdown("----") | |
| st.markdown(""" | |
| ## π ***Project Description*** | |
| ---------------- | |
| **OxSecure RAG** is your cybersecurity research companion! Powered by the Gemini API and crafted with smart prompts, it can analyze various documents, extract key insights, create embeddings, and support question-answering (Q&A) like never before. ππ‘οΈ | |
| π οΈ ***Framework Used*** | |
| - **Streamlit**: The sleek and interactive interface π¨. | |
| - **FAISS**: Super-efficient similarity search and clustering for dense vectors β‘. | |
| - **Pandas**: Handling and processing data files like a pro (CSV, Excel) π. | |
| - **PyPDF2**: Extracting text from PDFs with ease π. | |
| - **BeautifulSoup**: Scraping web data with precision π. | |
| - **gTTS**: Giving the bot a voice with text-to-speech ποΈ. | |
| - **Google Generative AI (genai)**: Querying the powerful Gemini API π§ . | |
| - **SpeechRecognition**: Turning your voice into input for hands-free interaction π§. | |
| ---------------- | |
| ποΈ ***Architecture*** | |
| 1. **Input Handling**: | |
| - Upload various file types (PDF, CSV, Excel, JSON) or provide a URL π. | |
| - Input text prompts directly π. | |
| - Speak your query using voice recognition π€. | |
| 2. **Text Extraction**: | |
| - Extract text from uploaded files or URLs using the right tools ππ. | |
| 3. **Text Embedding**: | |
| - Split extracted text into chunks and convert them into embeddings π§©. | |
| - Store embeddings in a FAISS index for fast, relevant search results π. | |
| 4. **Q&A System**: | |
| - Ask questions based on uploaded or entered context β. | |
| - Retrieve relevant text chunks from the FAISS index and query the Gemini API π. | |
| 5. **Response Generation**: | |
| - View the response from the Gemini API π§βπ». | |
| - Convert the response to speech for audio playback π. | |
| ---------------- | |
| π ***Instructions for Use*** | |
| 1. **Input**: | |
| - Upload a file (PDF, CSV, Excel, or JSON), provide a URL, or enter a text prompt π». | |
| 2. **Processing**: | |
| - Click "Start Deep Diving" to process the input and extract valuable insights π‘. | |
| 3. **Q&A**: | |
| - Enter a query, choose how many document chunks to retrieve, and select response mode (Text or Text-to-Speech) π―. | |
| - Click "Ask" to get your answer π§ . | |
| 4. **Voice Input**: | |
| - Use "Start Voice Recognition" to ask a question verbally ποΈ. | |
| - The answer will be generated and spoken aloud π£οΈ. | |
| 5. **Results**: | |
| - View extracted data and responses in a clear, readable format π. | |
| - If Text-to-Speech is selected, listen to the response π§. | |
| """) | |
| if st.button("Go to Main App", key="description_go_to_main_app"): | |
| st.session_state.show_main_app = True | |
| st.experimental_rerun() | |
| st.markdown("---") | |
| linkedin_url = "https://www.linkedin.com/in/aditya-pandey-896109224" | |
| st.markdown(f"Created with π€ π By Aditya Pandey [ LinkedIn π ]({linkedin_url})") | |
| # Initialize the app with the description and framework | |
| if "show_main_app" not in st.session_state: | |
| st.session_state.show_main_app = False | |
| if st.session_state.show_main_app: | |
| render_main_app() | |
| else: | |
| render_description_and_framework() | |