1
0

tb-dev-scripts: add ,video_loop_extractor.py

This commit is contained in:
2025-12-05 10:47:05 -08:00
parent 1000663cb5
commit 41febdadfc
2 changed files with 347 additions and 0 deletions

View File

@@ -13,5 +13,12 @@ pkgs.symlinkJoin {
"--prefix PATH : ${lib.makeBinPath [ pkgs.git ]}" "--prefix PATH : ${lib.makeBinPath [ pkgs.git ]}"
]; ];
} (builtins.readFile ./cmake_update_fetchcontent.py)) } (builtins.readFile ./cmake_update_fetchcontent.py))
(pkgs.writers.writePython3Bin ",video_loop_extractor" {
libraries = [ pkgs.python3Packages.librosa pkgs.python3Packages.numpy ];
makeWrapperArgs = [
"--prefix PATH : ${lib.makeBinPath [ pkgs.ffmpeg ]}"
];
} (builtins.readFile ./video_loop_extractor.py))
]; ];
} }

View File

@@ -0,0 +1,340 @@
"""
Video Loop Detector and Extractor
Detects where a looped video starts repeating and extracts only the first
unique segment using audio-based analysis with librosa.
Usage:
python loop_extractor.py "video.webm"
# Output: video_loop1.webm
"""
import argparse
import json
import os
import subprocess
import sys
import tempfile
from pathlib import Path
import numpy as np
import librosa
def get_video_duration(video_path: str) -> float:
"""Get video duration in seconds using ffprobe."""
cmd = [
"ffprobe", "-v", "quiet",
"-print_format", "json",
"-show_format",
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffprobe failed: {result.stderr}")
data = json.loads(result.stdout)
return float(data["format"]["duration"])
def extract_audio(
video_path: str, output_path: str, sample_rate: int = 22050
) -> None:
"""Extract audio from video to WAV file using ffmpeg."""
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-vn", # No video
"-ac", "1", # Mono
"-ar", str(sample_rate),
"-f", "wav",
output_path
]
print(f" Extracting audio at {sample_rate}Hz mono...")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg audio extraction failed: {result.stderr}")
def compute_chroma_features(
audio_path: str, hop_length: int = 22050
) -> tuple[np.ndarray, int]:
"""
Compute chroma features from audio file.
Args:
audio_path: Path to audio file
hop_length: Samples between frames (~1 second at 22050Hz)
Returns:
Tuple of (chroma features array, sample rate)
"""
print(" Loading audio...")
y, sr = librosa.load(audio_path, sr=22050, mono=True)
duration_sec = len(y) / sr
print(f" Audio duration: {duration_sec/3600:.2f} hours "
f"({duration_sec:.0f} seconds)")
print(f" Computing chroma features (hop={hop_length} samples, "
f"~{hop_length/sr:.1f}s)...")
chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length)
print(f" Chroma shape: {chroma.shape} "
f"(12 pitch classes x {chroma.shape[1]} frames)")
return chroma, sr
def find_loop_period(
chroma: np.ndarray,
sr: int,
hop_length: int,
min_loop_sec: float = 900, # 15 minutes
max_loop_sec: float = 10800 # 3 hours
) -> tuple[float, float]:
"""
Find the loop period using recurrence matrix and lag analysis.
Args:
chroma: Chroma feature matrix (12 x n_frames)
sr: Sample rate
hop_length: Hop length used for chroma
min_loop_sec: Minimum loop length in seconds
max_loop_sec: Maximum loop length in seconds
Returns:
Tuple of (loop_period_seconds, confidence_score)
"""
n_frames = chroma.shape[1]
frame_duration = hop_length / sr
# Convert time constraints to frame indices
min_loop_frames = int(min_loop_sec / frame_duration)
max_loop_frames = min(int(max_loop_sec / frame_duration), n_frames - 1)
print(" Building recurrence matrix...")
print(f" Looking for loops between {min_loop_sec/60:.0f} min "
f"and {max_loop_sec/3600:.1f} hr")
# Use time-delay embedding for cleaner results
chroma_stack = librosa.feature.stack_memory(chroma, n_steps=4, delay=2)
# Compute recurrence matrix with affinity mode for fuzzy matching
# Using cosine similarity which is robust to amplitude variations
rec = librosa.segment.recurrence_matrix(
chroma_stack,
mode='affinity',
metric='cosine',
sparse=False,
sym=True
)
print(f" Recurrence matrix shape: {rec.shape}")
# Convert to lag matrix - transforms diagonal patterns
# into horizontal bands
print(" Converting to lag matrix...")
lag = librosa.segment.recurrence_to_lag(rec)
# Sum along time axis to get lag histogram
# Strong peaks indicate dominant repetition periods
lag_histogram = np.sum(lag, axis=1)
# Only consider lags within our valid range
lag_histogram[:min_loop_frames] = 0
lag_histogram[max_loop_frames:] = 0
# Find the strongest peak
best_lag_frame = np.argmax(lag_histogram)
best_score = lag_histogram[best_lag_frame]
# Normalize score (0-1 range)
max_possible = n_frames * 1.0 # Maximum possible sum
confidence = best_score / max_possible if max_possible > 0 else 0
loop_period_sec = best_lag_frame * frame_duration
print(f" Best lag: {best_lag_frame} frames "
f"= {loop_period_sec:.1f} seconds ({loop_period_sec/60:.1f} min)")
print(f" Confidence score: {confidence:.3f}")
return loop_period_sec, confidence
def find_loop_period_autocorr(
chroma: np.ndarray,
sr: int,
hop_length: int,
min_loop_sec: float = 900,
max_loop_sec: float = 10800
) -> tuple[float, float]:
"""
Alternative: Find loop period using autocorrelation of chroma features.
This method is faster and uses less memory than the full recurrence matrix.
"""
n_frames = chroma.shape[1]
frame_duration = hop_length / sr
min_loop_frames = int(min_loop_sec / frame_duration)
max_loop_frames = min(int(max_loop_sec / frame_duration), n_frames // 2)
print(" Computing autocorrelation of chroma features...")
# Flatten chroma to 1D for autocorrelation (use mean across pitch classes)
chroma_mean = np.mean(chroma, axis=0)
# Normalize
chroma_mean = ((chroma_mean - np.mean(chroma_mean))
/ (np.std(chroma_mean) + 1e-8))
# Compute autocorrelation using FFT (efficient for long signals)
n = len(chroma_mean)
fft = np.fft.fft(chroma_mean, n=2*n)
autocorr = np.fft.ifft(fft * np.conj(fft))[:n].real
autocorr = autocorr / autocorr[0] # Normalize
# Find peaks in valid range
autocorr[:min_loop_frames] = 0
autocorr[max_loop_frames:] = 0
best_lag_frame = np.argmax(autocorr)
confidence = autocorr[best_lag_frame]
loop_period_sec = best_lag_frame * frame_duration
print(f" Best lag: {best_lag_frame} frames "
f"= {loop_period_sec:.1f} seconds ({loop_period_sec/60:.1f} min)")
print(f" Autocorrelation confidence: {confidence:.3f}")
return loop_period_sec, confidence
def extract_segment(
video_path: str,
output_path: str,
duration_sec: float
) -> None:
"""Extract first segment of video using ffmpeg stream copy."""
cmd = [
"ffmpeg", "-y",
"-i", video_path,
"-t", str(duration_sec),
"-c", "copy", # Stream copy, no re-encoding
output_path
]
print(f" Extracting first {duration_sec:.1f} seconds "
f"({duration_sec/60:.1f} min)...")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg extraction failed: {result.stderr}")
def main():
parser = argparse.ArgumentParser(
description="Detect video loop point and extract first unique segment"
)
parser.add_argument("video", help="Input video file path")
parser.add_argument(
"-o", "--output",
help="Output video file path (default: <input>_loop1.<ext>)")
parser.add_argument(
"--min-loop", type=float, default=900,
help="Minimum loop length in seconds (default: 900 = 15 min)")
parser.add_argument(
"--max-loop", type=float, default=10800,
help="Maximum loop length in seconds (default: 10800 = 3 hr)")
parser.add_argument(
"--method", choices=["recurrence", "autocorr"],
default="autocorr",
help="Detection method: recurrence (accurate) or autocorr (fast)")
args = parser.parse_args()
video_path = args.video
if not os.path.exists(video_path):
print(f"Error: Video file not found: {video_path}", file=sys.stderr)
sys.exit(1)
# Determine output path
if args.output:
output_path = args.output
else:
video_stem = Path(video_path).stem
video_ext = Path(video_path).suffix
output_path = str(
Path(video_path).parent / f"{video_stem}_loop1{video_ext}")
print(f"Input: {video_path}")
print(f"Output: {output_path}")
print()
# Get video duration
print("[1/5] Getting video info...")
duration = get_video_duration(video_path)
print(f" Duration: {duration/3600:.2f} hours")
print()
# Extract audio to temporary file
print("[2/5] Extracting audio...")
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
audio_path = tmp.name
try:
extract_audio(video_path, audio_path)
print()
# Compute chroma features
print("[3/5] Computing audio features...")
# Use ~1 second hop for efficiency on long videos
hop_length = 22050
chroma, sr = compute_chroma_features(audio_path, hop_length=hop_length)
print()
# Find loop period
print("[4/5] Detecting loop period...")
if args.method == "recurrence":
loop_period, confidence = find_loop_period(
chroma, sr, hop_length,
min_loop_sec=args.min_loop,
max_loop_sec=args.max_loop
)
else:
loop_period, confidence = find_loop_period_autocorr(
chroma, sr, hop_length,
min_loop_sec=args.min_loop,
max_loop_sec=args.max_loop
)
print()
if loop_period < args.min_loop:
print(f"Warning: Detected loop period ({loop_period:.0f}s) "
f"is below minimum ({args.min_loop:.0f}s)")
print("The video may not be a simple loop, "
"or parameters need adjustment.")
sys.exit(1)
# Extract first segment
print("[5/5] Extracting first loop segment...")
extract_segment(video_path, output_path, loop_period)
print()
# Summary
print("=" * 50)
print("Done!")
print(f" Detected loop period: {loop_period:.1f} seconds "
f"({loop_period/60:.1f} min)")
print(f" Confidence: {confidence:.3f}")
print(f" Original duration: {duration:.1f} seconds "
f"({duration/60:.1f} min)")
print(f" Estimated repetitions: {duration/loop_period:.1f}x")
print(f" Output: {output_path}")
finally:
# Cleanup
if os.path.exists(audio_path):
os.unlink(audio_path)
if __name__ == "__main__":
main()