Spaces:
Running
Running
| import streamlit as st | |
| from typing import List, Dict, Any, TypedDict, Optional, Tuple | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| import json | |
| import ipaddress | |
| import os | |
| try: | |
| import requests | |
| except Exception: | |
| requests = None | |
| # Optional libraries | |
| try: | |
| from duckduckgo_search import DDGS | |
| except Exception: | |
| DDGS = None | |
| try: | |
| from PyPDF2 import PdfReader | |
| except Exception: | |
| PdfReader = None | |
| try: | |
| import docx | |
| except Exception: | |
| docx = None | |
| try: | |
| import olefile | |
| except Exception: | |
| olefile = None | |
| try: | |
| from mutagen import File as MutagenFile | |
| except Exception: | |
| MutagenFile = None | |
| try: | |
| from rapidfuzz import fuzz | |
| except Exception: | |
| fuzz = None | |
| try: | |
| import exifread | |
| except Exception: | |
| exifread = None | |
| try: | |
| import networkx as nx | |
| except Exception: | |
| nx = None | |
| try: | |
| from pyvis.network import Network | |
| except Exception: | |
| Network = None | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except Exception: | |
| SentenceTransformer = None | |
| try: | |
| from jinja2 import Template | |
| except Exception: | |
| Template = None | |
| # --------------------------- | |
| # Config & Styles | |
| # --------------------------- | |
| st.set_page_config(page_title="OSINT Investigator", layout="wide") | |
| HIDE_STREAMLIT_STYLE = """ | |
| <style> | |
| #MainMenu {visibility: hidden;} | |
| footer {visibility: hidden;} | |
| .small {font-size: 0.85rem; color: #666} | |
| code {white-space: pre-wrap;} | |
| /* Floating Chat Styles */ | |
| .chat-window {position: fixed; bottom: 20px; right: 20px; width: 360px; max-height: 560px; background:#1c1c1c; border:1px solid #444; border-radius:14px; z-index:1000; display:flex; flex-direction:column; box-shadow:0 8px 24px rgba(0,0,0,.55);} | |
| .chat-header {padding:8px 12px; display:flex; align-items:center; gap:8px; border-bottom:1px solid #333; background:#222; border-top-left-radius:14px; border-top-right-radius:14px;} | |
| .chat-header .title {font-weight:600; color:#ffcc66;} | |
| .chat-close {margin-left:auto; cursor:pointer; font-weight:700; color:#bbb;} | |
| .chat-close:hover {color:#fff;} | |
| .chat-messages {padding:10px 12px; overflow-y:auto; flex:1; font-size:0.8rem;} | |
| .chat-messages p {margin:0 0 10px;} | |
| .msg-user {color:#fff;} | |
| .msg-bot {color:#ffcc66; font-style:italic;} | |
| .chat-input {padding:8px 10px; border-top:1px solid #333; background:#181818; border-bottom-left-radius:14px; border-bottom-right-radius:14px;} | |
| .chat-input textarea {font-size:0.75rem !important;} | |
| .badge-action {display:inline-block; background:#333; color:#ffcc66; padding:2px 6px; margin:2px 4px 6px 0; border-radius:6px; font-size:0.6rem; cursor:pointer;} | |
| .badge-action:hover {background:#444;} | |
| .chat-mini-btn {position:fixed; bottom:20px; right:20px; width:62px; height:62px; border-radius:50%; background:#222; border:2px solid #ffcc66; display:flex; align-items:center; justify-content:center; font-size:30px; cursor:pointer; z-index:999; box-shadow:0 0 8px rgba(0,0,0,.6);} | |
| .chat-mini-btn:hover {background:#333;} | |
| /* App Enhancements */ | |
| .app-brand-bar {display:flex; align-items:center; gap:14px; padding:8px 18px 4px 8px; border-bottom:1px solid #262626; margin:-1rem -1rem 1.2rem -1rem; background:linear-gradient(90deg,#141414,#181818);} | |
| .app-brand-title {font-size:1.35rem; font-weight:600; letter-spacing:.5px; color:#ffcc66;} | |
| .app-badge {display:inline-block; padding:2px 8px; border-radius:12px; font-size:0.65rem; font-weight:600; text-transform:uppercase; letter-spacing:.5px; margin-right:6px; background:#222; border:1px solid #333; color:#bbb;} | |
| .level-high {background:#11391f; border-color:#1f6d3b; color:#3ddc84;} | |
| .level-medium {background:#3a2e12; border-color:#72581a; color:#ffcf66;} | |
| .level-low {background:#3a1616; border-color:#7a2727; color:#ff6b6b;} | |
| .metric-row {margin-top:.4rem;} | |
| .stDataFrame {border:1px solid #262626; border-radius:10px; overflow:hidden;} | |
| .styled-section {background:#141414; border:1px solid #2a2a2a; padding:1rem 1.2rem; border-radius:14px; box-shadow:0 0 0 1px #111 inset, 0 4px 18px -8px #000;} | |
| .kpi-grid div[data-testid='metric-container'] {background:#181818; border:1px solid #262626; border-radius:12px; padding:.75rem;} | |
| .kpi-grid div[data-testid='stMetric'] {padding:.25rem .5rem .35rem .5rem;} | |
| .plan-expander summary {font-weight:600; letter-spacing:.5px;} | |
| .report-btn button {background:#ffcc66 !important; color:#111 !important; font-weight:600;} | |
| .stDownloadButton button {border-radius:10px;} | |
| .stTextInput input, .stTextArea textarea {border-radius:10px !important;} | |
| .stTabs [data-baseweb='tab-list'] {gap:4px;} | |
| .stTabs [data-baseweb='tab'] {background:#161616; padding:.5rem .9rem; border-radius:10px; border:1px solid #262626;} | |
| .stTabs [data-baseweb='tab']:hover {background:#1d1d1d;} | |
| .stTabs [aria-selected='true'] {background:#222 !important; border-color:#444 !important;} | |
| .section-title {font-size:1.05rem; font-weight:600; letter-spacing:.5px; margin-bottom:.35rem;} | |
| .sticky-toolbar {position:sticky; top:0; z-index:50; background:linear-gradient(90deg,#181818,#141414); padding:.4rem .6rem; border:1px solid #262626; border-radius:10px; margin-bottom:.6rem; box-shadow:0 6px 12px -8px rgba(0,0,0,.6);} | |
| .sticky-toolbar button {margin-right:.35rem;} | |
| .score-table {width:100%; border-collapse:collapse; font-size:0.75rem;} | |
| .score-table th {text-align:left; padding:6px 8px; background:#202020; position:sticky; top:0; z-index:2;} | |
| .score-table td {padding:6px 8px; border-top:1px solid #262626; vertical-align:top;} | |
| .badge {display:inline-block; padding:2px 7px; border-radius:10px; font-size:0.6rem; font-weight:600; letter-spacing:.5px;} | |
| .badge.high {background:#11391f; color:#3ddc84;} | |
| .badge.medium {background:#3a2e12; color:#ffcf66;} | |
| .badge.low {background:#3a1616; color:#ff6b6b;} | |
| .methodology-box {background:#141414; border:1px solid #262626; padding:.8rem 1rem; border-radius:12px; font-size:0.8rem; line-height:1.25rem;} | |
| body.light-mode, .light-mode [data-testid='stAppViewContainer'] {background:#f6f7f9; color:#222;} | |
| .light-mode .app-brand-bar {background:linear-gradient(90deg,#fafafa,#eceff1); border-color:#d8dadd;} | |
| .light-mode .app-brand-title {color:#7a4d00;} | |
| .light-mode .app-badge {background:#fff; border-color:#d1d4d8; color:#555;} | |
| .light-mode .sticky-toolbar {background:linear-gradient(90deg,#fff,#f3f5f7); border-color:#d8dade;} | |
| .light-mode .score-table th {background:#eceff1;} | |
| .light-mode .score-table td {border-color:#d9dde1;} | |
| .light-mode .badge.high {background:#d8f5e6; color:#0d7a3d;} | |
| .light-mode .badge.medium {background:#fbeccb; color:#8a6500;} | |
| .light-mode .badge.low {background:#fbd5d5; color:#b80000;} | |
| .light-mode .stTabs [data-baseweb='tab'] {background:#f5f6f7; border-color:#d9dde1;} | |
| .light-mode .stTabs [aria-selected='true'] {background:#ffffff !important; border-color:#b9bdc1 !important;} | |
| /* Skeleton / Shimmer */ | |
| @keyframes shimmer {0% {transform:translateX(-60%);} 100% {transform:translateX(120%);} } | |
| .skeleton-block {position:relative; overflow:hidden; background:#1e1e1e; border-radius:6px; margin:4px 0;} | |
| .skeleton-block.light-mode {background:#e2e5e9;} | |
| .skeleton-block::after {content:""; position:absolute; top:0; left:0; height:100%; width:50%; background:linear-gradient(90deg, rgba(255,255,255,0), rgba(255,255,255,.15), rgba(255,255,255,0)); animation:shimmer 1.25s infinite;} | |
| .sk-line-sm {height:10px;} | |
| .sk-line-md {height:14px;} | |
| .sk-line-lg {height:22px;} | |
| .sk-fade {animation:fadeIn .3s ease-in;} | |
| @keyframes fadeIn {from {opacity:0;} to {opacity:1;}} | |
| </style> | |
| """ | |
| st.markdown(HIDE_STREAMLIT_STYLE, unsafe_allow_html=True) | |
| st.markdown(""" | |
| <head> | |
| <link rel='icon' type='image/svg+xml' href="data:image/svg+xml;utf8,<svg xmlns='http://www.w3.org/2000/svg' viewBox='0 0 64 64'><circle fill='%23111' cx='32' cy='32' r='32'/><path fill='%23ffcc66' d='M12 38l4-14h32l4 14H12zm8 4h24c0 6-6 10-12 10s-12-4-12-10zM24 18c0-4 4-8 8-8s8 4 8 8v4H24v-4z'/></svg>"> | |
| <meta name='description' content='OSINT Investigator Suite - AI-augmented open source intelligence enumeration & scoring platform.'> | |
| <meta name='viewport' content='width=device-width, initial-scale=1'> | |
| </head> | |
| """, unsafe_allow_html=True) | |
| if st.session_state.get("settings", {}).get("light_mode"): | |
| st.markdown("""<script>const b=window.parent.document.querySelector('body'); if(b&&!b.classList.contains('light-mode')) b.classList.add('light-mode');</script>""", unsafe_allow_html=True) | |
| else: | |
| st.markdown("""<script>const b=window.parent.document.querySelector('body'); if(b) b.classList.remove('light-mode');</script>""", unsafe_allow_html=True) | |
| # --------------------------- | |
| # Sidebar: Settings | |
| # --------------------------- | |
| def _get_settings() -> Dict[str, Any]: | |
| with st.sidebar: | |
| st.header("Settings") | |
| model = st.selectbox( | |
| "Advisor model (CPU-friendly)", | |
| [ | |
| "qwen2.5-1.5b-instruct", | |
| "phi-3-mini-4k-instruct", | |
| "gemma-2-2b-it", | |
| ], | |
| index=0, | |
| key="advisor_model_select", | |
| help="Choose which free local LLM to use for advisor suggestions." | |
| ) | |
| max_per = st.slider( | |
| "Default max results per dork", | |
| min_value=3, | |
| max_value=50, | |
| value=10, | |
| step=1, | |
| key="default_max_results", | |
| help="Used as the default when executing dorks in Step 4." | |
| ) | |
| logging = st.checkbox( | |
| "Enable audit logging", | |
| value=True, | |
| key="enable_audit_logging", | |
| help="If off, actions won't be written to the audit trail." | |
| ) | |
| use_embeddings = st.checkbox( | |
| "Enable semantic similarity (embeddings)", | |
| value=False, | |
| key="enable_embeddings", | |
| help="Loads a small sentence-transformer to boost scoring by context relevance." | |
| ) | |
| light_mode = st.checkbox( | |
| "Light mode UI override", | |
| value=False, | |
| key="light_mode_toggle", | |
| help="Apply a lighter palette without reloading base theme" | |
| ) | |
| return {"model": model, "max_per": max_per, "logging": logging, "light_mode": light_mode} | |
| SETTINGS = _get_settings() | |
| st.session_state["settings"] = SETTINGS | |
| st.session_state.setdefault("_embed_model", None) | |
| # --------------------------- | |
| # Google Dorks (typed catalog for many entities) | |
| # --------------------------- | |
| class TypedDork(TypedDict): | |
| q: str | |
| type: str | |
| why: str | |
| # Dork category glossary (shown in explainer) | |
| DORK_TYPES: Dict[str, str] = { | |
| "Footprinting": "Map surface area: sites/subdomains, logins, admin panels, basic presence.", | |
| "Directory/Index": "Hunt for open listings or auto-generated indexes exposing files.", | |
| "Docs/Collab": "Live docs/boards accidentally exposed (docs.google, Trello, etc.).", | |
| "Code/Repo": "Public repos that may contain references, issues, or credentials.", | |
| "Credentials/Secrets": "Clues that hint at passwords/keys or places leaks may exist.", | |
| "Exposure/Leak": "Mentions of breaches, leaks, or dumps involving the entity.", | |
| "People/Profiles": "Official bios, resumes/CVs, speaker pages, researcher profiles.", | |
| "Social Activity": "Usernames/handles across social and developer communities.", | |
| "Regulatory/Legal": "Filings and official records (e.g., SEC/EDGAR).", | |
| "Incidents/Risk": "Incident reports, outages, protests, negative events.", | |
| "Academic/Research": "Scholarly/technical works tied to a name or org.", | |
| } | |
| # ---- Typed dork builders ---- | |
| def typed_dorks_for_email(email: str) -> List[TypedDork]: | |
| user, dom = (email.split("@", 1) + [""])[:2] | |
| return [ | |
| {"q": f'"{email}"', "type": "Footprinting", "why": "Exact email mentions across the web."}, | |
| {"q": f'intext:"{email}"', "type": "Footprinting", "why": "Mentions inside page bodies."}, | |
| {"q": f'intext:"{user}" intext:"{dom}"', "type": "Footprinting", "why": "Mentions with split user/domain."}, | |
| {"q": f'site:{dom} intext:"@{dom}"', "type": "Footprinting", "why": "Emails published on the same domain."}, | |
| {"q": f'"{email}" filetype:pdf OR filetype:doc OR filetype:xls OR filetype:csv', "type": "Docs/Collab", "why": "Docs that may expose PII/roles."}, | |
| {"q": f'"{email}" site:github.com OR site:gitlab.com', "type": "Code/Repo", "why": "Commits/issues referencing the email."}, | |
| {"q": f'"{email}" site:gravatar.com', "type": "People/Profiles", "why": "Avatar/profile tied to the email hash."}, | |
| {"q": f'"{email}" site:pastebin.com OR site:ghostbin.com OR site:hastebin.com', "type": "Exposure/Leak", "why": "Common paste sites for leaks."}, | |
| {"q": f'"{email}" inurl:wp- OR inurl:wp-content OR inurl:wp-config', "type": "Directory/Index", "why": "WordPress artifacts sometimes leak emails."}, | |
| {"q": f'"{email}" AROUND(3) "password"', "type": "Credentials/Secrets", "why": "Heuristic for password-adjacent mentions."}, | |
| ] | |
| def typed_dorks_for_domain(d: str) -> List[TypedDork]: | |
| return [ | |
| {"q": f"site:{d} -www", "type": "Footprinting", "why": "Apex domain excluding www."}, | |
| {"q": f"site:*.{d} -www", "type": "Footprinting", "why": "Enumerate subdomains exposed to crawlers."}, | |
| {"q": f'"@{d}"', "type": "Footprinting", "why": "Emails belonging to the domain across the web."}, | |
| {"q": f'site:linkedin.com "{d}"', "type": "People/Profiles", "why": "Employees listing org domain."}, | |
| {"q": f'site:github.com "{d}"', "type": "Code/Repo", "why": "Repositories/issues referencing the domain."}, | |
| {"q": f'site:gitlab.com "{d}"', "type": "Code/Repo", "why": "Alternate forge often used by teams."}, | |
| {"q": f'site:docs.google.com "{d}"', "type": "Docs/Collab", "why": "Potentially exposed Google Docs/Sheets/Slides."}, | |
| {"q": f'site:trello.com "{d}"', "type": "Docs/Collab", "why": "Public Trello boards occasionally misconfigured."}, | |
| {"q": f'"{d}" filetype:pdf OR filetype:doc OR filetype:xls OR filetype:ppt OR filetype:csv', "type": "Docs/Collab", "why": "Documents with the org name/domain."}, | |
| {"q": f"site:{d} inurl:login OR inurl:admin OR inurl:signup", "type": "Footprinting", "why": "Auth surfaces (discovery only)."}, | |
| {"q": f'site:{d} intitle:"index of"', "type": "Directory/Index", "why": "Open directory listings on that domain."}, | |
| {"q": f"site:{d} ext:env OR ext:.git OR ext:git-credentials OR ext:sql OR ext:log", "type": "Credentials/Secrets", "why": "Common secret-bearing file extensions."}, | |
| {"q": f'"{d}" breach OR leak OR "data exposure"', "type": "Exposure/Leak", "why": "Press and trackers mentioning exposures."}, | |
| ] | |
| def typed_dorks_for_ip(ip: str) -> List[TypedDork]: | |
| return [ | |
| {"q": f'"{ip}"', "type": "Footprinting", "why": "Places where the raw IP is printed or logged."}, | |
| {"q": f'intext:"{ip}"', "type": "Footprinting", "why": "Body text mentions (forums, logs)."}, | |
| {"q": f'"{ip}" filetype:log OR filetype:txt', "type": "Directory/Index", "why": "Exposed logs referencing the IP."}, | |
| {"q": f'"{ip}" blacklist OR abuse', "type": "Incidents/Risk", "why": "Blacklist/abuse mentions and reports."}, | |
| {"q": f'"{ip}" intitle:"index of"', "type": "Directory/Index", "why": "Open indexes listing files with that IP."}, | |
| ] | |
| def typed_dorks_for_username(u: str) -> List[TypedDork]: | |
| return [ | |
| {"q": f'"{u}"', "type": "Footprinting", "why": "Exact handle mentions across the web."}, | |
| {"q": f'"{u}" site:twitter.com OR site:x.com OR site:reddit.com OR site:github.com OR site:stackexchange.com', "type": "Social Activity", "why": "Find consistent identity across major platforms."}, | |
| {"q": f'"{u}" site:medium.com OR site:substack.com', "type": "People/Profiles", "why": "Author pages tied to the handle."}, | |
| {"q": f'"{u}" site:keybase.io', "type": "People/Profiles", "why": "Cryptographic identity/proofs."}, | |
| {"q": f'"{u}" inurl:users OR inurl:profile', "type": "Footprinting", "why": "Generic user profile URLs."}, | |
| {"q": f'"{u}" filetype:pdf resume OR "curriculum vitae"', "type": "People/Profiles", "why": "CVs/resumes listing the handle."}, | |
| {"q": f'"{u}" AROUND(3) email', "type": "People/Profiles", "why": "Correlate handle to emails in bios/posts."}, | |
| {"q": f'"{u}" avatar OR "profile photo"', "type": "People/Profiles", "why": "Images tied to the identity."}, | |
| ] | |
| def typed_dorks_for_person(name: str) -> List[TypedDork]: | |
| return [ | |
| {"q": f'"{name}"', "type": "Footprinting", "why": "Exact full-name mentions."}, | |
| {"q": f'"{name}" site:linkedin.com', "type": "People/Profiles", "why": "Primary professional profile."}, | |
| {"q": f'"{name}" filetype:pdf resume OR "curriculum vitae"', "type": "People/Profiles", "why": "Resume/CV documents."}, | |
| {"q": f'"{name}" conference OR talk OR keynote', "type": "People/Profiles", "why": "Speaker bios and conference pages."}, | |
| {"q": f'"{name}" site:github.com OR site:gitlab.com', "type": "Code/Repo", "why": "Developer activity tied to the name."}, | |
| {"q": f'"{name}" site:researchgate.net OR site:scholar.google.com', "type": "Academic/Research", "why": "Scholarly output."}, | |
| {"q": f'"{name}" site:medium.com OR site:substack.com', "type": "People/Profiles", "why": "Editorial/social writing."}, | |
| {"q": f'"{name}" "email" OR "contact"', "type": "People/Profiles", "why": "Pages listing contact info."}, | |
| ] | |
| def typed_dorks_for_org(org: str) -> List[TypedDork]: | |
| return [ | |
| {"q": f'"{org}" site:sec.gov OR site:edgar', "type": "Regulatory/Legal", "why": "Official SEC/EDGAR filings."}, | |
| {"q": f'"{org}" contract award OR RFP OR "sources sought"', "type": "Regulatory/Legal", "why": "Gov procurement history and notices."}, | |
| {"q": f'"{org}" breach OR incident OR "data exposure"', "type": "Incidents/Risk", "why": "News/trackers about incidents/leaks."}, | |
| {"q": f'"{org}" site:linkedin.com', "type": "People/Profiles", "why": "Employees and org page."}, | |
| {"q": f'"{org}" site:github.com OR site:gitlab.com', "type": "Code/Repo", "why": "Public repos under org name."}, | |
| {"q": f'"{org}" filetype:pdf OR filetype:doc OR filetype:ppt OR filetype:xls', "type": "Docs/Collab", "why": "Documents carrying org name."}, | |
| {"q": f'"{org}" site:docs.google.com OR site:trello.com', "type": "Docs/Collab", "why": "Potentially exposed docs/boards."}, | |
| ] | |
| def typed_dorks_for_location(loc: str) -> List[TypedDork]: | |
| return [ | |
| {"q": f'"{loc}" incident OR protest OR outage', "type": "Incidents/Risk", "why": "Events/incidents tied to the place."}, | |
| {"q": f'"{loc}" satellite imagery OR "before after"', "type": "Footprinting", "why": "Imagery context for geospatial checks."}, | |
| {"q": f'"{loc}" site:news', "type": "Incidents/Risk", "why": "Recent news mentions for the place."}, | |
| {"q": f'"{loc}" filetype:pdf report', "type": "Docs/Collab", "why": "Reports that reference the location."}, | |
| ] | |
| def typed_dorks_for_file(desc: str) -> List[TypedDork]: | |
| return [ | |
| {"q": f'"{desc}" filetype:pdf OR filetype:doc OR filetype:xls OR filetype:ppt OR filetype:csv', "type": "Docs/Collab", "why": "Document hunting by keyword."}, | |
| {"q": f'"{desc}" site:archive.org', "type": "Docs/Collab", "why": "Wayback/Archive artifacts."}, | |
| {"q": f'"{desc}" intitle:"index of"', "type": "Directory/Index", "why": "Open listings that may contain files."}, | |
| ] | |
| TYPED_DORK_MAP: Dict[str, Any] = { | |
| "Email Address": typed_dorks_for_email, | |
| "Domain / Website": typed_dorks_for_domain, | |
| "IP Address": typed_dorks_for_ip, | |
| "Username / Handle": typed_dorks_for_username, | |
| "Named Individual": typed_dorks_for_person, | |
| "Organization / Company": typed_dorks_for_org, | |
| "Location": typed_dorks_for_location, | |
| "File / Image": typed_dorks_for_file, | |
| } | |
| # --------------------------- | |
| # STEP 1: Explainer | |
| # --------------------------- | |
| def render_dorks_explainer(entity_type: str, entity_value: str): | |
| st.subheader("Step 1: Dork Explainer") | |
| st.caption("These are categorized OSINT search operators. Copy/paste into Google if you like; this app automates via DuckDuckGo to respect ToS.") | |
| with st.expander("Dork categories explained", expanded=False): | |
| for t, desc in DORK_TYPES.items(): | |
| st.markdown(f"**{t}** — {desc}") | |
| builder = TYPED_DORK_MAP.get(entity_type) | |
| typed = builder(entity_value) if (builder and entity_value) else [] | |
| if not typed: | |
| st.info("Enter an entity value above to see a tailored catalog.") | |
| return | |
| for d in typed: | |
| st.markdown(f"- **[{d['type']}]** `{d['q']}`") | |
| st.markdown(f" <span class='small'>{d['why']}</span>", unsafe_allow_html=True) | |
| # --------------------------- | |
| # STEP 2: Advisor (LLM-powered with rules fallback) | |
| # --------------------------- | |
| # Goal weights for rules-based fallback / blending | |
| GOAL_WEIGHTS: Dict[str, Dict[str, int]] = { | |
| "Map footprint / surface": {"Footprinting": 3, "Directory/Index": 2}, | |
| "Find documents & spreadsheets": {"Docs/Collab": 3, "Directory/Index": 2}, | |
| "Discover code & credentials": {"Code/Repo": 3, "Credentials/Secrets": 3, "Directory/Index": 2}, | |
| "Identify breaches/leaks": {"Exposure/Leak": 3, "Credentials/Secrets": 2}, | |
| "Find people & org info": {"People/Profiles": 3, "Regulatory/Legal": 2}, | |
| "Track incidents / risk": {"Incidents/Risk": 3}, | |
| "Academic/technical trails": {"Academic/Research": 3}, | |
| } | |
| DEFAULT_GOALS = list(GOAL_WEIGHTS.keys()) | |
| MODEL_ID_MAP = { | |
| "qwen2.5-1.5b-instruct": "Qwen/Qwen2.5-1.5B-Instruct", | |
| "phi-3-mini-4k-instruct": "microsoft/phi-3-mini-4k-instruct", | |
| "gemma-2-2b-it": "google/gemma-2-2b-it", | |
| } | |
| # --------------------------- | |
| # Known Facts Model | |
| # --------------------------- | |
| class KnownFacts: | |
| handles: List[str] | |
| real_names: List[str] | |
| emails: List[str] | |
| domains: List[str] | |
| ips: List[str] | |
| locations: List[str] | |
| orgs: List[str] | |
| context: str | |
| def from_session(cls) -> "KnownFacts": | |
| return st.session_state.get("known_facts") or cls([], [], [], [], [], [], [], "") | |
| def _parse_csv(s: str) -> List[str]: | |
| return [x.strip() for x in (s or "").split(",") if x.strip()] | |
| def _known_facts_ui(): | |
| st.subheader("Known Facts / Prior Intelligence") | |
| st.caption("Provide what you already know. This seeds scoring & generation.") | |
| col_a, col_b, col_c = st.columns(3) | |
| with col_a: | |
| handles = st.text_area("Handles / Usernames (comma)", key="kf_handles", height=70) | |
| emails = st.text_area("Emails (comma)", key="kf_emails", height=70) | |
| ips = st.text_area("IP addresses (comma)", key="kf_ips", height=70) | |
| with col_b: | |
| real_names = st.text_area("Real Names (comma)", key="kf_real_names", height=70, help="Full names or key name variants") | |
| domains = st.text_area("Domains (comma)", key="kf_domains", height=70) | |
| orgs = st.text_area("Organizations (comma)", key="kf_orgs", height=70) | |
| with col_c: | |
| locations = st.text_area("Locations (comma)", key="kf_locations", height=70) | |
| context = st.text_area("Context / Keywords", key="kf_context", height=160, help="Free-text mission context, tech stack, roles, etc.") | |
| if st.button("Save Known Facts", key="btn_save_facts"): | |
| facts = KnownFacts( | |
| handles=_parse_csv(handles), | |
| real_names=_parse_csv(real_names), | |
| emails=_parse_csv(emails), | |
| domains=_parse_csv(domains), | |
| ips=_parse_csv(ips), | |
| locations=_parse_csv(locations), | |
| orgs=_parse_csv(orgs), | |
| context=context.strip(), | |
| ) | |
| st.session_state["known_facts"] = facts | |
| st.success("Facts saved (session only).") | |
| facts = KnownFacts.from_session() | |
| st.markdown(f"**Current facts loaded:** {len(facts.handles)} handles, {len(facts.emails)} emails, {len(facts.domains)} domains, {len(facts.real_names)} names.") | |
| st.markdown("---") | |
| st.markdown("### Candidate Generation") | |
| st.caption("Generate permutations / derived candidates from known facts.") | |
| if st.button("Generate Candidates", key="btn_gen_candidates"): | |
| facts = KnownFacts.from_session() | |
| usernames = set(facts.handles) | |
| # simple mutations | |
| for h in list(usernames): | |
| for suf in ["123", "01", "_sec", "_research", "-dev"]: | |
| usernames.add(h + suf) | |
| if h.isalpha(): | |
| usernames.add(h + "1") | |
| # email permutations (if have names + domains) | |
| emails = set(facts.emails) | |
| if facts.real_names and facts.domains: | |
| first = facts.real_names[0].split()[0].lower() | |
| last = facts.real_names[0].split()[-1].lower() | |
| for d in facts.domains[:3]: | |
| emails.update({ | |
| f"{first}.{last}@{d}", | |
| f"{first}{last}@{d}", | |
| f"{first[0]}{last}@{d}", | |
| f"{first}_{last}@{d}", | |
| }) | |
| # domain variants (very light) | |
| dom_vars = set(facts.domains) | |
| for d in facts.domains: | |
| if d.count('.') >= 1: | |
| root = d.split('.')[0] | |
| tld = d.split('.')[-1] | |
| dom_vars.add(root + "-dev." + tld) | |
| dom_vars.add(root + "-staging." + tld) | |
| st.session_state["generated_candidates"] = { | |
| "usernames": sorted(list(usernames))[:100], | |
| "emails": sorted(list(emails))[:100], | |
| "domains": sorted(list(dom_vars))[:100] | |
| } | |
| st.success("Candidates generated.") | |
| cand = st.session_state.get("generated_candidates") | |
| if cand: | |
| st.write("Usernames (sample)", cand["usernames"][:10]) | |
| st.write("Emails (sample)", cand["emails"][:10]) | |
| st.write("Domains (sample)", cand["domains"][:10]) | |
| if st.button("Add All Candidates to Facts", key="btn_add_cand"): | |
| facts = KnownFacts.from_session() | |
| facts.handles = sorted(list(set(facts.handles + cand["usernames"]))) | |
| facts.emails = sorted(list(set(facts.emails + cand["emails"]))) | |
| facts.domains = sorted(list(set(facts.domains + cand["domains"]))) | |
| st.session_state["known_facts"] = facts | |
| st.success("Candidates merged into facts.") | |
| def _generate_investigation_plan(entity_type: str, entity_value: str, facts: KnownFacts) -> Dict[str, Any]: | |
| """Produce a structured investigation plan based on current facts and target type.""" | |
| objectives = [ | |
| "Establish definitive identifiers (emails, handles, domains) to anchor pivots", | |
| "Map exposed surface (sites, code, documents, credentials indicators)", | |
| "Correlate identities across platforms and artifacts", | |
| "Identify signs of exposure, breach, or sensitive data leakage", | |
| "Prioritize high-confidence findings for deeper manual review" | |
| ] | |
| # Gap analysis | |
| gaps = [] | |
| if not facts.emails: gaps.append("No confirmed email addresses") | |
| if not facts.handles: gaps.append("No social/developer handles") | |
| if not facts.domains and entity_type != "Domain / Website": gaps.append("No related domains captured") | |
| if not facts.real_names and entity_type in ("Named Individual", "Organization / Company"): gaps.append("No individual name variants") | |
| if not facts.orgs and entity_type == "Named Individual": gaps.append("No employing organizations") | |
| if not facts.context: gaps.append("Context / mission keywords empty (reduces scoring nuance)") | |
| if not gaps: gaps = ["Current fact set sufficient for first enumeration pass"] | |
| # Phase recommendations | |
| phases: List[Dict[str, Any]] = [] | |
| phases.append({ | |
| "phase": "Phase 1 - Baseline & Fact Hardening", | |
| "goals": ["Normalize entity value", "Collect canonical facts", "Note obvious pivots"], | |
| "actions": [ | |
| "Record primary identifier in Known Facts", | |
| "Add any immediately known emails, domains, handles", | |
| "Capture mission / context keywords (tech stack, industry, roles)", | |
| "Run Advisor for broad Footprinting and People queries" | |
| ] | |
| }) | |
| phases.append({ | |
| "phase": "Phase 2 - Surface Enumeration", | |
| "goals": ["Map public assets", "Discover documents & code"], | |
| "actions": [ | |
| "Select dorks: site:, filetype:, intitle:'index of' variations", | |
| "Enumerate repo references (GitHub/GitLab) and note unique strings", | |
| "Pull down high-signal docs (PDF/DOCX) and extract metadata for hidden emails/handles" | |
| ] | |
| }) | |
| phases.append({ | |
| "phase": "Phase 3 - Identity Correlation", | |
| "goals": ["Link handles to emails", "Find cross-platform reuse"], | |
| "actions": [ | |
| "Search handles with platform-specific queries (social + developer)", | |
| "Leverage resume / CV / speaker page dorks for name-email alignment", | |
| "Add newly confirmed identifiers back into Known Facts and re-score" | |
| ] | |
| }) | |
| phases.append({ | |
| "phase": "Phase 4 - Exposure & Risk Signals", | |
| "goals": ["Detect leak indicators", "Prioritize potential sensitive exposure"], | |
| "actions": [ | |
| "Run leak / breach / paste oriented dorks including credential keywords", | |
| "Inspect any pastebin / gist / artifact snippets for policy or secret references", | |
| "Flag findings with multiple co-occurring identifiers for manual escalation" | |
| ] | |
| }) | |
| phases.append({ | |
| "phase": "Phase 5 - Consolidation & Reporting", | |
| "goals": ["Score & rank findings", "Produce exportable report"], | |
| "actions": [ | |
| "Re-score after final fact enrichment", | |
| "Visualize graph to ensure high-score nodes connect multiple anchors", | |
| "Export HTML report and retain audit log", | |
| "Document residual gaps & next potential pivots (e.g., historical archives, certificate transparency)" | |
| ] | |
| }) | |
| return { | |
| "entity_type": entity_type, | |
| "entity_value": entity_value, | |
| "objectives": objectives, | |
| "gaps": gaps, | |
| "phases": phases, | |
| "facts_snapshot": facts.__dict__, | |
| } | |
| def render_investigation_plan(entity_type: str, entity_value: str): | |
| st.subheader("Investigation Plan") | |
| facts = KnownFacts.from_session() | |
| plan = _generate_investigation_plan(entity_type, entity_value, facts) | |
| st.markdown("### Core Objectives") | |
| for o in plan["objectives"]: | |
| st.markdown(f"- {o}") | |
| st.markdown("### Current Gaps") | |
| for g in plan["gaps"]: | |
| st.markdown(f"- {g}") | |
| st.markdown("### Phased Approach") | |
| for ph in plan["phases"]: | |
| with st.expander(ph["phase"], expanded=False): | |
| st.markdown("**Goals**") | |
| for g in ph["goals"]: | |
| st.markdown(f"- {g}") | |
| st.markdown("**Actions**") | |
| for a in ph["actions"]: | |
| st.markdown(f"- {a}") | |
| if st.button("Export Plan (Markdown)", key="btn_export_plan"): | |
| md_lines = [f"# Investigation Plan: {plan['entity_type']} — {plan['entity_value']}", "", "## Objectives"] | |
| md_lines += [f"- {o}" for o in plan["objectives"]] | |
| md_lines += ["", "## Gaps"] + [f"- {g}" for g in plan["gaps"]] | |
| md_lines += ["", "## Phases"] | |
| for ph in plan["phases"]: | |
| md_lines.append(f"### {ph['phase']}") | |
| md_lines.append("**Goals**") | |
| md_lines += [f"- {g}" for g in ph["goals"]] | |
| md_lines.append("**Actions**") | |
| md_lines += [f"- {a}" for a in ph["actions"]] | |
| md_lines.append("") | |
| md = "\n".join(md_lines) | |
| st.download_button("Download Plan", md, file_name="investigation_plan.md", mime="text/markdown") | |
| def _score_dork_rule(d: TypedDork, goals: List[str], user_note: str) -> float: | |
| s = 1.0 | |
| for g in goals: | |
| for cat, w in GOAL_WEIGHTS.get(g, {}).items(): | |
| if d["type"] == cat: | |
| s += w | |
| note = (user_note or "").lower() | |
| if any(k in note for k in ["password", "credential", "secret", "token"]): | |
| if d["type"] in {"Credentials/Secrets", "Code/Repo", "Directory/Index"}: | |
| s += 1.5 | |
| if any(k in note for k in ["resume", "cv", "employee", "contact"]): | |
| if d["type"] in {"People/Profiles"}: | |
| s += 1.0 | |
| if any(k in note for k in ["breach", "leak", "dump", "paste"]): | |
| if d["type"] in {"Exposure/Leak", "Credentials/Secrets"}: | |
| s += 1.5 | |
| if any(k in note for k in ["paper", "research", "doi", "citation"]): | |
| if d["type"] in {"Academic/Research"}: | |
| s += 1.0 | |
| return s | |
| def _recommend_rules(entity_type: str, entity_value: str, goals: List[str], user_note: str, top_k: int = 10) -> List[TypedDork]: | |
| builder = TYPED_DORK_MAP.get(entity_type) | |
| typed = builder(entity_value) if (builder and entity_value) else [] | |
| ranked = sorted(typed, key=lambda d: _score_dork_rule(d, goals, user_note), reverse=True) | |
| return ranked[:top_k] | |
| def _safe_json_list(txt: str) -> List[Dict[str, Any]]: | |
| """Best-effort extraction of a JSON list from raw LLM text or user input. | |
| Strategy: | |
| 1. Strip surrounding markdown code fences (with or without language tag). | |
| 2. Attempt direct json.loads. | |
| 3. Locate outermost '[' ... ']' span and attempt parse. | |
| Returns [] on any failure or non-list root. | |
| """ | |
| if not txt: | |
| return [] | |
| s = txt.strip() | |
| # Remove markdown fences like ```json ... ``` | |
| if s.startswith("```"): | |
| lines = s.split("\n") | |
| # drop first fence line | |
| lines = lines[1:] | |
| # drop trailing fence line if present | |
| if lines and lines[-1].strip() == "```": | |
| lines = lines[:-1] | |
| s = "\n".join(lines).strip() | |
| # Try direct parse | |
| try: | |
| data = json.loads(s) | |
| if isinstance(data, list): | |
| return data # type: ignore[return-value] | |
| except Exception: | |
| pass | |
| # Fallback: largest bracketed list slice | |
| start = s.find("[") | |
| end = s.rfind("]") | |
| if start != -1 and end != -1 and end > start: | |
| candidate = s[start:end+1] | |
| try: | |
| data = json.loads(candidate) | |
| if isinstance(data, list): | |
| return data # type: ignore[return-value] | |
| except Exception: | |
| pass | |
| return [] | |
| def _hf_infer(model_id: str, prompt: str, max_new_tokens: int = 384, temperature: float = 0.2) -> Optional[str]: | |
| """Call Hugging Face Inference API if token & requests available. | |
| Returns generated text or None (which triggers rule-based fallback).""" | |
| if requests is None: | |
| st.warning("'requests' not installed; cannot call Hugging Face Inference API. Falling back to rules.") | |
| return None | |
| api_token = os.getenv("HF_API_TOKEN") | |
| if not api_token: | |
| st.warning("HF_API_TOKEN not set. Add it as a secret/environment variable to enable LLM advisor. Falling back to rules.") | |
| return None | |
| url = f"https://api-inference.huggingface.co/models/{model_id}" | |
| headers = {"Authorization": f"Bearer {api_token}"} | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": max_new_tokens, | |
| "temperature": temperature, | |
| "return_full_text": False, | |
| }, | |
| } | |
| try: | |
| resp = requests.post(url, headers=headers, json=payload, timeout=90) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if isinstance(data, list) and data and isinstance(data[0], dict) and "generated_text" in data[0]: | |
| return data[0]["generated_text"] | |
| if isinstance(data, dict) and "generated_text" in data: | |
| return data["generated_text"] | |
| # Unknown shape: return serialized | |
| return json.dumps(data) | |
| except Exception as e: | |
| st.warning(f"HF inference error: {e}. Falling back to rules.") | |
| return None | |
| def _build_llm_prompt(entity_type: str, entity_value: str, goals: List[str], hint: str, baseline: List[TypedDork], top_k: int) -> str: | |
| cat_list = ", ".join(sorted(DORK_TYPES.keys())) | |
| baseline_lines = "\n".join([f"- {d['type']}: {d['q']} // {d['why']}" for d in baseline[:25]]) | |
| return f""" | |
| You are an OSINT assistant that crafts focused Google dorks. | |
| Given the entity type and value, the user's goals, and an optional hint, return a JSON array (and ONLY a JSON array) of up to {top_k} objects with this schema: | |
| {{"q": "<google dork string>", "type": "<one of [{cat_list}]>", "why": "<1 sentence rationale>"}} | |
| Rules: | |
| - Prefer free, public sources; avoid paid services. | |
| - Keep queries precise; quote exact strings; use site:, filetype:, inurl:, intitle:, and AROUND(n) when helpful. | |
| - Use ONLY categories from the allowed list above. | |
| - Output must be valid JSON (no prose, no markdown fences). | |
| ENTITY_TYPE: {entity_type} | |
| ENTITY_VALUE: {entity_value} | |
| GOALS: {goals} | |
| HINT: {hint or '(none)'} | |
| BASELINE_CATALOG (for inspiration, don't just repeat): | |
| {baseline_lines} | |
| """ | |
| def _recommend_llm(entity_type: str, entity_value: str, goals: List[str], hint: str, top_k: int) -> List[TypedDork]: | |
| builder = TYPED_DORK_MAP.get(entity_type) | |
| baseline = builder(entity_value) if (builder and entity_value) else [] | |
| model_key = st.session_state.get("settings", {}).get("model", "qwen2.5-1.5b-instruct") | |
| model_id = MODEL_ID_MAP.get(model_key, model_key) | |
| prompt = _build_llm_prompt(entity_type, entity_value, goals, hint, baseline, top_k) | |
| raw = _hf_infer(model_id, prompt) | |
| if not raw: | |
| return [] | |
| parsed = _safe_json_list(raw) | |
| out: List[TypedDork] = [] | |
| for item in parsed: | |
| if not isinstance(item, dict): | |
| continue | |
| q = str(item.get("q", "")).strip() | |
| typ = str(item.get("type", "Footprinting")).strip() | |
| why = str(item.get("why", "Suggested by LLM")).strip() | |
| if not q: | |
| continue | |
| if typ not in DORK_TYPES: | |
| typ = "Footprinting" | |
| out.append({"q": q, "type": typ, "why": why}) | |
| # Dedupe while preserving order | |
| seen = set() | |
| deduped: List[TypedDork] = [] | |
| for d in out: | |
| if d["q"] in seen: | |
| continue | |
| seen.add(d["q"]) | |
| deduped.append(d) | |
| return deduped[:top_k] | |
| def render_dork_recommender(entity_type: str, entity_value: str): | |
| st.subheader("Step 2: Advisor") | |
| goals = st.multiselect("What are you trying to do?", DEFAULT_GOALS, default=["Map footprint / surface", "Find documents & spreadsheets"], key="advisor_goals") | |
| hint = st.text_input("Optional hint (e.g., 'credentials around build system', 'employee directory')", key="advisor_hint") | |
| top_k = st.slider("How many suggestions?", 3, 20, 10, key="advisor_topk") | |
| use_llm = st.checkbox("Use advisor LLM (Hugging Face Inference API)", value=False, key="use_llm_checkbox", help="Requires HF_API_TOKEN environment secret. Falls back to rules if unavailable.") | |
| if st.button("Suggest dorks", key="btn_suggest"): | |
| recs: List[TypedDork] = [] | |
| if use_llm: | |
| recs = _recommend_llm(entity_type, entity_value, goals, hint, top_k) | |
| if not recs: | |
| recs = _recommend_rules(entity_type, entity_value, goals, hint, top_k) | |
| if not recs: | |
| st.warning("Enter a valid entity value first.") | |
| return | |
| st.session_state["dork_recs"] = recs | |
| st.markdown("#### Recommended dorks") | |
| for r in recs: | |
| st.markdown(f"- **[{r['type']}]** `{r['q']}`") | |
| st.markdown(f" <span class='small'>{r['why']}</span>", unsafe_allow_html=True) | |
| # --------------------------- | |
| # STEP 3: Selection | |
| # --------------------------- | |
| def render_dork_selection(entity_type: str, entity_value: str): | |
| st.subheader("Step 3: Select dorks") | |
| recs = st.session_state.get("dork_recs", []) | |
| choice = st.radio("Select method", ["Accept advisor", "Pick from catalog", "Custom"], key="method_radio") | |
| final = [] | |
| if choice == "Accept advisor": | |
| final = [r["q"] for r in recs] | |
| elif choice == "Pick from catalog": | |
| typed = TYPED_DORK_MAP[entity_type](entity_value) | |
| for idx, d in enumerate(typed): | |
| if st.checkbox(d["q"], key=f"pick_{idx}"): | |
| final.append(d["q"]) | |
| elif choice == "Custom": | |
| txt = st.text_area("Enter custom dorks") | |
| if txt: | |
| final = [l.strip() for l in txt.splitlines() if l.strip()] | |
| st.session_state["selected_dorks"] = final | |
| st.write("Final Basket:", final) | |
| # --------------------------- | |
| # STEP 4: Execution + Metadata | |
| # --------------------------- | |
| def _audit_init(): | |
| st.session_state.setdefault("audit", []) | |
| def _audit_log(action: str, **details): | |
| if not st.session_state.get("settings", {}).get("logging", True): | |
| return | |
| _audit_init() | |
| st.session_state["audit"].append({"ts": datetime.utcnow().isoformat()+"Z", "action": action, **details}) | |
| def ddg_search(query: str, max_results: int=5): | |
| if DDGS is None: | |
| return [] | |
| with DDGS() as ddgs: | |
| return list(ddgs.text(query, max_results=max_results)) | |
| # --------------------------- | |
| # Scoring | |
| # --------------------------- | |
| SOURCE_RELIABILITY = { | |
| "high": [".gov", ".mil", ".edu", "sec.gov", "reuters", "bloomberg", "nytimes", "wsj"], | |
| "med": ["github.com", "gitlab.com", "medium.com", "substack.com", "bbc"], | |
| } | |
| def _source_reliability(url: str) -> str: | |
| url_l = (url or "").lower() | |
| for kw in SOURCE_RELIABILITY["high"]: | |
| if kw in url_l: | |
| return "High" | |
| for kw in SOURCE_RELIABILITY["med"]: | |
| if kw in url_l: | |
| return "Medium" | |
| return "Low" | |
| def _fuzzy_match(a: str, b: str) -> float: | |
| if not a or not b: | |
| return 0.0 | |
| if a.lower() == b.lower(): | |
| return 1.0 | |
| if fuzz: | |
| return fuzz.ratio(a.lower(), b.lower()) / 100.0 | |
| return 0.0 | |
| def score_finding(row: Dict[str, Any], facts: KnownFacts) -> Dict[str, Any]: | |
| title = row.get("title") or row.get("heading") or "" | |
| snippet = row.get("body") or row.get("snippet") or "" | |
| url = row.get("href") or row.get("link") or "" | |
| text = f"{title}\n{snippet}".lower() | |
| score = 0 | |
| comps: List[Dict[str, Any]] = [] | |
| def add(points: int, label: str, reason: str): | |
| nonlocal score | |
| score += points | |
| comps.append({"label": label, "points": points, "reason": reason}) | |
| # Exact matches | |
| hits = 0 | |
| for e in facts.emails: | |
| if e.lower() in text: | |
| add(25, "Email match", e) | |
| hits += 1 | |
| for h in facts.handles: | |
| if h.lower() in text: | |
| add(15, "Handle match", h) | |
| hits += 1 | |
| for d in facts.domains: | |
| if d.lower() in text: | |
| add(10, "Domain mention", d) | |
| hits += 1 | |
| for ip in facts.ips: | |
| if ip and ip.lower() in text: | |
| add(10, "IP mention", ip) | |
| hits += 1 | |
| for org in facts.orgs: | |
| if org.lower() in text: | |
| add(8, "Org mention", org) | |
| hits += 1 | |
| for name in facts.real_names: | |
| if name.lower() in text: | |
| add(20, "Name mention", name) | |
| hits += 1 | |
| else: | |
| # fuzzy | |
| for token in name.split(): | |
| for word in text.split(): | |
| if _fuzzy_match(token, word) >= 0.9: | |
| add(8, "Fuzzy name token", f"{token}->{word}") | |
| hits += 1 | |
| break | |
| if hits >= 2: | |
| add(10, "Co-occurrence", f"{hits} fact tokens present") | |
| # Source reliability | |
| rel = _source_reliability(url) | |
| if rel == "High": | |
| add(10, "Source reliability", rel) | |
| elif rel == "Medium": | |
| add(5, "Source reliability", rel) | |
| # Context keywords basic | |
| ctx_hits = 0 | |
| if facts.context: | |
| ctx_hits = sum(1 for kw in facts.context.lower().split() if kw and kw in text) | |
| if ctx_hits >= 3: | |
| add(10, "Context alignment", f"{ctx_hits} context keywords") | |
| elif ctx_hits == 2: | |
| add(6, "Context alignment", "2 context keywords") | |
| elif ctx_hits == 1: | |
| add(3, "Context alignment", "1 context keyword") | |
| # Optional embedding similarity (semantic relevance to context) | |
| if ctx_hits < 3 and st.session_state.get("settings", {}).get("enable_embeddings") and facts.context and SentenceTransformer: | |
| emb_model = st.session_state.get("_embed_model") | |
| if emb_model is None: | |
| with st.spinner("Loading embedding model (once)..."): | |
| try: | |
| emb_model = SentenceTransformer("all-MiniLM-L6-v2") | |
| st.session_state["_embed_model"] = emb_model | |
| except Exception: | |
| emb_model = None | |
| if emb_model: | |
| try: | |
| q_emb = emb_model.encode([facts.context[:512]])[0] | |
| doc_emb = emb_model.encode([text[:1024]])[0] | |
| # cosine | |
| dot = float((q_emb @ doc_emb) / ((q_emb**2).sum()**0.5 * (doc_emb**2).sum()**0.5)) | |
| if dot > 0.35: | |
| pts = int(min(20, (dot - 0.35) / (0.30) * 20)) # scale 0.35..0.65 -> 0..20 | |
| if pts > 0: | |
| add(pts, "Semantic similarity", f"cos={dot:.2f}") | |
| except Exception: | |
| pass | |
| level = "High" if score >= 70 else ("Medium" if score >= 40 else "Low") | |
| explanation = "; ".join(f"{c['label']} +{c['points']} ({c['reason']})" for c in comps) | |
| return { | |
| **row, | |
| "score": score, | |
| "level": level, | |
| "explanation": explanation, | |
| "components": comps, | |
| "reliability": rel, | |
| "url": url, | |
| "title": title, | |
| "snippet": snippet, | |
| } | |
| def score_all_findings(rows: List[Dict[str, Any]], facts: KnownFacts) -> List[Dict[str, Any]]: | |
| return [score_finding(r, facts) for r in rows] | |
| # File/Image metadata extraction | |
| def extract_metadata(upload) -> Dict[str, Any]: | |
| info: Dict[str, Any] = {} | |
| if not upload: | |
| return info | |
| name = upload.name.lower() | |
| try: | |
| if name.endswith(".pdf") and PdfReader: | |
| reader = PdfReader(upload) | |
| info = {"Pages": len(reader.pages), "Meta": dict(reader.metadata)} | |
| elif name.endswith(".docx") and docx: | |
| doc = docx.Document(upload) | |
| cp = doc.core_properties | |
| info = {"Title": cp.title, "Author": cp.author, "Created": cp.created} | |
| elif (name.endswith(".doc") or name.endswith(".xls")) and olefile: | |
| if olefile.isOleFile(upload): | |
| info = {"OLE": "Legacy Office file detected"} | |
| elif name.endswith((".mp3", ".flac", ".ogg", ".m4a")) and MutagenFile: | |
| audio = MutagenFile(upload) | |
| info = dict(audio) if audio else {} | |
| elif name.endswith((".jpg", ".jpeg", ".png")) and exifread: | |
| tags = exifread.process_file(upload) | |
| info = {tag: str(val) for tag, val in tags.items()} | |
| except Exception as e: | |
| info = {"error": str(e)} | |
| return info | |
| # --------------------------- | |
| # Graph Visualization | |
| # --------------------------- | |
| def build_graph(scored: List[Dict[str, Any]], facts: KnownFacts) -> Optional[str]: | |
| if not nx or not Network: | |
| return None | |
| G = nx.Graph() | |
| # Add fact nodes | |
| for email in facts.emails: | |
| G.add_node(email, type="email") | |
| for h in facts.handles: | |
| G.add_node(h, type="handle") | |
| for d in facts.domains: | |
| G.add_node(d, type="domain") | |
| for n in facts.real_names: | |
| G.add_node(n, type="name") | |
| # Add finding nodes & edges | |
| for f in scored[:300]: | |
| url = f.get("url") or "unknown" | |
| G.add_node(url, type="finding", score=f.get("score",0)) | |
| text = (f.get("title","") + " " + f.get("snippet",""))[:400].lower() | |
| linked = False | |
| for token in facts.emails + facts.handles + facts.domains + facts.real_names: | |
| if token.lower() and token.lower() in text: | |
| G.add_edge(token, url) | |
| linked = True | |
| if not linked and f.get("level") == "High": | |
| # still include high score node | |
| continue | |
| # Visualize | |
| net = Network(height="550px", width="100%", bgcolor="#111", font_color="white") | |
| for n, data in G.nodes(data=True): | |
| color = { | |
| "email": "#ff7f50", | |
| "handle": "#1e90ff", | |
| "domain": "#32cd32", | |
| "name": "#daa520", | |
| "finding": "#888" | |
| }.get(data.get("type"), "#999") | |
| size = 15 if data.get("type") != "finding" else max(5, min(25, int(data.get("score",10)/4))) | |
| net.add_node(n, label=n[:30], color=color, title=n, size=size) | |
| for u,v in G.edges(): | |
| net.add_edge(u,v) | |
| path = "graph.html" | |
| net.show(path) | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except Exception: | |
| return None | |
| # --------------------------- | |
| # Report Export | |
| # --------------------------- | |
| HTML_TEMPLATE = """<!doctype html><html><head><meta charset='utf-8'/><title>OSINT Report</title> | |
| <style>body{font-family:Arial,Helvetica,sans-serif;margin:2rem;background:#111;color:#eee;} h1,h2{color:#ffcc66} table{border-collapse:collapse;width:100%;margin:1rem 0;} th,td{border:1px solid #444;padding:6px;font-size:0.85rem;} .high{color:#4caf50;font-weight:700}.medium{color:#ffc107}.low{color:#f44336} code{background:#222;padding:2px 4px;border-radius:4px;} .small{font-size:0.75rem;color:#ccc}</style> | |
| </head><body> | |
| <h1>OSINT Investigation Report</h1> | |
| <h2>Summary</h2> | |
| <p><b>Entity Type:</b> {{ entity_type }}<br/><b>Entity Value:</b> {{ entity_value }}<br/> | |
| <b>Generated:</b> {{ generated }} UTC</p> | |
| <h2>Known Facts</h2> | |
| <pre>{{ facts_json }}</pre> | |
| <h2>Findings (Top {{ findings|length }})</h2> | |
| <table><thead><tr><th>Score</th><th>Level</th><th>Title</th><th>URL</th><th>Reliability</th><th>Explanation</th></tr></thead><tbody> | |
| {% for f in findings %} | |
| <tr><td>{{ f.score }}</td><td class='{{ f.level|lower }}'>{{ f.level }}</td><td>{{ f.title }}</td><td><a href='{{ f.url }}' target='_blank'>link</a></td><td>{{ f.reliability }}</td><td class='small'>{{ f.explanation }}</td></tr> | |
| {% endfor %} | |
| </tbody></table> | |
| </body></html>""" | |
| def export_report(entity_type: str, entity_value: str, facts: KnownFacts, scored: List[Dict[str, Any]]): | |
| if not Template: | |
| st.warning("jinja2 not installed; cannot build HTML report.") | |
| return | |
| tpl = Template(HTML_TEMPLATE) | |
| html = tpl.render( | |
| entity_type=entity_type, | |
| entity_value=entity_value, | |
| generated=datetime.utcnow().isoformat(), | |
| facts_json=json.dumps(facts.__dict__, indent=2), | |
| findings=scored[:200], | |
| ) | |
| st.download_button("Download HTML Report", data=html.encode("utf-8"), file_name="osint_report.html", mime="text/html") | |
| # --------------------------- | |
| # Username Availability Probe (simple) | |
| # --------------------------- | |
| PLATFORM_PATTERNS: Dict[str,str] = { | |
| "GitHub": "https://github.com/{user}", | |
| "Twitter": "https://x.com/{user}", | |
| "Reddit": "https://www.reddit.com/user/{user}", | |
| "Medium": "https://medium.com/@{user}", | |
| } | |
| def probe_usernames(users: List[str], limit: int = 10) -> List[Dict[str,str]]: | |
| out = [] | |
| if requests is None: | |
| return out | |
| for u in users[:limit]: | |
| for plat, pattern in PLATFORM_PATTERNS.items(): | |
| url = pattern.format(user=u) | |
| status = "?" | |
| try: | |
| r = requests.get(url, timeout=5) | |
| if r.status_code == 200: | |
| status = "Exists" | |
| elif r.status_code == 404: | |
| status = "Not Found" | |
| else: | |
| status = str(r.status_code) | |
| except Exception: | |
| status = "Error" | |
| out.append({"platform": plat, "username": u, "status": status}) | |
| return out | |
| def render_step4_execution(entity_type: str, entity_value: str): | |
| st.subheader("Step 4: Execute & Metadata") | |
| final = st.session_state.get("selected_dorks", []) | |
| if not final: | |
| st.info("No dorks selected.") | |
| return | |
| max_per = st.slider("Max results", 3, 20, st.session_state.get("settings", {}).get("max_per", 10)) | |
| if st.button("Run dorks"): | |
| # Progressive skeleton loader while executing each query | |
| placeholder = st.empty() | |
| results: List[Dict[str, Any]] = [] | |
| total_expected = len(final) * max_per | |
| for i, q in enumerate(final, start=1): | |
| remaining = len(final) - i + 1 | |
| est_remaining = remaining * max_per | |
| # Render skeletons representing expected remaining results (capped for performance) | |
| with placeholder.container(): | |
| st.markdown("#### Running searches…") | |
| st.caption(f"Query {i}/{len(final)}: {q}") | |
| skel_blocks = min(est_remaining, 18) # avoid huge DOM | |
| # Distribute size variations for visual interest | |
| sizes = ["sm", "md", "lg"] | |
| rows_html = [] | |
| for j in range(skel_blocks): | |
| size = sizes[j % len(sizes)] | |
| rows_html.append(f'<div class="skeleton-block skeleton-h {size}"></div>') | |
| st.markdown( | |
| '<div class="skeleton-group">' + "".join(rows_html) + "</div>", | |
| unsafe_allow_html=True, | |
| ) | |
| # Execute the actual search | |
| rows = ddg_search(q, max_results=max_per) | |
| _audit_log("dork_run", dork=q, results=len(rows)) | |
| results.extend(rows) | |
| # Clear placeholder after completion | |
| placeholder.empty() | |
| st.session_state["dork_results"] = results | |
| # compute scores after acquiring all results | |
| facts = KnownFacts.from_session() | |
| st.session_state["scored_results"] = score_all_findings(results, facts) | |
| if res := st.session_state.get("dork_results"): | |
| st.json(res) | |
| audit_str = "\n".join(json.dumps(ev) for ev in st.session_state["audit"]) | |
| st.download_button("Download audit", audit_str, "audit.jsonl") | |
| st.markdown("---") | |
| st.subheader("File/Image Metadata Extractor") | |
| upload = st.file_uploader("Upload a file (pdf, docx, mp3, jpg, etc.)") | |
| if upload: | |
| meta = extract_metadata(upload) | |
| st.json(meta) | |
| # --------------------------- | |
| # Main | |
| # --------------------------- | |
| def render_help_tab(): | |
| st.subheader("How To Use This OSINT Investigator Suite") | |
| st.markdown(""" | |
| This tab is a quick field manual. It shows the purpose of every tab, the workflow order, and pro tips. | |
| ### Recommended Workflow (Fast Path) | |
| 1. Known Facts – Load seed identifiers (handles, emails, domains, names). | |
| 2. Plan – Review the autogenerated phased investigation plan; adjust facts if gaps obvious. | |
| 3. Explainer – Learn the dork building logic for transparency (optional). | |
| 4. Advisor – Get recommended dorks (rule + optional LLM). Refine, then accept. | |
| 5. Selection – Curate / edit / remove dorks; finalize the set to run. | |
| 6. Execution – Run dorks (skeleton loaders show progress); extract file/image metadata if you have artifacts. | |
| 7. Scoring – Review confidence scores, filter, read explanations, iterate by adding new facts and re-scoring. | |
| 8. Graph – Visual relationship view (requires networkx + pyvis) to spot high‑intersection nodes. | |
| 9. Report – Export an HTML snapshot for stakeholders / evidence chain. | |
| 10. Usernames – Probe handle existence across common platforms. | |
| 11. Help – (This) reference card anytime. | |
| --- | |
| ### Tab Details & Tips | |
| **Known Facts** | |
| - Add all solid identifiers early; scoring & dork generation leverage them. | |
| - Handles & emails dramatically raise confidence when co-occurring in sources. | |
| - Update facts after each scouting loop (new domains from findings, etc.). | |
| **Plan** | |
| - Generated phases: Recon, Expansion, Correlation, Deep Dive, Reporting. | |
| - Use it as a narrative backbone for your final export or task tickets. | |
| **Explainer** | |
| - Shows how base + contextual tokens assemble into search dorks by entity type. | |
| - Use to justify methodology or teach newcomers. | |
| **Advisor** | |
| - Hybrid: deterministic heuristic rules plus optional LLM (if HF token + model set in settings). | |
| - Toggle embedding/semantic features in settings (if present) to enrich scoring later. | |
| - Accept the generated list to push candidates to Selection. | |
| **Selection** | |
| - Final edit surface. Remove noisy / redundant queries before execution. | |
| - Keep a balanced mix: broad footprint + specific leak/file/resource patterns. | |
| **Execution** | |
| - Click Run dorks: animated skeleton placeholders appear per batch while searches resolve. | |
| - Results cached in session: re-running overwrites (audit log tracks runs). | |
| - Metadata Extractor: Upload docs / images to pull EXIF, PDF metadata, docx core props, audio tags. | |
| **Scoring** | |
| - Each finding scored from component signals (exact identifiers, fuzzy tokens, co-occurrence, reliability, context keywords, semantic similarity). | |
| - Levels: High ≥70, Medium ≥40. Use filters + search bar to triage. | |
| - Re-score after updating Known Facts or enabling embeddings. | |
| - "Full Explanations" expands reasoning transparency for defensibility. | |
| **Graph** | |
| - Visual pivot map: nodes sized by aggregated score; edges for shared identifiers. | |
| - Use to spot central assets (good pivot candidates) quickly. | |
| - If graph libs missing you'll see an install hint (they're listed in requirements). | |
| **Report** | |
| - Generates a standalone HTML (includes styling + key metrics) for sharing. | |
| - Consider exporting after each major iteration to preserve state (version trail). | |
| **Usernames** | |
| - Lightweight existence probe (HTTP status heuristic). "Exists" ≠ ownership proof. | |
| - Add more platforms by extending PLATFORM_PATTERNS in code. | |
| **Chat Assistant (Floating)** | |
| - Noir-style guidance; quick buttons for common pivots. | |
| - If a model + token configured, responses may blend LLM nuance with rule hints; otherwise rule-based only. | |
| - Close with ✕; reopen with the 🕵️ button. | |
| **Light / Dark Toggle** | |
| - Sidebar toggle (if present) swaps theme classes; custom components auto-adapt. | |
| **Skeleton Loaders** | |
| - Shimmering bars appear during long search batches to indicate progress. | |
| --- | |
| ### Power User Tips | |
| - Iterative Loop: (Run) → (Score) → (Add new facts from findings) → (Re-score) → (Graph) → (Report). | |
| - High-value pivots: Rare email domains, unique handles in code repos, author names in PDF metadata. | |
| - Noise Control: Remove generic dorks that return unrelated trending content before executing. | |
| - Evidence Chain: Audit log (download on Execution tab) + HTML reports form a defensible trail. | |
| ### Performance Notes | |
| - Limiting Max results reduces API latency & keeps scoring responsive. | |
| - Embedding model loads lazily—first semantic scoring may pause a few seconds. | |
| - Graph view caps large result sets to avoid browser lockups. | |
| ### Glossary | |
| - Dork: Crafted search query combining identifiers + context tokens. | |
| - Pivot: New investigative direction unlocked by a discovered unique attribute. | |
| - Co-occurrence: Multiple target identifiers appearing together in one source. | |
| ### Ethics Reminder | |
| Public sources only. No credential stuffing, intrusion, or accessing private data stores. Respect rate limits & platform ToS. | |
| """) | |
| def main(): | |
| st.markdown(""" | |
| <div class='app-brand-bar'> | |
| <div style='font-size:28px'>🕵️</div> | |
| <div class='app-brand-title'>OSINT Investigator Suite</div> | |
| <div class='app-badge'>AI-Augmented</div> | |
| <div class='app-badge'>Heuristic Scoring</div> | |
| <div class='app-badge'>Report Export</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| entity_type = st.selectbox("Entity type", list(TYPED_DORK_MAP.keys()), key="entity_type") | |
| entity_value = st.text_input("Entity value", "user@example.com", key="entity_value") | |
| if entity_type and entity_value: | |
| tabs = st.tabs(["Known Facts", "Plan", "Explainer", "Advisor", "Selection", "Execution", "Scoring", "Graph", "Report", "Usernames", "Help"]) | |
| with tabs[0]: | |
| _known_facts_ui() | |
| with tabs[1]: | |
| render_investigation_plan(entity_type, entity_value) | |
| with tabs[2]: | |
| render_dorks_explainer(entity_type, entity_value) | |
| with tabs[3]: | |
| render_dork_recommender(entity_type, entity_value) | |
| with tabs[4]: | |
| render_dork_selection(entity_type, entity_value) | |
| with tabs[5]: | |
| render_step4_execution(entity_type, entity_value) | |
| with tabs[6]: | |
| st.subheader("Scoring & Confidence") | |
| facts = KnownFacts.from_session() | |
| scored = st.session_state.get("scored_results") | |
| if not scored: | |
| st.info("Run dorks first to generate findings and scores.") | |
| else: | |
| high = sum(1 for r in scored if r["level"] == "High") | |
| med = sum(1 for r in scored if r["level"] == "Medium") | |
| low = sum(1 for r in scored if r["level"] == "Low") | |
| st.markdown("<div class='sticky-toolbar'><strong>Findings Overview</strong></div>", unsafe_allow_html=True) | |
| k1,k2,k3,k4 = st.columns(4) | |
| k1.metric("Total", len(scored)) | |
| k2.metric("High", high) | |
| k3.metric("Medium", med) | |
| k4.metric("Low", low) | |
| level_filter = st.multiselect("Levels", ["High", "Medium", "Low"], default=["High", "Medium", "Low"], key="lvl_filter") | |
| q = st.text_input("Search title/snippet", key="score_search") | |
| view = [r for r in scored if r["level"] in level_filter and (not q or q.lower() in (r.get("snippet", '')).lower() or q.lower() in (r.get("title", '')).lower())] | |
| rows_html = [] | |
| for r in view: | |
| lvl = r["level"].lower() | |
| badge = f"<span class='badge {lvl}'>{r['level']}</span>" | |
| title = (r.get('title',''))[:120] | |
| expl_short = (r.get('explanation',''))[:180] | |
| url = r.get('url') or '' | |
| rows_html.append(f"<tr><td>{r['score']}</td><td>{badge}</td><td>{title}</td><td><a href='{url}' target='_blank'>link</a></td><td>{r['reliability']}</td><td>{expl_short}</td></tr>") | |
| table_html = """ | |
| <div style='max-height:520px;overflow:auto;border:1px solid #262626;border-radius:12px;'> | |
| <table class='score-table'> | |
| <thead><tr><th>Score</th><th>Level</th><th>Title</th><th>URL</th><th>Reliab.</th><th>Explanation (truncated)</th></tr></thead> | |
| <tbody>{rows}</tbody> | |
| </table> | |
| </div> | |
| """.format(rows="".join(rows_html)) | |
| st.markdown(table_html, unsafe_allow_html=True) | |
| col_rescore, col_full, col_export = st.columns([1,2,1]) | |
| with col_rescore: | |
| if st.button("Re-score", key="btn_rescore_now"): | |
| rescored = score_all_findings(st.session_state.get("dork_results", []), facts) | |
| st.session_state["scored_results"] = rescored | |
| st.success("Re-scored.") | |
| with col_full: | |
| with st.expander("Full Explanations"): | |
| for r in view: | |
| st.markdown(f"**{r.get('title','')}** — {r['level']} ({r['score']})\n\n{r.get('explanation','')}") | |
| with col_export: | |
| if st.button("Export Report (HTML)", key="btn_export_report_inline"): | |
| export_report(entity_type, entity_value, facts, scored) | |
| with tabs[7]: | |
| st.subheader("Entity Graph") | |
| facts = KnownFacts.from_session() | |
| scored = st.session_state.get("scored_results") or [] | |
| if scored: | |
| html = build_graph(scored, facts) | |
| if html: | |
| st.components.v1.html(html, height=600, scrolling=True) | |
| else: | |
| st.info("Install networkx & pyvis for graph visualization.") | |
| else: | |
| st.info("No scored findings yet.") | |
| with tabs[8]: | |
| st.subheader("Report Export") | |
| facts = KnownFacts.from_session() | |
| scored = st.session_state.get("scored_results") or [] | |
| if scored: | |
| export_report(entity_type, entity_value, facts, scored) | |
| else: | |
| st.info("Run and score findings to export a report.") | |
| with tabs[9]: | |
| st.subheader("Username Availability Probe") | |
| facts = KnownFacts.from_session() | |
| sample_users = facts.handles[:10] or [entity_value] if entity_type == "Username / Handle" else [] | |
| if not sample_users: | |
| st.info("Add handles in Known Facts or pick a username entity.") | |
| else: | |
| if st.button("Probe Platforms", key="btn_probe_users"): | |
| data = probe_usernames(sample_users) | |
| st.session_state["probe_results"] = data | |
| if pr := st.session_state.get("probe_results"): | |
| st.dataframe(pr, use_container_width=True) | |
| with tabs[10]: | |
| render_help_tab() | |
| # Floating chat widget render | |
| render_chat_widget(entity_type, entity_value) | |
| with st.expander("Methodology / Scoring Rubric", expanded=False): | |
| st.markdown(""" | |
| **Scoring Components** | |
| - Email (+25) / Name exact (+20) / Handle (+15) / Domain (+10) / IP (+10) / Org (+8) | |
| - Fuzzy name token (+8) / Co-occurrence (+10) | |
| - Source reliability High (+10) / Medium (+5) | |
| - Context alignment (1:+3 / 2:+6 / ≥3:+10) | |
| - Semantic similarity (0–20 scaled) if enabled | |
| **Levels:** High ≥70, Medium ≥40, else Low. | |
| """) | |
| with st.expander("Ethical Use Notice", expanded=False): | |
| st.markdown("Lawful OSINT only. No intrusion, auth bypass, or accessing non-public data. Respect platform ToS & privacy.") | |
| # --------------------------- | |
| # Chat Assistant | |
| # --------------------------- | |
| GUIDE_SYSTEM = ( | |
| "You are a noir-style seasoned OSINT investigator named 'The Analyst'. Speak like classic crime noir: terse, vivid metaphors, professional, never cheesy. " | |
| "Guide the user step-by-step in enumerating a digital entity using only ethical open sources. " | |
| "Each answer: <=150 words, 2-4 compact paragraphs or bullet fragments. Provide concrete next actions, pivot angles, and a light ethics reminder if user drifts. " | |
| "Avoid sensationalism. No illegal guidance. Occasionally finish with a brief noir tag line like 'That's the shape of the alley, kid.'" ) | |
| def _summarize_context(entity_type: str, entity_value: str) -> str: | |
| facts: KnownFacts = KnownFacts.from_session() | |
| scored = st.session_state.get("scored_results") or [] | |
| high_titles = [s.get("title") for s in scored if s.get("level") == "High"][:5] | |
| parts = [f"Entity: {entity_type}={entity_value}"] | |
| if facts.handles: parts.append(f"Handles:{len(facts.handles)}") | |
| if facts.emails: parts.append(f"Emails:{len(facts.emails)}") | |
| if facts.domains: parts.append(f"Domains:{len(facts.domains)}") | |
| if high_titles: parts.append("HighHits:" + ";".join(high_titles)) | |
| return " | ".join(parts) | |
| def _rule_based_reply(user_msg: str, entity_type: str, entity_value: str) -> str: | |
| msg = user_msg.lower() | |
| lines = [] | |
| ctx = _summarize_context(entity_type, entity_value) | |
| if any(k in msg for k in ["start", "hello", "hi", "first"]): | |
| lines.append("First we empty our pockets—handles, domains, emails. Solid identifiers become compass bearings.") | |
| if "dork" in msg or "search" in msg: | |
| lines.append("Open with wide footprint dorks. Then tighten: docs leaks, repo chatter, paste traces. Each query is a flashlight beam.") | |
| if "score" in msg or "confidence" in msg: | |
| lines.append("Confidence breathes when multiple facts collide in a clean source. Add precise emails or stable handles—re-score, watch the highs rise.") | |
| if "graph" in msg: | |
| lines.append("Graph shows the intersections. Nodes struck by multiple identifiers—those corners hide stories.") | |
| if "pivot" in msg or "next" in msg: | |
| lines.append("Pivot off unique anchors: a handle in a PDF, an email in a commit, a domain in a press note. Each pivot narrows the alley.") | |
| if not lines: | |
| lines.append("Playbook: 1) Lock facts 2) Advisor for 10 sharp dorks 3) Select & run 4) Score 5) Add new facts 6) Graph pivots 7) Export report.") | |
| lines.append(f"Context snapshot: {ctx}") | |
| lines.append("Stay clean—public sources only. That's the shape of the alley, kid.") | |
| return "\n\n".join(lines) | |
| def render_chat_widget(entity_type: str, entity_value: str): | |
| # Session setup | |
| st.session_state.setdefault("chat_history", []) | |
| st.session_state.setdefault("chat_open", True) | |
| open_flag = st.session_state["chat_open"] | |
| # Mini open button (when closed) | |
| if not open_flag: | |
| if st.button("🕵️", key="open_chat_button"): | |
| st.session_state["chat_open"] = True | |
| # Style the button to float | |
| st.markdown(""" | |
| <style> | |
| div[data-testid='stButton'] button[kind='secondary'] {background:#222;border:2px solid #ffcc66;} | |
| </style> | |
| <div class='chat-mini-btn'></div> | |
| """, unsafe_allow_html=True) | |
| return | |
| # Build chat window | |
| messages = st.session_state["chat_history"] | |
| # Render HTML shell | |
| st.markdown("<div class='chat-window'>", unsafe_allow_html=True) | |
| # Header with close control | |
| c1, c2, c3 = st.columns([0.2, 0.65, 0.15]) | |
| with c1: | |
| st.markdown("<div class='chat-header' style='background:transparent;padding:4px 0 0 6px;'>🕵️</div>", unsafe_allow_html=True) | |
| with c2: | |
| st.markdown("<div class='chat-header' style='background:transparent;padding:4px 0;'> <span class='title'>Investigator</span></div>", unsafe_allow_html=True) | |
| with c3: | |
| if st.button("✕", key="close_chat_btn"): | |
| st.session_state["chat_open"] = False | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| return | |
| # Messages area | |
| # Use an empty container to emulate scroll (Streamlit limitation) | |
| msg_container = st.container() | |
| with msg_container: | |
| if messages: | |
| for turn in messages[-18:]: | |
| st.markdown(f"<p class='msg-user'><b>You:</b> {turn['user']}</p>", unsafe_allow_html=True) | |
| st.markdown(f"<p class='msg-bot'><b>Inv:</b> {turn['assistant']}</p>", unsafe_allow_html=True) | |
| else: | |
| st.markdown("<p class='msg-bot'>Need a lead? Ask me about dorks, scoring, or pivots.</p>", unsafe_allow_html=True) | |
| # Input form | |
| with st.form("chat_form", clear_on_submit=True): | |
| q = st.text_area("Message", key="chat_input_area", height=70, label_visibility="collapsed") | |
| col_a, col_b, col_c, col_d = st.columns(4) | |
| send = False | |
| with col_a: | |
| if st.form_submit_button("Send"): | |
| send = True | |
| with col_b: | |
| if st.form_submit_button("Dorks"): | |
| q = "What dorks should I run next?"; send = True | |
| with col_c: | |
| if st.form_submit_button("Confidence"): | |
| q = "How do I improve confidence now?"; send = True | |
| with col_d: | |
| if st.form_submit_button("Pivot"): | |
| q = "Give me a pivot strategy."; send = True | |
| if send and q.strip(): | |
| reply: Optional[str] = None | |
| if st.session_state.get("settings", {}).get("model") and os.getenv("HF_API_TOKEN"): | |
| convo = st.session_state["chat_history"][-6:] | |
| history_str = "\n".join([f"User: {h['user']}\nAssistant: {h['assistant']}" for h in convo if h.get('assistant')]) | |
| prompt = ( | |
| f"{GUIDE_SYSTEM}\nCurrentContext: {_summarize_context(entity_type, entity_value)}\n" + | |
| history_str + f"\nUser: {q}\nAssistant:") | |
| reply = _hf_infer(MODEL_ID_MAP.get(st.session_state["settings"]["model"], st.session_state["settings"]["model"]), prompt, max_new_tokens=190, temperature=0.35) | |
| if not reply: | |
| reply = _rule_based_reply(q, entity_type, entity_value) | |
| st.session_state["chat_history"].append({"user": q, "assistant": reply}) | |
| st.markdown("<div class='chat-input small'>Ethical OSINT only.🕵️♂️</div>", unsafe_allow_html=True) | |
| st.markdown("</div>", unsafe_allow_html=True) | |
| if __name__ == "__main__": | |
| main() | |