youtube_tutorial / smolagent_processor.py
abdulshakur's picture
Upload folder using huggingface_hub
4d338c7 verified
"""
SmoLAgent processor for YouTube transcripts.
Handles transcript processing and step extraction.
"""
import re
import logging
from typing import Dict, List, Optional, Any, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SmoLAgentProcessor:
"""
Processor for YouTube transcripts using SmoLAgent.
This class handles the processing of YouTube transcripts to extract
meaningful steps and code snippets from tutorial videos.
"""
def __init__(self):
"""Initialize the SmoLAgentProcessor."""
logger.info("Initializing SmoLAgentProcessor")
# Regular expressions for code detection
self.code_patterns = [
# Python patterns
r'import\s+[\w\s,\.]+',
r'from\s+[\w\.]+\s+import\s+[\w\s,\.]+',
r'def\s+\w+\s*\([^)]*\)\s*:',
r'class\s+\w+(\s*\([^)]*\))?\s*:',
r'if\s+.*:\s*$',
r'for\s+.*:\s*$',
r'while\s+.*:\s*$',
r'try\s*:\s*$',
r'except\s+.*:\s*$',
r'return\s+.*',
r'print\s*\(',
r'with\s+.*:\s*$',
r'lambda\s+.*:',
r'@\w+',
# JavaScript patterns
r'function\s+\w+\s*\([^)]*\)\s*{',
r'const\s+\w+\s*=',
r'let\s+\w+\s*=',
r'var\s+\w+\s*=',
r'import\s+{[^}]*}\s+from',
r'export\s+',
r'=>\s*{',
r'document\.querySelector',
r'async\s+function',
r'await\s+',
# HTML patterns
r'<\w+[^>]*>',
r'</\w+>',
# CSS patterns
r'\.\w+\s*{',
r'#\w+\s*{',
r'@media',
r'@keyframes',
# Shell/Command line patterns
r'npm\s+install',
r'pip\s+install',
r'git\s+',
r'docker\s+',
r'cd\s+',
r'mkdir\s+',
r'touch\s+',
r'ls\s+',
r'rm\s+',
# General code indicators
r'```\w*',
r'`[^`]+`',
r'\$\s+\w+',
]
# Compile patterns for efficiency
self.compiled_patterns = [re.compile(pattern, re.IGNORECASE) for pattern in self.code_patterns]
# Step indicator patterns
self.step_indicators = [
r'step\s+\d+',
r'first\s+step',
r'next\s+step',
r'final\s+step',
r'let\'s\s+start',
r'now\s+we',
r'next\s+we',
r'first\s+we',
r'finally\s+we',
r'let\'s\s+do',
r'we\s+need\s+to',
r'you\s+need\s+to',
r'we\'re\s+going\s+to',
r'i\'m\s+going\s+to',
r'let\'s\s+create',
r'let\'s\s+add',
r'let\'s\s+implement',
r'let\'s\s+build',
r'let\'s\s+make',
r'let\'s\s+set\s+up',
r'let\'s\s+configure',
r'let\'s\s+install',
r'let\'s\s+initialize',
r'let\'s\s+define',
r'let\'s\s+write',
r'let\'s\s+move\s+on\s+to',
r'moving\s+on\s+to',
r'now\s+let\'s',
r'the\s+next\s+thing',
r'after\s+that',
r'once\s+you\'ve',
r'once\s+we\'ve',
r'now\s+that\s+we',
r'now\s+that\s+you',
r'to\s+begin',
r'to\s+start',
r'to\s+get\s+started',
r'first\s+thing',
r'second\s+thing',
r'third\s+thing',
r'lastly',
r'finally',
r'in\s+conclusion',
r'to\s+summarize',
r'to\s+wrap\s+up',
]
# Compile step indicators for efficiency
self.compiled_step_indicators = [re.compile(pattern, re.IGNORECASE) for pattern in self.step_indicators]
# Programming language detection patterns
self.language_patterns = {
'python': [
r'import\s+[\w\s,\.]+',
r'from\s+[\w\.]+\s+import\s+[\w\s,\.]+',
r'def\s+\w+\s*\([^)]*\)\s*:',
r'class\s+\w+(\s*\([^)]*\))?\s*:',
r'print\s*\(',
r'if\s+.*:\s*$',
r'for\s+.*:\s*$',
r'while\s+.*:\s*$',
],
'javascript': [
r'function\s+\w+\s*\([^)]*\)\s*{',
r'const\s+\w+\s*=',
r'let\s+\w+\s*=',
r'var\s+\w+\s*=',
r'import\s+{[^}]*}\s+from',
r'export\s+',
r'=>\s*{',
r'document\.',
r'window\.',
],
'html': [
r'<html',
r'<head',
r'<body',
r'<div',
r'<span',
r'<p>',
r'<a\s+href',
r'<img\s+src',
r'<script',
r'<style',
],
'css': [
r'\.\w+\s*{',
r'#\w+\s*{',
r'@media',
r'@keyframes',
r'margin:',
r'padding:',
r'color:',
r'background:',
],
'shell': [
r'npm\s+install',
r'pip\s+install',
r'git\s+',
r'docker\s+',
r'cd\s+',
r'mkdir\s+',
r'touch\s+',
r'ls\s+',
r'rm\s+',
],
}
# Compile language patterns for efficiency
self.compiled_language_patterns = {
lang: [re.compile(pattern, re.IGNORECASE) for pattern in patterns]
for lang, patterns in self.language_patterns.items()
}
def process_transcript(self, transcript: List[Dict[str, Any]], chapters: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Process the transcript to extract steps.
Args:
transcript: List of transcript segments with text and timestamps
chapters: List of chapters with title, start_time, end_time
Returns:
List of steps with timestamp, text, and code information
"""
if not transcript:
logger.warning("Empty transcript provided")
return []
logger.info(f"Processing transcript with {len(transcript)} segments and {len(chapters)} chapters")
# Merge adjacent transcript segments
merged_segments = self._merge_adjacent_segments(transcript)
logger.info(f"Merged into {len(merged_segments)} segments")
# Extract steps from merged segments
steps = self._extract_steps(merged_segments, chapters)
logger.info(f"Extracted {len(steps)} steps")
# Detect code in steps
steps_with_code = self._detect_code_in_steps(steps)
logger.info(f"Detected code in steps, final count: {len(steps_with_code)}")
return steps_with_code
def _merge_adjacent_segments(self, transcript: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Merge adjacent transcript segments that are part of the same sentence.
Args:
transcript: List of transcript segments
Returns:
List of merged transcript segments
"""
if not transcript:
return []
merged = []
current_segment = transcript[0].copy()
for i in range(1, len(transcript)):
segment = transcript[i]
# Check if segments are close in time (within 2 seconds)
time_gap = segment["start"] - (current_segment["start"] + current_segment.get("duration", 0))
# Check if the current segment ends with a sentence-ending punctuation
current_text_ends_sentence = re.search(r'[.!?]\s*$', current_segment["text"])
if time_gap < 2 and not current_text_ends_sentence:
# Merge segments
current_segment["text"] += " " + segment["text"]
current_segment["duration"] = segment["start"] + segment.get("duration", 0) - current_segment["start"]
else:
# Start a new segment
merged.append(current_segment)
current_segment = segment.copy()
# Add the last segment
merged.append(current_segment)
return merged
def _extract_steps(self, segments: List[Dict[str, Any]], chapters: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Extract steps from transcript segments.
Args:
segments: List of transcript segments
chapters: List of chapters
Returns:
List of steps with timestamp and text
"""
steps = []
# If we have chapters, use them as the primary structure
if chapters:
logger.info("Using chapters as primary structure for steps")
for chapter in chapters:
chapter_start = chapter["start_time"]
chapter_end = chapter.get("end_time", float("inf"))
# Find segments that belong to this chapter
chapter_segments = [
s for s in segments
if s["start"] >= chapter_start and s["start"] < chapter_end
]
if not chapter_segments:
continue
# Add chapter as a step
steps.append({
"timestamp": self._format_timestamp(chapter_start),
"text": f"## {chapter['title']}",
"start_seconds": chapter_start,
"is_chapter": True
})
# Extract steps within this chapter
chapter_steps = self._extract_steps_from_segments(chapter_segments)
# If no steps found within chapter, add the first segment as a step
if not chapter_steps and chapter_segments:
chapter_steps = [{
"timestamp": self._format_timestamp(chapter_segments[0]["start"]),
"text": chapter_segments[0]["text"],
"start_seconds": chapter_segments[0]["start"],
"is_chapter": False
}]
steps.extend(chapter_steps)
else:
# No chapters, extract steps directly from segments
logger.info("No chapters available, extracting steps directly from segments")
steps = self._extract_steps_from_segments(segments)
# If no steps found, create steps based on time intervals
if not steps and segments:
logger.info("No clear steps found, creating steps based on time intervals")
# Get total duration
if len(segments) > 1:
total_duration = segments[-1]["start"] + segments[-1].get("duration", 0) - segments[0]["start"]
else:
total_duration = segments[0].get("duration", 300) # Default to 5 minutes if only one segment
# Create steps every 2 minutes or at least 5 steps
step_count = max(5, int(total_duration / 120))
interval = total_duration / step_count
for i in range(step_count):
target_time = segments[0]["start"] + i * interval
# Find the closest segment
closest_segment = min(segments, key=lambda s: abs(s["start"] - target_time))
steps.append({
"timestamp": self._format_timestamp(closest_segment["start"]),
"text": closest_segment["text"],
"start_seconds": closest_segment["start"],
"is_chapter": False
})
# Sort steps by timestamp
steps.sort(key=lambda x: x["start_seconds"])
return steps
def _extract_steps_from_segments(self, segments: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Extract steps from transcript segments based on step indicators.
Args:
segments: List of transcript segments
Returns:
List of steps with timestamp and text
"""
steps = []
for segment in segments:
text = segment["text"]
# Check if the segment contains a step indicator
is_step = any(pattern.search(text) for pattern in self.compiled_step_indicators)
# Check if the segment contains code
is_code = any(pattern.search(text) for pattern in self.compiled_patterns)
# Add as a step if it's a step indicator or contains code
if is_step or is_code:
steps.append({
"timestamp": self._format_timestamp(segment["start"]),
"text": text,
"start_seconds": segment["start"],
"is_chapter": False
})
return steps
def _detect_code_in_steps(self, steps: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Detect code snippets in steps.
Args:
steps: List of steps
Returns:
List of steps with code information
"""
steps_with_code = []
for step in steps:
text = step["text"]
# Skip chapter headings for code detection
if step.get("is_chapter", False):
steps_with_code.append(step)
continue
# Check if the text contains code
is_code = any(pattern.search(text) for pattern in self.compiled_patterns)
if is_code:
# Detect programming language
language = self._detect_language(text)
steps_with_code.append({
**step,
"is_code": True,
"code_language": language,
"code_content": text
})
else:
steps_with_code.append({
**step,
"is_code": False
})
return steps_with_code
def _detect_language(self, text: str) -> str:
"""
Detect the programming language of a code snippet.
Args:
text: Code snippet text
Returns:
Detected programming language
"""
language_scores = {}
for lang, patterns in self.compiled_language_patterns.items():
score = sum(1 for pattern in patterns if pattern.search(text))
language_scores[lang] = score
if not language_scores or max(language_scores.values()) == 0:
return "text"
return max(language_scores.items(), key=lambda x: x[1])[0]
def _format_timestamp(self, seconds: float) -> str:
"""
Format seconds as MM:SS timestamp.
Args:
seconds: Time in seconds
Returns:
Formatted timestamp string
"""
minutes = int(seconds // 60)
seconds = int(seconds % 60)
return f"{minutes}:{seconds:02d}"