Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| import requests | |
| import re | |
| import os | |
| import base64 | |
| import logging | |
| from typing import Dict, Optional | |
| app = Flask(__name__) | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MirrorScraper: | |
| def __init__(self): | |
| self.base_url = "https://cdn.youtubeunblocked.live" | |
| self.target_url = "https://adnade.net/ptp/?user=alllogin&subid=143" | |
| self.headers = { | |
| "User-Agent": "Mozilla/5.0 (Linux; Android 14; SM-X205 Build/UP1A.231005.007) AppleWebKit/537.36", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| } | |
| def extract_csrf_from_homepage(self) -> Optional[str]: | |
| """Extract fresh CSRF token from the homepage""" | |
| homepage_url = f"{self.base_url}/" | |
| logger.info("π Extracting CSRF token from homepage...") | |
| try: | |
| response = requests.get(homepage_url, headers=self.headers, timeout=10) | |
| if response.status_code == 200: | |
| csrf_match = re.search(r'name="csrf"[^>]*value="([^"]+)"', response.text) | |
| if csrf_match: | |
| csrf_token = csrf_match.group(1) | |
| logger.info("β CSRF token extracted from homepage") | |
| return csrf_token | |
| else: | |
| logger.error("β CSRF token not found in homepage") | |
| return None | |
| else: | |
| logger.error(f"β Failed to get homepage. Status: {response.status_code}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"β Error extracting CSRF: {str(e)}") | |
| return None | |
| def get_server_data_and_csrf(self) -> tuple: | |
| """POST to servers page and extract server ID and new CSRF token""" | |
| servers_url = f"{self.base_url}/servers" | |
| # First get homepage CSRF | |
| homepage_csrf = self.extract_csrf_from_homepage() | |
| if not homepage_csrf: | |
| return None, None | |
| data = { | |
| "url": self.target_url, | |
| "csrf": homepage_csrf | |
| } | |
| headers = { | |
| **self.headers, | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| "Origin": self.base_url, | |
| "Referer": self.base_url + "/", | |
| } | |
| logger.info("π Getting server data and new CSRF token...") | |
| try: | |
| response = requests.post(servers_url, data=data, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| # Extract NEW CSRF token from the JavaScript data | |
| csrf_match = re.search(r'data-csrf=""([^&]+)""', response.text) | |
| if csrf_match: | |
| new_csrf = csrf_match.group(1) | |
| logger.info("β New CSRF token extracted") | |
| else: | |
| logger.warning("β οΈ New CSRF token not found, using fallback") | |
| new_csrf = "RXFOUlV0ZjFxdHg5L1VGd1ZrazRKVFRDUGdNQ1ZhUG01N2l3ZitXTFFMUU81Z2tUb1JXY3JoOXJveWtaSHA2clNBRHZhSFNIN1N0Rk8rak13c3dJNFBEcEcwa3VjYndCMVo5emEyd0ZUQ2RzK0U3RCtNR3dYcGVVSkxTamx6bzl4M2orNkN0Q2JwLzZHU0F6NHkwSTlRPT0=" | |
| # Extract server ID (use first one from the list or fallback) | |
| server_id = "161" # Fallback to known working server | |
| return server_id, new_csrf | |
| else: | |
| logger.error(f"β Failed to get servers page. Status: {response.status_code}") | |
| return None, None | |
| except Exception as e: | |
| logger.error(f"β Error getting server data: {str(e)}") | |
| return None, None | |
| def get_complete_url(self) -> Dict[str, str]: | |
| """Get complete URL using the correct flow""" | |
| try: | |
| # Get server ID and NEW CSRF token | |
| server_id, new_csrf = self.get_server_data_and_csrf() | |
| if not server_id or not new_csrf: | |
| return {"success": False, "error": "Cannot proceed without server data"} | |
| # Use the NEW CSRF token for the final request | |
| url = f"{self.base_url}/requests?fso=" | |
| data = { | |
| "url": self.target_url, | |
| "proxyServerId": server_id, | |
| "csrf": new_csrf, | |
| "demo": "0", | |
| "frontOrigin": self.base_url | |
| } | |
| headers = { | |
| **self.headers, | |
| "Content-Type": "application/x-www-form-urlencoded", | |
| "Origin": self.base_url, | |
| "Referer": self.base_url + "/servers", | |
| } | |
| logger.info("π Getting complete URL from mirror site...") | |
| response = requests.post(url, data=data, headers=headers, allow_redirects=False, timeout=10) | |
| logger.info(f"π Status: {response.status_code}") | |
| if response.status_code == 302 and 'location' in response.headers: | |
| complete_url = response.headers['location'] | |
| logger.info("β Complete URL obtained") | |
| return { | |
| "success": True, | |
| "url": complete_url, | |
| "message": "URL successfully generated" | |
| } | |
| else: | |
| error_msg = f"Failed. Status: {response.status_code}" | |
| if 'location' not in response.headers: | |
| error_msg += " - No location header in response" | |
| logger.error(f"β {error_msg}") | |
| return {"success": False, "error": error_msg} | |
| except Exception as e: | |
| logger.error(f"β Exception occurred: {str(e)}") | |
| return {"success": False, "error": f"Internal server error: {str(e)}"} | |
| # Initialize the scraper | |
| scraper = MirrorScraper() | |
| def home(): | |
| return jsonify({ | |
| "message": "Mirror URL Generator API - huijio/urltemplate", | |
| "status": "active", | |
| "version": "1.0", | |
| "endpoints": { | |
| "/generate": "GET - Generate a new URL", | |
| "/health": "GET - Health check", | |
| "/docs": "GET - API documentation" | |
| }, | |
| "example_usage": "GET https://huijio-urltemplate.hf.space/generate" | |
| }) | |
| def health(): | |
| return jsonify({"status": "healthy", "service": "mirror-url-generator"}) | |
| def docs(): | |
| return jsonify({ | |
| "api_documentation": { | |
| "base_url": "https://huijio-urltemplate.hf.space", | |
| "endpoints": { | |
| "/generate": { | |
| "method": "GET", | |
| "description": "Generate a new proxy URL", | |
| "response": { | |
| "success": "boolean", | |
| "url": "string (the generated URL)", | |
| "message": "string" | |
| } | |
| }, | |
| "/health": { | |
| "method": "GET", | |
| "description": "Health check endpoint", | |
| "response": {"status": "string"} | |
| } | |
| } | |
| } | |
| }) | |
| def generate_url(): | |
| """API endpoint to generate the complete URL""" | |
| try: | |
| result = scraper.get_complete_url() | |
| return jsonify(result) | |
| except Exception as e: | |
| return jsonify({ | |
| "success": False, | |
| "error": f"Internal server error: {str(e)}" | |
| }), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860, debug=False) |