Skip to content
Merged
102 changes: 102 additions & 0 deletions operators/vid_vec_rep_clip/OPTIMIZATION_NOTES.md
Comment thread
aatmanvaidya marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this file to the benchmark folder?

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# Optimization Notes for `vid_vec_rep_clip`

## **Problem Statement**
The current implementation of the `vid_vec_rep_clip` operator lacks support for processing longer videos efficiently and reliably. Specifically, we wanted to investigate:
- Can the operator process longer videos (1min to 1hr) without breaking or exhausting system resources?
- Is the model itself a bottleneck, or is the limitation due to code inefficiencies?
- How does the operator perform in terms of CPU and memory usage for large video inputs?

## **Goals**
- Determine if the operator can process videos of varying lengths (1, 5, 10, 20, 30, 45, 60 mins).
- Profile memory and CPU usage during execution.
- Fix inefficiencies (if any) in the original implementation.
- Ensure output vector correctness post-refactor.

## Findings from Original Implementation

![alt text](image.png)
**Inconsistencies:** Memory usage for the 60-min video is unexpectedly lower than for the 30-min video, suggesting inefficient memory handling or potential leaks in intermediate steps.

## Results After Refactor

![alt text](<Screenshot 2025-05-02 at 6.12.31 AM.png>)
_Note: Longer videos show memory increase due to more efficient baseline measurement._

## Performance Comparison

### Memory-Optimized vs Original Implementation
![alt text](<Screenshot 2025-05-02 at 6.12.44 AM.png>)

### Memory Optimization Highlights:
- 81% reduction for 30-minute videos (1917MB → 365MB)

- 73.7% savings for 10-minute videos (1858MB → 488MB)

- More stable memory profile across all durations.

### Processing Tradeoffs:
- 37-121% longer processing for videos ≤30 minutes.

- 14-22% faster for very long videos (>45 minutes)

- More accurate performance measurements

## Key Improvements

### Efficient I-Frame Sampling
- Switched to extracting only I-frames using `ffmpeg`, reducing unnecessary frame processing and improving memory efficiency.

### Built-in Memory Profiling:
- Integrated `psutil` and `tracemalloc` to monitor memory usage before and after processing.

- Reports net memory change, helping diagnose scaling issues.

### Scalable to Long Videos:
- Successfully tested on videos up to 1 hour, showing stable memory growth.

- Reports net memory change, helping diagnose scaling issues.

### Enhanced Test Coverage:
- Includes test cases for:
- Local long videos (e.g., 1 hr)
- Sample short videos
- Remote video URLs

## Summary of Changes

This PR introduces the following enhancements to the `vid_vec_rep_clip` operator:

1. #### I-Frame Sampling Strategy:
Instead of decoding every frame or relying on precomputed metadata, the updated operator uses `ffmpeg` to extract only **I-frames** for vector representation. This reduces redundancy and improves scalability.

2. #### Streaming Feature Extraction:
Frames are now loaded and processed in a streaming manner (one at a time) using temporary storage, preventing memory bloat.

3. #### Detailed Profiling Added to Tests:

The `unittest` suite has been enhanced to capture:
- Memory usage before/after processing
- Net memory consumption
- CPU time and usage
- Peak memory (from `tracemalloc`)
- Total I-frames and vectors generated

4. #### Average Vector Addition:
The final output includes a mean vector of all I-frame features, maintaining consistency with prior behavior.


## 📌 Limitations
- **I-Frame Distribution:** The number of I-frames is determined by video encoding, so a shorter video could occasionally have more I-frames than a longer one. This is expected and valid behavior.

- **Processing Time:** The new implementation may take slightly longer for short videos due to I-frame extraction overhead, but this tradeoff is acceptable given the lower memory usage and improved scalability.

- **No Parallelism Yet:** Current implementation processes frames sequentially. There’s room for future speedup via batching or multithreading.

## Checklist

- ✅ Code handles long videos (1 min to 1 hour)
- ✅ Memory and CPU profiling included
- ✅ Documented tradeoffs (time vs. memory)
- ✅ Old and new results clearly documented
- ✅ Known limitations acknowledged

50 changes: 46 additions & 4 deletions operators/vid_vec_rep_clip/test.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import unittest
from unittest.case import skip

import os
import tracemalloc
import psutil
from feluda.models.media_factory import VideoFactory
from operators.vid_vec_rep_clip import vid_vec_rep_clip


import time
from pathlib import Path
class Test(unittest.TestCase):
@classmethod
def setUpClass(cls):
Expand All @@ -20,16 +22,56 @@ def tearDownClass(cls):
@skip
def test_sample_video_from_disk(self):
video_path = VideoFactory.make_from_file_on_disk(
r"core/operators/sample_data/sample-cat-video.mp4"
r"core/operators/sample_data/my_10min_video.mp4"
)
result = vid_vec_rep_clip.run(video_path)
for vec in result:
self.assertEqual(len(vec.get("vid_vec")), 512)

def test_large_video_profile(self):

rel_path = "5_min_video.mp4" # just the file name
abs_path = os.path.abspath(rel_path)
video_path = VideoFactory.make_from_file_on_disk(abs_path)

# Start CPU tracking (using psutil)
process = psutil.Process(os.getpid())
mem_before = process.memory_info().rss / 1024 / 1024 # in MB
cpu_times_before = process.cpu_times()
start = time.time()

result = vid_vec_rep_clip.run(video_path)
end_time = time.time()
cpu_end = process.cpu_times()

# Stop memory tracking and get the current and peak memory usage
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()


count = 0
for vec in result:
self.assertEqual(len(vec.get("vid_vec")), 512)
count += 1

end = time.time()
mem_after = process.memory_info().rss / 1024 / 1024
cpu_times_after = process.cpu_times()

print(f"Total I-frame vectors: {count - 1}")
print(f"Average vector included: True")
print(f"Total vectors generated (incl. avg): {count}")
print(f"Memory before processing: {mem_before:.2f} MB")
print(f"Memory after processing: {mem_after:.2f} MB")
print(f"Net memory change (test-side): {mem_after - mem_before:+.2f} MB")
print(f"CPU time used (user + system): {(cpu_times_after.user + cpu_times_after.system) - (cpu_times_before.user + cpu_times_before.system):.2f} seconds")
print(f"Processing time: {end - start:.2f} seconds")

# @skip
Comment thread
aatmanvaidya marked this conversation as resolved.
def test_sample_video_from_url(self):
video_url = "https://tattle-media.s3.amazonaws.com/test-data/tattle-search/cat_vid_2mb.mp4"
video_path = VideoFactory.make_from_url(video_url)
result = vid_vec_rep_clip.run(video_path)
for vec in result:
self.assertEqual(len(vec.get("vid_vec")), 512)

118 changes: 56 additions & 62 deletions operators/vid_vec_rep_clip/vid_vec_rep_clip.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""
Operator to extract video vector representations using CLIP-ViT-B-32.
"""


import psutil
import time
def initialize(param):
"""
Initializes the operator.
Expand All @@ -18,13 +18,12 @@ def initialize(param):
import os
import subprocess
import tempfile

import torch
from PIL import Image
from transformers import AutoProcessor, CLIPModel

# Load the model and processor
processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32")
processor = AutoProcessor.from_pretrained("openai/clip-vit-base-patch32", use_fast=True)
Comment thread
aatmanvaidya marked this conversation as resolved.
Outdated
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")

# Set the device
Expand Down Expand Up @@ -60,16 +59,20 @@ class VideoAnalyzer:
A class for video feature extraction.
"""

def __init__(self, fname):
def __init__(self, fname, frame_sample_rate=1):
Comment thread
aatmanvaidya marked this conversation as resolved.
Outdated
"""
Constructor for the `VideoAnalyzer` class.

Args:
fname (str): Path to the video file
frame_sample_rate (int): Sample every Nth I-frame
"""
self.model = model
self.device = device
self.frame_images = []
self.processor = processor
self.frame_sample_rate = frame_sample_rate

self.fname = fname
self.feature_matrix = []
self.analyze(fname)

Expand All @@ -90,81 +93,72 @@ def analyze(self, fname):
Raises:
FileNotFoundError: If the file is not found
"""
# check if file exists
if not os.path.exists(fname):
raise FileNotFoundError(f"File not found: {fname}")

# Extract I-frames and features
self.frame_images = self.extract_frames(fname)
self.feature_matrix = self.extract_features(self.frame_images)
print(f"Analyzing video: {fname}")

def extract_frames(self, fname):
"""
Extracts I-frames from the video file using `ffmpeg`.
process = psutil.Process(os.getpid())
cpu_before = process.cpu_percent(interval=1.0)
mem_before = process.memory_info().rss / (1024 * 1024)

Args:
fname (str): Path to the video file
start_time = time.time()
self.feature_matrix = self.extract_features_streaming(fname)
end_time = time.time()

Returns:
list: List of PIL Images
"""
mem_after = process.memory_info().rss / (1024 * 1024)
cpu_after = process.cpu_percent(interval=1.0)

print(f"Total vectors generated: {len(self.feature_matrix)}")
print(f"Peak memory usage: {mem_after - mem_before:.2f} MB")
print(f"CPU usage before processing: {cpu_before:.1f}%")
print(f"CPU usage after processing: {cpu_after:.1f}%")
print(f"Processing time: {end_time - start_time:.2f} seconds")

def extract_features_streaming(self, fname):
feature_list = []
Comment thread
aatmanvaidya marked this conversation as resolved.
Outdated
with tempfile.TemporaryDirectory() as temp_dir:
# Command to extract I-frames using ffmpeg's command line tool
cmd = f"""
ffmpeg -i "{fname}" -vf "select=eq(pict_type\,I)" -vsync vfr "{temp_dir}/frame_%05d.jpg"
ffmpeg -i \"{fname}\" -vf \"select=eq(pict_type\\,I)\" -vsync vfr \"{temp_dir}/frame_%05d.jpg\"
Comment thread
aatmanvaidya marked this conversation as resolved.
Outdated
"""
with subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
) as process:
process.wait()
frames = []
for filename in os.listdir(temp_dir):
if filename.endswith((".jpg")):
image_path = os.path.join(temp_dir, filename)
with Image.open(image_path) as img:
frames.append(img.copy())
return frames

def extract_features(self, images):
"""
Extracts features from a list of images using pre-trained CLIP-ViT-B-32.

Args:
images (list): List of PIL Images

Returns:
torch.Tensor: Feature matrix of shape (batch, 512)
"""
inputs = processor(
images=images, return_tensors="pt", padding=True, truncation=True
)
inputs = {k: v.to(self.device) for k, v in inputs.items()} # move to device
print("Extracting I-frames with ffmpeg...")
subprocess.run(cmd, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

filenames = sorted([f for f in os.listdir(temp_dir) if f.endswith(".jpg")])
print(f"Total I-frames found: {len(filenames)}")
print(f"Sampling every {self.frame_sample_rate}th frame")

for i, filename in enumerate(filenames):
if i % self.frame_sample_rate != 0:
continue
image_path = os.path.join(temp_dir, filename)
with Image.open(image_path) as img:
img = img.convert("RGB")
feature = self.extract_single_feature(img)
feature_list.append(feature.cpu())

return torch.stack(feature_list)

def extract_single_feature(self, img):
inputs = self.processor(images=img, return_tensors="pt")
inputs = {k: v.to(self.device) for k, v in inputs.items()}
with torch.no_grad():
features = self.model.get_image_features(**inputs)
return features

feature = self.model.get_image_features(**inputs)
return feature.squeeze(0)

def run(file):
"""
Runs the operator.

Args:
file (dict): `VideoFactory` file object

Returns:
generator: Yields video and I-frame vector representations
"""
fname = file["path"]

try:
vid_analyzer = VideoAnalyzer(fname)
vid_analyzer = VideoAnalyzer(fname, frame_sample_rate=1) # Now processing all I-frames
return gendata(vid_analyzer)
finally:
os.remove(fname)

if file.get("is_temp", False) and os.path.exists(fname):
os.remove(fname)

def cleanup(param):
pass


def state():
pass