ai-code-scanner-ui / rule_detector.py
mubi-613's picture
Initial clean commit: Fixed security issues and removed heavy checkpoints
168ae1c
import ast
import re
import astroid # Used for deep code analysis
class RuleBasedCodeDetector:
def __init__(self):
# Using raw strings (r'') to ensure regex backslashes are handled correctly
self.patterns = {
'sql_injection': [
r"f?[\"'][^\"']*(SELECT|INSERT|UPDATE|DELETE)[^\"']*\{\w+\}[^\"']*[\"']",
r"execute\(f[\"']",
r"cursor\.execute\(f[\"']",
r"\%s.*\%\(.*\)"
],
'hardcoded_secret': [
r'(api[_-]?key|password|secret|token|auth)\s*=\s*[\"\'][^\"\']+[\"\']',
r'(aws[_-]?(access[_-]?key|secret[_-]?key))\s*=\s*[\"\'][^\"\']+[\"\']',
r'[\"\'][A-Za-z0-9]{32,}[\"\']',
r'sk_[a-zA-Z0-9]{24,}'
],
'xss_vulnerability': [
r'f["\'][^"\']*\{\w+\}[^"\']*["\']',
r'Flask\.render_template_string\([^)]+\+\s*\w+',
r'\.format\([^)]*request\.'
],
'command_injection': [
r'os\.system\([^)]*\+\s*\w+',
r'subprocess\.(run|call|Popen)\([^)]*\+\s*\w+',
r'exec\([^)]*\w+'
],
'insecure_deserialization': [
r'pickle\.loads\([^)]*\w+',
r'yaml\.load\([^)]*\w+',
r'marshal\.loads\([^)]*\w+'
]
}
def detect_syntax_errors(self, code: str):
"""Detect Python syntax errors using built-in ast"""
try:
ast.parse(code)
return []
except SyntaxError as e:
return [{
"type": "syntax_error",
"severity": "CRITICAL",
"message": f"Syntax Error: {str(e)}",
"line": e.lineno if hasattr(e, 'lineno') else 1,
"fix": "Fix the syntax error before security analysis",
"detector": "syntax_checker"
}]
def detect_by_patterns(self, code: str):
"""Detect vulnerabilities using regex patterns"""
issues = []
lines = code.split('\n')
for i, line in enumerate(lines, 1):
for vuln_type, patterns in self.patterns.items():
for pattern in patterns:
if re.search(pattern, line, re.IGNORECASE):
issues.append({
"type": vuln_type,
"severity": self._get_severity(vuln_type),
"message": self._get_message(vuln_type),
"line": i,
"fix": self._get_fix(vuln_type),
"detector": "rule_based"
})
break
return issues
def detect_with_astroid(self, code: str):
"""Advanced detection using the Astroid library logic"""
issues = []
try:
# Astroid builds a more 'intelligent' tree than standard AST
node = astroid.parse(code)
# Example: Find all function calls to check for dangerous ones
for call in node.nodes_of_class(astroid.nodes.Call):
func_name = call.func.as_string()
if func_name == 'eval':
issues.append({
"type": "eval_usage",
"severity": "CRITICAL",
"message": "Dangerous eval() call detected via Astroid",
"line": call.lineno,
"fix": "Use ast.literal_eval() instead",
"detector": "astroid_analysis"
})
except Exception:
pass
return issues
def detect_ast_patterns(self, code: str):
"""Detect complex issues using Python's Abstract Syntax Tree"""
issues = []
try:
tree = ast.parse(code)
for node in ast.walk(tree):
if isinstance(node, ast.Call):
if isinstance(node.func, ast.Name):
if node.func.id in ['eval', 'exec']:
issues.append({
"type": f"{node.func.id}_usage",
"severity": "CRITICAL",
"message": f"{node.func.id}() function detected",
"line": node.lineno,
"fix": "Remove or replace with safer alternatives",
"detector": "ast_analysis"
})
except SyntaxError:
pass
return issues
def _get_severity(self, vuln_type: str) -> str:
severities = {
'sql_injection': 'CRITICAL',
'hardcoded_secret': 'CRITICAL',
'command_injection': 'CRITICAL',
'insecure_deserialization': 'HIGH',
'xss_vulnerability': 'HIGH',
'eval_usage': 'CRITICAL',
'exec_usage': 'CRITICAL'
}
return severities.get(vuln_type, 'MEDIUM')
def _get_message(self, vuln_type: str) -> str:
messages = {
'sql_injection': 'Potential SQL injection vulnerability',
'hardcoded_secret': 'Hardcoded secret found in code',
'xss_vulnerability': 'Potential Cross-Site Scripting (XSS) vulnerability',
'command_injection': 'Potential command injection vulnerability',
'insecure_deserialization': 'Insecure deserialization detected',
}
return messages.get(vuln_type, 'Security risk detected')
def _get_fix(self, vuln_type: str) -> str:
fixes = {
'sql_injection': 'Use parameterized queries or ORM',
'hardcoded_secret': 'Use environment variables',
'xss_vulnerability': 'Escape user input',
'command_injection': 'Avoid string concatenation in system calls',
'insecure_deserialization': 'Use JSON for data exchange'
}
return fixes.get(vuln_type, 'Review security documentation')
def analyze(self, code: str):
all_issues = []
# 1. Check syntax
syntax_issues = self.detect_syntax_errors(code)
all_issues.extend(syntax_issues)
if not syntax_issues:
# 2. Pattern Matching
all_issues.extend(self.detect_by_patterns(code))
# 3. Standard AST
all_issues.extend(self.detect_ast_patterns(code))
# 4. Advanced Astroid analysis
all_issues.extend(self.detect_with_astroid(code))
# Calculate security score
severity_weights = {'CRITICAL': 1.0, 'HIGH': 0.7, 'MEDIUM': 0.4, 'LOW': 0.1}
total_weight = sum(severity_weights.get(i['severity'], 0.1) for i in all_issues)
security_score = max(0, 100 - (total_weight * 20))
return {
"issues": all_issues,
"security_score": security_score,
"issue_count": len(all_issues)
}
if __name__ == "__main__":
detector = RuleBasedCodeDetector()
test_code = """
def save_data(user_input):
api_key = "sk_live_abcdef123456789"
eval(user_input)
query = f"SELECT * FROM logs WHERE msg = '{user_input}'"
"""
result = detector.analyze(test_code)
print(f"--- Security Report ---")
print(f"Score: {result['security_score']:.1f}/100")
for issue in result['issues']:
print(f"[{issue['severity']}] Line {issue['line']}: {issue['message']} ({issue['detector']})")