Spaces:
Sleeping
Sleeping
| import ast | |
| import re | |
| import astroid # Used for deep code analysis | |
| class RuleBasedCodeDetector: | |
| def __init__(self): | |
| # Using raw strings (r'') to ensure regex backslashes are handled correctly | |
| self.patterns = { | |
| 'sql_injection': [ | |
| r"f?[\"'][^\"']*(SELECT|INSERT|UPDATE|DELETE)[^\"']*\{\w+\}[^\"']*[\"']", | |
| r"execute\(f[\"']", | |
| r"cursor\.execute\(f[\"']", | |
| r"\%s.*\%\(.*\)" | |
| ], | |
| 'hardcoded_secret': [ | |
| r'(api[_-]?key|password|secret|token|auth)\s*=\s*[\"\'][^\"\']+[\"\']', | |
| r'(aws[_-]?(access[_-]?key|secret[_-]?key))\s*=\s*[\"\'][^\"\']+[\"\']', | |
| r'[\"\'][A-Za-z0-9]{32,}[\"\']', | |
| r'sk_[a-zA-Z0-9]{24,}' | |
| ], | |
| 'xss_vulnerability': [ | |
| r'f["\'][^"\']*\{\w+\}[^"\']*["\']', | |
| r'Flask\.render_template_string\([^)]+\+\s*\w+', | |
| r'\.format\([^)]*request\.' | |
| ], | |
| 'command_injection': [ | |
| r'os\.system\([^)]*\+\s*\w+', | |
| r'subprocess\.(run|call|Popen)\([^)]*\+\s*\w+', | |
| r'exec\([^)]*\w+' | |
| ], | |
| 'insecure_deserialization': [ | |
| r'pickle\.loads\([^)]*\w+', | |
| r'yaml\.load\([^)]*\w+', | |
| r'marshal\.loads\([^)]*\w+' | |
| ] | |
| } | |
| def detect_syntax_errors(self, code: str): | |
| """Detect Python syntax errors using built-in ast""" | |
| try: | |
| ast.parse(code) | |
| return [] | |
| except SyntaxError as e: | |
| return [{ | |
| "type": "syntax_error", | |
| "severity": "CRITICAL", | |
| "message": f"Syntax Error: {str(e)}", | |
| "line": e.lineno if hasattr(e, 'lineno') else 1, | |
| "fix": "Fix the syntax error before security analysis", | |
| "detector": "syntax_checker" | |
| }] | |
| def detect_by_patterns(self, code: str): | |
| """Detect vulnerabilities using regex patterns""" | |
| issues = [] | |
| lines = code.split('\n') | |
| for i, line in enumerate(lines, 1): | |
| for vuln_type, patterns in self.patterns.items(): | |
| for pattern in patterns: | |
| if re.search(pattern, line, re.IGNORECASE): | |
| issues.append({ | |
| "type": vuln_type, | |
| "severity": self._get_severity(vuln_type), | |
| "message": self._get_message(vuln_type), | |
| "line": i, | |
| "fix": self._get_fix(vuln_type), | |
| "detector": "rule_based" | |
| }) | |
| break | |
| return issues | |
| def detect_with_astroid(self, code: str): | |
| """Advanced detection using the Astroid library logic""" | |
| issues = [] | |
| try: | |
| # Astroid builds a more 'intelligent' tree than standard AST | |
| node = astroid.parse(code) | |
| # Example: Find all function calls to check for dangerous ones | |
| for call in node.nodes_of_class(astroid.nodes.Call): | |
| func_name = call.func.as_string() | |
| if func_name == 'eval': | |
| issues.append({ | |
| "type": "eval_usage", | |
| "severity": "CRITICAL", | |
| "message": "Dangerous eval() call detected via Astroid", | |
| "line": call.lineno, | |
| "fix": "Use ast.literal_eval() instead", | |
| "detector": "astroid_analysis" | |
| }) | |
| except Exception: | |
| pass | |
| return issues | |
| def detect_ast_patterns(self, code: str): | |
| """Detect complex issues using Python's Abstract Syntax Tree""" | |
| issues = [] | |
| try: | |
| tree = ast.parse(code) | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Call): | |
| if isinstance(node.func, ast.Name): | |
| if node.func.id in ['eval', 'exec']: | |
| issues.append({ | |
| "type": f"{node.func.id}_usage", | |
| "severity": "CRITICAL", | |
| "message": f"{node.func.id}() function detected", | |
| "line": node.lineno, | |
| "fix": "Remove or replace with safer alternatives", | |
| "detector": "ast_analysis" | |
| }) | |
| except SyntaxError: | |
| pass | |
| return issues | |
| def _get_severity(self, vuln_type: str) -> str: | |
| severities = { | |
| 'sql_injection': 'CRITICAL', | |
| 'hardcoded_secret': 'CRITICAL', | |
| 'command_injection': 'CRITICAL', | |
| 'insecure_deserialization': 'HIGH', | |
| 'xss_vulnerability': 'HIGH', | |
| 'eval_usage': 'CRITICAL', | |
| 'exec_usage': 'CRITICAL' | |
| } | |
| return severities.get(vuln_type, 'MEDIUM') | |
| def _get_message(self, vuln_type: str) -> str: | |
| messages = { | |
| 'sql_injection': 'Potential SQL injection vulnerability', | |
| 'hardcoded_secret': 'Hardcoded secret found in code', | |
| 'xss_vulnerability': 'Potential Cross-Site Scripting (XSS) vulnerability', | |
| 'command_injection': 'Potential command injection vulnerability', | |
| 'insecure_deserialization': 'Insecure deserialization detected', | |
| } | |
| return messages.get(vuln_type, 'Security risk detected') | |
| def _get_fix(self, vuln_type: str) -> str: | |
| fixes = { | |
| 'sql_injection': 'Use parameterized queries or ORM', | |
| 'hardcoded_secret': 'Use environment variables', | |
| 'xss_vulnerability': 'Escape user input', | |
| 'command_injection': 'Avoid string concatenation in system calls', | |
| 'insecure_deserialization': 'Use JSON for data exchange' | |
| } | |
| return fixes.get(vuln_type, 'Review security documentation') | |
| def analyze(self, code: str): | |
| all_issues = [] | |
| # 1. Check syntax | |
| syntax_issues = self.detect_syntax_errors(code) | |
| all_issues.extend(syntax_issues) | |
| if not syntax_issues: | |
| # 2. Pattern Matching | |
| all_issues.extend(self.detect_by_patterns(code)) | |
| # 3. Standard AST | |
| all_issues.extend(self.detect_ast_patterns(code)) | |
| # 4. Advanced Astroid analysis | |
| all_issues.extend(self.detect_with_astroid(code)) | |
| # Calculate security score | |
| severity_weights = {'CRITICAL': 1.0, 'HIGH': 0.7, 'MEDIUM': 0.4, 'LOW': 0.1} | |
| total_weight = sum(severity_weights.get(i['severity'], 0.1) for i in all_issues) | |
| security_score = max(0, 100 - (total_weight * 20)) | |
| return { | |
| "issues": all_issues, | |
| "security_score": security_score, | |
| "issue_count": len(all_issues) | |
| } | |
| if __name__ == "__main__": | |
| detector = RuleBasedCodeDetector() | |
| test_code = """ | |
| def save_data(user_input): | |
| api_key = "sk_live_abcdef123456789" | |
| eval(user_input) | |
| query = f"SELECT * FROM logs WHERE msg = '{user_input}'" | |
| """ | |
| result = detector.analyze(test_code) | |
| print(f"--- Security Report ---") | |
| print(f"Score: {result['security_score']:.1f}/100") | |
| for issue in result['issues']: | |
| print(f"[{issue['severity']}] Line {issue['line']}: {issue['message']} ({issue['detector']})") |