"""REST API routes.""" import json from datetime import datetime from typing import Optional from fastapi import APIRouter, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from coderag.api.schemas import ( IndexRepositoryRequest, IndexRepositoryResponse, QueryRequest, QueryResponse, ListRepositoriesResponse, RepositoryInfo, CitationResponse, RetrievedChunkResponse, ErrorResponse, ) from coderag.config import get_settings from coderag.generation.generator import ResponseGenerator from coderag.indexing.embeddings import EmbeddingGenerator from coderag.indexing.vectorstore import VectorStore from coderag.ingestion.chunker import CodeChunker from coderag.ingestion.filter import FileFilter from coderag.ingestion.loader import RepositoryLoader from coderag.ingestion.validator import GitHubURLValidator, ValidationError from coderag.logging import get_logger from coderag.models.document import Document from coderag.models.query import Query as QueryModel from coderag.models.repository import Repository, RepositoryStatus logger = get_logger(__name__) router = APIRouter() # Global state (in production, use a proper database) settings = get_settings() repos_file = settings.data_dir / "repositories.json" repositories: dict[str, Repository] = {} def load_repositories() -> None: """Load repositories from disk.""" global repositories if repos_file.exists(): try: data = json.loads(repos_file.read_text()) repositories = {r["id"]: Repository.from_dict(r) for r in data} except Exception as e: logger.error("Failed to load repositories", error=str(e)) def save_repositories() -> None: """Save repositories to disk.""" repos_file.parent.mkdir(parents=True, exist_ok=True) data = [r.to_dict() for r in repositories.values()] repos_file.write_text(json.dumps(data, indent=2)) # Load on startup load_repositories() async def index_repository_task( url: str, repo_id: str, branch: Optional[str], include_patterns: Optional[list[str]], exclude_patterns: Optional[list[str]], ) -> None: """Background task to index a repository.""" repo = repositories[repo_id] try: # Validate and clone validator = GitHubURLValidator() repo_info = await validator.validate_repository(url) branch = branch or repo_info.branch or "main" loader = RepositoryLoader() repo_path = loader.clone_repository(repo_info, branch) repo.clone_path = repo_path repo.status = RepositoryStatus.INDEXING save_repositories() # Filter files file_filter = FileFilter( include_patterns=include_patterns, exclude_patterns=exclude_patterns, ) files = list(file_filter.filter_files(repo_path)) # Load documents documents = [] for file_path in files: try: doc = Document.from_file(file_path, repo_path, repo.id) documents.append(doc) except Exception as e: logger.warning("Failed to load file", path=str(file_path), error=str(e)) # Chunk chunker = CodeChunker() chunks = [] for doc in documents: for chunk in chunker.chunk_document(doc): chunks.append(chunk) # Embed and store if chunks: vectorstore = VectorStore() vectorstore.delete_repo_chunks(repo.id) embedder = EmbeddingGenerator() embedded_chunks = embedder.embed_chunks(chunks) vectorstore.add_chunks(embedded_chunks) # Update status repo.chunk_count = len(chunks) repo.indexed_at = datetime.now() repo.status = RepositoryStatus.READY save_repositories() logger.info("Repository indexed", repo_id=repo_id, chunks=len(chunks)) except Exception as e: logger.error("Indexing failed", repo_id=repo_id, error=str(e)) repo.status = RepositoryStatus.ERROR repo.error_message = str(e) save_repositories() @router.post("/repos/index", response_model=IndexRepositoryResponse, status_code=202) async def index_repository( request: IndexRepositoryRequest, background_tasks: BackgroundTasks, ) -> IndexRepositoryResponse: """Index a GitHub repository.""" # Create repository record repo = Repository( url=request.url, branch=request.branch or "main", status=RepositoryStatus.PENDING, ) repositories[repo.id] = repo save_repositories() # Start background indexing background_tasks.add_task( index_repository_task, request.url, repo.id, request.branch, request.include_patterns, request.exclude_patterns, ) return IndexRepositoryResponse( repo_id=repo.id, status=repo.status.value, message="Repository indexing started", ) @router.post("/query", response_model=QueryResponse) async def query_repository(request: QueryRequest) -> QueryResponse: """Query a repository.""" # Check repository exists if request.repo_id not in repositories: raise HTTPException(status_code=404, detail="Repository not found") repo = repositories[request.repo_id] if repo.status != RepositoryStatus.READY: raise HTTPException( status_code=400, detail=f"Repository not ready (status: {repo.status.value})", ) try: # Generate response generator = ResponseGenerator() query = QueryModel( question=request.question, repo_id=request.repo_id, top_k=request.top_k, ) response = generator.generate(query) # Convert to API schema return QueryResponse( answer=response.answer, citations=[ CitationResponse( file_path=c.file_path, start_line=c.start_line, end_line=c.end_line, ) for c in response.citations ], retrieved_chunks=[ RetrievedChunkResponse( chunk_id=c.chunk_id, file_path=c.file_path, start_line=c.start_line, end_line=c.end_line, relevance_score=c.relevance_score, chunk_type=c.chunk_type, name=c.name, content=c.content, ) for c in response.retrieved_chunks ], grounded=response.grounded, query_id=response.query_id, ) except Exception as e: logger.error("Query failed", error=str(e)) raise HTTPException(status_code=500, detail=str(e)) @router.get("/repos", response_model=ListRepositoriesResponse) async def list_repositories() -> ListRepositoriesResponse: """List all repositories.""" return ListRepositoriesResponse( repositories=[ RepositoryInfo( id=repo.id, url=repo.url, branch=repo.branch, chunk_count=repo.chunk_count, status=repo.status.value, indexed_at=repo.indexed_at, error_message=repo.error_message, ) for repo in repositories.values() ] ) @router.get("/repos/{repo_id}", response_model=RepositoryInfo) async def get_repository(repo_id: str) -> RepositoryInfo: """Get repository details.""" if repo_id not in repositories: raise HTTPException(status_code=404, detail="Repository not found") repo = repositories[repo_id] return RepositoryInfo( id=repo.id, url=repo.url, branch=repo.branch, chunk_count=repo.chunk_count, status=repo.status.value, indexed_at=repo.indexed_at, error_message=repo.error_message, ) @router.delete("/repos/{repo_id}") async def delete_repository(repo_id: str) -> dict: """Delete a repository.""" if repo_id not in repositories: raise HTTPException(status_code=404, detail="Repository not found") repo = repositories[repo_id] try: # Delete from vector store vectorstore = VectorStore() vectorstore.delete_repo_chunks(repo_id) # Delete from records del repositories[repo_id] save_repositories() return {"message": f"Repository {repo.full_name} deleted"} except Exception as e: logger.error("Delete failed", error=str(e)) raise HTTPException(status_code=500, detail=str(e))