Spaces:
Sleeping
Sleeping
| import sys | |
| import ast | |
| import logging | |
| import traceback | |
| from pathlib import Path | |
| from mcp.server.fastmcp import FastMCP | |
| import json | |
| from services.code_generation_service import CodeGenerator | |
| from services.project_service import ProjectAnalyzer | |
| from services.refactoring_service import RefactoringAdvisor | |
| import os | |
| from dotenv import load_dotenv | |
| from services.sequence_service import ProjectSequenceAnalyzer, SequenceDiagramService | |
| from services.usecase_service import UseCaseDiagramService | |
| load_dotenv() | |
| # --- π‘οΈ PROTOCOL PROTECTION & LOGGING SETUP π‘οΈ --- | |
| os.environ.setdefault("MODAL_TOKEN_ID", os.getenv("MODAL_TOKEN_ID", "")) | |
| os.environ.setdefault("MODAL_TOKEN_SECRET", os.getenv("MODAL_TOKEN_SECRET", "")) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| stream=sys.stderr, | |
| format='%(message)s' | |
| ) | |
| for lib in ["httpx", "httpcore", "asyncio"]: | |
| logging.getLogger(lib).setLevel(logging.WARNING) | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, str(Path(__file__).parent)) | |
| # 1. Initialize MCP Server | |
| mcp = FastMCP("ArchitectAI", dependencies=["langchain-openai", "langchain-core"]) | |
| # 2. Import Core Logic | |
| try: | |
| from core.llm_factory import create_sambanova_llm, create_nebius_llm, create_openai_llm, create_gemini_llm | |
| from services.filesystem_service import FileSystemVisitor, TreeFormatter | |
| from services.architecture_service import ( | |
| ArchitectureVisitor, FastTypeEnricher, DeterministicPlantUMLConverter | |
| ) | |
| logging.info("β All services imported successfully") | |
| except ImportError as e: | |
| logging.error(f"β Critical Import Error: {e}") | |
| raise | |
| try: | |
| import modal | |
| modal_function = modal.Function.from_name("architect-ai-surgeon", "safe_refactor_and_test") | |
| logging.info("βοΈ Modal Function imported successfully.") | |
| except ImportError: | |
| logging.warning("β οΈ Could not import modal_executor function. Cloud features disabled.") | |
| modal_function = None | |
| # --- π‘οΈ SANDBOX CONFIGURATION π‘οΈ --- | |
| def _find_project_root(start_path: Path) -> Path: | |
| """ | |
| Searches upwards for a marker that indicates the project root | |
| (like .git, pyproject.toml, or requirements.txt). | |
| Falls back to CWD if nothing is found. | |
| """ | |
| current = start_path.resolve() | |
| for parent in [current] + list(current.parents): | |
| # Check for common project markers | |
| if (parent / ".git").exists() or \ | |
| (parent / "pyproject.toml").exists() or \ | |
| (parent / "requirements.txt").exists(): | |
| return parent | |
| # Fallback: If no marker found, use the Current Working Directory (where you ran the command) | |
| return Path.cwd().resolve() | |
| # 1. Start searching from the location of this script | |
| _script_location = Path(__file__).parent | |
| # 2. Determine the actual project root dynamically | |
| SANDBOX_ROOT = _find_project_root(_script_location) | |
| logging.info(f"π Project Root Detected at: {SANDBOX_ROOT}") | |
| BLOCKED_PATHS = { | |
| SANDBOX_ROOT / ".venv", | |
| SANDBOX_ROOT / "venv", | |
| SANDBOX_ROOT / ".git", | |
| SANDBOX_ROOT / ".env", | |
| SANDBOX_ROOT / "__pycache__", | |
| } | |
| # --- π§ SINGLETON LLM CLIENT π§ --- | |
| class LLMClientSingleton: | |
| """ | |
| Singleton pattern for LLM client. | |
| Ensures only ONE instance of the LLM client is created and reused. | |
| """ | |
| _instance = None | |
| _llm_client = None | |
| _current_provider = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| logging.info("ποΈ Creating LLM Singleton instance...") | |
| return cls._instance | |
| def get_client(self, preferred_provider: str = "sambanova", temperature: float = 0.0): | |
| """ | |
| Get or initialize the LLM client. | |
| Only creates a new client if provider changes. | |
| """ | |
| # Return cached client if provider hasn't changed | |
| if self._llm_client is not None and self._current_provider == preferred_provider: | |
| logging.debug(f"β»οΈ Reusing cached {preferred_provider} client") | |
| return self._llm_client | |
| # Create new client if provider changed | |
| logging.info(f"π Initializing {preferred_provider} LLM client...") | |
| strategies = [ | |
| ("sambanova", create_sambanova_llm), | |
| ("gemini", create_gemini_llm), | |
| ("nebius", create_nebius_llm), | |
| ("openai", create_openai_llm) | |
| ] | |
| if preferred_provider == "gemini": | |
| strategies = [ | |
| ("gemini", create_gemini_llm), | |
| ("sambanova", create_sambanova_llm), | |
| ("nebius", create_nebius_llm), | |
| ("openai", create_openai_llm) | |
| ] | |
| elif preferred_provider == "nebius": | |
| strategies = [ | |
| ("nebius", create_nebius_llm), | |
| ("gemini", create_gemini_llm), | |
| ("openai", create_openai_llm), | |
| ("sambanova", create_sambanova_llm) | |
| ] | |
| elif preferred_provider == "openai": | |
| strategies = [ | |
| ("openai", create_openai_llm), | |
| ("gemini", create_gemini_llm), | |
| ("nebius", create_nebius_llm), | |
| ("sambanova", create_sambanova_llm) | |
| ] | |
| for name, factory in strategies: | |
| try: | |
| logging.info(f"π Attempting to connect to {name}...") | |
| self._llm_client = factory(temperature=temperature) | |
| self._current_provider = name | |
| logging.info(f"β Connected to {name} (cached for reuse)") | |
| return self._llm_client | |
| except Exception as e: | |
| logging.warning(f"β οΈ {name} failed: {str(e)[:100]}...") | |
| logging.error("β No LLM provider available!") | |
| self._llm_client = None | |
| self._current_provider = None | |
| return None | |
| def reset(self): | |
| """Force reset the cached client (useful for testing).""" | |
| logging.info("π Resetting LLM Singleton...") | |
| self._llm_client = None | |
| self._current_provider = None | |
| # Global singleton instance | |
| _llm_singleton = LLMClientSingleton() | |
| def _validate_path(user_path: str, operation: str = "read") -> tuple[bool, Path, str]: | |
| """ | |
| Validates that a user-provided path is within the sandbox. | |
| Returns: | |
| (is_valid, resolved_path, error_message) | |
| """ | |
| try: | |
| requested = Path(user_path).resolve() | |
| sandbox_root = SANDBOX_ROOT.resolve() | |
| try: | |
| requested.relative_to(sandbox_root) | |
| except ValueError: | |
| return False, requested, f"β Access Denied: Path '{user_path}' is outside project directory" | |
| if requested in BLOCKED_PATHS or any(requested.is_relative_to(bp) for bp in BLOCKED_PATHS if bp.exists()): | |
| return False, requested, f"β Access Denied: Cannot access '{requested.name}' (protected directory)" | |
| if not requested.exists(): | |
| return False, requested, f"β Path not found: '{user_path}'" | |
| if operation == "read": | |
| if requested.is_dir(): | |
| return False, requested, f"β '{user_path}' is a directory. Use list_project_structure() instead" | |
| if not requested.is_file(): | |
| return False, requested, f"β '{user_path}' is not a regular file" | |
| elif operation == "list": | |
| if not requested.is_dir(): | |
| return False, requested, f"β '{user_path}' is not a directory" | |
| logging.debug(f"β Path validation passed: {requested}") | |
| return True, requested, "" | |
| except Exception as e: | |
| return False, Path(user_path), f"β Path validation error: {str(e)}" | |
| # --- MCP TOOLS --- | |
| def generate_architecture_diagram(code: str, enrich: bool = True, provider: str = "sambanova") -> str: | |
| """ | |
| Analyzes Python code and generates a PlantUML Class Diagram. | |
| Uses singleton LLM client for efficiency. | |
| """ | |
| try: | |
| # 1. Static Analysis (Fast & Deterministic) | |
| try: | |
| tree = ast.parse(code) | |
| visitor = ArchitectureVisitor() | |
| visitor.visit(tree) | |
| logging.info(f"β Static analysis: {len(visitor.structure)} classes found") | |
| except SyntaxError as se: | |
| return f"β Syntax Error in code: {se}" | |
| # 2. AI Enrichment (Optional & Hybrid) - Using Singleton | |
| if enrich and visitor.structure: | |
| # π§ GET SINGLETON CLIENT | |
| llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) | |
| if llm: | |
| try: | |
| enricher = FastTypeEnricher(llm) | |
| visitor.structure = enricher.enrich(code, visitor.structure) | |
| logging.info("β Type enrichment complete") | |
| except Exception as e: | |
| logging.error(f"β οΈ Enrichment failed (skipping): {e}") | |
| else: | |
| logging.warning("β οΈ Skipping enrichment: No LLM provider available") | |
| # 3. Visualization (Deterministic) | |
| converter = DeterministicPlantUMLConverter() | |
| result = converter.convert(visitor.structure) | |
| logging.info("β Diagram generated successfully") | |
| return result | |
| except Exception as e: | |
| logging.error(f"β Unexpected error: {e}") | |
| traceback.print_exc(file=sys.stderr) | |
| return f"β Processing failed: {str(e)}" | |
| def list_project_structure(path: str = ".", style: str = "tree") -> str: | |
| """ | |
| Deterministically maps the folder structure. | |
| Args: | |
| path: The root directory to list. | |
| style: 'tree' for visual string, 'json' for structured data (easier for coding). | |
| π‘οΈ Sandboxed: Can only list directories within the project. | |
| """ | |
| try: | |
| # π‘οΈ SANDBOX CHECK | |
| is_valid, resolved_path, error_msg = _validate_path(path, operation="list") | |
| if not is_valid: | |
| logging.warning(f"π« Sandbox violation attempt: {error_msg}") | |
| return error_msg | |
| # 1. Get Raw Data (Dictionary) | |
| visitor = FileSystemVisitor() | |
| tree_data = visitor.visit(str(resolved_path)) | |
| # 2. Format Output based on Style | |
| if style == "json": | |
| result = json.dumps(tree_data, indent=2) | |
| else: | |
| formatter = TreeFormatter() | |
| result = formatter.format(tree_data) | |
| logging.info(f"β Listed directory: {resolved_path} (style={style})") | |
| return result | |
| except Exception as e: | |
| logging.error(f"β Directory listing failed: {e}") | |
| return f"Error listing directory: {e}" | |
| def read_file(path: str) -> str: | |
| """ | |
| Reads the full content of a specific file. | |
| Use this to inspect code before generating diagrams. | |
| π‘οΈ Sandboxed: Can only read files within the project. | |
| """ | |
| try: | |
| is_valid, resolved_path, error_msg = _validate_path(path, operation="read") | |
| if not is_valid: | |
| logging.warning(f"π« Sandbox violation attempt: {error_msg}") | |
| return error_msg | |
| content = resolved_path.read_text(encoding='utf-8', errors='replace') | |
| logging.info(f"β Read file: {resolved_path.name} ({len(content)} chars)") | |
| return content | |
| except Exception as e: | |
| logging.error(f"β Read failed: {e}") | |
| return f"β Error reading file: {str(e)}" | |
| def generate_full_project_diagram(path: str = ".", enrich: bool = False, provider: str = "sambanova") -> str: | |
| """ | |
| Analyzes the ENTIRE project directory and generates a massive Class Diagram. | |
| Args: | |
| path: Root directory to analyze. | |
| enrich: If True, uses AI to infer types (Can be slow for big projects). | |
| """ | |
| try: | |
| # 1. Security Check | |
| is_valid, resolved_path, error_msg = _validate_path(path, operation="list") | |
| if not is_valid: | |
| return error_msg | |
| # 2. Analyze Project (The Aggregation) | |
| analyzer = ProjectAnalyzer(resolved_path) | |
| full_structure = analyzer.analyze() | |
| if not full_structure: | |
| return "β οΈ No Python code found in this directory." | |
| # 3. AI Enrichment (Optional Batch Processing) | |
| # Warning: For huge projects, we might want to limit this or do it in chunks. | |
| if enrich: | |
| # We construct a "Virtual" code context combining all files? | |
| # OR we just skip enrichment for the full map to save tokens. | |
| # For now, let's SKIP enrichment for the full view to be fast. | |
| logging.info("βΉοΈ Skipping AI enrichment for full project to ensure speed.") | |
| pass | |
| # 4. Convert to PlantUML | |
| converter = DeterministicPlantUMLConverter() | |
| puml_code = converter.convert(full_structure) | |
| logging.info(f"β Generated full project diagram ({len(full_structure)} items)") | |
| return puml_code | |
| except Exception as e: | |
| logging.error(f"β Project analysis failed: {e}") | |
| return f"Error analyzing project: {str(e)}" | |
| def propose_architecture_refactoring(path: str = ".") -> str: | |
| """ | |
| Analyzes the project and returns a JSON proposal with 'proposed_uml'. | |
| Use this to visualize changes BEFORE applying them. | |
| """ | |
| try: | |
| # 1. Scan Project | |
| is_valid, resolved_path, error_msg = _validate_path(path, operation="list") | |
| if not is_valid: return error_msg | |
| analyzer = ProjectAnalyzer(resolved_path) | |
| structure = analyzer.analyze() | |
| if not structure: return "Error: No code found." | |
| # 2. Generate Proposal | |
| advisor = RefactoringAdvisor() | |
| proposal = advisor.propose_improvement(structure) | |
| # Return as pretty string for the Agent/User to read | |
| return json.dumps(proposal, indent=2) | |
| except Exception as e: | |
| return f"Error generating proposal: {e}" | |
| def apply_refactoring(file_path: str, instruction: str) -> str: | |
| """ | |
| APPLIES CHANGES TO THE FILE. | |
| WARNING: This overwrites the file. Use only on 'ai_architect - Copy'. | |
| Args: | |
| file_path: Relative path (e.g., 'core/llm_factory.py'). | |
| instruction: What to do (e.g., 'Refactor to Singleton pattern'). | |
| """ | |
| try: | |
| # 1. Security Check | |
| target_file = SANDBOX_ROOT / file_path | |
| if not target_file.exists(): | |
| return f"β File not found: {file_path}" | |
| original_code = target_file.read_text(encoding='utf-8') | |
| generator = CodeGenerator() | |
| new_code = generator.generate_refactored_code(original_code, instruction, file_path) | |
| saved_path = generator.save_code(file_path, new_code, SANDBOX_ROOT) | |
| return f"β File updated successfully: {saved_path}" | |
| except Exception as e: | |
| return f"β Error: {e}" | |
| def apply_refactoring_safely( | |
| file_path: str, | |
| instruction: str, | |
| test_file: str = None | |
| ) -> str: | |
| """ | |
| Safely refactors code with automatic testing. | |
| Uses Modal sandbox if available, falls back to local LLM if not. | |
| Args: | |
| file_path: Path to file to refactor | |
| instruction: Refactoring instructions | |
| test_file: Optional path to test file | |
| """ | |
| try: | |
| # 1. Read original code | |
| target = SANDBOX_ROOT / file_path | |
| if not target.exists(): | |
| return f"β File not found: {file_path}" | |
| original_code = target.read_text(encoding='utf-8') | |
| # 2. Read tests if provided | |
| test_code = None | |
| if test_file: | |
| test_path = SANDBOX_ROOT / test_file | |
| if test_path.exists(): | |
| test_code = test_path.read_text(encoding='utf-8') | |
| # 3. Try Modal sandbox first | |
| if modal_function is not None: | |
| try: | |
| logging.info("π Sending to Modal sandbox...") | |
| result = modal_function.remote( | |
| original_code, | |
| instruction, | |
| test_code | |
| ) | |
| # 4. Check results | |
| if not result["success"]: | |
| return f"β Refactoring failed: {result['error']}" | |
| if not result["test_results"]["passed"]: | |
| return f""" | |
| β οΈ Refactoring succeeded but tests FAILED: | |
| {result['test_results']['output']} | |
| Code was NOT saved. Review and fix tests first. | |
| """ | |
| # 5. Save only if tests passed | |
| target.write_text(result["new_code"], encoding='utf-8') | |
| return f""" | |
| β Refactoring completed successfully in Modal sandbox! | |
| π Tests: PASSED β | |
| πΎ File saved: {file_path} | |
| Test output: | |
| {result['test_results']['output']} | |
| """ | |
| except Exception as modal_error: | |
| logging.warning(f"β οΈ Modal sandbox failed: {str(modal_error)[:100]}... Falling back to local refactoring.") | |
| # 4. Fallback: Local LLM refactoring (no testing) | |
| logging.info("π§ Using local LLM for refactoring (testing disabled)...") | |
| llm = _llm_singleton.get_client(preferred_provider="openai", temperature=0.0) | |
| if llm is None: | |
| return "β No LLM available for refactoring (Modal and local LLM both failed)" | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| system_msg = SystemMessage(content= | |
| "You are a Senior Python Refactoring Engineer. " | |
| "Rewrite the provided code to meet the user's architectural instructions " | |
| "while preserving the original business logic." | |
| ) | |
| user_msg = HumanMessage(content=f""" | |
| Refactor this Python code according to these instructions: | |
| **Instructions:** {instruction} | |
| **Original Code:** | |
| ```python | |
| {original_code} | |
| ``` | |
| Return ONLY the refactored code, no explanations. | |
| """) | |
| try: | |
| response = llm.invoke([system_msg, user_msg]) | |
| new_code = response.content | |
| # Clean up markdown if present | |
| if "```python" in new_code: | |
| new_code = new_code.split("```python")[1].split("```")[0].strip() | |
| elif "```" in new_code: | |
| new_code = new_code.split("```")[1].split("```")[0].strip() | |
| # Save the refactored code | |
| target.write_text(new_code, encoding='utf-8') | |
| return f""" | |
| β Refactoring completed successfully (Local LLM)! | |
| πΎ File saved: {file_path} | |
| β οΈ Note: Testing was skipped (Modal not available). Please verify manually. | |
| Refactored code preview: | |
| ```python | |
| {new_code[:500]}... | |
| ``` | |
| """ | |
| except Exception as llm_error: | |
| return f"β Local LLM refactoring failed: {str(llm_error)}" | |
| except Exception as e: | |
| logging.error(f"β Refactoring error: {e}") | |
| return f"β Error: {e}" | |
| def generate_usecase_diagram(code: str, enrich: bool = True, provider: str = "sambanova") -> str: | |
| """ | |
| Analyzes Python code and generates a PlantUML Use Case Diagram. | |
| Detects: | |
| - Service classes (patterns: *Service, *Controller, *Handler, *API) | |
| - FastAPI/Flask endpoints (@app.get, @router.post, etc.) | |
| - Public methods as potential use cases | |
| Args: | |
| code: Python source code to analyze | |
| enrich: If True, uses AI to infer actors and relationships | |
| provider: LLM provider ("sambanova", "nebius", "openai") | |
| Returns: | |
| PlantUML use case diagram string | |
| """ | |
| try: | |
| # Get LLM client (using singleton) | |
| llm = None | |
| if enrich: | |
| llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) | |
| # Generate diagram | |
| service = UseCaseDiagramService(llm=llm) | |
| result = service.generate(code, enrich=enrich) | |
| logging.info("β Use case diagram generated successfully") | |
| return result | |
| except Exception as e: | |
| logging.error(f"β Use case diagram failed: {e}") | |
| traceback.print_exc(file=sys.stderr) | |
| return f"' β Error: {str(e)}" | |
| def generate_full_project_usecase_diagram(path: str = ".", enrich: bool = True, provider: str = "sambanova") -> str: | |
| """ | |
| Analyzes the ENTIRE project directory and generates a Use Case Diagram. | |
| Scans all .py files for services, controllers, and endpoints. | |
| Args: | |
| path: Root directory to analyze | |
| enrich: If True, uses AI to infer actors | |
| provider: LLM provider | |
| Returns: | |
| PlantUML use case diagram string | |
| """ | |
| try: | |
| # Security check | |
| is_valid, resolved_path, error_msg = _validate_path(path, operation="list") | |
| if not is_valid: | |
| return error_msg | |
| # Collect all Python files | |
| combined_code = [] | |
| file_count = 0 | |
| for file_path in resolved_path.rglob("*.py"): | |
| parts = file_path.parts | |
| if any(p.startswith(".") or p in ["venv", "env", "__pycache__", "node_modules"] for p in parts): | |
| continue | |
| try: | |
| code = file_path.read_text(encoding='utf-8', errors='replace') | |
| combined_code.append(f"# === File: {file_path.name} ===\n{code}") | |
| file_count += 1 | |
| except Exception: | |
| continue | |
| if not combined_code: | |
| return "' β οΈ No Python files found in directory" | |
| # Get LLM client | |
| llm = None | |
| if enrich: | |
| llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) | |
| # Generate diagram from combined code | |
| service = UseCaseDiagramService(llm=llm) | |
| # Limit code size to avoid token limits | |
| full_code = "\n\n".join(combined_code)[:50000] | |
| result = service.generate(full_code, enrich=enrich) | |
| logging.info(f"β Project use case diagram generated ({file_count} files)") | |
| return result | |
| except Exception as e: | |
| logging.error(f"β Project use case diagram failed: {e}") | |
| return f"' β Error: {str(e)}" | |
| def generate_sequence_diagram( | |
| code: str, | |
| entry_method: str = None, | |
| enrich: bool = True, | |
| provider: str = "sambanova" | |
| ) -> str: | |
| """ | |
| Analyzes Python code and generates a PlantUML Sequence Diagram. | |
| Traces method calls through: | |
| - self.dependency.method() patterns | |
| - Control flow (if/else, loops, try/except) | |
| - Async operations | |
| Args: | |
| code: Python source code to analyze | |
| entry_method: Starting point (e.g., "OrderService.create_order") | |
| If None, auto-detects the method with most calls | |
| enrich: If True, uses AI to improve readability | |
| provider: LLM provider ("sambanova", "nebius", "openai") | |
| Returns: | |
| PlantUML sequence diagram string | |
| """ | |
| try: | |
| # Get LLM client | |
| llm = None | |
| if enrich: | |
| llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) | |
| # Generate diagram | |
| service = SequenceDiagramService(llm=llm) | |
| result = service.generate(code, entry_method=entry_method, enrich=enrich) | |
| logging.info(f"β Sequence diagram generated (entry: {entry_method or 'auto'})") | |
| return result | |
| except Exception as e: | |
| logging.error(f"β Sequence diagram failed: {e}") | |
| traceback.print_exc(file=sys.stderr) | |
| return f"' β Error: {str(e)}" | |
| def generate_full_project_sequence_diagram( | |
| path: str = ".", | |
| entry_method: str = None, | |
| enrich: bool = False, | |
| provider: str = "sambanova" | |
| ) -> str: | |
| """ | |
| Analyzes ENTIRE project and generates a cross-file Sequence Diagram. | |
| Scans all .py files and traces method calls across modules. | |
| Args: | |
| path: Root directory to analyze | |
| entry_method: Starting point (e.g., "OrderService.create_order") | |
| enrich: If True, uses AI to improve readability | |
| provider: LLM provider | |
| Returns: | |
| PlantUML sequence diagram string | |
| """ | |
| try: | |
| # Security check | |
| is_valid, resolved_path, error_msg = _validate_path(path, operation="list") | |
| if not is_valid: | |
| return error_msg | |
| # Collect all Python files | |
| file_contents = {} | |
| for file_path in resolved_path.rglob("*.py"): | |
| parts = file_path.parts | |
| if any(p.startswith(".") or p in ["venv", "env", "__pycache__", ".git"] for p in parts): | |
| continue | |
| try: | |
| file_contents[file_path.name] = file_path.read_text(encoding='utf-8', errors='replace') | |
| except Exception: | |
| continue | |
| if not file_contents: | |
| return "' β οΈ No Python files found in directory" | |
| # Get LLM client | |
| llm = None | |
| if enrich: | |
| llm = _llm_singleton.get_client(preferred_provider=provider, temperature=0.0) | |
| # Analyze project | |
| analyzer = ProjectSequenceAnalyzer(llm=llm) | |
| analyzer.analyze_files(file_contents) | |
| # Auto-detect entry if not provided | |
| if not entry_method: | |
| candidates = [ | |
| (m, len(c)) | |
| for m, c in analyzer.global_visitor.call_sequences.items() | |
| if c and not m.startswith('_') | |
| ] | |
| if candidates: | |
| candidates.sort(key=lambda x: x[1], reverse=True) | |
| entry_method = candidates[0][0] | |
| if not entry_method: | |
| # List available methods | |
| methods = list(analyzer.global_visitor.call_sequences.keys())[:10] | |
| return f"' β οΈ No entry method specified. Available: {methods}" | |
| result = analyzer.generate_diagram(entry_method, enrich=enrich) | |
| logging.info(f"β Project sequence diagram generated ({len(file_contents)} files)") | |
| return result | |
| except Exception as e: | |
| logging.error(f"β Project sequence diagram failed: {e}") | |
| return f"' β Error: {str(e)}" | |
| def list_sequence_entry_points(code: str) -> str: | |
| """ | |
| Lists all available methods that can be used as sequence diagram entry points. | |
| Use this to discover what methods exist before generating a sequence diagram. | |
| Args: | |
| code: Python source code to analyze | |
| Returns: | |
| JSON list of available entry methods with their call counts | |
| """ | |
| try: | |
| from services.sequence_service import CallGraphVisitor | |
| tree = ast.parse(code) | |
| visitor = CallGraphVisitor() | |
| visitor.visit(tree) | |
| # Build list with call counts | |
| entry_points = [] | |
| for method, calls in visitor.call_sequences.items(): | |
| if method.startswith('_'): | |
| continue | |
| entry_points.append({ | |
| 'method': method, | |
| 'calls_count': len(calls), | |
| 'calls_to': list(set( | |
| c['target_type'] for c in calls if c.get('target_type') | |
| ))[:5] | |
| }) | |
| # Sort by call count (most interesting methods first) | |
| entry_points.sort(key=lambda x: x['calls_count'], reverse=True) | |
| logging.info(f"β Found {len(entry_points)} potential entry points") | |
| return json.dumps({ | |
| 'total_methods': len(entry_points), | |
| 'entry_points': entry_points[:20] | |
| }, indent=2) | |
| except SyntaxError as se: | |
| return json.dumps({'error': f'Syntax error: {se}'}) | |
| except Exception as e: | |
| return json.dumps({'error': str(e)}) | |
| def list_project_entry_points(path: str = ".") -> str: | |
| """ | |
| Lists all available methods across the ENTIRE project for sequence diagrams. | |
| Args: | |
| path: Root directory to analyze | |
| Returns: | |
| JSON list of available entry methods with their call counts | |
| """ | |
| try: | |
| from services.sequence_service import CallGraphVisitor | |
| # Security check | |
| is_valid, resolved_path, error_msg = _validate_path(path, operation="list") | |
| if not is_valid: | |
| return json.dumps({'error': error_msg}) | |
| # Analyze all files | |
| global_visitor = CallGraphVisitor() | |
| file_count = 0 | |
| for file_path in resolved_path.rglob("*.py"): | |
| parts = file_path.parts | |
| if any(p.startswith(".") or p in ["venv", "env", "__pycache__"] for p in parts): | |
| continue | |
| try: | |
| code = file_path.read_text(encoding='utf-8', errors='replace') | |
| tree = ast.parse(code) | |
| file_visitor = CallGraphVisitor() | |
| file_visitor.visit(tree) | |
| # Merge | |
| global_visitor.classes.update(file_visitor.classes) | |
| for method, calls in file_visitor.call_sequences.items(): | |
| global_visitor.call_sequences[method].extend(calls) | |
| file_count += 1 | |
| except Exception: | |
| continue | |
| # Build entry points list | |
| entry_points = [] | |
| for method, calls in global_visitor.call_sequences.items(): | |
| if method.startswith('_') or '.visit_' in method: | |
| continue | |
| entry_points.append({ | |
| 'method': method, | |
| 'calls_count': len(calls), | |
| 'calls_to': list(set( | |
| c['target_type'] for c in calls if c.get('target_type') | |
| ))[:5] | |
| }) | |
| entry_points.sort(key=lambda x: x['calls_count'], reverse=True) | |
| logging.info(f"β Scanned {file_count} files, found {len(entry_points)} entry points") | |
| return json.dumps({ | |
| 'files_analyzed': file_count, | |
| 'total_methods': len(entry_points), | |
| 'entry_points': entry_points[:30] | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({'error': str(e)}) | |
| # --- ENTRY POINT --- | |
| if __name__ == "__main__": | |
| logging.info("π MCP Server 'ArchitectAI' starting...") | |
| logging.info(f"π Sandbox Root: {SANDBOX_ROOT}") | |
| logging.info(f"π« Protected Paths: {len(BLOCKED_PATHS)}") | |
| logging.info("π§ LLM Client: Singleton Pattern (created once, reused)") | |
| logging.info("π Available Diagram Types:") | |
| logging.info(" β’ Class Diagrams β generate_architecture_diagram()") | |
| logging.info(" β’ Use Case Diagrams β generate_usecase_diagram()") | |
| logging.info(" β’ Sequence Diagrams β generate_sequence_diagram()") | |
| logging.info("Available providers: sambanova, nebius, openai") | |
| try: | |
| mcp.run() | |
| except KeyboardInterrupt: | |
| logging.info("π Server stopped manually.") | |
| except Exception as e: | |
| logging.critical(f"β Fatal server error: {e}") | |
| sys.exit(1) |