| | """
|
| | BOINC Client Integration
|
| | Handles distributed computing task submission and monitoring
|
| | """
|
| |
|
| | import os
|
| | import json
|
| | import time
|
| | import requests
|
| | from typing import Dict, List, Optional
|
| | from pathlib import Path
|
| | from dataclasses import dataclass, asdict
|
| | from datetime import datetime
|
| | import yaml
|
| |
|
| | @dataclass
|
| | class WorkUnit:
|
| | """Represents a BOINC work unit"""
|
| | id: str
|
| | name: str
|
| | workunit_type: str
|
| | input_file: str
|
| | status: str
|
| | created_at: str
|
| | completed_at: Optional[str] = None
|
| | result_file: Optional[str] = None
|
| | error: Optional[str] = None
|
| |
|
| | class BOINCClient:
|
| | """BOINC client for distributed computing integration"""
|
| |
|
| | def __init__(self, config_path: str = "config.yml"):
|
| | with open(config_path, 'r') as f:
|
| | self.config = yaml.safe_load(f)['boinc']
|
| |
|
| | self.project_url = self.config['project_url']
|
| | self.work_dir = Path(self.config['work_dir'])
|
| | self.work_dir.mkdir(parents=True, exist_ok=True)
|
| |
|
| | self.tasks_file = self.work_dir / "tasks.json"
|
| | self.tasks = self._load_tasks()
|
| |
|
| | def _load_tasks(self) -> Dict[str, WorkUnit]:
|
| | """Load existing tasks from disk"""
|
| | if self.tasks_file.exists():
|
| | with open(self.tasks_file, 'r') as f:
|
| | data = json.load(f)
|
| | return {k: WorkUnit(**v) for k, v in data.items()}
|
| | return {}
|
| |
|
| | def _save_tasks(self):
|
| | """Save tasks to disk"""
|
| | with open(self.tasks_file, 'w') as f:
|
| | data = {k: asdict(v) for k, v in self.tasks.items()}
|
| | json.dump(data, f, indent=2)
|
| |
|
| | def submit_task(
|
| | self,
|
| | workunit_type: str,
|
| | input_file: str,
|
| | name: Optional[str] = None
|
| | ) -> str:
|
| | """
|
| | Submit a new work unit to BOINC
|
| |
|
| | Args:
|
| | workunit_type: Type of analysis (variant_calling, blast_search, etc.)
|
| | input_file: Path to input data file
|
| | name: Optional custom name for the work unit
|
| |
|
| | Returns:
|
| | Work unit ID
|
| | """
|
| | task_id = f"wu_{int(time.time() * 1000)}"
|
| |
|
| | if name is None:
|
| | name = f"{workunit_type}_{task_id}"
|
| |
|
| |
|
| | work_unit = WorkUnit(
|
| | id=task_id,
|
| | name=name,
|
| | workunit_type=workunit_type,
|
| | input_file=input_file,
|
| | status="pending",
|
| | created_at=datetime.now().isoformat()
|
| | )
|
| |
|
| |
|
| |
|
| | self._simulate_submission(work_unit)
|
| |
|
| | self.tasks[task_id] = work_unit
|
| | self._save_tasks()
|
| |
|
| | return task_id
|
| |
|
| | def _simulate_submission(self, work_unit: WorkUnit):
|
| | """
|
| | Simulate BOINC submission (for development/demo purposes)
|
| | In production, replace with actual BOINC API calls
|
| | """
|
| |
|
| | task_dir = self.work_dir / work_unit.id
|
| | task_dir.mkdir(exist_ok=True)
|
| |
|
| |
|
| | input_path = Path(work_unit.input_file)
|
| | if input_path.exists():
|
| | import shutil
|
| | shutil.copy(input_path, task_dir / input_path.name)
|
| |
|
| |
|
| | metadata = {
|
| | "task_id": work_unit.id,
|
| | "type": work_unit.workunit_type,
|
| | "input": work_unit.input_file,
|
| | "submitted": work_unit.created_at
|
| | }
|
| |
|
| | with open(task_dir / "metadata.json", 'w') as f:
|
| | json.dump(metadata, f, indent=2)
|
| |
|
| | def get_task_status(self, task_id: str) -> Optional[WorkUnit]:
|
| | """Get status of a specific task"""
|
| | return self.tasks.get(task_id)
|
| |
|
| | def list_tasks(
|
| | self,
|
| | status: Optional[str] = None,
|
| | workunit_type: Optional[str] = None
|
| | ) -> List[WorkUnit]:
|
| | """
|
| | List all tasks with optional filtering
|
| |
|
| | Args:
|
| | status: Filter by status (pending, running, completed, failed)
|
| | workunit_type: Filter by work unit type
|
| | """
|
| | tasks = list(self.tasks.values())
|
| |
|
| | if status:
|
| | tasks = [t for t in tasks if t.status == status]
|
| |
|
| | if workunit_type:
|
| | tasks = [t for t in tasks if t.workunit_type == workunit_type]
|
| |
|
| | return sorted(tasks, key=lambda t: t.created_at, reverse=True)
|
| |
|
| | def update_task_status(self, task_id: str, status: str, **kwargs):
|
| | """Update task status and additional fields"""
|
| | if task_id in self.tasks:
|
| | self.tasks[task_id].status = status
|
| |
|
| | for key, value in kwargs.items():
|
| | if hasattr(self.tasks[task_id], key):
|
| | setattr(self.tasks[task_id], key, value)
|
| |
|
| | if status == "completed":
|
| | self.tasks[task_id].completed_at = datetime.now().isoformat()
|
| |
|
| | self._save_tasks()
|
| |
|
| | def cancel_task(self, task_id: str) -> bool:
|
| | """Cancel a pending or running task"""
|
| | if task_id in self.tasks:
|
| | task = self.tasks[task_id]
|
| | if task.status in ["pending", "running"]:
|
| | task.status = "cancelled"
|
| | self._save_tasks()
|
| | return True
|
| | return False
|
| |
|
| | def get_results(self, task_id: str) -> Optional[Path]:
|
| | """Get results file for a completed task"""
|
| | if task_id in self.tasks:
|
| | task = self.tasks[task_id]
|
| | if task.status == "completed" and task.result_file:
|
| | result_path = Path(task.result_file)
|
| | if result_path.exists():
|
| | return result_path
|
| | return None
|
| |
|
| | def get_statistics(self) -> Dict:
|
| | """Get overall statistics about BOINC tasks"""
|
| | total = len(self.tasks)
|
| | by_status = {}
|
| | by_type = {}
|
| |
|
| | for task in self.tasks.values():
|
| | by_status[task.status] = by_status.get(task.status, 0) + 1
|
| | by_type[task.workunit_type] = by_type.get(task.workunit_type, 0) + 1
|
| |
|
| | completed = [t for t in self.tasks.values() if t.completed_at]
|
| |
|
| | if completed:
|
| | avg_time = sum([
|
| | (datetime.fromisoformat(t.completed_at) -
|
| | datetime.fromisoformat(t.created_at)).total_seconds()
|
| | for t in completed
|
| | ]) / len(completed)
|
| | else:
|
| | avg_time = 0
|
| |
|
| | return {
|
| | "total_tasks": total,
|
| | "by_status": by_status,
|
| | "by_type": by_type,
|
| | "completed_tasks": len(completed),
|
| | "average_completion_time_seconds": avg_time
|
| | }
|
| |
|
| |
|
| | class BOINCTaskManager:
|
| | """High-level task manager for common workflows"""
|
| |
|
| | def __init__(self):
|
| | self.client = BOINCClient()
|
| |
|
| | def submit_variant_calling(self, fastq_file: str) -> str:
|
| | """Submit variant calling task"""
|
| | return self.client.submit_task(
|
| | workunit_type="variant_calling",
|
| | input_file=fastq_file,
|
| | name=f"variant_calling_{Path(fastq_file).stem}"
|
| | )
|
| |
|
| | def submit_blast_search(self, sequence_file: str) -> str:
|
| | """Submit BLAST search task"""
|
| | return self.client.submit_task(
|
| | workunit_type="blast_search",
|
| | input_file=sequence_file,
|
| | name=f"blast_{Path(sequence_file).stem}"
|
| | )
|
| |
|
| | def submit_alignment(self, fastq_file: str) -> str:
|
| | """Submit sequence alignment task"""
|
| | return self.client.submit_task(
|
| | workunit_type="alignment",
|
| | input_file=fastq_file,
|
| | name=f"alignment_{Path(fastq_file).stem}"
|
| | )
|
| |
|
| | def submit_annotation(self, vcf_file: str) -> str:
|
| | """Submit variant annotation task"""
|
| | return self.client.submit_task(
|
| | workunit_type="annotation",
|
| | input_file=vcf_file,
|
| | name=f"annotation_{Path(vcf_file).stem}"
|
| | )
|
| |
|
| | def batch_submit(
|
| | self,
|
| | workunit_type: str,
|
| | input_files: List[str]
|
| | ) -> List[str]:
|
| | """Submit multiple tasks at once"""
|
| | task_ids = []
|
| | for input_file in input_files:
|
| | task_id = self.client.submit_task(workunit_type, input_file)
|
| | task_ids.append(task_id)
|
| | return task_ids
|
| |
|