| | """
|
| | Neo4j Database Manager
|
| | Handle graph database connections and operations
|
| | """
|
| |
|
| | from neo4j import GraphDatabase
|
| | from typing import Dict, List, Optional, Any
|
| | import yaml
|
| | import logging
|
| |
|
| | logging.basicConfig(level=logging.INFO)
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | class DatabaseManager:
|
| | """Manage Neo4j database connections and schema"""
|
| |
|
| | def __init__(self, config_path: str = "config.yml"):
|
| | with open(config_path, 'r') as f:
|
| | self.config = yaml.safe_load(f)['neo4j']
|
| |
|
| | self.driver = GraphDatabase.driver(
|
| | self.config['uri'],
|
| | auth=(self.config['username'], self.config['password'])
|
| | )
|
| |
|
| | logger.info(f"Connected to Neo4j at {self.config['uri']}")
|
| |
|
| | def close(self):
|
| | """Close database connection"""
|
| | self.driver.close()
|
| |
|
| | def __enter__(self):
|
| | return self
|
| |
|
| | def __exit__(self, exc_type, exc_val, exc_tb):
|
| | self.close()
|
| |
|
| | def execute_query(self, query: str, parameters: Optional[Dict] = None) -> List[Dict]:
|
| | """Execute a Cypher query and return results"""
|
| | with self.driver.session() as session:
|
| | result = session.run(query, parameters or {})
|
| | return [record.data() for record in result]
|
| |
|
| | def initialize_schema(self):
|
| | """Initialize database schema with constraints and indexes"""
|
| | queries = [
|
| |
|
| | "CREATE CONSTRAINT gene_id IF NOT EXISTS FOR (g:Gene) REQUIRE g.gene_id IS UNIQUE",
|
| | "CREATE CONSTRAINT mutation_id IF NOT EXISTS FOR (m:Mutation) REQUIRE m.mutation_id IS UNIQUE",
|
| | "CREATE CONSTRAINT patient_id IF NOT EXISTS FOR (p:Patient) REQUIRE p.patient_id IS UNIQUE",
|
| | "CREATE CONSTRAINT cancer_type_id IF NOT EXISTS FOR (c:CancerType) REQUIRE c.cancer_type_id IS UNIQUE",
|
| |
|
| |
|
| | "CREATE INDEX gene_symbol IF NOT EXISTS FOR (g:Gene) ON (g.symbol)",
|
| | "CREATE INDEX mutation_position IF NOT EXISTS FOR (m:Mutation) ON (m.chromosome, m.position)",
|
| | "CREATE INDEX patient_project IF NOT EXISTS FOR (p:Patient) ON (p.project_id)",
|
| | ]
|
| |
|
| | with self.driver.session() as session:
|
| | for query in queries:
|
| | try:
|
| | session.run(query)
|
| | logger.info(f"Executed: {query[:50]}...")
|
| | except Exception as e:
|
| | logger.warning(f"Schema query failed (may already exist): {e}")
|
| |
|
| | logger.info("Database schema initialized")
|
| |
|
| | def clear_database(self):
|
| | """Clear all nodes and relationships (use with caution!)"""
|
| | query = "MATCH (n) DETACH DELETE n"
|
| | with self.driver.session() as session:
|
| | session.run(query)
|
| | logger.info("Database cleared")
|
| |
|
| |
|
| | class GeneRepository:
|
| | """Repository for Gene nodes"""
|
| |
|
| | def __init__(self, db_manager: DatabaseManager):
|
| | self.db = db_manager
|
| |
|
| | def create_gene(self, gene_data: Dict) -> Dict:
|
| | """Create a Gene node"""
|
| | query = """
|
| | MERGE (g:Gene {gene_id: $gene_id})
|
| | SET g.symbol = $symbol,
|
| | g.name = $name,
|
| | g.chromosome = $chromosome,
|
| | g.start_position = $start_position,
|
| | g.end_position = $end_position,
|
| | g.strand = $strand,
|
| | g.gene_type = $gene_type
|
| | RETURN g
|
| | """
|
| | result = self.db.execute_query(query, gene_data)
|
| | return result[0]['g'] if result else {}
|
| |
|
| | def get_gene_by_symbol(self, symbol: str) -> Optional[Dict]:
|
| | """Find gene by symbol"""
|
| | query = """
|
| | MATCH (g:Gene {symbol: $symbol})
|
| | RETURN g
|
| | """
|
| | result = self.db.execute_query(query, {'symbol': symbol})
|
| | return result[0]['g'] if result else None
|
| |
|
| | def get_gene_mutations(self, gene_id: str) -> List[Dict]:
|
| | """Get all mutations for a gene"""
|
| | query = """
|
| | MATCH (g:Gene {gene_id: $gene_id})<-[:AFFECTS]-(m:Mutation)
|
| | RETURN m
|
| | ORDER BY m.position
|
| | """
|
| | result = self.db.execute_query(query, {'gene_id': gene_id})
|
| | return [r['m'] for r in result]
|
| |
|
| |
|
| | class MutationRepository:
|
| | """Repository for Mutation nodes"""
|
| |
|
| | def __init__(self, db_manager: DatabaseManager):
|
| | self.db = db_manager
|
| |
|
| | def create_mutation(self, mutation_data: Dict, gene_id: str) -> Dict:
|
| | """Create a Mutation node and link to Gene"""
|
| | query = """
|
| | MATCH (g:Gene {gene_id: $gene_id})
|
| | MERGE (m:Mutation {mutation_id: $mutation_id})
|
| | SET m.chromosome = $chromosome,
|
| | m.position = $position,
|
| | m.reference = $reference,
|
| | m.alternate = $alternate,
|
| | m.consequence = $consequence,
|
| | m.variant_type = $variant_type,
|
| | m.quality = $quality
|
| | MERGE (m)-[:AFFECTS]->(g)
|
| | RETURN m
|
| | """
|
| | params = {**mutation_data, 'gene_id': gene_id}
|
| | result = self.db.execute_query(query, params)
|
| | return result[0]['m'] if result else {}
|
| |
|
| | def link_mutation_to_patient(self, mutation_id: str, patient_id: str, properties: Optional[Dict] = None):
|
| | """Create HAS_MUTATION relationship"""
|
| | query = """
|
| | MATCH (p:Patient {patient_id: $patient_id})
|
| | MATCH (m:Mutation {mutation_id: $mutation_id})
|
| | MERGE (p)-[r:HAS_MUTATION]->(m)
|
| | SET r.allele_frequency = $allele_frequency,
|
| | r.depth = $depth
|
| | RETURN r
|
| | """
|
| | params = {
|
| | 'patient_id': patient_id,
|
| | 'mutation_id': mutation_id,
|
| | 'allele_frequency': properties.get('allele_frequency', 0) if properties else 0,
|
| | 'depth': properties.get('depth', 0) if properties else 0
|
| | }
|
| | self.db.execute_query(query, params)
|
| |
|
| | def get_mutation_frequency(self, mutation_id: str) -> Dict:
|
| | """Calculate mutation frequency across patients"""
|
| | query = """
|
| | MATCH (m:Mutation {mutation_id: $mutation_id})
|
| | MATCH (p:Patient)-[:HAS_MUTATION]->(m)
|
| | OPTIONAL MATCH (all:Patient)
|
| | WITH m, count(DISTINCT p) as patients_with_mutation, count(DISTINCT all) as total_patients
|
| | RETURN m.mutation_id as mutation_id,
|
| | patients_with_mutation,
|
| | total_patients,
|
| | toFloat(patients_with_mutation) / total_patients as frequency
|
| | """
|
| | result = self.db.execute_query(query, {'mutation_id': mutation_id})
|
| | return result[0] if result else {}
|
| |
|
| |
|
| | class PatientRepository:
|
| | """Repository for Patient nodes"""
|
| |
|
| | def __init__(self, db_manager: DatabaseManager):
|
| | self.db = db_manager
|
| |
|
| | def create_patient(self, patient_data: Dict) -> Dict:
|
| | """Create a Patient node"""
|
| | query = """
|
| | MERGE (p:Patient {patient_id: $patient_id})
|
| | SET p.project_id = $project_id,
|
| | p.age = $age,
|
| | p.gender = $gender,
|
| | p.race = $race,
|
| | p.ethnicity = $ethnicity,
|
| | p.vital_status = $vital_status
|
| | RETURN p
|
| | """
|
| | result = self.db.execute_query(query, patient_data)
|
| | return result[0]['p'] if result else {}
|
| |
|
| | def link_patient_to_cancer_type(self, patient_id: str, cancer_type_id: str, properties: Optional[Dict] = None):
|
| | """Create DIAGNOSED_WITH relationship"""
|
| | query = """
|
| | MATCH (p:Patient {patient_id: $patient_id})
|
| | MATCH (c:CancerType {cancer_type_id: $cancer_type_id})
|
| | MERGE (p)-[r:DIAGNOSED_WITH]->(c)
|
| | SET r.stage = $stage,
|
| | r.grade = $grade,
|
| | r.diagnosis_date = $diagnosis_date
|
| | RETURN r
|
| | """
|
| | params = {
|
| | 'patient_id': patient_id,
|
| | 'cancer_type_id': cancer_type_id,
|
| | 'stage': properties.get('stage') if properties else None,
|
| | 'grade': properties.get('grade') if properties else None,
|
| | 'diagnosis_date': properties.get('diagnosis_date') if properties else None
|
| | }
|
| | self.db.execute_query(query, params)
|
| |
|
| | def get_patient_mutations(self, patient_id: str) -> List[Dict]:
|
| | """Get all mutations for a patient"""
|
| | query = """
|
| | MATCH (p:Patient {patient_id: $patient_id})-[r:HAS_MUTATION]->(m:Mutation)-[:AFFECTS]->(g:Gene)
|
| | RETURN m, g, r.allele_frequency as allele_frequency, r.depth as depth
|
| | ORDER BY g.symbol
|
| | """
|
| | result = self.db.execute_query(query, {'patient_id': patient_id})
|
| | return result
|
| |
|
| |
|
| | class CancerTypeRepository:
|
| | """Repository for CancerType nodes"""
|
| |
|
| | def __init__(self, db_manager: DatabaseManager):
|
| | self.db = db_manager
|
| |
|
| | def create_cancer_type(self, cancer_data: Dict) -> Dict:
|
| | """Create a CancerType node"""
|
| | query = """
|
| | MERGE (c:CancerType {cancer_type_id: $cancer_type_id})
|
| | SET c.name = $name,
|
| | c.tissue = $tissue,
|
| | c.disease_type = $disease_type
|
| | RETURN c
|
| | """
|
| | result = self.db.execute_query(query, cancer_data)
|
| | return result[0]['c'] if result else {}
|
| |
|
| | def get_common_mutations(self, cancer_type_id: str, limit: int = 10) -> List[Dict]:
|
| | """Get most common mutations for a cancer type"""
|
| | query = """
|
| | MATCH (c:CancerType {cancer_type_id: $cancer_type_id})<-[:DIAGNOSED_WITH]-(p:Patient)
|
| | MATCH (p)-[:HAS_MUTATION]->(m:Mutation)-[:AFFECTS]->(g:Gene)
|
| | WITH m, g, count(DISTINCT p) as patient_count
|
| | RETURN m, g, patient_count
|
| | ORDER BY patient_count DESC
|
| | LIMIT $limit
|
| | """
|
| | result = self.db.execute_query(query, {'cancer_type_id': cancer_type_id, 'limit': limit})
|
| | return result
|
| |
|
| | def get_statistics(self, cancer_type_id: str) -> Dict:
|
| | """Get statistics for a cancer type"""
|
| | query = """
|
| | MATCH (c:CancerType {cancer_type_id: $cancer_type_id})<-[:DIAGNOSED_WITH]-(p:Patient)
|
| | OPTIONAL MATCH (p)-[:HAS_MUTATION]->(m:Mutation)
|
| | WITH c, count(DISTINCT p) as total_patients, count(DISTINCT m) as total_mutations
|
| | RETURN c.name as cancer_type,
|
| | total_patients,
|
| | total_mutations,
|
| | CASE WHEN total_patients > 0
|
| | THEN toFloat(total_mutations) / total_patients
|
| | ELSE 0
|
| | END as avg_mutations_per_patient
|
| | """
|
| | result = self.db.execute_query(query, {'cancer_type_id': cancer_type_id})
|
| | return result[0] if result else {}
|
| |
|