import gradio as gr import torch import torch.nn as nn import torch.nn.functional as F from torchvision import models, transforms from PIL import Image import numpy as np import cv2 from scipy import stats import requests from io import BytesIO import base64 from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn from fastapi.middleware.cors import CORSMiddleware import json import warnings warnings.filterwarnings('ignore') # ==================== MODEL DEFINITIONS ==================== class DualPathSiamese(nn.Module): def __init__(self, embedding_dim=256): super(DualPathSiamese, self).__init__() # Deep learning path - use weights parameter instead of pretrained resnet = models.resnet50(weights=None) self.cnn_backbone = nn.Sequential(*list(resnet.children())[:-1]) self.cnn_embedding = nn.Sequential( nn.Linear(2048, 512), nn.BatchNorm1d(512), nn.ReLU(), nn.Dropout(0.5), nn.Linear(512, embedding_dim) ) # Traditional CV path self.feature_embedding = nn.Sequential( nn.Linear(29, 128), nn.BatchNorm1d(128), nn.ReLU(), nn.Dropout(0.3), nn.Linear(128, 64) ) # Fusion layer self.fusion = nn.Sequential( nn.Linear(embedding_dim + 64, 256), nn.BatchNorm1d(256), nn.ReLU(), nn.Dropout(0.3), nn.Linear(256, embedding_dim) ) def forward_once(self, img, features): cnn_out = self.cnn_backbone(img) cnn_out = cnn_out.view(cnn_out.size(0), -1) cnn_embed = self.cnn_embedding(cnn_out) feat_embed = self.feature_embedding(features) combined = torch.cat([cnn_embed, feat_embed], dim=1) output = self.fusion(combined) return F.normalize(output, p=2, dim=1) def forward(self, img1, img2, features): feat1 = features[:, :29] feat2 = features[:, 29:] output1 = self.forward_once(img1, feat1) output2 = self.forward_once(img2, feat2) return output1, output2 class EnsembleSiamese: def __init__(self, device='cpu'): self.device = device self.models = {} self.model_names = ['dualpath', 'resnet50', 'efficientnet'] self.weights = [0.34, 0.33, 0.33] self.models_loaded = False try: # Load DualPath model self.models['dualpath'] = DualPathSiamese(embedding_dim=256).to(device) # Load ResNet50 model resnet = models.resnet50(weights=None) self.models['resnet50'] = self.create_resnet_siamese(resnet, 2048, 256).to(device) # Load EfficientNet model from torchvision.models import efficientnet_b3 efficientnet = efficientnet_b3(weights=None) self.models['efficientnet'] = self.create_efficientnet_siamese(efficientnet, 256).to(device) # Load trained weights with proper settings self.load_weights() self.models_loaded = True print("✅ Ensemble model initialized successfully!") except Exception as e: print(f"❌ Error initializing models: {e}") self.models_loaded = False def create_resnet_siamese(self, resnet, in_features, embedding_dim): class ResNetSiam(nn.Module): def __init__(self): super(ResNetSiam, self).__init__() self.backbone = nn.Sequential(*list(resnet.children())[:-1]) self.embedding = nn.Sequential( nn.Linear(in_features, 512), nn.BatchNorm1d(512), nn.ReLU(), nn.Dropout(0.5), nn.Linear(512, embedding_dim) ) def forward_once(self, x): x = self.backbone(x) x = x.view(x.size(0), -1) x = self.embedding(x) return F.normalize(x, p=2, dim=1) def forward(self, img1, img2, features=None): return self.forward_once(img1), self.forward_once(img2) return ResNetSiam() def create_efficientnet_siamese(self, efficientnet, embedding_dim): class EfficientNetSiam(nn.Module): def __init__(self): super(EfficientNetSiam, self).__init__() self.backbone = efficientnet.features self.avgpool = nn.AdaptiveAvgPool2d(1) self.embedding = nn.Sequential( nn.Linear(1536, 512), nn.BatchNorm1d(512), nn.ReLU(), nn.Dropout(0.4), nn.Linear(512, embedding_dim) ) def forward_once(self, x): x = self.backbone(x) x = self.avgpool(x) x = x.view(x.size(0), -1) x = self.embedding(x) return F.normalize(x, p=2, dim=1) def forward(self, img1, img2, features=None): return self.forward_once(img1), self.forward_once(img2) return EfficientNetSiam() def load_weights(self): """Load trained model weights with proper error handling""" try: # Load DualPath with weights_only=False for compatibility dualpath_state = torch.load('ensemble_dualpath.pth', map_location=self.device, weights_only=False) self.models['dualpath'].load_state_dict(dualpath_state['model_state_dict']) print("✅ DualPath weights loaded") # Load ResNet50 resnet_state = torch.load('ensemble_resnet50.pth', map_location=self.device, weights_only=False) self.models['resnet50'].load_state_dict(resnet_state['model_state_dict']) print("✅ ResNet50 weights loaded") # Load EfficientNet efficient_state = torch.load('ensemble_efficientnet.pth', map_location=self.device, weights_only=False) self.models['efficientnet'].load_state_dict(efficient_state['model_state_dict']) print("✅ EfficientNet weights loaded") except Exception as e: print(f"⚠️ Partial weight loading error: {e}") # Initialize with random weights if loading fails for name, model in self.models.items(): model.apply(self._init_weights) print("🔄 Models initialized with random weights") def _init_weights(self, m): """Initialize weights for models""" if isinstance(m, nn.Linear): torch.nn.init.xavier_uniform_(m.weight) if m.bias is not None: m.bias.data.fill_(0.01) def extract_handcrafted_features(self, img_array): """Extract traditional CV features from numpy array""" if img_array is None: return np.zeros(29) try: features = [] # Color histogram for i in range(3): hist = cv2.calcHist([img_array], [i], None, [8], [0, 256]) features.extend(hist.flatten() / (hist.sum() + 1e-6)) # HSV features hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV) features.extend([hsv[:,:,i].mean() for i in range(3)]) # Edge density gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) edges = cv2.Canny(gray, 50, 150) features.append(edges.sum() / (edges.size + 1e-6)) # Texture features.append(cv2.Laplacian(gray, cv2.CV_64F).var()) return np.array(features, dtype=np.float32) except Exception as e: print(f"Feature extraction error: {e}") return np.zeros(29) def predict_detailed(self, question_img, answer_imgs, threshold=0.312): """Predict similarity with detailed model breakdown""" if not self.models_loaded: return [{ 'answer_index': i, 'model_predictions': { 'dualpath': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}, 'resnet50': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}, 'efficientnet': {'distance': 1.0, 'confidence': 0.0, 'is_match': False} }, 'ensemble_confidence': 0.0, 'ensemble_distance': 1.0, 'ensemble_match': False, 'final_decision': False } for i in range(len(answer_imgs))] transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.ToTensor(), transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) ]) all_results = [] for answer_idx, answer_img in enumerate(answer_imgs): try: # Preprocess images q_img = transform(question_img.convert('RGB')).unsqueeze(0).to(self.device) a_img = transform(answer_img.convert('RGB')).unsqueeze(0).to(self.device) # Extract features q_features = self.extract_handcrafted_features(np.array(question_img)) a_features = self.extract_handcrafted_features(np.array(answer_img)) features = np.concatenate([q_features, a_features]) features_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(self.device) # Get predictions from all models model_predictions = {} distances = [] confidences = [] for name, model in self.models.items(): model.eval() with torch.no_grad(): if name == 'dualpath': out1, out2 = model(q_img, a_img, features_tensor) else: out1, out2 = model(q_img, a_img) dist = F.pairwise_distance(out1, out2) confidence = max(0, 100 * (1 - dist.item())) model_predictions[name] = { 'distance': float(dist.item()), 'confidence': float(confidence), 'is_match': bool(dist.item() < threshold) } distances.append(dist.item()) confidences.append(confidence) # Weighted average weighted_distance = sum(w * d for w, d in zip(self.weights, distances)) weighted_confidence = sum(w * c for w, c in zip(self.weights, confidences)) is_match = weighted_distance < threshold answer_result = { 'answer_index': answer_idx, 'model_predictions': model_predictions, 'ensemble_distance': float(weighted_distance), 'ensemble_confidence': float(weighted_confidence), 'ensemble_match': bool(is_match), 'final_decision': is_match } all_results.append(answer_result) except Exception as e: print(f"Error processing answer {answer_idx}: {e}") # Add fallback result all_results.append({ 'answer_index': answer_idx, 'model_predictions': { 'dualpath': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}, 'resnet50': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}, 'efficientnet': {'distance': 1.0, 'confidence': 0.0, 'is_match': False} }, 'ensemble_confidence': 0.0, 'ensemble_distance': 1.0, 'ensemble_match': False, 'final_decision': False }) return all_results # ==================== INITIALIZE MODEL ==================== device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"🔧 Using device: {device}") ensemble_model = EnsembleSiamese(device=device) # ==================== GRADIO INTERFACE ==================== def format_detailed_results(results): """Format results with detailed model breakdown""" if not ensemble_model.models_loaded: return "⚠️ **MODELS NOT PROPERLY LOADED**\n\nPlease check that all model files are uploaded:\n- ensemble_dualpath.pth\n- ensemble_resnet50.pth\n- ensemble_efficientnet.pth\n\nCurrently using fallback mode with random weights." output = "" # Find best match valid_results = [r for r in results if r['ensemble_confidence'] > 0] if not valid_results: return "❌ No valid predictions could be made. Please check your images." best_match = max(valid_results, key=lambda x: x['ensemble_confidence']) best_index = best_match['answer_index'] output += "🎯 **FINAL PREDICTION RESULTS** 🎯\n\n" output += f"**Best Match: Answer {best_index + 1}** \n" output += f"**Overall Confidence: {best_match['ensemble_confidence']:.2f}%** \n" output += f"**Distance: {best_match['ensemble_distance']:.4f}** \n" output += f"**Match: {'✅ YES' if best_match['final_decision'] else '❌ NO'}** \n\n" output += "---\n\n" output += "**📊 DETAILED MODEL BREAKDOWN:**\n\n" for result in results: output += f"## **Answer {result['answer_index'] + 1}**\n" output += f"**Ensemble:** {result['ensemble_confidence']:.2f}% | Distance: {result['ensemble_distance']:.4f} | {'✅ MATCH' if result['final_decision'] else '❌ NO MATCH'}\n\n" for model_name, prediction in result['model_predictions'].items(): emoji = "🟢" if prediction['is_match'] else "🔴" output += f" - **{model_name.upper()}:** {emoji} {prediction['confidence']:.2f}% | Distance: {prediction['distance']:.4f}\n" output += "\n" # Model agreement analysis output += "---\n\n" output += "**🤝 MODEL AGREEMENT ANALYSIS:**\n\n" for result in results: matches = sum(1 for pred in result['model_predictions'].values() if pred['is_match']) total_models = len(result['model_predictions']) agreement = (matches / total_models) * 100 consensus_emoji = "🟢" if agreement > 66 else "🟡" if agreement > 33 else "🔴" output += f"**Answer {result['answer_index'] + 1}:** {consensus_emoji} {matches}/{total_models} models agree ({agreement:.1f}% consensus)\n" # Add warning if models show suspicious behavior suspicious = any(any(pred['confidence'] > 99.9 for pred in r['model_predictions'].values()) for r in results) if suspicious: output += "\n---\n\n" output += "⚠️ **WARNING:** Some models are showing unusually high confidence scores. This may indicate model loading issues.\n" return output def predict_captcha_detailed(question_image, *answer_images): """Gradio prediction function with detailed output""" # Filter out None images answer_imgs = [img for img in answer_images if img is not None] if not question_image or len(answer_imgs) == 0: return "❌ Please upload both question and answer images" try: print(f"🔍 Processing: 1 question + {len(answer_imgs)} answers") # Get detailed predictions results = ensemble_model.predict_detailed(question_image, answer_imgs) # Format output output = format_detailed_results(results) # Add technical details output += "\n---\n\n" output += "**⚙️ TECHNICAL DETAILS:**\n\n" output += f"- **Threshold:** 0.312 (optimized during training)\n" output += f"- **Models:** DualPath (CNN + Handcrafted), ResNet50, EfficientNet-B3\n" output += f"- **Ensemble Weights:** DualPath(34%), ResNet50(33%), EfficientNet(33%)\n" output += f"- **Training Accuracy:** 98.67%\n" output += f"- **Device:** {device.upper()}\n" output += f"- **Models Loaded:** {'✅ YES' if ensemble_model.models_loaded else '❌ NO'}\n" return output except Exception as e: return f"❌ Error during prediction: {str(e)}" # ==================== FASTAPI SETUP ==================== app = FastAPI(title="CAPTCHA Solver API", version="1.0") # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class Base64PredictionRequest(BaseModel): question_base64: str answers_base64: list[str] def base64_to_image(base64_string): """Convert base64 string to PIL Image""" try: # Remove data URL prefix if present if ',' in base64_string: base64_string = base64_string.split(',')[1] image_data = base64.b64decode(base64_string) return Image.open(BytesIO(image_data)) except Exception as e: print(f"Error decoding base64: {e}") return None @app.post("/api/predict") async def api_predict_endpoint(request: Base64PredictionRequest): """API endpoint for userscript with base64 images""" try: print(f"📥 Received API request: {len(request.answers_base64)} answers") # Convert base64 to images question_img = base64_to_image(request.question_base64) if not question_img: return {"success": False, "error": "Failed to decode question image"} answer_imgs = [] for i, base64_str in enumerate(request.answers_base64): img = base64_to_image(base64_str) if img: answer_imgs.append(img) print(f"✅ Decoded answer {i+1}") else: print(f"❌ Failed to decode answer {i+1}") # Use fallback image answer_imgs.append(Image.new('RGB', (100, 100), color='white')) if len(answer_imgs) == 0: return {"success": False, "error": "No answer images could be decoded"} # Make prediction results = ensemble_model.predict_detailed(question_img, answer_imgs) # Find best match valid_results = [r for r in results if r['ensemble_confidence'] > 0] if not valid_results: return {"success": False, "error": "No valid predictions could be made"} best_index = np.argmax([r['ensemble_confidence'] for r in valid_results]) best_result = valid_results[best_index] # Prepare response response_data = { 'success': True, 'predictions': [ { 'answer_index': r['answer_index'], 'ensemble_confidence': r['ensemble_confidence'], 'ensemble_distance': r['ensemble_distance'], 'ensemble_match': r['ensemble_match'], 'model_predictions': r['model_predictions'] } for r in results ], 'best_match': int(best_index), 'best_confidence': float(best_result['ensemble_confidence']), 'best_distance': float(best_result['ensemble_distance']), 'models_loaded': ensemble_model.models_loaded } print(f"✅ API Prediction complete. Best match: {best_index} with {best_result['ensemble_confidence']:.2f}% confidence") return response_data except Exception as e: print(f"❌ API error: {str(e)}") return {"success": False, "error": str(e)} @app.get("/") async def root(): return { "message": "CAPTCHA Solver API is running!", "version": "1.0", "accuracy": "98.67%", "models_loaded": ensemble_model.models_loaded } @app.get("/health") async def health_check(): return { "status": "healthy", "models_loaded": ensemble_model.models_loaded, "device": device } # ==================== GRADIO UI ==================== with gr.Blocks(title="CAPTCHA Solver - Ensemble AI", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🔍 CAPTCHA Solver - Ensemble Siamese Network ### **Achieved 98.67% Accuracy during Training** This system uses an ensemble of three advanced neural networks to solve CAPTCHA challenges. """) # Status indicator status = gr.Markdown( value=f"**Status:** {'✅ Models Loaded Successfully' if ensemble_model.models_loaded else '⚠️ Models Not Properly Loaded - Using Fallback Mode'}" ) with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 📸 Upload Images") question = gr.Image(label="Question Image", type="pil", height=200) gr.Markdown("### 🎯 Answer Images") with gr.Row(): answer1 = gr.Image(label="Answer 1", type="pil", height=150) answer2 = gr.Image(label="Answer 2", type="pil", height=150) with gr.Row(): answer3 = gr.Image(label="Answer 3", type="pil", height=150) answer4 = gr.Image(label="Answer 4", type="pil", height=150) with gr.Row(): answer5 = gr.Image(label="Answer 5", type="pil", height=150) predict_btn = gr.Button("🚀 Analyze CAPTCHA", variant="primary", size="lg") with gr.Column(scale=2): gr.Markdown("### 📊 Prediction Results") output = gr.Markdown( label="Detailed Analysis", value="👆 Upload images and click 'Analyze CAPTCHA' to see predictions here..." ) # Connect the prediction function predict_btn.click( fn=predict_captcha_detailed, inputs=[question, answer1, answer2, answer3, answer4, answer5], outputs=output ) # ==================== RUN APPLICATION ==================== if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)