import gradio as gr import pandas as pd import requests from bs4 import BeautifulSoup from playwright.async_api import async_playwright import asyncio import re import time import random from urllib.parse import urljoin, urlparse try: from fake_useragent import UserAgent _UA = UserAgent() def rand_ua(): try: return _UA.random except: return None except Exception: _UA = None def rand_ua(): return None import spacy import json import os from typing import List, Dict, Tuple, Optional import logging import phonenumbers # ---------- Logging ---------- logging.basicConfig(level=logging.INFO) logger = logging.getLogger("imprint-scraper") # ---------- spaCy (optional) ---------- try: nlp = spacy.load("de_core_news_sm") except Exception: logger.warning("spaCy de_core_news_sm not found; owner extraction will rely on heuristics/regex only.") nlp = None # ---------- User agents ---------- UA_FALLBACKS = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 13_2) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.2 Safari/605.1.15", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", ] def any_user_agent() -> str: return rand_ua() or random.choice(UA_FALLBACKS) # ---------- Helpers ---------- IMPRINT_KEYWORDS = [ 'impressum','imprint','kontakt','contact','about','über uns','ueber uns', 'rechtliches','legal','rechtliches','anbieterkennzeichnung','anbieterkennung' ] OWNER_LABELS = [ 'inhaber','geschäftsführer','geschaeftsführer','geschäftsführerin','geschaeftsführerin', 'vertretungsberechtigt','eigentümer','betreiber','verantwortlich','ansprechpartner', 'inhabers','inhaberin','gründers','gruender','gruenderin','founder','owner','ceo','managing director' ] # Weight "impressum" above others when scoring KEYWORD_WEIGHTS = {'impressum': 5, 'imprint': 4, 'kontakt': 3, 'contact': 2, 'about': 1, 'über uns': 2, 'ueber uns': 2} COMMON_IMPRINT_PATHS = [ "/impressum", "/impressum.html", "/kontakt", "/kontakt.html", "/contact", "/contact.html", "/about", "/about.html", "/ueber-uns", "/ueberuns", "/ueber-uns.html", "/ueberuns.html", "/unternehmen", "/legal", "/legal-notice", "/rechtliches" ] def domain_region_hint(url: str) -> str: tld = urlparse(url).netloc.lower() if tld.endswith(".de"): return "DE" if tld.endswith(".at"): return "AT" if tld.endswith(".ch"): return "CH" return "DE" # default def normalize_phone_output(num: phonenumbers.PhoneNumber) -> str: # Return digits only, NO plus sign, per your preference. return re.sub(r"\D", "", phonenumbers.format_number(num, phonenumbers.PhoneNumberFormat.E164)).lstrip("+") def extract_text_keep_breaks(soup: BeautifulSoup) -> str: # Keep line breaks to help regex around labels/names for br in soup.find_all(["br", "p", "li", "tr", "div", "h1","h2","h3","h4","h5","h6"]): br.append("\n") text = soup.get_text(separator=" ") # Normalize whitespace text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n\s*\n+", "\n", text) return text def score_candidate(url: str, html_or_text: str) -> int: s = 0 low = html_or_text.lower() for k in IMPRINT_KEYWORDS: if k in url.lower(): s += KEYWORD_WEIGHTS.get(k,1) if k in low: s += KEYWORD_WEIGHTS.get(k,1) # extra bump for explicit phrases often on imprint pages for token in ["anbieterkennzeichnung", "haftungsausschluss", "ust-id", "steuer-nr", "amtsgericht", "registergericht", "hrb", "verantwortlich"]: if token in low: s += 1 return s # ---------- Scraper ---------- class ImprintScraper: def __init__(self, enable_js: bool = True, request_timeout: int = 20): self.enable_js = enable_js self.session = requests.Session() self.session.headers.update({ "User-Agent": any_user_agent(), "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Accept-Language": "de-DE,de;q=0.9,en;q=0.8", "Cache-Control": "no-cache", "Pragma": "no-cache", "DNT": "1", "Upgrade-Insecure-Requests": "1", "Connection": "keep-alive", }) self.timeout = request_timeout # Precompile self.re_email = re.compile(r'[\w\.-]+@[\w\.-]+\.\w+') # Relaxed; we delegate real validation to phonenumbers self.re_owner_line = re.compile( r'(?:(?:' + "|".join([re.escape(lbl) for lbl in OWNER_LABELS]) + r')\s*[:\-]?\s*)([A-ZÄÖÜ][A-Za-zÄÖÜäöüß\-\.\s]{2,80})', re.IGNORECASE ) # ---------- HTTP fetch ---------- def fetch_requests(self, url: str) -> Tuple[str, BeautifulSoup]: try: self.session.headers["User-Agent"] = any_user_agent() resp = self.session.get(url, timeout=self.timeout, allow_redirects=True) resp.raise_for_status() html = resp.text or resp.content.decode("utf-8", errors="ignore") soup = BeautifulSoup(html, "html.parser") return html, soup except Exception as e: logger.debug(f"Requests fetch failed for {url}: {e}") return "", BeautifulSoup("", "html.parser") async def fetch_playwright(self, url: str) -> Tuple[str, BeautifulSoup]: try: ua = any_user_agent() async with async_playwright() as p: browser = await p.chromium.launch(headless=True) context = await browser.new_context(user_agent=ua, viewport={'width': 1400, 'height': 900}) page = await context.new_page() await page.goto(url, wait_until="networkidle", timeout=40000) # Try to dismiss cookie banners (best-effort) cookie_texts = ["Alle akzeptieren","Akzeptieren","Zustimmen","Einverstanden","Accept all","OK","Alles akzeptieren"] for t in cookie_texts: try: btn = page.get_by_role("button", name=re.compile(t, re.IGNORECASE)) if await btn.count() > 0: await btn.first.click(timeout=1500) except Exception: pass await page.wait_for_load_state("networkidle") # Scroll to trigger lazy content for _ in range(3): await page.evaluate("window.scrollBy(0, document.body.scrollHeight/2)") await page.wait_for_timeout(700) html = await page.content() await browser.close() soup = BeautifulSoup(html, "html.parser") return html, soup except Exception as e: logger.debug(f"Playwright fetch failed for {url}: {e}") return "", BeautifulSoup("", "html.parser") async def smart_fetch(self, url: str) -> Tuple[str, BeautifulSoup]: html, soup = self.fetch_requests(url) # If page looks too small or has almost no links, try JS if self.enable_js: try_js = (len(html) < 3000) or (len(soup.find_all("a")) < 3) else: try_js = False if try_js: jhtml, jsoup = await self.fetch_playwright(url) if len(jhtml) > len(html): return jhtml, jsoup return html, soup # ---------- Find candidate links ---------- def find_imprint_like_links(self, soup: BeautifulSoup, base_url: str) -> List[str]: candidates = set() for a in soup.find_all("a", href=True): text = (a.get_text() or "").strip().lower() href = a["href"].strip() low_href = href.lower() if any(k in text for k in IMPRINT_KEYWORDS) or any(k in low_href for k in IMPRINT_KEYWORDS): candidates.add(urljoin(base_url, href)) # Look into footer specifically footer = soup.find("footer") if footer: for a in footer.find_all("a", href=True): text = (a.get_text() or "").strip().lower() href = a["href"].strip() if any(k in text for k in IMPRINT_KEYWORDS) or any(k in href.lower() for k in IMPRINT_KEYWORDS): candidates.add(urljoin(base_url, href)) return list(candidates) def fallback_paths(self, base_url: str) -> List[str]: base = f"{urlparse(base_url).scheme}://{urlparse(base_url).netloc}" return [urljoin(base, p) for p in COMMON_IMPRINT_PATHS] # ---------- Extraction helpers ---------- def extract_emails(self, html: str, soup: BeautifulSoup) -> List[str]: # include mailto: emails = set(self.re_email.findall(html)) for a in soup.find_all("a", href=True): if a["href"].lower().startswith("mailto:"): emails.add(a["href"].split(":",1)[1]) # filter fakes out = [ e for e in emails if not any(x in e.lower() for x in ["example.", "test.", "placeholder", "noreply@", "no-reply@"]) ] return sorted(set(out)) def extract_phones(self, text: str, region_hint: str) -> List[str]: out = [] for match in phonenumbers.PhoneNumberMatcher(text, region_hint): try: num = match.number if phonenumbers.is_valid_number(num): out.append(normalize_phone_output(num)) except Exception: pass return sorted(set(out)) def extract_owner_from_html(self, soup: BeautifulSoup) -> Optional[str]: # (1) dt/dd or label: value patterns for label in OWNER_LABELS: # strong/b near text strongs = soup.find_all(["strong","b"], string=re.compile(label, re.IGNORECASE)) for s in strongs: # Look right after the label node txt = s.find_next(string=True) if txt: cand = txt.strip() # Trim separators cand = re.sub(r'^[\s:\-–]+', '', cand).strip() # Cut at line breaks or pipes cand = re.split(r'[\n\|\/]', cand)[0].strip() if self._looks_like_person_name(cand): return cand # (2) Definition lists for dt in soup.find_all("dt"): if any(re.search(lbl, dt.get_text(strip=True), re.IGNORECASE) for lbl in OWNER_LABELS): dd = dt.find_next("dd") if dd: cand = dd.get_text(" ", strip=True) cand = re.split(r'[\n\|\/]', cand)[0].strip() if self._looks_like_person_name(cand): return cand # (3) Headings followed by a name for h in soup.find_all(re.compile("^h[1-6]$")): if any(re.search(lbl, h.get_text(" ", strip=True), re.IGNORECASE) for lbl in OWNER_LABELS): nxt = h.find_next(string=True) if nxt: cand = nxt.strip() cand = re.sub(r'^[\s:\-–]+','', cand) cand = re.split(r'[\n\|\/]', cand)[0].strip() if self._looks_like_person_name(cand): return cand return None def extract_owner_from_text(self, text: str) -> Optional[str]: # Label-based line m = self.re_owner_line.search(text) if m: cand = m.group(1).strip() cand = re.split(r'[\n\|\/]', cand)[0].strip() if self._looks_like_person_name(cand): return cand # spaCy fallback (PERSON near owner labels) if nlp: doc = nlp(text) low = text.lower() best = None best_dist = 99999 for ent in doc.ents: if ent.label_ == "PERSON": for lbl in OWNER_LABELS: pos = low.find(lbl) if pos != -1: d = abs(ent.start_char - pos) if d < best_dist and d < 250: best_dist = d best = ent.text.strip() if best and self._looks_like_person_name(best): return best return None def _looks_like_person_name(self, name: str) -> bool: if not name: return False # Exclude org suffixes bad = ["gmbh", "ag", "kg", "ug", "mbh", "gbr", "e.k.", "e.k", "ltd", "inc", "co.", "ohg", "kgaa"] if any(b in name.lower() for b in bad): return False # Simple heuristic: has at least one space and starts uppercase parts = [p for p in name.split() if p.strip()] if len(parts) < 1: return False # Accept "Vorname Nachname" or single family names sometimes present return True # ---------- Main ---------- async def extract_contact_info(self, url: str) -> Dict[str, str]: res = {'website': url, 'imprint_url': '', 'email': '', 'phone': '', 'owner': '', 'status': 'Processing...'} try: # 1) Load homepage (smart) html, soup = await self.smart_fetch(url) if not html: res['status'] = "Failed to load homepage" return res # 2) Find imprint-like links + fallback paths links = self.find_imprint_like_links(soup, url) links += self.fallback_paths(url) links = list(dict.fromkeys(links)) # dedupe, keep order # 3) Select best candidate by score (fetch each and score) candidates_scored = [] # Always include homepage as last resort links.append(url) seen = set() for link in links: if link in seen: continue seen.add(link) h2, s2 = await self.smart_fetch(link) if not h2: continue # quick text for scoring text2 = extract_text_keep_breaks(s2) score = score_candidate(link, h2 + "\n" + text2) candidates_scored.append((score, link, h2, s2, text2)) if not candidates_scored: res['status'] = "No pages found" return res candidates_scored.sort(key=lambda x: x[0], reverse=True) # choose highest scoring page best_score, best_url, best_html, best_soup, best_text = candidates_scored[0] res['imprint_url'] = best_url # 4) Extract data emails = self.extract_emails(best_html, best_soup) region = domain_region_hint(url) phones = self.extract_phones(best_text, region) owner = self.extract_owner_from_html(best_soup) or self.extract_owner_from_text(best_text) res['email'] = emails[0] if emails else '' res['phone'] = phones[0] if phones else '' res['owner'] = owner or '' res['status'] = "Success" if any([res['email'], res['phone'], res['owner']]) else "No contact info found" return res except Exception as e: logger.exception(f"Error processing {url}: {e}") res['status'] = f"Error: {e}" return res # ---------- Batch processing ---------- async def process_urls(urls: List[str], progress_callback=None, enable_js=True) -> pd.DataFrame: scraper = ImprintScraper(enable_js=enable_js) results = [] total = len(urls) for i, raw in enumerate(urls, start=1): if not raw: continue u = raw.strip() if not u.startswith(("http://", "https://")): u = "https://" + u if progress_callback: progress_callback(f"({i}/{total}) {u}") result = await scraper.extract_contact_info(u) results.append(result) if i < total: await asyncio.sleep(random.uniform(1.2, 2.5)) return pd.DataFrame(results) def parse_input_urls(text_input: str, file_input) -> List[str]: urls = [] if text_input and text_input.strip(): urls += [x.strip() for x in text_input.strip().splitlines() if x.strip()] if file_input: try: path = getattr(file_input, "name", None) or str(file_input) if str(path).lower().endswith(".csv"): df = pd.read_csv(path) chosen = None for col in df.columns: lc = col.lower() if any(k in lc for k in ["url","website","domain","link"]): chosen = col; break if chosen is None: chosen = df.columns[0] urls += [str(x).strip() for x in df[chosen].dropna().tolist()] else: with open(path, "r", encoding="utf-8", errors="ignore") as f: urls += [x.strip() for x in f if x.strip()] except Exception as e: logger.error(f"File read error: {e}") # unique preserve order seen = set(); out=[] for u in urls: if u not in seen: out.append(u); seen.add(u) return out async def scrape_websites(text_input: str, file_input, js_toggle: bool, progress=gr.Progress()) -> Tuple[pd.DataFrame, str]: urls = parse_input_urls(text_input, file_input) if not urls: return pd.DataFrame(), "No URLs provided." progress(0, desc="Starting...") def update(msg): progress(0.5, desc=msg) df = await process_urls(urls, progress_callback=update, enable_js=js_toggle) df = df.rename(columns={ 'website': 'Website', 'imprint_url': 'Imprint URL', 'email': 'Email', 'phone': 'Phone', 'owner': 'Owner', 'status': 'Status' }) out_path = "scraped_contacts.csv" df.to_csv(out_path, index=False, encoding="utf-8") progress(1.0, desc="Done") ok = (df["Status"] == "Success").sum() if not df.empty else 0 msg = f"Processed {len(df)} sites. Success: {ok} ({(ok/len(df)*100):.1f}%)." if len(df) else "No rows." return df, msg # ---------- Gradio UI ---------- def create_interface(): with gr.Blocks(title="🇩🇪 DACH Imprint Scraper (Fixed)") as app: gr.Markdown("# 🇩🇪 DACH Imprint Scraper\nFinds actual Impressum/Kontakt pages, extracts Email / Phone / Owner.") with gr.Row(): with gr.Column(): text_input = gr.Textbox(label="URLs (one per line)", lines=8, placeholder="https://example-shop.de\nhttps://another-store.at\nhttps://swiss-shop.ch") file_input = gr.File(label="Or upload CSV/TXT with URLs", file_types=[".csv",".txt"]) js_toggle = gr.Checkbox(value=True, label="Enable JavaScript (Playwright) fallback") run_btn = gr.Button("🚀 Start", variant="primary") with gr.Column(): status = gr.Textbox(label="Status", interactive=False) results = gr.Dataframe(label="Results", interactive=False) download = gr.File(label="Download CSV") run_btn.click(fn=scrape_websites, inputs=[text_input, file_input, js_toggle], outputs=[results, status]) \ .then(lambda df: "scraped_contacts.csv", inputs=[results], outputs=[download]) return app if __name__ == "__main__": app = create_interface() app.launch( share=True)