341 lines
10 KiB
Python
Executable File
341 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Video Loop Detector and Extractor
|
|
|
|
Detects where a looped video starts repeating and extracts only the first
|
|
unique segment using audio-based analysis with librosa.
|
|
|
|
Usage:
|
|
python loop_extractor.py "video.webm"
|
|
# Output: video_loop1.webm
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import librosa
|
|
|
|
|
|
def get_video_duration(video_path: str) -> float:
|
|
"""Get video duration in seconds using ffprobe."""
|
|
cmd = [
|
|
"ffprobe", "-v", "quiet",
|
|
"-print_format", "json",
|
|
"-show_format",
|
|
video_path
|
|
]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ffprobe failed: {result.stderr}")
|
|
|
|
data = json.loads(result.stdout)
|
|
return float(data["format"]["duration"])
|
|
|
|
|
|
def extract_audio(
|
|
video_path: str, output_path: str, sample_rate: int = 22050
|
|
) -> None:
|
|
"""Extract audio from video to WAV file using ffmpeg."""
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", video_path,
|
|
"-vn", # No video
|
|
"-ac", "1", # Mono
|
|
"-ar", str(sample_rate),
|
|
"-f", "wav",
|
|
output_path
|
|
]
|
|
print(f" Extracting audio at {sample_rate}Hz mono...")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ffmpeg audio extraction failed: {result.stderr}")
|
|
|
|
|
|
def compute_chroma_features(
|
|
audio_path: str, hop_length: int = 22050
|
|
) -> tuple[np.ndarray, int]:
|
|
"""
|
|
Compute chroma features from audio file.
|
|
|
|
Args:
|
|
audio_path: Path to audio file
|
|
hop_length: Samples between frames (~1 second at 22050Hz)
|
|
|
|
Returns:
|
|
Tuple of (chroma features array, sample rate)
|
|
"""
|
|
print(" Loading audio...")
|
|
y, sr = librosa.load(audio_path, sr=22050, mono=True)
|
|
|
|
duration_sec = len(y) / sr
|
|
print(f" Audio duration: {duration_sec/3600:.2f} hours "
|
|
f"({duration_sec:.0f} seconds)")
|
|
|
|
print(f" Computing chroma features (hop={hop_length} samples, "
|
|
f"~{hop_length/sr:.1f}s)...")
|
|
chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length)
|
|
|
|
print(f" Chroma shape: {chroma.shape} "
|
|
f"(12 pitch classes x {chroma.shape[1]} frames)")
|
|
|
|
return chroma, sr
|
|
|
|
|
|
def find_loop_period(
|
|
chroma: np.ndarray,
|
|
sr: int,
|
|
hop_length: int,
|
|
min_loop_sec: float = 900, # 15 minutes
|
|
max_loop_sec: float = 10800 # 3 hours
|
|
) -> tuple[float, float]:
|
|
"""
|
|
Find the loop period using recurrence matrix and lag analysis.
|
|
|
|
Args:
|
|
chroma: Chroma feature matrix (12 x n_frames)
|
|
sr: Sample rate
|
|
hop_length: Hop length used for chroma
|
|
min_loop_sec: Minimum loop length in seconds
|
|
max_loop_sec: Maximum loop length in seconds
|
|
|
|
Returns:
|
|
Tuple of (loop_period_seconds, confidence_score)
|
|
"""
|
|
n_frames = chroma.shape[1]
|
|
frame_duration = hop_length / sr
|
|
|
|
# Convert time constraints to frame indices
|
|
min_loop_frames = int(min_loop_sec / frame_duration)
|
|
max_loop_frames = min(int(max_loop_sec / frame_duration), n_frames - 1)
|
|
|
|
print(" Building recurrence matrix...")
|
|
print(f" Looking for loops between {min_loop_sec/60:.0f} min "
|
|
f"and {max_loop_sec/3600:.1f} hr")
|
|
|
|
# Use time-delay embedding for cleaner results
|
|
chroma_stack = librosa.feature.stack_memory(chroma, n_steps=4, delay=2)
|
|
|
|
# Compute recurrence matrix with affinity mode for fuzzy matching
|
|
# Using cosine similarity which is robust to amplitude variations
|
|
rec = librosa.segment.recurrence_matrix(
|
|
chroma_stack,
|
|
mode='affinity',
|
|
metric='cosine',
|
|
sparse=False,
|
|
sym=True
|
|
)
|
|
|
|
print(f" Recurrence matrix shape: {rec.shape}")
|
|
|
|
# Convert to lag matrix - transforms diagonal patterns into horizontal bands
|
|
print(" Converting to lag matrix...")
|
|
lag = librosa.segment.recurrence_to_lag(rec)
|
|
|
|
# Sum along time axis to get lag histogram
|
|
# Strong peaks indicate dominant repetition periods
|
|
lag_histogram = np.sum(lag, axis=1)
|
|
|
|
# Only consider lags within our valid range
|
|
lag_histogram[:min_loop_frames] = 0
|
|
lag_histogram[max_loop_frames:] = 0
|
|
|
|
# Find the strongest peak
|
|
best_lag_frame = np.argmax(lag_histogram)
|
|
best_score = lag_histogram[best_lag_frame]
|
|
|
|
# Normalize score (0-1 range)
|
|
max_possible = n_frames * 1.0 # Maximum possible sum
|
|
confidence = best_score / max_possible if max_possible > 0 else 0
|
|
|
|
loop_period_sec = best_lag_frame * frame_duration
|
|
|
|
print(f" Best lag: {best_lag_frame} frames "
|
|
f"= {loop_period_sec:.1f} seconds ({loop_period_sec/60:.1f} min)")
|
|
print(f" Confidence score: {confidence:.3f}")
|
|
|
|
return loop_period_sec, confidence
|
|
|
|
|
|
def find_loop_period_autocorr(
|
|
chroma: np.ndarray,
|
|
sr: int,
|
|
hop_length: int,
|
|
min_loop_sec: float = 900,
|
|
max_loop_sec: float = 10800
|
|
) -> tuple[float, float]:
|
|
"""
|
|
Alternative: Find loop period using autocorrelation of chroma features.
|
|
This method is faster and uses less memory than the full recurrence matrix.
|
|
"""
|
|
n_frames = chroma.shape[1]
|
|
frame_duration = hop_length / sr
|
|
|
|
min_loop_frames = int(min_loop_sec / frame_duration)
|
|
max_loop_frames = min(int(max_loop_sec / frame_duration), n_frames // 2)
|
|
|
|
print(" Computing autocorrelation of chroma features...")
|
|
|
|
# Flatten chroma to 1D for autocorrelation (use mean across pitch classes)
|
|
chroma_mean = np.mean(chroma, axis=0)
|
|
|
|
# Normalize
|
|
chroma_mean = ((chroma_mean - np.mean(chroma_mean))
|
|
/ (np.std(chroma_mean) + 1e-8))
|
|
|
|
# Compute autocorrelation using FFT (efficient for long signals)
|
|
n = len(chroma_mean)
|
|
fft = np.fft.fft(chroma_mean, n=2*n)
|
|
autocorr = np.fft.ifft(fft * np.conj(fft))[:n].real
|
|
autocorr = autocorr / autocorr[0] # Normalize
|
|
|
|
# Find peaks in valid range
|
|
autocorr[:min_loop_frames] = 0
|
|
autocorr[max_loop_frames:] = 0
|
|
|
|
best_lag_frame = np.argmax(autocorr)
|
|
confidence = autocorr[best_lag_frame]
|
|
|
|
loop_period_sec = best_lag_frame * frame_duration
|
|
|
|
print(f" Best lag: {best_lag_frame} frames "
|
|
f"= {loop_period_sec:.1f} seconds ({loop_period_sec/60:.1f} min)")
|
|
print(f" Autocorrelation confidence: {confidence:.3f}")
|
|
|
|
return loop_period_sec, confidence
|
|
|
|
|
|
def extract_segment(
|
|
video_path: str,
|
|
output_path: str,
|
|
duration_sec: float
|
|
) -> None:
|
|
"""Extract first segment of video using ffmpeg stream copy."""
|
|
cmd = [
|
|
"ffmpeg", "-y",
|
|
"-i", video_path,
|
|
"-t", str(duration_sec),
|
|
"-c", "copy", # Stream copy, no re-encoding
|
|
output_path
|
|
]
|
|
print(f" Extracting first {duration_sec:.1f} seconds "
|
|
f"({duration_sec/60:.1f} min)...")
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ffmpeg extraction failed: {result.stderr}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Detect video loop point and extract first unique segment"
|
|
)
|
|
parser.add_argument("video", help="Input video file path")
|
|
parser.add_argument(
|
|
"-o", "--output",
|
|
help="Output video file path (default: <input>_loop1.<ext>)")
|
|
parser.add_argument(
|
|
"--min-loop", type=float, default=900,
|
|
help="Minimum loop length in seconds (default: 900 = 15 min)")
|
|
parser.add_argument(
|
|
"--max-loop", type=float, default=10800,
|
|
help="Maximum loop length in seconds (default: 10800 = 3 hr)")
|
|
parser.add_argument(
|
|
"--method", choices=["recurrence", "autocorr"],
|
|
default="autocorr",
|
|
help="Detection method: recurrence (accurate) or autocorr (fast)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
video_path = args.video
|
|
if not os.path.exists(video_path):
|
|
print(f"Error: Video file not found: {video_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Determine output path
|
|
if args.output:
|
|
output_path = args.output
|
|
else:
|
|
video_stem = Path(video_path).stem
|
|
video_ext = Path(video_path).suffix
|
|
output_path = str(
|
|
Path(video_path).parent / f"{video_stem}_loop1{video_ext}")
|
|
|
|
print(f"Input: {video_path}")
|
|
print(f"Output: {output_path}")
|
|
print()
|
|
|
|
# Get video duration
|
|
print("[1/5] Getting video info...")
|
|
duration = get_video_duration(video_path)
|
|
print(f" Duration: {duration/3600:.2f} hours")
|
|
print()
|
|
|
|
# Extract audio to temporary file
|
|
print("[2/5] Extracting audio...")
|
|
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
|
|
audio_path = tmp.name
|
|
|
|
try:
|
|
extract_audio(video_path, audio_path)
|
|
print()
|
|
|
|
# Compute chroma features
|
|
print("[3/5] Computing audio features...")
|
|
# Use ~1 second hop for efficiency on long videos
|
|
hop_length = 22050
|
|
chroma, sr = compute_chroma_features(audio_path, hop_length=hop_length)
|
|
print()
|
|
|
|
# Find loop period
|
|
print("[4/5] Detecting loop period...")
|
|
if args.method == "recurrence":
|
|
loop_period, confidence = find_loop_period(
|
|
chroma, sr, hop_length,
|
|
min_loop_sec=args.min_loop,
|
|
max_loop_sec=args.max_loop
|
|
)
|
|
else:
|
|
loop_period, confidence = find_loop_period_autocorr(
|
|
chroma, sr, hop_length,
|
|
min_loop_sec=args.min_loop,
|
|
max_loop_sec=args.max_loop
|
|
)
|
|
print()
|
|
|
|
if loop_period < args.min_loop:
|
|
print(f"Warning: Detected loop period ({loop_period:.0f}s) "
|
|
f"is below minimum ({args.min_loop:.0f}s)")
|
|
print("The video may not be a simple loop, "
|
|
"or parameters need adjustment.")
|
|
sys.exit(1)
|
|
|
|
# Extract first segment
|
|
print("[5/5] Extracting first loop segment...")
|
|
extract_segment(video_path, output_path, loop_period)
|
|
print()
|
|
|
|
# Summary
|
|
print("=" * 50)
|
|
print("Done!")
|
|
print(f" Detected loop period: {loop_period:.1f} seconds "
|
|
f"({loop_period/60:.1f} min)")
|
|
print(f" Confidence: {confidence:.3f}")
|
|
print(f" Original duration: {duration:.1f} seconds "
|
|
f"({duration/60:.1f} min)")
|
|
print(f" Estimated repetitions: {duration/loop_period:.1f}x")
|
|
print(f" Output: {output_path}")
|
|
|
|
finally:
|
|
# Cleanup
|
|
if os.path.exists(audio_path):
|
|
os.unlink(audio_path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|