From 41febdadfc34117f5ff110a58b905726bb4d2a09 Mon Sep 17 00:00:00 2001 From: Timo Bingmann Date: Fri, 5 Dec 2025 10:47:05 -0800 Subject: [PATCH] tb-dev-scripts: add ,video_loop_extractor.py --- pkgs/tb-dev-scripts/default.nix | 7 + pkgs/tb-dev-scripts/video_loop_extractor.py | 340 ++++++++++++++++++++ 2 files changed, 347 insertions(+) create mode 100755 pkgs/tb-dev-scripts/video_loop_extractor.py diff --git a/pkgs/tb-dev-scripts/default.nix b/pkgs/tb-dev-scripts/default.nix index 3aa4a12..0d5ebdf 100644 --- a/pkgs/tb-dev-scripts/default.nix +++ b/pkgs/tb-dev-scripts/default.nix @@ -13,5 +13,12 @@ pkgs.symlinkJoin { "--prefix PATH : ${lib.makeBinPath [ pkgs.git ]}" ]; } (builtins.readFile ./cmake_update_fetchcontent.py)) + + (pkgs.writers.writePython3Bin ",video_loop_extractor" { + libraries = [ pkgs.python3Packages.librosa pkgs.python3Packages.numpy ]; + makeWrapperArgs = [ + "--prefix PATH : ${lib.makeBinPath [ pkgs.ffmpeg ]}" + ]; + } (builtins.readFile ./video_loop_extractor.py)) ]; } diff --git a/pkgs/tb-dev-scripts/video_loop_extractor.py b/pkgs/tb-dev-scripts/video_loop_extractor.py new file mode 100755 index 0000000..95c9915 --- /dev/null +++ b/pkgs/tb-dev-scripts/video_loop_extractor.py @@ -0,0 +1,340 @@ +""" +Video Loop Detector and Extractor + +Detects where a looped video starts repeating and extracts only the first +unique segment using audio-based analysis with librosa. + +Usage: + python loop_extractor.py "video.webm" + # Output: video_loop1.webm +""" + +import argparse +import json +import os +import subprocess +import sys +import tempfile +from pathlib import Path + +import numpy as np +import librosa + + +def get_video_duration(video_path: str) -> float: + """Get video duration in seconds using ffprobe.""" + cmd = [ + "ffprobe", "-v", "quiet", + "-print_format", "json", + "-show_format", + video_path + ] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f"ffprobe failed: {result.stderr}") + + data = json.loads(result.stdout) + return float(data["format"]["duration"]) + + +def extract_audio( + video_path: str, output_path: str, sample_rate: int = 22050 +) -> None: + """Extract audio from video to WAV file using ffmpeg.""" + cmd = [ + "ffmpeg", "-y", + "-i", video_path, + "-vn", # No video + "-ac", "1", # Mono + "-ar", str(sample_rate), + "-f", "wav", + output_path + ] + print(f" Extracting audio at {sample_rate}Hz mono...") + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f"ffmpeg audio extraction failed: {result.stderr}") + + +def compute_chroma_features( + audio_path: str, hop_length: int = 22050 +) -> tuple[np.ndarray, int]: + """ + Compute chroma features from audio file. + + Args: + audio_path: Path to audio file + hop_length: Samples between frames (~1 second at 22050Hz) + + Returns: + Tuple of (chroma features array, sample rate) + """ + print(" Loading audio...") + y, sr = librosa.load(audio_path, sr=22050, mono=True) + + duration_sec = len(y) / sr + print(f" Audio duration: {duration_sec/3600:.2f} hours " + f"({duration_sec:.0f} seconds)") + + print(f" Computing chroma features (hop={hop_length} samples, " + f"~{hop_length/sr:.1f}s)...") + chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length) + + print(f" Chroma shape: {chroma.shape} " + f"(12 pitch classes x {chroma.shape[1]} frames)") + + return chroma, sr + + +def find_loop_period( + chroma: np.ndarray, + sr: int, + hop_length: int, + min_loop_sec: float = 900, # 15 minutes + max_loop_sec: float = 10800 # 3 hours +) -> tuple[float, float]: + """ + Find the loop period using recurrence matrix and lag analysis. + + Args: + chroma: Chroma feature matrix (12 x n_frames) + sr: Sample rate + hop_length: Hop length used for chroma + min_loop_sec: Minimum loop length in seconds + max_loop_sec: Maximum loop length in seconds + + Returns: + Tuple of (loop_period_seconds, confidence_score) + """ + n_frames = chroma.shape[1] + frame_duration = hop_length / sr + + # Convert time constraints to frame indices + min_loop_frames = int(min_loop_sec / frame_duration) + max_loop_frames = min(int(max_loop_sec / frame_duration), n_frames - 1) + + print(" Building recurrence matrix...") + print(f" Looking for loops between {min_loop_sec/60:.0f} min " + f"and {max_loop_sec/3600:.1f} hr") + + # Use time-delay embedding for cleaner results + chroma_stack = librosa.feature.stack_memory(chroma, n_steps=4, delay=2) + + # Compute recurrence matrix with affinity mode for fuzzy matching + # Using cosine similarity which is robust to amplitude variations + rec = librosa.segment.recurrence_matrix( + chroma_stack, + mode='affinity', + metric='cosine', + sparse=False, + sym=True + ) + + print(f" Recurrence matrix shape: {rec.shape}") + + # Convert to lag matrix - transforms diagonal patterns + # into horizontal bands + print(" Converting to lag matrix...") + lag = librosa.segment.recurrence_to_lag(rec) + + # Sum along time axis to get lag histogram + # Strong peaks indicate dominant repetition periods + lag_histogram = np.sum(lag, axis=1) + + # Only consider lags within our valid range + lag_histogram[:min_loop_frames] = 0 + lag_histogram[max_loop_frames:] = 0 + + # Find the strongest peak + best_lag_frame = np.argmax(lag_histogram) + best_score = lag_histogram[best_lag_frame] + + # Normalize score (0-1 range) + max_possible = n_frames * 1.0 # Maximum possible sum + confidence = best_score / max_possible if max_possible > 0 else 0 + + loop_period_sec = best_lag_frame * frame_duration + + print(f" Best lag: {best_lag_frame} frames " + f"= {loop_period_sec:.1f} seconds ({loop_period_sec/60:.1f} min)") + print(f" Confidence score: {confidence:.3f}") + + return loop_period_sec, confidence + + +def find_loop_period_autocorr( + chroma: np.ndarray, + sr: int, + hop_length: int, + min_loop_sec: float = 900, + max_loop_sec: float = 10800 +) -> tuple[float, float]: + """ + Alternative: Find loop period using autocorrelation of chroma features. + This method is faster and uses less memory than the full recurrence matrix. + """ + n_frames = chroma.shape[1] + frame_duration = hop_length / sr + + min_loop_frames = int(min_loop_sec / frame_duration) + max_loop_frames = min(int(max_loop_sec / frame_duration), n_frames // 2) + + print(" Computing autocorrelation of chroma features...") + + # Flatten chroma to 1D for autocorrelation (use mean across pitch classes) + chroma_mean = np.mean(chroma, axis=0) + + # Normalize + chroma_mean = ((chroma_mean - np.mean(chroma_mean)) + / (np.std(chroma_mean) + 1e-8)) + + # Compute autocorrelation using FFT (efficient for long signals) + n = len(chroma_mean) + fft = np.fft.fft(chroma_mean, n=2*n) + autocorr = np.fft.ifft(fft * np.conj(fft))[:n].real + autocorr = autocorr / autocorr[0] # Normalize + + # Find peaks in valid range + autocorr[:min_loop_frames] = 0 + autocorr[max_loop_frames:] = 0 + + best_lag_frame = np.argmax(autocorr) + confidence = autocorr[best_lag_frame] + + loop_period_sec = best_lag_frame * frame_duration + + print(f" Best lag: {best_lag_frame} frames " + f"= {loop_period_sec:.1f} seconds ({loop_period_sec/60:.1f} min)") + print(f" Autocorrelation confidence: {confidence:.3f}") + + return loop_period_sec, confidence + + +def extract_segment( + video_path: str, + output_path: str, + duration_sec: float +) -> None: + """Extract first segment of video using ffmpeg stream copy.""" + cmd = [ + "ffmpeg", "-y", + "-i", video_path, + "-t", str(duration_sec), + "-c", "copy", # Stream copy, no re-encoding + output_path + ] + print(f" Extracting first {duration_sec:.1f} seconds " + f"({duration_sec/60:.1f} min)...") + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + raise RuntimeError(f"ffmpeg extraction failed: {result.stderr}") + + +def main(): + parser = argparse.ArgumentParser( + description="Detect video loop point and extract first unique segment" + ) + parser.add_argument("video", help="Input video file path") + parser.add_argument( + "-o", "--output", + help="Output video file path (default: _loop1.)") + parser.add_argument( + "--min-loop", type=float, default=900, + help="Minimum loop length in seconds (default: 900 = 15 min)") + parser.add_argument( + "--max-loop", type=float, default=10800, + help="Maximum loop length in seconds (default: 10800 = 3 hr)") + parser.add_argument( + "--method", choices=["recurrence", "autocorr"], + default="autocorr", + help="Detection method: recurrence (accurate) or autocorr (fast)") + + args = parser.parse_args() + + video_path = args.video + if not os.path.exists(video_path): + print(f"Error: Video file not found: {video_path}", file=sys.stderr) + sys.exit(1) + + # Determine output path + if args.output: + output_path = args.output + else: + video_stem = Path(video_path).stem + video_ext = Path(video_path).suffix + output_path = str( + Path(video_path).parent / f"{video_stem}_loop1{video_ext}") + + print(f"Input: {video_path}") + print(f"Output: {output_path}") + print() + + # Get video duration + print("[1/5] Getting video info...") + duration = get_video_duration(video_path) + print(f" Duration: {duration/3600:.2f} hours") + print() + + # Extract audio to temporary file + print("[2/5] Extracting audio...") + with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: + audio_path = tmp.name + + try: + extract_audio(video_path, audio_path) + print() + + # Compute chroma features + print("[3/5] Computing audio features...") + # Use ~1 second hop for efficiency on long videos + hop_length = 22050 + chroma, sr = compute_chroma_features(audio_path, hop_length=hop_length) + print() + + # Find loop period + print("[4/5] Detecting loop period...") + if args.method == "recurrence": + loop_period, confidence = find_loop_period( + chroma, sr, hop_length, + min_loop_sec=args.min_loop, + max_loop_sec=args.max_loop + ) + else: + loop_period, confidence = find_loop_period_autocorr( + chroma, sr, hop_length, + min_loop_sec=args.min_loop, + max_loop_sec=args.max_loop + ) + print() + + if loop_period < args.min_loop: + print(f"Warning: Detected loop period ({loop_period:.0f}s) " + f"is below minimum ({args.min_loop:.0f}s)") + print("The video may not be a simple loop, " + "or parameters need adjustment.") + sys.exit(1) + + # Extract first segment + print("[5/5] Extracting first loop segment...") + extract_segment(video_path, output_path, loop_period) + print() + + # Summary + print("=" * 50) + print("Done!") + print(f" Detected loop period: {loop_period:.1f} seconds " + f"({loop_period/60:.1f} min)") + print(f" Confidence: {confidence:.3f}") + print(f" Original duration: {duration:.1f} seconds " + f"({duration/60:.1f} min)") + print(f" Estimated repetitions: {duration/loop_period:.1f}x") + print(f" Output: {output_path}") + + finally: + # Cleanup + if os.path.exists(audio_path): + os.unlink(audio_path) + + +if __name__ == "__main__": + main()