zeracap2 / appnoapi.pi
huijio's picture
Rename app.py to appnoapi.pi
e9456ae verified
import gradio as gr
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms
from PIL import Image
import numpy as np
import cv2
from scipy import stats
import requests
from io import BytesIO
import base64
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
from fastapi.middleware.cors import CORSMiddleware
import json
import warnings
warnings.filterwarnings('ignore')
# ==================== MODEL DEFINITIONS ====================
class DualPathSiamese(nn.Module):
def __init__(self, embedding_dim=256):
super(DualPathSiamese, self).__init__()
# Deep learning path - use weights parameter instead of pretrained
resnet = models.resnet50(weights=None)
self.cnn_backbone = nn.Sequential(*list(resnet.children())[:-1])
self.cnn_embedding = nn.Sequential(
nn.Linear(2048, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, embedding_dim)
)
# Traditional CV path
self.feature_embedding = nn.Sequential(
nn.Linear(29, 128),
nn.BatchNorm1d(128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64)
)
# Fusion layer
self.fusion = nn.Sequential(
nn.Linear(embedding_dim + 64, 256),
nn.BatchNorm1d(256),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(256, embedding_dim)
)
def forward_once(self, img, features):
cnn_out = self.cnn_backbone(img)
cnn_out = cnn_out.view(cnn_out.size(0), -1)
cnn_embed = self.cnn_embedding(cnn_out)
feat_embed = self.feature_embedding(features)
combined = torch.cat([cnn_embed, feat_embed], dim=1)
output = self.fusion(combined)
return F.normalize(output, p=2, dim=1)
def forward(self, img1, img2, features):
feat1 = features[:, :29]
feat2 = features[:, 29:]
output1 = self.forward_once(img1, feat1)
output2 = self.forward_once(img2, feat2)
return output1, output2
class EnsembleSiamese:
def __init__(self, device='cpu'):
self.device = device
self.models = {}
self.model_names = ['dualpath', 'resnet50', 'efficientnet']
self.weights = [0.34, 0.33, 0.33]
self.models_loaded = False
try:
# Load DualPath model
self.models['dualpath'] = DualPathSiamese(embedding_dim=256).to(device)
# Load ResNet50 model
resnet = models.resnet50(weights=None)
self.models['resnet50'] = self.create_resnet_siamese(resnet, 2048, 256).to(device)
# Load EfficientNet model
from torchvision.models import efficientnet_b3
efficientnet = efficientnet_b3(weights=None)
self.models['efficientnet'] = self.create_efficientnet_siamese(efficientnet, 256).to(device)
# Load trained weights with proper settings
self.load_weights()
self.models_loaded = True
print("βœ… Ensemble model initialized successfully!")
except Exception as e:
print(f"❌ Error initializing models: {e}")
self.models_loaded = False
def create_resnet_siamese(self, resnet, in_features, embedding_dim):
class ResNetSiam(nn.Module):
def __init__(self):
super(ResNetSiam, self).__init__()
self.backbone = nn.Sequential(*list(resnet.children())[:-1])
self.embedding = nn.Sequential(
nn.Linear(in_features, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, embedding_dim)
)
def forward_once(self, x):
x = self.backbone(x)
x = x.view(x.size(0), -1)
x = self.embedding(x)
return F.normalize(x, p=2, dim=1)
def forward(self, img1, img2, features=None):
return self.forward_once(img1), self.forward_once(img2)
return ResNetSiam()
def create_efficientnet_siamese(self, efficientnet, embedding_dim):
class EfficientNetSiam(nn.Module):
def __init__(self):
super(EfficientNetSiam, self).__init__()
self.backbone = efficientnet.features
self.avgpool = nn.AdaptiveAvgPool2d(1)
self.embedding = nn.Sequential(
nn.Linear(1536, 512),
nn.BatchNorm1d(512),
nn.ReLU(),
nn.Dropout(0.4),
nn.Linear(512, embedding_dim)
)
def forward_once(self, x):
x = self.backbone(x)
x = self.avgpool(x)
x = x.view(x.size(0), -1)
x = self.embedding(x)
return F.normalize(x, p=2, dim=1)
def forward(self, img1, img2, features=None):
return self.forward_once(img1), self.forward_once(img2)
return EfficientNetSiam()
def load_weights(self):
"""Load trained model weights with proper error handling"""
try:
# Load DualPath with weights_only=False for compatibility
dualpath_state = torch.load('ensemble_dualpath.pth', map_location=self.device, weights_only=False)
self.models['dualpath'].load_state_dict(dualpath_state['model_state_dict'])
print("βœ… DualPath weights loaded")
# Load ResNet50
resnet_state = torch.load('ensemble_resnet50.pth', map_location=self.device, weights_only=False)
self.models['resnet50'].load_state_dict(resnet_state['model_state_dict'])
print("βœ… ResNet50 weights loaded")
# Load EfficientNet
efficient_state = torch.load('ensemble_efficientnet.pth', map_location=self.device, weights_only=False)
self.models['efficientnet'].load_state_dict(efficient_state['model_state_dict'])
print("βœ… EfficientNet weights loaded")
except Exception as e:
print(f"⚠️ Partial weight loading error: {e}")
# Initialize with random weights if loading fails
for name, model in self.models.items():
model.apply(self._init_weights)
print("πŸ”„ Models initialized with random weights")
def _init_weights(self, m):
"""Initialize weights for models"""
if isinstance(m, nn.Linear):
torch.nn.init.xavier_uniform_(m.weight)
if m.bias is not None:
m.bias.data.fill_(0.01)
def extract_handcrafted_features(self, img_array):
"""Extract traditional CV features from numpy array"""
if img_array is None:
return np.zeros(29)
try:
features = []
# Color histogram
for i in range(3):
hist = cv2.calcHist([img_array], [i], None, [8], [0, 256])
features.extend(hist.flatten() / (hist.sum() + 1e-6))
# HSV features
hsv = cv2.cvtColor(img_array, cv2.COLOR_RGB2HSV)
features.extend([hsv[:,:,i].mean() for i in range(3)])
# Edge density
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
edges = cv2.Canny(gray, 50, 150)
features.append(edges.sum() / (edges.size + 1e-6))
# Texture
features.append(cv2.Laplacian(gray, cv2.CV_64F).var())
return np.array(features, dtype=np.float32)
except Exception as e:
print(f"Feature extraction error: {e}")
return np.zeros(29)
def predict_detailed(self, question_img, answer_imgs, threshold=0.312):
"""Predict similarity with detailed model breakdown"""
if not self.models_loaded:
return [{
'answer_index': i,
'model_predictions': {
'dualpath': {'distance': 1.0, 'confidence': 0.0, 'is_match': False},
'resnet50': {'distance': 1.0, 'confidence': 0.0, 'is_match': False},
'efficientnet': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}
},
'ensemble_confidence': 0.0,
'ensemble_distance': 1.0,
'ensemble_match': False,
'final_decision': False
} for i in range(len(answer_imgs))]
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
all_results = []
for answer_idx, answer_img in enumerate(answer_imgs):
try:
# Preprocess images
q_img = transform(question_img.convert('RGB')).unsqueeze(0).to(self.device)
a_img = transform(answer_img.convert('RGB')).unsqueeze(0).to(self.device)
# Extract features
q_features = self.extract_handcrafted_features(np.array(question_img))
a_features = self.extract_handcrafted_features(np.array(answer_img))
features = np.concatenate([q_features, a_features])
features_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(self.device)
# Get predictions from all models
model_predictions = {}
distances = []
confidences = []
for name, model in self.models.items():
model.eval()
with torch.no_grad():
if name == 'dualpath':
out1, out2 = model(q_img, a_img, features_tensor)
else:
out1, out2 = model(q_img, a_img)
dist = F.pairwise_distance(out1, out2)
confidence = max(0, 100 * (1 - dist.item()))
model_predictions[name] = {
'distance': float(dist.item()),
'confidence': float(confidence),
'is_match': bool(dist.item() < threshold)
}
distances.append(dist.item())
confidences.append(confidence)
# Weighted average
weighted_distance = sum(w * d for w, d in zip(self.weights, distances))
weighted_confidence = sum(w * c for w, c in zip(self.weights, confidences))
is_match = weighted_distance < threshold
answer_result = {
'answer_index': answer_idx,
'model_predictions': model_predictions,
'ensemble_distance': float(weighted_distance),
'ensemble_confidence': float(weighted_confidence),
'ensemble_match': bool(is_match),
'final_decision': is_match
}
all_results.append(answer_result)
except Exception as e:
print(f"Error processing answer {answer_idx}: {e}")
# Add fallback result
all_results.append({
'answer_index': answer_idx,
'model_predictions': {
'dualpath': {'distance': 1.0, 'confidence': 0.0, 'is_match': False},
'resnet50': {'distance': 1.0, 'confidence': 0.0, 'is_match': False},
'efficientnet': {'distance': 1.0, 'confidence': 0.0, 'is_match': False}
},
'ensemble_confidence': 0.0,
'ensemble_distance': 1.0,
'ensemble_match': False,
'final_decision': False
})
return all_results
# ==================== INITIALIZE MODEL ====================
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"πŸ”§ Using device: {device}")
ensemble_model = EnsembleSiamese(device=device)
# ==================== GRADIO INTERFACE ====================
def format_detailed_results(results):
"""Format results with detailed model breakdown"""
if not ensemble_model.models_loaded:
return "⚠️ **MODELS NOT PROPERLY LOADED**\n\nPlease check that all model files are uploaded:\n- ensemble_dualpath.pth\n- ensemble_resnet50.pth\n- ensemble_efficientnet.pth\n\nCurrently using fallback mode with random weights."
output = ""
# Find best match
valid_results = [r for r in results if r['ensemble_confidence'] > 0]
if not valid_results:
return "❌ No valid predictions could be made. Please check your images."
best_match = max(valid_results, key=lambda x: x['ensemble_confidence'])
best_index = best_match['answer_index']
output += "🎯 **FINAL PREDICTION RESULTS** 🎯\n\n"
output += f"**Best Match: Answer {best_index + 1}** \n"
output += f"**Overall Confidence: {best_match['ensemble_confidence']:.2f}%** \n"
output += f"**Distance: {best_match['ensemble_distance']:.4f}** \n"
output += f"**Match: {'βœ… YES' if best_match['final_decision'] else '❌ NO'}** \n\n"
output += "---\n\n"
output += "**πŸ“Š DETAILED MODEL BREAKDOWN:**\n\n"
for result in results:
output += f"## **Answer {result['answer_index'] + 1}**\n"
output += f"**Ensemble:** {result['ensemble_confidence']:.2f}% | Distance: {result['ensemble_distance']:.4f} | {'βœ… MATCH' if result['final_decision'] else '❌ NO MATCH'}\n\n"
for model_name, prediction in result['model_predictions'].items():
emoji = "🟒" if prediction['is_match'] else "πŸ”΄"
output += f" - **{model_name.upper()}:** {emoji} {prediction['confidence']:.2f}% | Distance: {prediction['distance']:.4f}\n"
output += "\n"
# Model agreement analysis
output += "---\n\n"
output += "**🀝 MODEL AGREEMENT ANALYSIS:**\n\n"
for result in results:
matches = sum(1 for pred in result['model_predictions'].values() if pred['is_match'])
total_models = len(result['model_predictions'])
agreement = (matches / total_models) * 100
consensus_emoji = "🟒" if agreement > 66 else "🟑" if agreement > 33 else "πŸ”΄"
output += f"**Answer {result['answer_index'] + 1}:** {consensus_emoji} {matches}/{total_models} models agree ({agreement:.1f}% consensus)\n"
# Add warning if models show suspicious behavior
suspicious = any(any(pred['confidence'] > 99.9 for pred in r['model_predictions'].values()) for r in results)
if suspicious:
output += "\n---\n\n"
output += "⚠️ **WARNING:** Some models are showing unusually high confidence scores. This may indicate model loading issues.\n"
return output
def predict_captcha_detailed(question_image, *answer_images):
"""Gradio prediction function with detailed output"""
# Filter out None images
answer_imgs = [img for img in answer_images if img is not None]
if not question_image or len(answer_imgs) == 0:
return "❌ Please upload both question and answer images"
try:
print(f"πŸ” Processing: 1 question + {len(answer_imgs)} answers")
# Get detailed predictions
results = ensemble_model.predict_detailed(question_image, answer_imgs)
# Format output
output = format_detailed_results(results)
# Add technical details
output += "\n---\n\n"
output += "**βš™οΈ TECHNICAL DETAILS:**\n\n"
output += f"- **Threshold:** 0.312 (optimized during training)\n"
output += f"- **Models:** DualPath (CNN + Handcrafted), ResNet50, EfficientNet-B3\n"
output += f"- **Ensemble Weights:** DualPath(34%), ResNet50(33%), EfficientNet(33%)\n"
output += f"- **Training Accuracy:** 98.67%\n"
output += f"- **Device:** {device.upper()}\n"
output += f"- **Models Loaded:** {'βœ… YES' if ensemble_model.models_loaded else '❌ NO'}\n"
return output
except Exception as e:
return f"❌ Error during prediction: {str(e)}"
# ==================== FASTAPI SETUP ====================
app = FastAPI(title="CAPTCHA Solver API", version="1.0")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class Base64PredictionRequest(BaseModel):
question_base64: str
answers_base64: list[str]
def base64_to_image(base64_string):
"""Convert base64 string to PIL Image"""
try:
# Remove data URL prefix if present
if ',' in base64_string:
base64_string = base64_string.split(',')[1]
image_data = base64.b64decode(base64_string)
return Image.open(BytesIO(image_data))
except Exception as e:
print(f"Error decoding base64: {e}")
return None
@app.post("/api/predict")
async def api_predict_endpoint(request: Base64PredictionRequest):
"""API endpoint for userscript with base64 images"""
try:
print(f"πŸ“₯ Received API request: {len(request.answers_base64)} answers")
# Convert base64 to images
question_img = base64_to_image(request.question_base64)
if not question_img:
return {"success": False, "error": "Failed to decode question image"}
answer_imgs = []
for i, base64_str in enumerate(request.answers_base64):
img = base64_to_image(base64_str)
if img:
answer_imgs.append(img)
print(f"βœ… Decoded answer {i+1}")
else:
print(f"❌ Failed to decode answer {i+1}")
# Use fallback image
answer_imgs.append(Image.new('RGB', (100, 100), color='white'))
if len(answer_imgs) == 0:
return {"success": False, "error": "No answer images could be decoded"}
# Make prediction
results = ensemble_model.predict_detailed(question_img, answer_imgs)
# Find best match
valid_results = [r for r in results if r['ensemble_confidence'] > 0]
if not valid_results:
return {"success": False, "error": "No valid predictions could be made"}
best_index = np.argmax([r['ensemble_confidence'] for r in valid_results])
best_result = valid_results[best_index]
# Prepare response
response_data = {
'success': True,
'predictions': [
{
'answer_index': r['answer_index'],
'ensemble_confidence': r['ensemble_confidence'],
'ensemble_distance': r['ensemble_distance'],
'ensemble_match': r['ensemble_match'],
'model_predictions': r['model_predictions']
} for r in results
],
'best_match': int(best_index),
'best_confidence': float(best_result['ensemble_confidence']),
'best_distance': float(best_result['ensemble_distance']),
'models_loaded': ensemble_model.models_loaded
}
print(f"βœ… API Prediction complete. Best match: {best_index} with {best_result['ensemble_confidence']:.2f}% confidence")
return response_data
except Exception as e:
print(f"❌ API error: {str(e)}")
return {"success": False, "error": str(e)}
@app.get("/")
async def root():
return {
"message": "CAPTCHA Solver API is running!",
"version": "1.0",
"accuracy": "98.67%",
"models_loaded": ensemble_model.models_loaded
}
@app.get("/health")
async def health_check():
return {
"status": "healthy",
"models_loaded": ensemble_model.models_loaded,
"device": device
}
# ==================== GRADIO UI ====================
with gr.Blocks(title="CAPTCHA Solver - Ensemble AI", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ” CAPTCHA Solver - Ensemble Siamese Network
### **Achieved 98.67% Accuracy during Training**
This system uses an ensemble of three advanced neural networks to solve CAPTCHA challenges.
""")
# Status indicator
status = gr.Markdown(
value=f"**Status:** {'βœ… Models Loaded Successfully' if ensemble_model.models_loaded else '⚠️ Models Not Properly Loaded - Using Fallback Mode'}"
)
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ“Έ Upload Images")
question = gr.Image(label="Question Image", type="pil", height=200)
gr.Markdown("### 🎯 Answer Images")
with gr.Row():
answer1 = gr.Image(label="Answer 1", type="pil", height=150)
answer2 = gr.Image(label="Answer 2", type="pil", height=150)
with gr.Row():
answer3 = gr.Image(label="Answer 3", type="pil", height=150)
answer4 = gr.Image(label="Answer 4", type="pil", height=150)
with gr.Row():
answer5 = gr.Image(label="Answer 5", type="pil", height=150)
predict_btn = gr.Button("πŸš€ Analyze CAPTCHA", variant="primary", size="lg")
with gr.Column(scale=2):
gr.Markdown("### πŸ“Š Prediction Results")
output = gr.Markdown(
label="Detailed Analysis",
value="πŸ‘† Upload images and click 'Analyze CAPTCHA' to see predictions here..."
)
# Connect the prediction function
predict_btn.click(
fn=predict_captcha_detailed,
inputs=[question, answer1, answer2, answer3, answer4, answer5],
outputs=output
)
# ==================== RUN APPLICATION ====================
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)