Spaces:
Runtime error
Runtime error
| """ | |
| TTS Dataset Collection Tool with Font Support and Enhanced Error Handling | |
| """ | |
| import os | |
| import json | |
| import nltk | |
| import gradio as gr | |
| from datetime import datetime | |
| from pathlib import Path | |
| import shutil | |
| import logging | |
| from typing import Dict, List, Tuple, Optional | |
| import traceback | |
| # Download NLTK data during initialization | |
| try: | |
| nltk.download('punkt', quiet=True) | |
| except Exception as e: | |
| print(f"Warning: Failed to download NLTK data: {str(e)}") | |
| print("Downloading from alternative source...") | |
| try: | |
| import ssl | |
| try: | |
| _create_unverified_https_context = ssl._create_unverified_context | |
| except AttributeError: | |
| pass | |
| else: | |
| ssl._create_default_https_context = _create_unverified_https_context | |
| nltk.download('punkt', quiet=True) | |
| except Exception as e: | |
| print(f"Critical error downloading NLTK data: {str(e)}") | |
| raise | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Font configurations | |
| FONT_STYLES = { | |
| "english_serif": { | |
| "name": "Times New Roman", | |
| "family": "serif", | |
| "css": "font-family: 'Times New Roman', serif;" | |
| }, | |
| "english_sans": { | |
| "name": "Arial", | |
| "family": "sans-serif", | |
| "css": "font-family: Arial, sans-serif;" | |
| }, | |
| "nastaliq": { | |
| "name": "Nastaliq", | |
| "family": "Jameel Noori Nastaleeq", | |
| "css": "font-family: 'Jameel Noori Nastaleeq', serif;" | |
| }, | |
| "naskh": { | |
| "name": "Naskh", | |
| "family": "Traditional Arabic", | |
| "css": "font-family: 'Traditional Arabic', serif;" | |
| } | |
| } | |
| class TTSDatasetCollector: | |
| """Manages TTS dataset collection and organization with enhanced features""" | |
| def __init__(self): | |
| """Initialize the TTS Dataset Collector""" | |
| self.root_path = Path(os.path.dirname(os.path.abspath(__file__))) / "dataset" | |
| self.sentences = [] | |
| self.current_index = 0 | |
| self.current_font = "english_serif" | |
| self.setup_directories() | |
| logger.info("TTS Dataset Collector initialized") | |
| def setup_directories(self) -> None: | |
| """Create necessary directory structure with logging""" | |
| try: | |
| # Create main dataset directory | |
| self.root_path.mkdir(exist_ok=True) | |
| # Create subdirectories | |
| for subdir in ['audio', 'transcriptions', 'metadata', 'fonts']: | |
| (self.root_path / subdir).mkdir(exist_ok=True) | |
| # Initialize log file | |
| log_file = self.root_path / 'dataset_log.txt' | |
| if not log_file.exists(): | |
| with open(log_file, 'w', encoding='utf-8') as f: | |
| f.write(f"Dataset collection initialized on {datetime.now().isoformat()}\n") | |
| logger.info("Directory structure created successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to create directory structure: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| raise RuntimeError("Failed to initialize directory structure") | |
| def log_operation(self, message: str, level: str = "info") -> None: | |
| """Log operations with timestamp and level""" | |
| try: | |
| log_file = self.root_path / 'dataset_log.txt' | |
| timestamp = datetime.now().isoformat() | |
| with open(log_file, 'a', encoding='utf-8') as f: | |
| f.write(f"[{timestamp}] [{level.upper()}] {message}\n") | |
| if level.lower() == "error": | |
| logger.error(message) | |
| else: | |
| logger.info(message) | |
| except Exception as e: | |
| logger.error(f"Failed to log operation: {str(e)}") | |
| def process_text(self, text: str) -> Tuple[bool, str]: | |
| """Process pasted or loaded text with error handling""" | |
| try: | |
| if not text.strip(): | |
| return False, "Text is empty" | |
| # Tokenize sentences | |
| self.sentences = nltk.sent_tokenize(text.strip()) | |
| if not self.sentences: | |
| return False, "No valid sentences found in text" | |
| self.current_index = 0 | |
| # Log success | |
| self.log_operation(f"Processed text with {len(self.sentences)} sentences") | |
| return True, f"Successfully loaded {len(self.sentences)} sentences" | |
| except Exception as e: | |
| error_msg = f"Error processing text: {str(e)}" | |
| self.log_operation(error_msg, "error") | |
| logger.error(traceback.format_exc()) | |
| return False, error_msg | |
| def load_text_file(self, file) -> Tuple[bool, str]: | |
| """Process and load text file with enhanced error handling""" | |
| if not file: | |
| return False, "No file provided" | |
| try: | |
| # Validate file extension | |
| if not file.name.endswith('.txt'): | |
| return False, "Only .txt files are supported" | |
| with open(file.name, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| return self.process_text(text) | |
| except UnicodeDecodeError: | |
| error_msg = "File encoding error. Please ensure the file is UTF-8 encoded" | |
| self.log_operation(error_msg, "error") | |
| return False, error_msg | |
| except Exception as e: | |
| error_msg = f"Error loading file: {str(e)}" | |
| self.log_operation(error_msg, "error") | |
| logger.error(traceback.format_exc()) | |
| return False, error_msg | |
| def get_styled_text(self, text: str) -> str: | |
| """Get text with current font styling""" | |
| font_css = FONT_STYLES[self.current_font]['css'] | |
| return f"<div style='{font_css}'>{text}</div>" | |
| def set_font(self, font_style: str) -> Tuple[bool, str]: | |
| """Set the current font style""" | |
| if font_style not in FONT_STYLES: | |
| return False, f"Invalid font style. Available styles: {', '.join(FONT_STYLES.keys())}" | |
| self.current_font = font_style | |
| return True, f"Font style set to {font_style}" | |
| def generate_filenames(self, dataset_name: str, speaker_id: str) -> Tuple[str, str]: | |
| """Generate unique filenames for audio and text files""" | |
| timestamp = datetime.now().strftime("%Y%m%d%H%M%S") | |
| sentence_id = f"{self.current_index+1:04d}" | |
| base_name = f"{dataset_name}_{speaker_id}_{sentence_id}_{timestamp}" | |
| return f"{base_name}.wav", f"{base_name}.txt" | |
| def save_recording(self, audio_file, speaker_id: str, dataset_name: str) -> Tuple[bool, str]: | |
| """Save recording with enhanced error handling and logging""" | |
| if not all([audio_file, speaker_id, dataset_name]): | |
| missing = [] | |
| if not audio_file: missing.append("audio recording") | |
| if not speaker_id: missing.append("speaker ID") | |
| if not dataset_name: missing.append("dataset name") | |
| return False, f"Missing required information: {', '.join(missing)}" | |
| try: | |
| # Validate inputs | |
| if not speaker_id.strip().isalnum(): | |
| return False, "Speaker ID must contain only letters and numbers" | |
| if not dataset_name.strip().isalnum(): | |
| return False, "Dataset name must contain only letters and numbers" | |
| # Generate filenames | |
| audio_name, text_name = self.generate_filenames(dataset_name, speaker_id) | |
| # Create speaker directories | |
| audio_dir = self.root_path / 'audio' / speaker_id | |
| text_dir = self.root_path / 'transcriptions' / speaker_id | |
| audio_dir.mkdir(exist_ok=True) | |
| text_dir.mkdir(exist_ok=True) | |
| # Save audio file | |
| audio_path = audio_dir / audio_name | |
| shutil.copy2(audio_file, audio_path) | |
| # Save transcription | |
| text_path = text_dir / text_name | |
| self.save_transcription( | |
| text_path, | |
| self.sentences[self.current_index], | |
| { | |
| 'speaker_id': speaker_id, | |
| 'dataset_name': dataset_name, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'audio_file': audio_name, | |
| 'font_style': self.current_font | |
| } | |
| ) | |
| # Update metadata | |
| self.update_metadata(speaker_id, dataset_name) | |
| # Log success | |
| self.log_operation( | |
| f"Saved recording: Speaker={speaker_id}, Dataset={dataset_name}, " | |
| f"Audio={audio_name}, Text={text_name}" | |
| ) | |
| return True, f"Recording saved successfully as {audio_name}" | |
| except Exception as e: | |
| error_msg = f"Error saving recording: {str(e)}" | |
| self.log_operation(error_msg, "error") | |
| logger.error(traceback.format_exc()) | |
| return False, error_msg | |
| def save_transcription(self, file_path: Path, text: str, metadata: Dict) -> None: | |
| """Save transcription with metadata""" | |
| content = f"""[METADATA] | |
| Recording_ID: {metadata['audio_file']} | |
| Speaker_ID: {metadata['speaker_id']} | |
| Dataset_Name: {metadata['dataset_name']} | |
| Timestamp: {metadata['timestamp']} | |
| Font_Style: {metadata['font_style']} | |
| [TEXT] | |
| {text} | |
| """ | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| def update_metadata(self, speaker_id: str, dataset_name: str) -> None: | |
| """Update dataset metadata with error handling""" | |
| metadata_file = self.root_path / 'metadata' / 'dataset_info.json' | |
| try: | |
| if metadata_file.exists(): | |
| with open(metadata_file, 'r') as f: | |
| metadata = json.load(f) | |
| else: | |
| metadata = {'speakers': {}, 'last_updated': None} | |
| # Update speaker data | |
| if speaker_id not in metadata['speakers']: | |
| metadata['speakers'][speaker_id] = { | |
| 'total_recordings': 0, | |
| 'datasets': {} | |
| } | |
| if dataset_name not in metadata['speakers'][speaker_id]['datasets']: | |
| metadata['speakers'][speaker_id]['datasets'][dataset_name] = { | |
| 'recordings': 0, | |
| 'sentences': len(self.sentences), | |
| 'first_recording': datetime.now().isoformat(), | |
| 'last_recording': None, | |
| 'font_styles_used': [] | |
| } | |
| # Update counts and timestamps | |
| metadata['speakers'][speaker_id]['total_recordings'] += 1 | |
| metadata['speakers'][speaker_id]['datasets'][dataset_name]['recordings'] += 1 | |
| metadata['speakers'][speaker_id]['datasets'][dataset_name]['last_recording'] = \ | |
| datetime.now().isoformat() | |
| # Update font styles | |
| if self.current_font not in metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used']: | |
| metadata['speakers'][speaker_id]['datasets'][dataset_name]['font_styles_used'].append( | |
| self.current_font | |
| ) | |
| metadata['last_updated'] = datetime.now().isoformat() | |
| # Save updated metadata | |
| with open(metadata_file, 'w') as f: | |
| json.dump(metadata, f, indent=2) | |
| self.log_operation(f"Updated metadata for {speaker_id} in {dataset_name}") | |
| except Exception as e: | |
| error_msg = f"Error updating metadata: {str(e)}" | |
| self.log_operation(error_msg, "error") | |
| logger.error(traceback.format_exc()) | |
| def get_navigation_info(self) -> Dict[str, Optional[str]]: | |
| """Get current and next sentence information""" | |
| if not self.sentences: | |
| return { | |
| 'current': None, | |
| 'next': None, | |
| 'progress': "No text loaded" | |
| } | |
| current = self.get_styled_text(self.sentences[self.current_index]) | |
| next_text = None | |
| if self.current_index < len(self.sentences) - 1: | |
| next_text = self.get_styled_text(self.sentences[self.current_index + 1]) | |
| progress = f"Sentence {self.current_index + 1} of {len(self.sentences)}" | |
| return { | |
| 'current': current, | |
| 'next': next_text, | |
| 'progress': progress | |
| } | |
| def navigate(self, direction: str) -> Dict[str, Optional[str]]: | |
| """Navigate through sentences""" | |
| if not self.sentences: | |
| return { | |
| 'current': None, | |
| 'next': None, | |
| 'progress': "No text loaded", | |
| 'status': "⚠️ Please load a text file first" | |
| } | |
| if direction == "next" and self.current_index < len(self.sentences) - 1: | |
| self.current_index += 1 | |
| elif direction == "prev" and self.current_index > 0: | |
| self.current_index -= 1 | |
| nav_info = self.get_navigation_info() | |
| nav_info['status'] = "✅ Navigation successful" | |
| return nav_info | |
| def get_dataset_statistics(self) -> Dict: | |
| """Get current dataset statistics""" | |
| try: | |
| metadata_file = self.root_path / 'metadata' / 'dataset_info.json' | |
| if not metadata_file.exists(): | |
| return {} | |
| with open(metadata_file, 'r') as f: | |
| return json.load(f) | |
| except Exception as e: | |
| logger.error(f"Error reading dataset statistics: {str(e)}") | |
| return {} | |
| def create_interface(): | |
| """Create Gradio interface with enhanced features""" | |
| # Create custom CSS for fonts | |
| custom_css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| } | |
| .record-button { | |
| font-size: 1.2em !important; | |
| padding: 20px !important; | |
| } | |
| .sentence-display { | |
| font-size: 1.4em !important; | |
| padding: 15px !important; | |
| border: 1px solid #ddd !important; | |
| border-radius: 8px !important; | |
| margin: 10px 0 !important; | |
| min-height: 100px !important; | |
| } | |
| """ | |
| # Add font-face declarations | |
| for font_style, font_info in FONT_STYLES.items(): | |
| if font_style in ['nastaliq', 'naskh']: | |
| custom_css += f""" | |
| @font-face {{ | |
| font-family: '{font_info["family"]}'; | |
| src: url('fonts/{font_info["family"]}.ttf') format('truetype'); | |
| }} | |
| """ | |
| collector = TTSDatasetCollector() | |
| with gr.Blocks(title="TTS Dataset Collection Tool", css=custom_css) as interface: | |
| gr.Markdown("# TTS Dataset Collection Tool") | |
| with gr.Row(): | |
| # Left column - Configuration and Input | |
| with gr.Column(): | |
| text_input = gr.Textbox( | |
| label="Paste Text", | |
| placeholder="Paste your text here...", | |
| lines=5 | |
| ) | |
| file_input = gr.File( | |
| label="Or Upload Text File (.txt)", | |
| file_types=[".txt"] | |
| ) | |
| speaker_id = gr.Textbox( | |
| label="Speaker ID", | |
| placeholder="Enter unique speaker identifier (letters and numbers only)" | |
| ) | |
| dataset_name = gr.Textbox( | |
| label="Dataset Name", | |
| placeholder="Enter dataset name (letters and numbers only)" | |
| ) | |
| font_select = gr.Dropdown( | |
| choices=list(FONT_STYLES.keys()), | |
| value="english_serif", | |
| label="Select Font Style" | |
| ) | |
| # Right column - Recording | |
| with gr.Column(): | |
| current_text = gr.HTML( | |
| label="Current Sentence", | |
| elem_classes=["sentence-display"] | |
| ) | |
| audio_recorder = gr.Audio( | |
| label="Record Audio", | |
| type="filepath", | |
| elem_classes=["record-button"] | |
| ) | |
| next_text = gr.HTML( | |
| label="Next Sentence", | |
| elem_classes=["sentence-display"] | |
| ) | |
| # Controls | |
| with gr.Row(): | |
| prev_btn = gr.Button("Previous", variant="secondary") | |
| next_btn = gr.Button("Next", variant="primary") | |
| save_btn = gr.Button("Save Recording", variant="primary", elem_classes=["record-button"]) | |
| # Status and Progress | |
| with gr.Row(): | |
| progress = gr.Textbox( | |
| label="Progress", | |
| interactive=False | |
| ) | |
| status = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| max_lines=3 | |
| ) | |
| # Dataset Info | |
| with gr.Row(): | |
| dataset_info = gr.JSON( | |
| label="Dataset Statistics", | |
| value={} | |
| ) | |
| def process_pasted_text(text): | |
| """Handle pasted text input""" | |
| if not text: | |
| return { | |
| current_text: "", | |
| next_text: "", | |
| progress: "", | |
| status: "⚠️ No text provided", | |
| dataset_info: collector.get_dataset_statistics() | |
| } | |
| success, msg = collector.process_text(text) | |
| if not success: | |
| return { | |
| current_text: "", | |
| next_text: "", | |
| progress: "", | |
| status: f"❌ {msg}", | |
| dataset_info: collector.get_dataset_statistics() | |
| } | |
| nav_info = collector.get_navigation_info() | |
| return { | |
| current_text: nav_info['current'], | |
| next_text: nav_info['next'], | |
| progress: nav_info['progress'], | |
| status: f"✅ {msg}", | |
| dataset_info: collector.get_dataset_statistics() | |
| } | |
| def update_font(font_style): | |
| """Update font and refresh display""" | |
| success, msg = collector.set_font(font_style) | |
| if not success: | |
| return {status: msg} | |
| nav_info = collector.get_navigation_info() | |
| return { | |
| current_text: nav_info['current'], | |
| next_text: nav_info['next'], | |
| status: f"Font updated to {font_style}" | |
| } | |
| def load_file(file): | |
| """Handle file loading with enhanced error reporting""" | |
| if not file: | |
| return { | |
| current_text: "", | |
| next_text: "", | |
| progress: "", | |
| status: "⚠️ No file selected", | |
| dataset_info: collector.get_dataset_statistics() | |
| } | |
| success, msg = collector.load_text_file(file) | |
| if not success: | |
| return { | |
| current_text: "", | |
| next_text: "", | |
| progress: "", | |
| status: f"❌ {msg}", | |
| dataset_info: collector.get_dataset_statistics() | |
| } | |
| nav_info = collector.get_navigation_info() | |
| return { | |
| current_text: nav_info['current'], | |
| next_text: nav_info['next'], | |
| progress: nav_info['progress'], | |
| status: f"✅ {msg}", | |
| dataset_info: collector.get_dataset_statistics() | |
| } | |
| def save_current_recording(audio_file, speaker_id_value, dataset_name_value): | |
| """Handle saving the current recording""" | |
| if not audio_file: | |
| return {status: "⚠️ Please record audio first"} | |
| success, msg = collector.save_recording( | |
| audio_file, speaker_id_value, dataset_name_value | |
| ) | |
| if not success: | |
| return { | |
| status: f"❌ {msg}", | |
| dataset_info: collector.get_dataset_statistics() | |
| } | |
| # Auto-advance to next sentence after successful save | |
| nav_info = collector.navigate("next") | |
| return { | |
| current_text: nav_info['current'], | |
| next_text: nav_info['next'], | |
| progress: nav_info['progress'], | |
| status: f"✅ {msg}", | |
| dataset_info: collector.get_dataset_statistics() | |
| } | |
| def navigate_sentences(direction): | |
| """Handle navigation between sentences""" | |
| nav_info = collector.navigate(direction) | |
| return { | |
| current_text: nav_info['current'], | |
| next_text: nav_info['next'], | |
| progress: nav_info['progress'], | |
| status: nav_info['status'] | |
| } | |
| # Event handlers | |
| text_input.change( | |
| process_pasted_text, | |
| inputs=[text_input], | |
| outputs=[current_text, next_text, progress, status, dataset_info] | |
| ) | |
| file_input.upload( | |
| load_file, | |
| inputs=[file_input], | |
| outputs=[current_text, next_text, progress, status, dataset_info] | |
| ) | |
| font_select.change( | |
| update_font, | |
| inputs=[font_select], | |
| outputs=[current_text, next_text, status] | |
| ) | |
| save_btn.click( | |
| save_current_recording, | |
| inputs=[audio_recorder, speaker_id, dataset_name], | |
| outputs=[current_text, next_text, progress, status, dataset_info] | |
| ) | |
| prev_btn.click( | |
| lambda: navigate_sentences("prev"), | |
| outputs=[current_text, next_text, progress, status] | |
| ) | |
| next_btn.click( | |
| lambda: navigate_sentences("next"), | |
| outputs=[current_text, next_text, progress, status] | |
| ) | |
| # Initialize dataset info | |
| dataset_info.value = collector.get_dataset_statistics() | |
| return interface | |
| if __name__ == "__main__": | |
| try: | |
| # Set up any required environment variables | |
| os.environ["GRADIO_SERVER_NAME"] = "0.0.0.0" | |
| os.environ["GRADIO_SERVER_PORT"] = "7860" | |
| # Create and launch the interface | |
| interface = create_interface() | |
| interface.queue() # Enable queuing for better handling of concurrent users | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| debug=True, | |
| show_error=True | |
| ) | |
| except Exception as e: | |
| logger.error(f"Failed to launch interface: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| raise |