Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import hashlib | |
| import pandas as pd | |
| import json | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from io import BytesIO | |
| from PIL import Image | |
| from pefile import PE, PEFormatError | |
| import os | |
| import re | |
| # VirusTotal API details | |
| VIRUSTOTAL_API_KEY = 'ed48e6407e0b7975be7d19c797e1217f500183c9ae84d1119af8628ba4c98c3d' | |
| # streamlit framework | |
| st.set_page_config( | |
| page_title="OxThreat", | |
| page_icon="π", | |
| layout="wide" | |
| ) | |
| # Function to calculate the file's SHA-256 hash | |
| def get_file_hash(file): | |
| file.seek(0) # Reset file pointer to the beginning | |
| file_hash = hashlib.sha256(file.read()).hexdigest() | |
| file.seek(0) # Reset file pointer to the beginning | |
| return file_hash | |
| # Function to analyze the file using VirusTotal | |
| def virustotal_analysis(file_hash): | |
| url = f"https://www.virustotal.com/api/v3/files/{file_hash}" | |
| headers = {"x-apikey": VIRUSTOTAL_API_KEY} | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| return response.json() | |
| else: | |
| st.error("Error with VirusTotal API request. Please check your API key or the file hash.") | |
| return None | |
| # Function to extract metadata from PE files | |
| def extract_metadata(file): | |
| try: | |
| pe = PE(data=file.read()) | |
| metadata = { | |
| "Number of Sections": pe.FILE_HEADER.NumberOfSections, | |
| "Time Date Stamp": pe.FILE_HEADER.TimeDateStamp, | |
| "Characteristics": pe.FILE_HEADER.Characteristics, | |
| } | |
| return metadata | |
| except PEFormatError: | |
| st.error("Uploaded file is not a valid PE format.") | |
| return None | |
| # Function to analyze log files | |
| def analyze_log_file(log_content): | |
| errors = re.findall(r'ERROR.*', log_content) | |
| return pd.DataFrame(errors, columns=["Errors"]) | |
| # Function to create charts from VirusTotal results | |
| def create_virus_total_charts(virus_total_results): | |
| if not virus_total_results: | |
| return None | |
| stats = virus_total_results['data']['attributes']['last_analysis_stats'] | |
| labels = list(stats.keys()) | |
| values = list(stats.values()) | |
| fig, ax = plt.subplots(figsize=(10, 5)) | |
| sns.barplot(x=labels, y=values, palette="viridis", ax=ax) | |
| ax.set_title("VirusTotal Analysis Results", fontsize=16, fontweight='bold') | |
| ax.set_xlabel("Analysis Types", fontsize=14) | |
| ax.set_ylabel("Count", fontsize=14) | |
| return fig | |
| # Function to create detailed tables from JSON data | |
| def create_detailed_table(data, title): | |
| st.write(f"### {title}") | |
| # Normalize JSON data into a DataFrame | |
| df = pd.json_normalize(data) | |
| # Debug: Show raw data and DataFrame | |
| st.write("Raw Data:", data) | |
| if df.empty: | |
| st.write("No data available.") | |
| else: | |
| # Apply minimal styling for debugging | |
| styled_df = df.style.background_gradient(cmap='viridis') \ | |
| .format(na_rep='N/A', precision=2) | |
| # Display the styled DataFrame | |
| st.dataframe(styled_df) | |
| # Function to display the analysis results on the dashboard | |
| def display_analysis_results(metadata, virus_total_results, log_analysis=None): | |
| st.write("## Analysis Results") | |
| col1, col2 = st.columns([2, 1]) | |
| # Metadata | |
| with col1: | |
| if metadata: | |
| st.write("### π PE File Metadata") | |
| create_detailed_table(metadata, "PE File Metadata") | |
| # VirusTotal Results | |
| with col1: | |
| if virus_total_results: | |
| st.write("### π¦ VirusTotal Results") | |
| create_detailed_table(virus_total_results['data'], "VirusTotal Results") | |
| st.write("#### π VirusTotal Analysis Stats") | |
| fig = create_virus_total_charts(virus_total_results) | |
| if fig: | |
| st.pyplot(fig) | |
| # Log Analysis | |
| with col2: | |
| if log_analysis is not None: | |
| st.write("### π Log Analysis") | |
| st.table(log_analysis) | |
| # Main page of the Streamlit app | |
| def main_page(): | |
| st.title("π¦ Malware Analysis Tool") | |
| st.markdown("---") | |
| st.image('ui/antivirus.png', width=200, use_column_width='always') | |
| if st.button("Go to File Analysis ποΈ"): | |
| st.session_state.page = "file_analysis" | |
| st.experimental_rerun() | |
| # File analysis page where the user can upload files for analysis | |
| def file_analysis_page(): | |
| st.title("π File Analysis Dashboard") | |
| st.markdown("---") | |
| st.image('ui/antivirus.png', width=80, use_column_width='none') | |
| uploaded_file = st.file_uploader("Upload any file for analysis", type=["exe", "dll", "log", "pdf", "png", "jpg", "jpeg", "gif", "txt", "zip", "rar", "apk"]) | |
| if uploaded_file: | |
| file_hash = get_file_hash(uploaded_file) | |
| st.write(f"SHA-256 Hash: {file_hash}") | |
| file_extension = uploaded_file.name.split('.')[-1].lower() | |
| # Handle different file types | |
| if file_extension in ['png', 'jpg', 'jpeg', 'gif']: | |
| st.write("### π Image Preview") | |
| image = Image.open(uploaded_file) | |
| image.thumbnail((150, 150)) # Resize for preview | |
| st.image(image, caption='Uploaded Image', use_column_width=True) | |
| metadata = None | |
| virus_total_results = None | |
| log_analysis = None | |
| elif file_extension == 'pdf': | |
| st.write("### π PDF File") | |
| st.write("PDF preview is not supported. Please use other tools to view.") | |
| st.download_button(label="Download PDF", data=uploaded_file, file_name=uploaded_file.name) | |
| metadata = None | |
| virus_total_results = None | |
| log_analysis = None | |
| elif file_extension in ['txt', 'log']: | |
| st.write("### π Log File Content") | |
| log_content = uploaded_file.getvalue().decode("utf-8") | |
| log_analysis = analyze_log_file(log_content) | |
| metadata = None | |
| virus_total_results = None | |
| elif file_extension in ['zip', 'rar']: | |
| st.write("### π¦ Compressed File") | |
| st.write("Compressed files require further extraction and analysis.") | |
| metadata = None | |
| virus_total_results = None | |
| log_analysis = None | |
| elif file_extension in ['apk', 'exe', 'dll']: | |
| # Save uploaded file temporarily | |
| file_path = f"./temp/{uploaded_file.name}" | |
| os.makedirs(os.path.dirname(file_path), exist_ok=True) | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| try: | |
| with open(file_path, "rb") as file: | |
| file_hash = get_file_hash(file) | |
| metadata = extract_metadata(file) | |
| virus_total_results = virustotal_analysis(file_hash) | |
| finally: | |
| # Clean up | |
| os.remove(file_path) | |
| log_analysis = None | |
| else: | |
| st.error("Unsupported file type.") | |
| metadata = None | |
| virus_total_results = None | |
| log_analysis = None | |
| display_analysis_results(metadata, virus_total_results, log_analysis) | |
| # Initialize session state for page navigation | |
| if 'page' not in st.session_state: | |
| st.session_state.page = "main" | |
| # Routing based on page state | |
| if st.session_state.page == "main": | |
| main_page() | |
| elif st.session_state.page == "file_analysis": | |
| file_analysis_page() | |
| def analyze_log_file(log_content): | |
| # Data storage structures for IPs, Domains, Headers, Sessions | |
| ip_data = [] | |
| domain_data = [] | |
| header_data = [] | |
| session_data = [] | |
| # Regular expressions for matching | |
| ip_regex = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b') | |
| domain_regex = re.compile(r'\b[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}\b') | |
| header_regex = re.compile(r'(User-Agent|Content-Type|Authorization):\s*(.*)', re.IGNORECASE) | |
| session_regex = re.compile(r'SessionID:\s*([a-zA-Z0-9]+)') | |
| log_entries = [] | |
| for line in log_content.splitlines(): | |
| # Match IPs | |
| ips = ip_regex.findall(line) | |
| if ips: | |
| ip_data.extend(ips) | |
| # Match Domains | |
| domains = domain_regex.findall(line) | |
| if domains: | |
| domain_data.extend(domains) | |
| # Match Headers | |
| headers = header_regex.findall(line) | |
| if headers: | |
| header_data.extend(headers) | |
| # Match Sessions | |
| sessions = session_regex.findall(line) | |
| if sessions: | |
| session_data.extend(sessions) | |
| log_entries.append(line) | |
| # Convert to DataFrame | |
| log_df = pd.DataFrame(log_entries, columns=["Log Entries"]) | |
| # Additional DataFrames for captured data | |
| ip_df = pd.DataFrame(ip_data, columns=["IP Addresses"]) | |
| domain_df = pd.DataFrame(domain_data, columns=["Domains"]) | |
| header_df = pd.DataFrame(header_data, columns=["Header Name", "Header Value"]) | |
| session_df = pd.DataFrame(session_data, columns=["Session IDs"]) | |
| # Summary of findings | |
| summary = { | |
| "log_dataframe": log_df, | |
| "ip_dataframe": ip_df, | |
| "domain_dataframe": domain_df, | |
| "header_dataframe": header_df, | |
| "session_dataframe": session_df | |
| } | |
| return summary | |
| # Log Analysis Section | |
| if log_analysis is not None: | |
| st.write("### π Log Analysis") | |
| # First row: IP Addresses and Domains | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write("**IP Addresses:**") | |
| st.dataframe(log_analysis.get("ip_dataframe")) | |
| with col2: | |
| st.write("**Domains:**") | |
| st.dataframe(log_analysis.get("domain_dataframe")) | |
| # Second row: Log Entries, Session IDs, Headers | |
| col3, col4, col5 = st.columns([2, 1, 1]) | |
| with col3: | |
| st.write("**Log Entries:**") | |
| st.dataframe(log_analysis.get("log_dataframe")) | |
| with col4: | |
| st.write("**Session IDs:**") | |
| if not log_analysis.get("session_dataframe").empty: | |
| st.dataframe(log_analysis.get("session_dataframe")) | |
| else: | |
| st.write("No session IDs found.") | |
| with col5: | |
| st.write("**Headers:**") | |
| if not log_analysis.get("header_dataframe").empty: | |
| st.dataframe(log_analysis.get("header_dataframe")) | |
| else: | |
| st.write("No headers found.") | |