#!/usr/bin/env python3 """ Video Loop Detector and Extractor Detects where a looped video starts repeating and extracts only the first unique segment using audio-based analysis with librosa. Usage: python loop_extractor.py "video.webm" # Output: video_loop1.webm """ import argparse import json import os import subprocess import sys import tempfile from pathlib import Path import numpy as np import librosa def get_video_duration(video_path: str) -> float: """Get video duration in seconds using ffprobe.""" cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", video_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffprobe failed: {result.stderr}") data = json.loads(result.stdout) return float(data["format"]["duration"]) def extract_audio(video_path: str, output_path: str, sample_rate: int = 22050) -> None: """Extract audio from video to WAV file using ffmpeg.""" cmd = [ "ffmpeg", "-y", "-i", video_path, "-vn", # No video "-ac", "1", # Mono "-ar", str(sample_rate), "-f", "wav", output_path ] print(f" Extracting audio at {sample_rate}Hz mono...") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg audio extraction failed: {result.stderr}") def compute_chroma_features(audio_path: str, hop_length: int = 22050) -> tuple[np.ndarray, int]: """ Compute chroma features from audio file. Args: audio_path: Path to audio file hop_length: Samples between frames (~1 second at 22050Hz) Returns: Tuple of (chroma features array, sample rate) """ print(f" Loading audio...") y, sr = librosa.load(audio_path, sr=22050, mono=True) duration_sec = len(y) / sr print(f" Audio duration: {duration_sec/3600:.2f} hours ({duration_sec:.0f} seconds)") print(f" Computing chroma features (hop={hop_length} samples, ~{hop_length/sr:.1f}s)...") chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length) print(f" Chroma shape: {chroma.shape} (12 pitch classes x {chroma.shape[1]} frames)") return chroma, sr def find_loop_period( chroma: np.ndarray, sr: int, hop_length: int, min_loop_sec: float = 900, # 15 minutes max_loop_sec: float = 10800 # 3 hours ) -> tuple[float, float]: """ Find the loop period using recurrence matrix and lag analysis. Args: chroma: Chroma feature matrix (12 x n_frames) sr: Sample rate hop_length: Hop length used for chroma min_loop_sec: Minimum loop length in seconds max_loop_sec: Maximum loop length in seconds Returns: Tuple of (loop_period_seconds, confidence_score) """ n_frames = chroma.shape[1] frame_duration = hop_length / sr # Convert time constraints to frame indices min_loop_frames = int(min_loop_sec / frame_duration) max_loop_frames = min(int(max_loop_sec / frame_duration), n_frames - 1) print(f" Building recurrence matrix...") print(f" Looking for loops between {min_loop_sec/60:.0f} min and {max_loop_sec/3600:.1f} hr") # Use time-delay embedding for cleaner results chroma_stack = librosa.feature.stack_memory(chroma, n_steps=4, delay=2) # Compute recurrence matrix with affinity mode for fuzzy matching # Using cosine similarity which is robust to amplitude variations rec = librosa.segment.recurrence_matrix( chroma_stack, mode='affinity', metric='cosine', sparse=False, sym=True ) print(f" Recurrence matrix shape: {rec.shape}") # Convert to lag matrix - this transforms diagonal patterns into horizontal bands print(" Converting to lag matrix...") lag = librosa.segment.recurrence_to_lag(rec) # Sum along time axis to get lag histogram # Strong peaks indicate dominant repetition periods lag_histogram = np.sum(lag, axis=1) # Only consider lags within our valid range lag_histogram[:min_loop_frames] = 0 lag_histogram[max_loop_frames:] = 0 # Find the strongest peak best_lag_frame = np.argmax(lag_histogram) best_score = lag_histogram[best_lag_frame] # Normalize score (0-1 range) max_possible = n_frames * 1.0 # Maximum possible sum confidence = best_score / max_possible if max_possible > 0 else 0 loop_period_sec = best_lag_frame * frame_duration print(f" Best lag: {best_lag_frame} frames = {loop_period_sec:.1f} seconds " f"({loop_period_sec/60:.1f} min)") print(f" Confidence score: {confidence:.3f}") return loop_period_sec, confidence def find_loop_period_autocorr( chroma: np.ndarray, sr: int, hop_length: int, min_loop_sec: float = 900, max_loop_sec: float = 10800 ) -> tuple[float, float]: """ Alternative: Find loop period using autocorrelation of chroma features. This method is faster and uses less memory than the full recurrence matrix. """ n_frames = chroma.shape[1] frame_duration = hop_length / sr min_loop_frames = int(min_loop_sec / frame_duration) max_loop_frames = min(int(max_loop_sec / frame_duration), n_frames // 2) print(" Computing autocorrelation of chroma features...") # Flatten chroma to 1D for autocorrelation (use mean across pitch classes) chroma_mean = np.mean(chroma, axis=0) # Normalize chroma_mean = ((chroma_mean - np.mean(chroma_mean)) / (np.std(chroma_mean) + 1e-8)) # Compute autocorrelation using FFT (efficient for long signals) n = len(chroma_mean) fft = np.fft.fft(chroma_mean, n=2*n) autocorr = np.fft.ifft(fft * np.conj(fft))[:n].real autocorr = autocorr / autocorr[0] # Normalize # Find peaks in valid range autocorr[:min_loop_frames] = 0 autocorr[max_loop_frames:] = 0 best_lag_frame = np.argmax(autocorr) confidence = autocorr[best_lag_frame] loop_period_sec = best_lag_frame * frame_duration print(f" Best lag: {best_lag_frame} frames = {loop_period_sec:.1f} seconds " f"({loop_period_sec/60:.1f} min)") print(f" Autocorrelation confidence: {confidence:.3f}") return loop_period_sec, confidence def extract_segment( video_path: str, output_path: str, duration_sec: float ) -> None: """Extract first segment of video using ffmpeg stream copy.""" cmd = [ "ffmpeg", "-y", "-i", video_path, "-t", str(duration_sec), "-c", "copy", # Stream copy, no re-encoding output_path ] print(f" Extracting first {duration_sec:.1f} seconds " f"({duration_sec/60:.1f} min)...") result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(f"ffmpeg extraction failed: {result.stderr}") def main(): parser = argparse.ArgumentParser( description="Detect video loop point and extract first unique segment" ) parser.add_argument("video", help="Input video file path") parser.add_argument( "-o", "--output", help="Output video file path (default: _loop1.)") parser.add_argument( "--min-loop", type=float, default=900, help="Minimum loop length in seconds (default: 900 = 15 min)") parser.add_argument( "--max-loop", type=float, default=10800, help="Maximum loop length in seconds (default: 10800 = 3 hr)") parser.add_argument( "--method", choices=["recurrence", "autocorr"], default="autocorr", help="Detection method: recurrence (accurate, slow) or autocorr (fast)") args = parser.parse_args() video_path = args.video if not os.path.exists(video_path): print(f"Error: Video file not found: {video_path}", file=sys.stderr) sys.exit(1) # Determine output path if args.output: output_path = args.output else: video_stem = Path(video_path).stem video_ext = Path(video_path).suffix output_path = str( Path(video_path).parent / f"{video_stem}_loop1{video_ext}") print(f"Input: {video_path}") print(f"Output: {output_path}") print() # Get video duration print("[1/5] Getting video info...") duration = get_video_duration(video_path) print(f" Duration: {duration/3600:.2f} hours") print() # Extract audio to temporary file print("[2/5] Extracting audio...") with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: audio_path = tmp.name try: extract_audio(video_path, audio_path) print() # Compute chroma features print("[3/5] Computing audio features...") # Use ~1 second hop for efficiency on long videos hop_length = 22050 chroma, sr = compute_chroma_features(audio_path, hop_length=hop_length) print() # Find loop period print("[4/5] Detecting loop period...") if args.method == "recurrence": loop_period, confidence = find_loop_period( chroma, sr, hop_length, min_loop_sec=args.min_loop, max_loop_sec=args.max_loop ) else: loop_period, confidence = find_loop_period_autocorr( chroma, sr, hop_length, min_loop_sec=args.min_loop, max_loop_sec=args.max_loop ) print() if loop_period < args.min_loop: print(f"Warning: Detected loop period ({loop_period:.0f}s) " f"is below minimum ({args.min_loop:.0f}s)") print("The video may not be a simple loop, " "or parameters need adjustment.") sys.exit(1) # Extract first segment print("[5/5] Extracting first loop segment...") extract_segment(video_path, output_path, loop_period) print() # Summary print("=" * 50) print("Done!") print(f" Detected loop period: {loop_period:.1f} seconds " f"({loop_period/60:.1f} min)") print(f" Confidence: {confidence:.3f}") print(f" Original duration: {duration:.1f} seconds " f"({duration/60:.1f} min)") print(f" Estimated repetitions: {duration/loop_period:.1f}x") print(f" Output: {output_path}") finally: # Cleanup if os.path.exists(audio_path): os.unlink(audio_path) if __name__ == "__main__": main()