CodeRAG / src /coderag /api /routes.py
Sebastiangmz's picture
Initial CodeRAG deploy
d557d77
"""REST API routes."""
import json
from datetime import datetime
from typing import Optional
from fastapi import APIRouter, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from coderag.api.schemas import (
IndexRepositoryRequest,
IndexRepositoryResponse,
QueryRequest,
QueryResponse,
ListRepositoriesResponse,
RepositoryInfo,
CitationResponse,
RetrievedChunkResponse,
ErrorResponse,
)
from coderag.config import get_settings
from coderag.generation.generator import ResponseGenerator
from coderag.indexing.embeddings import EmbeddingGenerator
from coderag.indexing.vectorstore import VectorStore
from coderag.ingestion.chunker import CodeChunker
from coderag.ingestion.filter import FileFilter
from coderag.ingestion.loader import RepositoryLoader
from coderag.ingestion.validator import GitHubURLValidator, ValidationError
from coderag.logging import get_logger
from coderag.models.document import Document
from coderag.models.query import Query as QueryModel
from coderag.models.repository import Repository, RepositoryStatus
logger = get_logger(__name__)
router = APIRouter()
# Global state (in production, use a proper database)
settings = get_settings()
repos_file = settings.data_dir / "repositories.json"
repositories: dict[str, Repository] = {}
def load_repositories() -> None:
"""Load repositories from disk."""
global repositories
if repos_file.exists():
try:
data = json.loads(repos_file.read_text())
repositories = {r["id"]: Repository.from_dict(r) for r in data}
except Exception as e:
logger.error("Failed to load repositories", error=str(e))
def save_repositories() -> None:
"""Save repositories to disk."""
repos_file.parent.mkdir(parents=True, exist_ok=True)
data = [r.to_dict() for r in repositories.values()]
repos_file.write_text(json.dumps(data, indent=2))
# Load on startup
load_repositories()
async def index_repository_task(
url: str,
repo_id: str,
branch: Optional[str],
include_patterns: Optional[list[str]],
exclude_patterns: Optional[list[str]],
) -> None:
"""Background task to index a repository."""
repo = repositories[repo_id]
try:
# Validate and clone
validator = GitHubURLValidator()
repo_info = await validator.validate_repository(url)
branch = branch or repo_info.branch or "main"
loader = RepositoryLoader()
repo_path = loader.clone_repository(repo_info, branch)
repo.clone_path = repo_path
repo.status = RepositoryStatus.INDEXING
save_repositories()
# Filter files
file_filter = FileFilter(
include_patterns=include_patterns,
exclude_patterns=exclude_patterns,
)
files = list(file_filter.filter_files(repo_path))
# Load documents
documents = []
for file_path in files:
try:
doc = Document.from_file(file_path, repo_path, repo.id)
documents.append(doc)
except Exception as e:
logger.warning("Failed to load file", path=str(file_path), error=str(e))
# Chunk
chunker = CodeChunker()
chunks = []
for doc in documents:
for chunk in chunker.chunk_document(doc):
chunks.append(chunk)
# Embed and store
if chunks:
vectorstore = VectorStore()
vectorstore.delete_repo_chunks(repo.id)
embedder = EmbeddingGenerator()
embedded_chunks = embedder.embed_chunks(chunks)
vectorstore.add_chunks(embedded_chunks)
# Update status
repo.chunk_count = len(chunks)
repo.indexed_at = datetime.now()
repo.status = RepositoryStatus.READY
save_repositories()
logger.info("Repository indexed", repo_id=repo_id, chunks=len(chunks))
except Exception as e:
logger.error("Indexing failed", repo_id=repo_id, error=str(e))
repo.status = RepositoryStatus.ERROR
repo.error_message = str(e)
save_repositories()
@router.post("/repos/index", response_model=IndexRepositoryResponse, status_code=202)
async def index_repository(
request: IndexRepositoryRequest,
background_tasks: BackgroundTasks,
) -> IndexRepositoryResponse:
"""Index a GitHub repository."""
# Create repository record
repo = Repository(
url=request.url,
branch=request.branch or "main",
status=RepositoryStatus.PENDING,
)
repositories[repo.id] = repo
save_repositories()
# Start background indexing
background_tasks.add_task(
index_repository_task,
request.url,
repo.id,
request.branch,
request.include_patterns,
request.exclude_patterns,
)
return IndexRepositoryResponse(
repo_id=repo.id,
status=repo.status.value,
message="Repository indexing started",
)
@router.post("/query", response_model=QueryResponse)
async def query_repository(request: QueryRequest) -> QueryResponse:
"""Query a repository."""
# Check repository exists
if request.repo_id not in repositories:
raise HTTPException(status_code=404, detail="Repository not found")
repo = repositories[request.repo_id]
if repo.status != RepositoryStatus.READY:
raise HTTPException(
status_code=400,
detail=f"Repository not ready (status: {repo.status.value})",
)
try:
# Generate response
generator = ResponseGenerator()
query = QueryModel(
question=request.question,
repo_id=request.repo_id,
top_k=request.top_k,
)
response = generator.generate(query)
# Convert to API schema
return QueryResponse(
answer=response.answer,
citations=[
CitationResponse(
file_path=c.file_path,
start_line=c.start_line,
end_line=c.end_line,
)
for c in response.citations
],
retrieved_chunks=[
RetrievedChunkResponse(
chunk_id=c.chunk_id,
file_path=c.file_path,
start_line=c.start_line,
end_line=c.end_line,
relevance_score=c.relevance_score,
chunk_type=c.chunk_type,
name=c.name,
content=c.content,
)
for c in response.retrieved_chunks
],
grounded=response.grounded,
query_id=response.query_id,
)
except Exception as e:
logger.error("Query failed", error=str(e))
raise HTTPException(status_code=500, detail=str(e))
@router.get("/repos", response_model=ListRepositoriesResponse)
async def list_repositories() -> ListRepositoriesResponse:
"""List all repositories."""
return ListRepositoriesResponse(
repositories=[
RepositoryInfo(
id=repo.id,
url=repo.url,
branch=repo.branch,
chunk_count=repo.chunk_count,
status=repo.status.value,
indexed_at=repo.indexed_at,
error_message=repo.error_message,
)
for repo in repositories.values()
]
)
@router.get("/repos/{repo_id}", response_model=RepositoryInfo)
async def get_repository(repo_id: str) -> RepositoryInfo:
"""Get repository details."""
if repo_id not in repositories:
raise HTTPException(status_code=404, detail="Repository not found")
repo = repositories[repo_id]
return RepositoryInfo(
id=repo.id,
url=repo.url,
branch=repo.branch,
chunk_count=repo.chunk_count,
status=repo.status.value,
indexed_at=repo.indexed_at,
error_message=repo.error_message,
)
@router.delete("/repos/{repo_id}")
async def delete_repository(repo_id: str) -> dict:
"""Delete a repository."""
if repo_id not in repositories:
raise HTTPException(status_code=404, detail="Repository not found")
repo = repositories[repo_id]
try:
# Delete from vector store
vectorstore = VectorStore()
vectorstore.delete_repo_chunks(repo_id)
# Delete from records
del repositories[repo_id]
save_repositories()
return {"message": f"Repository {repo.full_name} deleted"}
except Exception as e:
logger.error("Delete failed", error=str(e))
raise HTTPException(status_code=500, detail=str(e))