| | """
|
| | FASTQ Processing Pipeline
|
| | Quality control and preprocessing of sequencing data
|
| | """
|
| |
|
| | from pathlib import Path
|
| | from typing import Dict, List, Optional
|
| | import yaml
|
| | import logging
|
| | from Bio import SeqIO
|
| | from Bio.SeqIO.QualityIO import FastqGeneralIterator
|
| |
|
| | logging.basicConfig(level=logging.INFO)
|
| | logger = logging.getLogger(__name__)
|
| |
|
| |
|
| | class FASTQProcessor:
|
| | """Process FASTQ sequencing files"""
|
| |
|
| | def __init__(self, config_path: str = "config.yml"):
|
| | with open(config_path, 'r') as f:
|
| | self.config = yaml.safe_load(f)['pipeline']['fastq']
|
| |
|
| | self.quality_threshold = self.config['quality_threshold']
|
| | self.min_length = self.config['min_length']
|
| | self.output_dir = Path(self.config['output_dir'])
|
| | self.output_dir.mkdir(parents=True, exist_ok=True)
|
| |
|
| | def quality_filter(
|
| | self,
|
| | input_file: Path,
|
| | output_file: Optional[Path] = None
|
| | ) -> Dict:
|
| | """
|
| | Filter FASTQ reads by quality score
|
| |
|
| | Args:
|
| | input_file: Input FASTQ file
|
| | output_file: Output filtered FASTQ file
|
| |
|
| | Returns:
|
| | Statistics dictionary
|
| | """
|
| | if output_file is None:
|
| | output_file = self.output_dir / f"{input_file.stem}_filtered.fastq"
|
| |
|
| | stats = {
|
| | 'total_reads': 0,
|
| | 'passed_reads': 0,
|
| | 'failed_reads': 0,
|
| | 'total_bases': 0,
|
| | 'passed_bases': 0
|
| | }
|
| |
|
| | try:
|
| | with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f:
|
| | for title, sequence, quality in FastqGeneralIterator(in_f):
|
| | stats['total_reads'] += 1
|
| | stats['total_bases'] += len(sequence)
|
| |
|
| |
|
| | quality_scores = [ord(q) - 33 for q in quality]
|
| | avg_quality = sum(quality_scores) / len(quality_scores)
|
| |
|
| |
|
| | if avg_quality >= self.quality_threshold and len(sequence) >= self.min_length:
|
| | out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n")
|
| | stats['passed_reads'] += 1
|
| | stats['passed_bases'] += len(sequence)
|
| | else:
|
| | stats['failed_reads'] += 1
|
| |
|
| | stats['pass_rate'] = stats['passed_reads'] / stats['total_reads'] if stats['total_reads'] > 0 else 0
|
| |
|
| | logger.info(f"Filtered {input_file.name}: {stats['passed_reads']}/{stats['total_reads']} reads passed")
|
| | return stats
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error filtering FASTQ: {e}")
|
| | return stats
|
| |
|
| | def trim_adapters(
|
| | self,
|
| | input_file: Path,
|
| | adapter_sequence: str,
|
| | output_file: Optional[Path] = None
|
| | ) -> Path:
|
| | """
|
| | Trim adapter sequences from reads
|
| |
|
| | Args:
|
| | input_file: Input FASTQ file
|
| | adapter_sequence: Adapter sequence to trim
|
| | output_file: Output trimmed file
|
| | """
|
| | if output_file is None:
|
| | output_file = self.output_dir / f"{input_file.stem}_trimmed.fastq"
|
| |
|
| | trimmed_count = 0
|
| |
|
| | try:
|
| | with open(input_file, 'r') as in_f, open(output_file, 'w') as out_f:
|
| | for title, sequence, quality in FastqGeneralIterator(in_f):
|
| |
|
| | adapter_pos = sequence.find(adapter_sequence)
|
| |
|
| | if adapter_pos != -1:
|
| |
|
| | sequence = sequence[:adapter_pos]
|
| | quality = quality[:adapter_pos]
|
| | trimmed_count += 1
|
| |
|
| | if len(sequence) >= self.min_length:
|
| | out_f.write(f"@{title}\n{sequence}\n+\n{quality}\n")
|
| |
|
| | logger.info(f"Trimmed adapters from {trimmed_count} reads")
|
| | return output_file
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error trimming adapters: {e}")
|
| | return input_file
|
| |
|
| | def calculate_statistics(self, fastq_file: Path) -> Dict:
|
| | """
|
| | Calculate statistics for FASTQ file
|
| |
|
| | Returns:
|
| | Dictionary with read count, length distribution, quality scores
|
| | """
|
| | stats = {
|
| | 'total_reads': 0,
|
| | 'total_bases': 0,
|
| | 'min_length': float('inf'),
|
| | 'max_length': 0,
|
| | 'avg_length': 0,
|
| | 'avg_quality': 0,
|
| | 'gc_content': 0
|
| | }
|
| |
|
| | lengths = []
|
| | qualities = []
|
| | gc_count = 0
|
| |
|
| | try:
|
| | with open(fastq_file, 'r') as f:
|
| | for title, sequence, quality in FastqGeneralIterator(f):
|
| | stats['total_reads'] += 1
|
| | seq_len = len(sequence)
|
| | stats['total_bases'] += seq_len
|
| |
|
| | lengths.append(seq_len)
|
| | stats['min_length'] = min(stats['min_length'], seq_len)
|
| | stats['max_length'] = max(stats['max_length'], seq_len)
|
| |
|
| |
|
| | quality_scores = [ord(q) - 33 for q in quality]
|
| | qualities.extend(quality_scores)
|
| |
|
| |
|
| | gc_count += sequence.count('G') + sequence.count('C')
|
| |
|
| | if stats['total_reads'] > 0:
|
| | stats['avg_length'] = sum(lengths) / len(lengths)
|
| | stats['avg_quality'] = sum(qualities) / len(qualities)
|
| | stats['gc_content'] = (gc_count / stats['total_bases']) * 100
|
| |
|
| | return stats
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error calculating statistics: {e}")
|
| | return stats
|
| |
|
| | def convert_to_fasta(
|
| | self,
|
| | input_file: Path,
|
| | output_file: Optional[Path] = None
|
| | ) -> Path:
|
| | """Convert FASTQ to FASTA format"""
|
| | if output_file is None:
|
| | output_file = self.output_dir / f"{input_file.stem}.fasta"
|
| |
|
| | try:
|
| | count = SeqIO.convert(str(input_file), "fastq", str(output_file), "fasta")
|
| | logger.info(f"Converted {count} sequences to FASTA")
|
| | return output_file
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error converting to FASTA: {e}")
|
| | return input_file
|
| |
|
| |
|
| | class FASTQQualityControl:
|
| | """Quality control analysis for FASTQ files"""
|
| |
|
| | def __init__(self):
|
| | self.processor = FASTQProcessor()
|
| |
|
| | def run_qc(self, fastq_file: Path) -> Dict:
|
| | """
|
| | Run comprehensive QC on FASTQ file
|
| |
|
| | Returns:
|
| | QC report dictionary
|
| | """
|
| | report = {
|
| | 'file': str(fastq_file),
|
| | 'statistics': {},
|
| | 'quality_check': 'PASS',
|
| | 'warnings': []
|
| | }
|
| |
|
| |
|
| | stats = self.processor.calculate_statistics(fastq_file)
|
| | report['statistics'] = stats
|
| |
|
| |
|
| | if stats['avg_quality'] < 20:
|
| | report['warnings'].append('Low average quality score')
|
| | report['quality_check'] = 'WARN'
|
| |
|
| | if stats['avg_length'] < 50:
|
| | report['warnings'].append('Short average read length')
|
| | report['quality_check'] = 'WARN'
|
| |
|
| | if stats['gc_content'] < 30 or stats['gc_content'] > 70:
|
| | report['warnings'].append(f'Unusual GC content: {stats["gc_content"]:.1f}%')
|
| |
|
| | return report
|
| |
|
| | def generate_qc_report(self, fastq_files: List[Path]) -> Dict:
|
| | """Generate QC report for multiple FASTQ files"""
|
| | reports = {}
|
| |
|
| | for fastq_file in fastq_files:
|
| | report = self.run_qc(fastq_file)
|
| | reports[fastq_file.name] = report
|
| |
|
| |
|
| | summary = {
|
| | 'total_files': len(fastq_files),
|
| | 'passed': sum(1 for r in reports.values() if r['quality_check'] == 'PASS'),
|
| | 'warnings': sum(1 for r in reports.values() if r['quality_check'] == 'WARN'),
|
| | 'failed': sum(1 for r in reports.values() if r['quality_check'] == 'FAIL')
|
| | }
|
| |
|
| | return {
|
| | 'summary': summary,
|
| | 'file_reports': reports
|
| | }
|
| |
|