diff --git a/tests/feluda_integration_tests/__init__.py b/tests/feluda_integration_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/feluda_integration_tests/test_01_feluda_and_image_vec_rep_resnet.py b/tests/feluda_integration_tests/test_01_feluda_and_image_vec_rep_resnet.py deleted file mode 100644 index 1d85886b..00000000 --- a/tests/feluda_integration_tests/test_01_feluda_and_image_vec_rep_resnet.py +++ /dev/null @@ -1,123 +0,0 @@ -import contextlib -import tempfile -import unittest -from pathlib import Path -from unittest.mock import patch - -import numpy as np -import yaml -from requests.exceptions import ConnectTimeout - -from feluda import Feluda -from feluda.factory import ImageFactory - - -class TestFeludaImageVectorIntegration(unittest.TestCase): - @classmethod - def setUpClass(cls): - """Create a temporary test configuration file that will be used for all tests.""" - cls.config = { - "operators": { - "label": "Operators", - "parameters": [ - { - "name": "image vectors", - "type": "image_vec_rep_resnet", - "parameters": {"index_name": "image"}, - } - ], - } - } - - # Create temporary config file using with statement to ensure proper resource cleanup - fd, cls.config_path = tempfile.mkstemp(suffix=".yml") - with open(fd, "w") as f: - yaml.dump(cls.config, f) - - # Initialize Feluda - cls.feluda = Feluda(cls.config_path) - cls.feluda.setup() - - cls.test_image_url = "https://tattle-media.s3.amazonaws.com/test-data/tattle-search/text-in-image-test-hindi.png" - cls.expected_vector_dim = 512 - - def setUp(self): - """Set up test-specific feluda resources.""" - # Get operator reference in each test to ensure isolation - self.operator = self.feluda.operators.get()["image_vec_rep_resnet"] - - def test_image_vector_generation(self): - """Test that image vector generation works end-to-end.""" - image_obj = ImageFactory.make_from_url(self.test_image_url) - self.assertIsNotNone(image_obj, "Image object should be successfully created") - - image_vec = self.operator.run(image_obj) - - self.assertTrue( - isinstance(image_vec, (list, np.ndarray)), - "Vector should be a list or numpy array", - ) - self.assertTrue(len(image_vec) > 0, "Vector should not be empty") - self.assertEqual( - len(image_vec), - self.expected_vector_dim, - f"Vector should have dimension {self.expected_vector_dim}", - ) - - if isinstance(image_vec, np.ndarray): - self.assertFalse(np.all(image_vec == 0), "Vector should not be all zeros") - self.assertFalse( - np.any(np.isnan(image_vec)), "Vector should not contain NaN values" - ) - - def test_invalid_image_url(self): - """Test handling of invalid image URL.""" - invalid_url = "https://nonexistent-url/image.jpg" - - for exception in [ConnectTimeout]: - with self.subTest(exception=exception.__name__): - with patch("requests.get") as mock_get: - mock_get.side_effect = exception - result = ImageFactory.make_from_url(invalid_url) - self.assertIsNone(result) - - def test_operator_configuration(self): - """Test that operator is properly configured.""" - self.assertIsNotNone(self.operator, "Operator should be properly initialized") - self.assertTrue( - hasattr(self.operator, "run"), "Operator should have 'run' method" - ) - - @contextlib.contextmanager - def assertNoException(self, msg=None): - """Context manager to verify no exception is raised.""" - try: - yield - except Exception as e: - self.fail(f"{msg or 'Exception was raised'}: {e}") - - def test_image_vector_consistency(self): - """Test that generating vectors twice from the same image gives consistent results.""" - image_obj = ImageFactory.make_from_url(self.test_image_url) - - with self.assertNoException( - "First vector generation should not raise exceptions" - ): - vec1 = self.operator.run(image_obj) - - with self.assertNoException( - "Second vector generation should not raise exceptions" - ): - vec2 = self.operator.run(image_obj) - - np.testing.assert_array_equal( - vec1, vec2, "Vectors should be identical for the same image" - ) - - @classmethod - def tearDownClass(cls): - """Clean up temporary files after all tests are done.""" - try: - Path(cls.config_path).unlink(missing_ok=True) - except Exception as e: - print(f"Warning: Failed to delete temporary file: {e}") diff --git a/tests/feluda_integration_tests/test_02_feluda_and_vid_vec_rep_clip.py b/tests/feluda_integration_tests/test_02_feluda_and_vid_vec_rep_clip.py deleted file mode 100644 index fd2d45b5..00000000 --- a/tests/feluda_integration_tests/test_02_feluda_and_vid_vec_rep_clip.py +++ /dev/null @@ -1,107 +0,0 @@ -import os -import tempfile -import unittest -from pathlib import Path - -import numpy as np -import yaml - -from feluda import Feluda -from feluda.factory import VideoFactory - - -class TestFeludaVideoVectorIntegration(unittest.TestCase): - config_path = None - feluda = None - - @classmethod - def setUpClass(cls): - cls.config = { - "operators": { - "label": "Operators", - "parameters": [ - { - "name": "video vector", - "type": "vid_vec_rep_clip", - "parameters": {"index_name": "video"}, - } - ], - } - } - - fd, cls.config_path = tempfile.mkstemp(suffix=".yml", text=True) - with open(fd, "w") as f: - yaml.dump(cls.config, f) - - try: - cls.feluda = Feluda(cls.config_path) - cls.feluda.setup() - except Exception as e: - if cls.config_path and Path(cls.config_path).exists(): - Path(cls.config_path).unlink() - raise RuntimeError(f"Feluda setup failed: {e}") from e - - cls.test_video_url = "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" - cls.expected_vector_dim = 512 - - def setUp(self): - self.operator = self.feluda.operators.get()["vid_vec_rep_clip"] - if not self.operator: - self.fail("Failed to get operator 'vid_vec_rep_clip' from Feluda instance") - - def test_video_vector_generation(self): - video_object = VideoFactory.make_from_url(self.test_video_url) - downloaded_path = video_object.get("path") - self.assertIsNotNone(downloaded_path, "VideoFactory did not return a path") - self.assertTrue( - Path(downloaded_path).exists(), - f"Downloaded file not found at {downloaded_path}", - ) - - vector_generator = self.operator.run(video_object) - - try: - first_output_item = next(vector_generator) - except Exception as e: - if Path(downloaded_path).exists(): - os.remove(downloaded_path) - self.fail(f"Calling next on generator raised an unexpected error: {e}") - - self.assertIsInstance( - first_output_item, dict, "Operator did not yield a dictionary" - ) - self.assertIn( - "vid_vec", first_output_item, "Yielded dictionary missing 'vid_vec' key" - ) - self.assertIn( - "is_avg", first_output_item, "Yielded dictionary missing 'is_avg' key" - ) - - actual_vector = first_output_item["vid_vec"] - is_average = first_output_item["is_avg"] - - self.assertTrue(is_average, "First yielded vector should have is_avg=True") - - self.assertIsInstance( - actual_vector, list, "Vector ('vid_vec') should be a list" - ) - self.assertTrue(len(actual_vector) > 0, "Vector should not be empty") - self.assertEqual( - len(actual_vector), - self.expected_vector_dim, - f"Vector should have dimension {self.expected_vector_dim}", - ) - - vector_np = np.array(actual_vector) - self.assertFalse(np.all(vector_np == 0), "Vector should not be all zeros") - self.assertFalse( - np.any(np.isnan(vector_np)), "Vector should not contain NaN values" - ) - - @classmethod - def tearDownClass(cls): - """Clean up temporary files after all tests are done.""" - try: - Path(cls.config_path).unlink(missing_ok=True) - except Exception as e: - print(f"Warning: Failed to delete temporary file: {e}") diff --git a/tests/feluda_integration_tests/test_03_feluda_and_cluster_embeddings.py b/tests/feluda_integration_tests/test_03_feluda_and_cluster_embeddings.py deleted file mode 100644 index 0794b369..00000000 --- a/tests/feluda_integration_tests/test_03_feluda_and_cluster_embeddings.py +++ /dev/null @@ -1,71 +0,0 @@ -import os -import tempfile -import unittest - -import yaml - -from feluda import Feluda -from feluda.factory import AudioFactory - - -class TestFeludaClusterEmbeddingsIntegration(unittest.TestCase): - @classmethod - def setUpClass(cls): - """Set up the test environment.""" - cls.config = { - "operators": { - "label": "Operators", - "parameters": [ - { - "name": "Cluster Embeddings", - "type": "cluster_embeddings", - "parameters": {"index_name": "audio"}, - } - ], - } - } - - # Create a temporary configuration file - fd, cls.config_path = tempfile.mkstemp(suffix=".yml") - with open(fd, "w") as f: - yaml.dump(cls.config, f) - - # Initialize Feluda - cls.feluda = Feluda(cls.config_path) - cls.feluda.setup() - - cls.test_audio_url = "https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav" - - def test_cluster_embeddings(self): - """Test the cluster_embeddings operator.""" - audio_obj = AudioFactory.make_from_url(self.test_audio_url) - self.assertIsNotNone(audio_obj, "Audio object should be successfully created") - - print(f"Audio object: {audio_obj}") - - # Generate mock embeddings and payloads for testing - embedding_1 = [0.1, 0.2, 0.3] # Mock embedding for sample 1 - embedding_2 = [0.4, 0.5, 0.6] # Mock embedding for sample 2 - payload_1 = {"path": audio_obj["path"]} - payload_2 = {"path": audio_obj["path"]} - - # Prepare input_data with at least 2 samples - input_data = [ - {"embedding": embedding_1, "payload": payload_1}, - {"embedding": embedding_2, "payload": payload_2}, - ] - - operator = self.feluda.operators.get()["cluster_embeddings"] - result = operator.run(input_data=input_data, n_clusters=2, modality="audio") - - self.assertIn("cluster_0", result) - self.assertIn("cluster_1", result) - self.assertEqual(len(result), 2) - - @classmethod - def tearDownClass(cls): - """Clean up temporary files.""" - try: - os.remove(cls.config_path) - except Exception as e: - print(f"Warning: Failed to delete temporary file: {e}") diff --git a/tests/feluda_integration_tests/test_04_feluda_and_dimension_reduction.py b/tests/feluda_integration_tests/test_04_feluda_and_dimension_reduction.py deleted file mode 100644 index fc4896d6..00000000 --- a/tests/feluda_integration_tests/test_04_feluda_and_dimension_reduction.py +++ /dev/null @@ -1,306 +0,0 @@ -import contextlib -import math -import os -import tempfile -import unittest -from pathlib import Path - -import numpy as np -import yaml - -from feluda import Feluda - - -class TestFeludaDimensionReductionIntegration(unittest.TestCase): - @classmethod - def setUpClass(cls): - """Create a temporary test configuration file that will be used for all tests.""" - cls.config = { - "operators": { - "label": "ReductionOperators", - "parameters": [ - { - "name": "tsne reduction", - "type": "dimension_reduction", - "parameters": { - "model_type": "tsne", - "n_components": 2, - "perplexity": 2, - "learning_rate": 100, - "max_iter": 250, - "random_state": 123, - }, - } - ], - } - } - - fd, cls.config_path = tempfile.mkstemp(suffix=".yml") - with os.fdopen(fd, "w") as f: - yaml.dump(cls.config, f) - - cls.feluda = Feluda(cls.config_path) - cls.feluda.setup() - cls.sample_inputs = [ - {"payload": "A", "embedding": [1.0, 0.0, 0.0]}, - {"payload": "B", "embedding": [0.0, 1.0, 0.0]}, - {"payload": "C", "embedding": [0.0, 0.0, 1.0]}, - ] - cls.expected_dim = cls.config["operators"]["parameters"][0]["parameters"][ - "n_components" - ] - - def setUp(self): - """Fetch fresh operator reference for each test.""" - self.operator = self.feluda.operators.get()["dimension_reduction"] - - def test_end_to_end_reduction(self): - """Test that tsne reduction runs end-to-end and outputs correct shape and finite values.""" - # Some TSNE implementations may reject certain configurations; skip if unsupported - try: - out = self.operator.run(self.sample_inputs) - except RuntimeError as e: - self.skipTest(f"t-SNE reduction not supported: {e}") - - self.assertIsInstance(out, list) - self.assertEqual(len(out), len(self.sample_inputs)) - - for original, reduced in zip(self.sample_inputs, out): - self.assertIn("payload", reduced) - self.assertIn("reduced_embedding", reduced) - self.assertEqual(reduced["payload"], original["payload"]) - vec = reduced["reduced_embedding"] - self.assertIsInstance(vec, list) - self.assertEqual(len(vec), self.expected_dim) - for v in vec: - self.assertIsInstance(v, float) - self.assertTrue(math.isfinite(v), "reduced value should be finite") - - def test_consistency_with_fixed_seed(self): - """With a fixed random_state, repeated runs give identical results.""" - try: - out1 = self.operator.run(self.sample_inputs) - out2 = self.operator.run(self.sample_inputs) - except RuntimeError: - self.skipTest("t-SNE reduction not supported for consistency test") - - for v1, v2 in zip(out1, out2): - np.testing.assert_array_almost_equal( - np.array(v1["reduced_embedding"]), - np.array(v2["reduced_embedding"]), - decimal=6, - err_msg="Repeated reductions should be identical with fixed seed", - ) - - def test_invalid_input_empty(self): - """Test that passing an empty list raises ValueError.""" - with self.assertRaises(ValueError): - self.operator.run([]) - - def test_invalid_input_missing_keys(self): - """Test that missing 'embedding' or 'payload' keys raises KeyError.""" - with self.assertRaises(KeyError): - self.operator.run([{"payload": "only_payload"}]) - with self.assertRaises(KeyError): - self.operator.run([{"embedding": [1, 2, 3]}]) - - def test_operator_configuration(self): - """Test that operator initializes properly and has required methods.""" - self.assertIsNotNone(self.operator) - self.assertTrue(hasattr(self.operator, "run")) - self.assertTrue(hasattr(self.operator, "initialize")) - - def test_initialize_and_run_sequence(self): - """Ensure initialize + run sequence works, and parameters are bounded correctly.""" - new_inputs = [ - {"payload": "X", "embedding": [1, 2, 3]}, - {"payload": "Y", "embedding": [4, 5, 6]}, - ] - n_samples = len(new_inputs) - n_features = len(new_inputs[0]["embedding"]) if n_samples > 0 else 0 - valid_n_components = min(n_samples, n_features) - valid_perplexity = min(5, n_samples - 1) - params = { - "model_type": "tsne", - "n_components": valid_n_components, - "perplexity": valid_perplexity, - "random_state": 0, - } - - with self.assertNoException("initialize should not raise"): - self.operator.initialize(params) - with self.assertNoException("run should not raise after initialize"): - out = self.operator.run(new_inputs) - - for item in out: - self.assertEqual( - len(item["reduced_embedding"]), - valid_n_components, - "n_components should match initialized parameter", - ) - - def test_edge_case_high_components(self): - """Test that n_components > n_features or n_samples raises or is handled.""" - bad_params = { - "model_type": "tsne", - "n_components": max( - 1, len(self.sample_inputs) + len(self.sample_inputs[0]["embedding"]) - ), - "perplexity": 1, - "random_state": 0, - } - # Either initialize or run should error out - try: - self.operator.initialize(bad_params) - except (ValueError, RuntimeError): - return - with self.assertRaises((ValueError, RuntimeError)): - self.operator.run(self.sample_inputs) - - def test_unsupported_model_type(self): - """Test that unsupported model types raise ValueError.""" - with self.assertRaises(ValueError): - params = {"model_type": "unsupported_algorithm"} - self.operator.initialize(params) - - def test_different_tsne_parameters(self): - """Test that different TSNE parameters are correctly applied.""" - # Initialize with exact method instead of barnes_hut - params = { - "model_type": "tsne", - "n_components": 2, - "perplexity": 1, # Keep low for small test dataset - "method": "exact", # Different method - "random_state": 42, - "max_iter": 500, # Different max iterations - } - - with self.assertNoException("initialize with custom params should not raise"): - self.operator.initialize(params) - - try: - result = self.operator.run(self.sample_inputs) - self.assertEqual(len(result), len(self.sample_inputs)) - self.assertEqual(len(result[0]["reduced_embedding"]), 2) - except RuntimeError as e: - self.skipTest(f"t-SNE with custom params not supported: {e}") - - def test_different_input_dimensions(self): - """Test reduction with different input dimensions.""" - # 5D input - inputs_5d = [ - {"payload": "A", "embedding": [1.0, 2.0, 3.0, 4.0, 5.0]}, - {"payload": "B", "embedding": [5.0, 4.0, 3.0, 2.0, 1.0]}, - {"payload": "C", "embedding": [1.0, 1.0, 1.0, 1.0, 1.0]}, - ] - - params = { - "model_type": "tsne", - "n_components": 2, # Reduce to 2D - "perplexity": 1, # Valid perplexity for 3 samples - "random_state": 42, - } - - with self.assertNoException("initialize for 5D input should not raise"): - self.operator.initialize(params) - - try: - result = self.operator.run(inputs_5d) - self.assertEqual(len(result), len(inputs_5d)) - self.assertEqual(len(result[0]["reduced_embedding"]), 2) - except RuntimeError as e: - self.skipTest(f"t-SNE reduction for 5D input not supported: {e}") - - def test_invalid_embeddings_dimension(self): - """Test that mismatched embedding dimensions raise an error.""" - # Create an input with mismatched embedding dimensions - bad_inputs = [ - {"payload": "A", "embedding": [1.0, 2.0]}, - {"payload": "B", "embedding": [1.0, 2.0, 3.0]}, # Different dimension - ] - - with self.assertRaises((ValueError, RuntimeError)): - self.operator.run(bad_inputs) - - def test_direct_gen_data_function(self): - """Test the gen_data utility function directly.""" - # We need to import the module to test internal function - try: - from operators.dimension_reduction import dimension_reduction - - payloads = ["A", "B"] - embeddings = np.array([[1.0, 2.0], [3.0, 4.0]]) - - result = dimension_reduction.gen_data(payloads, embeddings) - - self.assertEqual(len(result), 2) - self.assertEqual(result[0]["payload"], "A") - self.assertEqual(result[1]["payload"], "B") - self.assertEqual(result[0]["reduced_embedding"], [1.0, 2.0]) - self.assertEqual(result[1]["reduced_embedding"], [3.0, 4.0]) - except ImportError: - self.skipTest("Could not import dimension_reduction module directly") - - def test_multiple_initializations(self): - """Test that multiple initializations work correctly.""" - params1 = { - "model_type": "tsne", - "n_components": 2, - "perplexity": 1, - "random_state": 42, - } - - params2 = { - "model_type": "tsne", - "n_components": 2, - "perplexity": 1, - "random_state": 43, # Different seed - } - - # First initialization - self.operator.initialize(params1) - - try: - result1 = self.operator.run(self.sample_inputs) - except RuntimeError: - self.skipTest("t-SNE not supported for first initialization") - - # Second initialization with different parameters - self.operator.initialize(params2) - - try: - result2 = self.operator.run(self.sample_inputs) - except RuntimeError: - self.skipTest("t-SNE not supported for second initialization") - - # Results should be different with different random seeds - any_different = False - for v1, v2 in zip(result1, result2): - try: - np.testing.assert_array_almost_equal( - np.array(v1["reduced_embedding"]), np.array(v2["reduced_embedding"]) - ) - except AssertionError: - any_different = True - break - - # With different random seeds, results should differ - self.assertTrue( - any_different, "Different random seeds should produce different results" - ) - - @contextlib.contextmanager - def assertNoException(self, msg=None): - """Context manager to verify no exception is raised.""" - try: - yield - except Exception as e: - self.fail(f"{msg or 'Exception was raised'}: {e}") - - @classmethod - def tearDownClass(cls): - """Clean up temporary file.""" - try: - Path(cls.config_path).unlink(missing_ok=True) - except Exception as e: - print(f"Warning: could not remove temp config: {e}") diff --git a/tests/feluda_integration_tests/test_05_feluda_and_classify_video_zero_shot.py b/tests/feluda_integration_tests/test_05_feluda_and_classify_video_zero_shot.py deleted file mode 100644 index c298a0bc..00000000 --- a/tests/feluda_integration_tests/test_05_feluda_and_classify_video_zero_shot.py +++ /dev/null @@ -1,237 +0,0 @@ -import tempfile -import unittest - -import numpy as np -import yaml - -from feluda import Feluda -from feluda.factory import VideoFactory - - -class TestFeludaClassifyVideoZeroShotIntegration(unittest.TestCase): - """ - Integration test for Feluda and classify-video-zero-shot operator. - This test validates the integration between: - - feluda (core) - - feluda-classify-video-zero-shot - """ - - @classmethod - def setUpClass(cls): - """Create a temporary test configuration file that will be used for all tests.""" - # Setup configuration with the video zero-shot classifier operator - cls.config = { - "operators": { - "label": "Operators", - "parameters": [ - { - "name": "video classifier", - "type": "classify_video_zero_shot", - "parameters": {}, - } - ], - } - } - - fd, cls.config_path = tempfile.mkstemp(suffix=".yml") - with open(fd, "w") as f: - yaml.dump(cls.config, f) - - cls.feluda = Feluda(cls.config_path) - cls.feluda.setup() - - cls.test_video_url = "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" - - cls.sample_labels = ["cat", "dog", "car", "building", "people"] - - def setUp(self): - """Setup before each test method.""" - self.operator = self.feluda.operators.get()["classify_video_zero_shot"] - - def test_operator_availability(self): - """Test that the classify_video_zero_shot operator is available.""" - self.assertIsNotNone( - self.operator, "Operator should be available in the system" - ) - self.assertTrue( - hasattr(self.operator, "run"), "Operator should have a 'run' method" - ) - - def test_video_object_generation(self): - """Test that video object generation works end-to-end.""" - video_obj = VideoFactory.make_from_url(self.test_video_url) - - self.assertIsNotNone(video_obj, "Video object should not be None") - self.assertIn("path", video_obj, "Video object should have a path attribute") - self.assertTrue(video_obj["path"], "Video path should not be empty") - - def test_video_classification_structure(self): - """Test video classification structure using zero-shot classifier.""" - video_obj = VideoFactory.make_from_url(self.test_video_url) - result = self.operator.run(video_obj, self.sample_labels) - - # Verify result structure - self.assertIsNotNone(result, "Classification result should not be None") - self.assertTrue(isinstance(result, dict), "Result should be a dictionary") - - # Check that result contains the expected keys - self.assertIn("prediction", result, "Result should contain a 'prediction' key") - self.assertIn("probs", result, "Result should contain a 'probs' key") - - # Check prediction is a string and one of our labels - self.assertTrue( - isinstance(result["prediction"], str), "Prediction should be a string" - ) - self.assertIn( - result["prediction"], - self.sample_labels, - "Prediction should be one of the provided labels", - ) - - # Check probs is a list with the correct length - self.assertTrue(isinstance(result["probs"], list), "Probs should be a list") - self.assertEqual( - len(result["probs"]), - len(self.sample_labels), - "Length of probs should match number of labels", - ) - - # Check probabilities sum close to 1 (allowing for small floating point errors) - self.assertAlmostEqual( - sum(result["probs"]), - 1.0, - places=len(self.sample_labels), - msg="Probabilities should sum to approximately 1", - ) - # Check each probability is a float between 0 and 1 - for prob in result["probs"]: - self.assertTrue( - isinstance(prob, float), "Each probability should be a float" - ) - self.assertTrue( - 0 <= prob <= 1, "Each probability should be between 0 and 1" - ) - - def test_video_classification_results(self): - """Test video classification results using zero-shot classifier.""" - # Create video object - video_obj = VideoFactory.make_from_url(self.test_video_url) - - # Perform classification - result = self.operator.run(video_obj, self.sample_labels) - - # For a cat video, "cat" should be the highest probability label - self.assertEqual( - result["prediction"], "cat", "Prediction should be 'cat' for a cat video" - ) - - # The first probability (for "cat") should be highest - max_prob_index = np.argmax(result["probs"]) - self.assertEqual( - max_prob_index, 0, "Highest probability should be for 'cat' (first label)" - ) - - # The "cat" probability should be significantly higher than others - cat_prob = result["probs"][0] - other_probs = result["probs"][1:] - self.assertTrue( - all(cat_prob > p for p in other_probs), - "Cat probability should be higher than all other probabilities", - ) - - # The cat probability should be reasonably high (based on the sample output) - self.assertGreater(cat_prob, 0.5, "Cat probability should be at least 0.5") - - def test_result_consistency(self): - """Test that results are consistent across multiple runs.""" - # Create video object (reusing the same video) - video_obj = VideoFactory.make_from_url(self.test_video_url) - - # Run classification twice - result1 = self.operator.run(video_obj, self.sample_labels) - - # Need to create a new video object since the previous one's file was deleted - video_obj2 = VideoFactory.make_from_url(self.test_video_url) - result2 = self.operator.run(video_obj2, self.sample_labels) - - # Check that predictions are the same - self.assertEqual( - result1["prediction"], - result2["prediction"], - "Predictions should be consistent across runs", - ) - - # Check that probabilities are similar (may not be identical due to frame extraction) - for i, (p1, p2) in enumerate(zip(result1["probs"], result2["probs"])): - # Allow for some variation but should be close - self.assertAlmostEqual( - p1, - p2, - places=1, - msg=f"Probabilities for label {self.sample_labels[i]} should be similar", - ) - - def test_different_labels(self): - """Test classification with different labels.""" - # Create video object - video_obj = VideoFactory.make_from_url(self.test_video_url) - - # Different set of labels - different_labels = ["feline", "animal", "vehicle", "nature", "technology"] - - # Perform classification - result = self.operator.run(video_obj, different_labels) - - # Check basic structure - self.assertIn("prediction", result) - self.assertIn("probs", result) - self.assertEqual(len(result["probs"]), len(different_labels)) - - # For a cat video, "feline" or "animal" should be the top predictions - predicted_label = result["prediction"] - self.assertIn( - predicted_label, - ["feline", "animal"], - f"Prediction should be 'feline' or 'animal' for a cat video, got '{predicted_label}'", - ) - - # Probabilities for feline/animal should be higher than others - feline_index = different_labels.index("feline") - animal_index = different_labels.index("animal") - - feline_prob = result["probs"][feline_index] - animal_prob = result["probs"][animal_index] - - # Other probabilities - other_indices = [ - i - for i in range(len(different_labels)) - if i != feline_index and i != animal_index - ] - other_probs = [result["probs"][i] for i in other_indices] - - # Either feline or animal should have significantly higher probability - self.assertTrue( - feline_prob > max(other_probs) or animal_prob > max(other_probs), - "Either 'feline' or 'animal' should have higher probability than other labels", - ) - - def test_empty_labels_list(self): - """Test handling of empty labels list.""" - # Create video object - video_obj = VideoFactory.make_from_url(self.test_video_url) - - # Test with empty labels list - should raise ValueError - with self.assertRaises(ValueError): - self.operator.run(video_obj, []) - - def test_invalid_video_input(self): - """Test handling of invalid video input.""" - # Test with None input - with self.assertRaises(Exception): - self.operator.run(None, self.sample_labels) - - # Test with invalid video object (missing path) - invalid_video = {"type": "video", "url": "invalid"} - with self.assertRaises(Exception): - self.operator.run(invalid_video, self.sample_labels) diff --git a/tests/feluda_integration_tests/test_06_feluda_and_vid_vec_rep_clip_and_cluster_embeddings.py b/tests/feluda_integration_tests/test_06_feluda_and_vid_vec_rep_clip_and_cluster_embeddings.py deleted file mode 100644 index 6adcc391..00000000 --- a/tests/feluda_integration_tests/test_06_feluda_and_vid_vec_rep_clip_and_cluster_embeddings.py +++ /dev/null @@ -1,403 +0,0 @@ -import contextlib -import tempfile -import unittest -from pathlib import Path - -import numpy as np -import yaml - -from feluda import Feluda -from feluda.factory import VideoFactory - - -class TestFeludaMultiOperatorIntegration(unittest.TestCase): - """ - Integration test for multiple Feluda operators. - - This test validates the integration between: - - feluda (core) - - feluda-vid-vec-rep-clip - - feluda-cluster-embeddings - """ - - @classmethod - def setUpClass(cls): - """Create a temporary test configuration file that will be used for all tests.""" - try: - # Setup configuration with multiple operators - cls.config = { - "operators": { - "label": "Operators", - "parameters": [ - { - "name": "video vectors", - "type": "vid_vec_rep_clip", - "parameters": {"index_name": "video"}, - }, - { - "name": "cluster embeddings", - "type": "cluster_embeddings", - "parameters": {}, - }, - ], - } - } - - # Create temporary config file using with statement to ensure proper resource cleanup - fd, cls.config_path = tempfile.mkstemp(suffix=".yml") - with open(fd, "w") as f: - yaml.dump(cls.config, f) - - # Initialize Feluda with better error handling - cls.feluda = Feluda(cls.config_path) - - # Try to setup, but allow for failures - try: - cls.feluda.setup() - cls.setup_successful = True - except Exception as e: - print(f"Warning: Setup failed with error: {e}") - cls.setup_successful = False - cls.setup_error = str(e) - - # Sample video URL for testing - cls.test_video_url = "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" - cls.expected_vector_dim = 512 - - # Sample data for clustering tests - cls.sample_clustering_data = [ - { - "embedding": [1.0, 2.0, 3.0], - "payload": {"id": "item1", "metadata": "data1"}, - }, - { - "embedding": [1.1, 2.1, 3.1], - "payload": {"id": "item2", "metadata": "data2"}, - }, - { - "embedding": [10.0, 11.0, 12.0], - "payload": {"id": "item3", "metadata": "data3"}, - }, - { - "embedding": [10.2, 11.2, 12.2], - "payload": {"id": "item4", "metadata": "data4"}, - }, - { - "embedding": [20.0, 21.0, 22.0], - "payload": {"id": "item5", "metadata": "data5"}, - }, - ] - except Exception as e: - print(f"Error during test setup: {e}") - raise - - def setUp(self): - """Set up test-specific feluda resources.""" - # Skip all tests if setup was not successful - if not getattr(self.__class__, "setup_successful", False): - self.skipTest( - f"Setup was not successful: {getattr(self.__class__, 'setup_error', 'Unknown error')}" - ) - - # Get operator references - self.operators = self.feluda.operators.get() - - # Check which operators are available (some may not be available due to dependencies) - self.has_video_operator = "vid_vec_rep_clip" in self.operators - self.has_cluster_operator = "cluster_embeddings" in self.operators - - def test_video_vector_generation(self): - """Test that video vector generation works end-to-end.""" - if not self.has_video_operator: - self.skipTest("vid_vec_rep_clip operator not available") - - video_operator = self.operators["vid_vec_rep_clip"] - - video_obj = VideoFactory.make_from_url(self.test_video_url) - self.assertIsNotNone(video_obj, "Video object should be successfully created") - - video_vec_generator = video_operator.run(video_obj) - - # Test first vector (average vector) - first_vec = next(video_vec_generator) - self.assertTrue(isinstance(first_vec, dict), "Result should be a dictionary") - self.assertIn("vid_vec", first_vec, "Result should contain 'vid_vec' key") - self.assertIn("is_avg", first_vec, "Result should contain 'is_avg' key") - self.assertTrue( - first_vec["is_avg"], "First vector should be the average vector" - ) - - # Verify vector dimensions - vid_vec = first_vec["vid_vec"] - self.assertTrue( - isinstance(vid_vec, list), - "Vector should be a list", - ) - self.assertEqual( - len(vid_vec), - self.expected_vector_dim, - f"Vector should have dimension {self.expected_vector_dim}", - ) - - # Check for I-frame vectors - i_frame_vectors = [] - for vec_data in video_vec_generator: - self.assertFalse( - vec_data["is_avg"], "Subsequent vectors should be I-frame vectors" - ) - i_frame_vectors.append(vec_data["vid_vec"]) - - # There should be at least one I-frame - self.assertTrue( - len(i_frame_vectors) > 0, "Should have at least one I-frame vector" - ) - - # All vectors should have the same dimension - for vec in i_frame_vectors: - self.assertEqual( - len(vec), - self.expected_vector_dim, - f"All I-frame vectors should have dimension {self.expected_vector_dim}", - ) - - def test_video_vector_consistency(self): - """Test that generating vectors twice from the same video gives consistent results.""" - if not self.has_video_operator: - self.fail("vid_vec_rep_clip operator not available") - - video_operator = self.operators["vid_vec_rep_clip"] - - # First vector generation - video_obj = VideoFactory.make_from_url(self.test_video_url) - vec1_generator = video_operator.run(video_obj) - vec1 = next(vec1_generator)["vid_vec"] - - # Second vector generation with a new video object - video_obj = VideoFactory.make_from_url(self.test_video_url) - vec2_generator = video_operator.run(video_obj) - vec2 = next(vec2_generator)["vid_vec"] - - # Vectors should be nearly identical (floating point comparison) - np.testing.assert_almost_equal( - vec1, - vec2, - decimal=5, - err_msg="Vectors should be nearly identical for the same video", - ) - - def test_kmeans_clustering(self): - """Test KMeans clustering with audio modality.""" - if not self.has_cluster_operator: - self.fail("cluster_embeddings operator not available") - - cluster_operator = self.operators["cluster_embeddings"] - - n_clusters = 3 - modality = "audio" - - result = cluster_operator.run( - self.sample_clustering_data, n_clusters=n_clusters, modality=modality - ) - - # Verify the result structure - self.assertTrue(isinstance(result, dict), "Result should be a dictionary") - self.assertEqual(len(result), n_clusters, f"Should have {n_clusters} clusters") - - # Check that each cluster key follows the expected format - for key in result: - self.assertTrue( - key.startswith("cluster_"), "Cluster keys should start with 'cluster_'" - ) - self.assertTrue( - isinstance(result[key], list), "Cluster values should be lists" - ) - - # Check that all items are assigned to some cluster - all_items = [] - for cluster_items in result.values(): - all_items.extend(cluster_items) - self.assertEqual( - len(all_items), - len(self.sample_clustering_data), - "All items should be assigned to a cluster", - ) - - def test_agglomerative_clustering(self): - """Test Agglomerative clustering with video modality.""" - if not self.has_cluster_operator: - self.fail("cluster_embeddings operator not available") - - cluster_operator = self.operators["cluster_embeddings"] - - n_clusters = 2 - modality = "video" - - result = cluster_operator.run( - self.sample_clustering_data, n_clusters=n_clusters, modality=modality - ) - - # Verify the result structure - self.assertTrue(isinstance(result, dict), "Result should be a dictionary") - self.assertEqual(len(result), n_clusters, f"Should have {n_clusters} clusters") - - # All other checks are similar to KMeans test - all_items = [] - for key, cluster_items in result.items(): - self.assertTrue( - key.startswith("cluster_"), "Cluster keys should start with 'cluster_'" - ) - self.assertTrue( - isinstance(cluster_items, list), "Cluster values should be lists" - ) - all_items.extend(cluster_items) - - self.assertEqual( - len(all_items), - len(self.sample_clustering_data), - "All items should be assigned to a cluster", - ) - - def test_video_to_clusters_integration(self): - """Test integration between video vector generation and clustering.""" - if not (self.has_video_operator and self.has_cluster_operator): - self.skipTest("Required operators not available") - - video_operator = self.operators["vid_vec_rep_clip"] - cluster_operator = self.operators["cluster_embeddings"] - - # Generate video vectors - video_obj = VideoFactory.make_from_url(self.test_video_url) - self.assertIsNotNone(video_obj, "Video object should be successfully created") - - video_vec_generator = video_operator.run(video_obj) - first_vec = next(video_vec_generator) - avg_vector = first_vec["vid_vec"] - - # Collect I-frame vectors as well (up to 5) - i_frame_vectors = [] - for i, vec_data in enumerate(video_vec_generator): - if i >= 5: # Limit to 5 I-frames to keep test reasonable - break - i_frame_vectors.append(vec_data["vid_vec"]) - - # Prepare data for clustering - clustering_data = [ - {"embedding": avg_vector, "payload": {"id": "avg_frame", "type": "average"}} - ] - - for i, vec in enumerate(i_frame_vectors): - clustering_data.append( - {"embedding": vec, "payload": {"id": f"iframe_{i}", "type": "iframe"}} - ) - - # Cluster the vectors - n_clusters = 2 # Arbitrary, just to demonstrate - modality = "video" - - result = cluster_operator.run( - clustering_data, n_clusters=n_clusters, modality=modality - ) - - # Verify clustering results - self.assertTrue( - 1 <= len(result) <= n_clusters, - f"Should have between 1 and {n_clusters} clusters", - ) - - # All items should be in clusters - all_items = [] - for cluster_items in result.values(): - all_items.extend(cluster_items) - self.assertEqual( - len(all_items), - len(clustering_data), - "All items should be assigned to a cluster", - ) - - def test_full_pipeline(self): - """Test the full pipeline with video vector extraction and clustering.""" - if not (self.has_video_operator and self.has_cluster_operator): - self.skipTest("Required operators not available") - - # Get operator references - video_operator = self.operators["vid_vec_rep_clip"] - cluster_operator = self.operators["cluster_embeddings"] - - # 1. Generate video vector - video_obj = VideoFactory.make_from_url(self.test_video_url) - video_vec_generator = video_operator.run(video_obj) - video_avg_vec = next(video_vec_generator)["vid_vec"] - - # Collect some I-frame vectors - iframe_vectors = [] - for i, vec_data in enumerate(video_vec_generator): - if i >= 4: # Just get a few frames - break - iframe_vectors.append(vec_data["vid_vec"]) - - # 2. Prepare data for clustering - clustering_data = [ - { - "embedding": video_avg_vec, - "payload": {"id": "avg_vector", "type": "video_avg"}, - } - ] - - # Add I-frame vectors - for i, vec in enumerate(iframe_vectors): - clustering_data.append( - { - "embedding": vec, - "payload": {"id": f"iframe_{i}", "type": "video_iframe"}, - } - ) - - # Add some synthetic vectors to ensure meaningful clustering - for i in range(3): - clustering_data.append( - { - "embedding": [float(10 * (i + 1))] * self.expected_vector_dim, - "payload": {"id": f"synthetic_{i}", "type": "synthetic"}, - } - ) - - # 3. Cluster the vectors - n_clusters = 3 # Expect video frames in one cluster, synthetic in another - modality = "video" - - cluster_result = cluster_operator.run( - clustering_data, n_clusters=n_clusters, modality=modality - ) - - # 4. Verify results - self.assertTrue( - 1 <= len(cluster_result) <= n_clusters, - f"Should have between 1 and {n_clusters} clusters", - ) - - # All items should be in clusters - all_items = [] - for cluster_items in cluster_result.values(): - all_items.extend(cluster_items) - self.assertEqual( - len(all_items), - len(clustering_data), - "All items should be assigned to a cluster", - ) - - @contextlib.contextmanager - def assertNoException(self, msg=None): - """Context manager to verify no exception is raised.""" - try: - yield - except Exception as e: - self.fail(f"{msg or 'Exception was raised'}: {e}") - - @classmethod - def tearDownClass(cls): - """Clean up temporary files after all tests are done.""" - try: - if hasattr(cls, "config_path"): - Path(cls.config_path).unlink(missing_ok=True) - except Exception as e: - print(f"Warning: Failed to delete temporary file: {e}") diff --git a/tests/feluda_integration_tests/test_07_feluda_and_vid_vec_rep_clip_and_dimension_reduction.py b/tests/feluda_integration_tests/test_07_feluda_and_vid_vec_rep_clip_and_dimension_reduction.py deleted file mode 100644 index 92caf14b..00000000 --- a/tests/feluda_integration_tests/test_07_feluda_and_vid_vec_rep_clip_and_dimension_reduction.py +++ /dev/null @@ -1,156 +0,0 @@ -import contextlib -import os -import tempfile -import unittest -from pathlib import Path - -import numpy as np -import yaml - -from feluda import Feluda -from feluda.factory import VideoFactory - - -class TestFeludaVidTSNEReductionIntegration(unittest.TestCase): - @classmethod - def setUpClass(cls): - # TSNE pipeline: video → 2D TSNE (perplexity=1, fixed seed) - cfg = { - "operators": { - "label": "Vid+TSNE", - "parameters": [ - {"name": "vid", "type": "vid_vec_rep_clip", "parameters": {}}, - { - "name": "tsne", - "type": "dimension_reduction", - "parameters": { - "perplexity": 1, - "n_components": 2, - }, - }, - ], - } - } - fd, cls.config_path = tempfile.mkstemp(suffix=".yml", text=True) - with os.fdopen(fd, "w") as fp: - yaml.dump(cfg, fp) - - cls.feluda = Feluda(cls.config_path) - cls.feluda.setup() - ops = cls.feluda.operators.get() - cls.vid = ops["vid_vec_rep_clip"] - cls.dr = ops["dimension_reduction"] - - sample_url = "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" - vecs = list(cls.vid.run(VideoFactory.make_from_url(sample_url))) - if len(vecs) < 3: - raise RuntimeError(f"Need ≥3 embeddings but got {len(vecs)}") - cls.avg_vec = vecs[0]["vid_vec"] - cls.frame_vecs = [vecs[1]["vid_vec"], vecs[2]["vid_vec"]] - cls.expected_dim = len(cls.avg_vec) - - @classmethod - def tearDownClass(cls): - Path(cls.config_path).unlink(missing_ok=True) - - @contextlib.contextmanager - def assertNoException(self, msg=None): - try: - yield - except Exception as e: - self.fail(f"{msg or 'Unexpected exception'}: {e}") - - def test_smoke_video_and_tsne(self): - """Smoke: two 512d → TSNE→ two 2d embeddings.""" - data = [ - {"payload": "avg", "embedding": self.avg_vec}, - {"payload": "f1", "embedding": self.frame_vecs[0]}, - ] - out = self.dr.run(data) - self.assertEqual(len(out), 2) - for item in out: - self.assertIn("reduced_embedding", item) - self.assertEqual(len(item["reduced_embedding"]), 2) - - def test_tsne_seed_consistency(self): - """Fixed-seed TSNE on the same two vectors yields identical outputs.""" - data = [ - {"payload": "avg", "embedding": self.avg_vec}, - {"payload": "f1", "embedding": self.frame_vecs[0]}, - ] - a = self.dr.run(data) - b = self.dr.run(data) - for x, y in zip(a, b): - np.testing.assert_allclose(x["reduced_embedding"], y["reduced_embedding"]) - - def test_full_video_to_tsne_pipeline(self): - """End-to-end video→TSNE preserves payloads and dims.""" - data = [ - {"payload": "avg", "embedding": self.avg_vec}, - {"payload": "f2", "embedding": self.frame_vecs[1]}, - ] - out = self.dr.run(data) - got = [o["payload"] for o in out] - self.assertCountEqual(got, ["avg", "f2"]) - for o in out: - self.assertEqual(len(o["reduced_embedding"]), 2) - - -class TestFeludaVidUMAPReductionIntegration(unittest.TestCase): - @classmethod - def setUpClass(cls): - # UMAP pipeline: video → 3D UMAP - cfg = { - "operators": { - "label": "Vid+UMAP", - "parameters": [ - {"name": "vid", "type": "vid_vec_rep_clip", "parameters": {}}, - { - "name": "umap", - "type": "dimension_reduction", - "parameters": { - "model_type": "umap", - "n_components": 3, - "n_neighbors": 2, - }, - }, - ], - } - } - fd, cls.config_path = tempfile.mkstemp(suffix=".yml", text=True) - with os.fdopen(fd, "w") as fp: - yaml.dump(cfg, fp) - - cls.feluda = Feluda(cls.config_path) - cls.feluda.setup() - ops = cls.feluda.operators.get() - cls.vid = ops["vid_vec_rep_clip"] - cls.dr = ops["dimension_reduction"] - - sample_url = "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" - vecs = list(cls.vid.run(VideoFactory.make_from_url(sample_url))) - needed = 5 - if len(vecs) < needed: - raise RuntimeError(f"Need ≥{needed} embeddings but got {len(vecs)}") - cls.samples = [v["vid_vec"] for v in vecs[:needed]] - - @classmethod - def tearDownClass(cls): - Path(cls.config_path).unlink(missing_ok=True) - - @contextlib.contextmanager - def assertNoException(self, msg=None): - try: - yield - except Exception as e: - self.fail(f"{msg or 'Unexpected exception'}: {e}") - - def test_umap_integration(self): - """UMAP: avg + frames → 3d embeddings.""" - data = [ - {"payload": f"p{i}", "embedding": vec} for i, vec in enumerate(self.samples) - ] - out = self.dr.run(data) - self.assertEqual(len(out), len(self.samples)) - for item in out: - self.assertEqual(len(item["reduced_embedding"]), 3) diff --git a/tests/feluda_integration_tests/test_classify_video_zero_shot.py b/tests/feluda_integration_tests/test_classify_video_zero_shot.py new file mode 100644 index 00000000..863b3f99 --- /dev/null +++ b/tests/feluda_integration_tests/test_classify_video_zero_shot.py @@ -0,0 +1,209 @@ +import numpy as np +import pytest +from classify_video_zero_shot import VideoClassifier + +from feluda.factory import VideoFactory + + +@pytest.fixture(scope="session") +def video_classifier_operator(): + """Fixture to provide video zero-shot classifier operator.""" + return VideoClassifier() + + +@pytest.fixture(scope="session") +def test_video_url(): + """Fixture to provide test video URL.""" + return "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" + + +@pytest.fixture(scope="session") +def sample_labels(): + """Fixture to provide sample labels for classification.""" + return ["cat", "dog", "car", "building", "people"] + + +class TestVideoZeroShotClassification: + """Test video zero-shot classification functionality.""" + + def test_operator_availability(self, video_classifier_operator): + """Test that the classify_video_zero_shot operator is available.""" + assert video_classifier_operator is not None, ( + "Operator should be available in the system" + ) + assert hasattr(video_classifier_operator, "run"), ( + "Operator should have a 'run' method" + ) + + def test_video_object_generation(self, test_video_url): + """Test that video object generation works end-to-end.""" + video_obj = VideoFactory.make_from_url(test_video_url) + + assert video_obj is not None, "Video object should not be None" + assert "path" in video_obj, "Video object should have a path attribute" + assert video_obj["path"], "Video path should not be empty" + + def test_video_classification_structure( + self, video_classifier_operator, test_video_url, sample_labels + ): + """Test video classification structure using zero-shot classifier.""" + video_obj = VideoFactory.make_from_url(test_video_url) + result = video_classifier_operator.run(video_obj, sample_labels) + + # Verify result structure + assert result is not None, "Classification result should not be None" + assert isinstance(result, dict), "Result should be a dictionary" + + # Check that result contains the expected keys + assert "prediction" in result, "Result should contain a 'prediction' key" + assert "probs" in result, "Result should contain a 'probs' key" + + # Check prediction is a string and one of our labels + assert isinstance(result["prediction"], str), "Prediction should be a string" + assert result["prediction"] in sample_labels, ( + "Prediction should be one of the provided labels" + ) + + # Check probs is a list with the correct length + assert isinstance(result["probs"], list), "Probs should be a list" + assert len(result["probs"]) == len(sample_labels), ( + "Length of probs should match number of labels" + ) + + # Check probabilities sum close to 1 (allowing for small floating point errors) + assert abs(sum(result["probs"]) - 1.0) < 1e-6, ( + "Probabilities should sum to approximately 1" + ) + + # Check each probability is a float between 0 and 1 + for prob in result["probs"]: + assert isinstance(prob, float), "Each probability should be a float" + assert 0 <= prob <= 1, "Each probability should be between 0 and 1" + + def test_video_classification_results( + self, video_classifier_operator, test_video_url, sample_labels + ): + """Test video classification results using zero-shot classifier.""" + # Create video object + video_obj = VideoFactory.make_from_url(test_video_url) + + # Perform classification + result = video_classifier_operator.run(video_obj, sample_labels) + + # For a cat video, "cat" should be the highest probability label + assert result["prediction"] == "cat", ( + "Prediction should be 'cat' for a cat video" + ) + + # The first probability (for "cat") should be highest + max_prob_index = np.argmax(result["probs"]) + assert max_prob_index == 0, ( + "Highest probability should be for 'cat' (first label)" + ) + + # The "cat" probability should be significantly higher than others + cat_prob = result["probs"][0] + other_probs = result["probs"][1:] + assert all(cat_prob > p for p in other_probs), ( + "Cat probability should be higher than all other probabilities" + ) + + # The cat probability should be reasonably high (based on the sample output) + assert cat_prob > 0.5, "Cat probability should be at least 0.5" + + def test_result_consistency( + self, video_classifier_operator, test_video_url, sample_labels + ): + """Test that results are consistent across multiple runs.""" + # Create video object (reusing the same video) + video_obj = VideoFactory.make_from_url(test_video_url) + + # Run classification twice + result1 = video_classifier_operator.run(video_obj, sample_labels) + + # Need to create a new video object since the previous one's file was deleted + video_obj2 = VideoFactory.make_from_url(test_video_url) + result2 = video_classifier_operator.run(video_obj2, sample_labels) + + # Check that predictions are the same + assert result1["prediction"] == result2["prediction"], ( + "Predictions should be consistent across runs" + ) + + # Check that probabilities are similar (may not be identical due to frame extraction) + for i, (p1, p2) in enumerate( + zip(result1["probs"], result2["probs"], strict=False) + ): + # Allow for some variation but should be close + assert abs(p1 - p2) < 0.1, ( + f"Probabilities for label {sample_labels[i]} should be similar" + ) + + def test_different_labels(self, video_classifier_operator, test_video_url): + """Test classification with different labels.""" + # Create video object + video_obj = VideoFactory.make_from_url(test_video_url) + + # Different set of labels + different_labels = ["feline", "animal", "vehicle", "nature", "technology"] + + # Perform classification + result = video_classifier_operator.run(video_obj, different_labels) + + # Check basic structure + assert "prediction" in result + assert "probs" in result + assert len(result["probs"]) == len(different_labels) + + # For a cat video, "feline" or "animal" should be the top predictions + predicted_label = result["prediction"] + assert predicted_label in ["feline", "animal"], ( + f"Prediction should be 'feline' or 'animal' for a cat video, got '{predicted_label}'" + ) + + # Probabilities for feline/animal should be higher than others + feline_index = different_labels.index("feline") + animal_index = different_labels.index("animal") + + feline_prob = result["probs"][feline_index] + animal_prob = result["probs"][animal_index] + + # Other probabilities + other_indices = [ + i + for i in range(len(different_labels)) + if i != feline_index and i != animal_index + ] + other_probs = [result["probs"][i] for i in other_indices] + + # Either feline or animal should have significantly higher probability + assert feline_prob > max(other_probs) or animal_prob > max(other_probs), ( + "Either 'feline' or 'animal' should have higher probability than other labels" + ) + + def test_empty_labels_list(self, video_classifier_operator, test_video_url): + """Test handling of empty labels list.""" + # Create video object + video_obj = VideoFactory.make_from_url(test_video_url) + + # Test with empty labels list - should raise ValueError + with pytest.raises(ValueError): + video_classifier_operator.run(video_obj, []) + + def test_invalid_video_input(self, video_classifier_operator, sample_labels): + """Test handling of invalid video input.""" + # Test with None input + with pytest.raises(Exception): + video_classifier_operator.run(None, sample_labels) + + # Test with invalid video object (missing path) + invalid_video = {"type": "video", "url": "invalid"} + with pytest.raises(Exception): + video_classifier_operator.run(invalid_video, sample_labels) + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_operators(video_classifier_operator): + """Cleanup operators after each test.""" + yield + video_classifier_operator.cleanup() diff --git a/tests/feluda_integration_tests/test_cluster_embeddings.py b/tests/feluda_integration_tests/test_cluster_embeddings.py new file mode 100644 index 00000000..efdeab00 --- /dev/null +++ b/tests/feluda_integration_tests/test_cluster_embeddings.py @@ -0,0 +1,55 @@ +import pytest +from cluster_embeddings import ClusterEmbeddings + +from feluda.factory import AudioFactory + + +@pytest.fixture(scope="session") +def cluster_operator(): + """Fixture to provide cluster embeddings operator.""" + return ClusterEmbeddings() + + +@pytest.fixture(scope="session") +def test_audio_url(): + """Fixture to provide test audio URL.""" + return "https://raw.githubusercontent.com/tattle-made/feluda/main/src/core/operators/sample_data/audio.wav" + + +class TestClusterEmbeddingsIntegration: + """Test cluster embeddings functionality.""" + + @pytest.skip("audio file is not available") + def test_cluster_embeddings(self, cluster_operator, test_audio_url): + """Test the cluster_embeddings operator.""" + audio_obj = AudioFactory.make_from_url(test_audio_url) + assert audio_obj is not None, "Audio object should be successfully created" + + print(f"Audio object: {audio_obj}") + + # Generate mock embeddings and payloads for testing + embedding_1 = [0.1, 0.2, 0.3] # Mock embedding for sample 1 + embedding_2 = [0.4, 0.5, 0.6] # Mock embedding for sample 2 + payload_1 = {"path": audio_obj["path"]} + payload_2 = {"path": audio_obj["path"]} + + # Prepare input_data with at least 2 samples + input_data = [ + {"embedding": embedding_1, "payload": payload_1}, + {"embedding": embedding_2, "payload": payload_2}, + ] + + result = cluster_operator.run( + input_data=input_data, n_clusters=2, modality="audio" + ) + + assert "cluster_0" in result + assert "cluster_1" in result + assert len(result) == 2 + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_operators(cluster_operator): + """Cleanup operators after each test.""" + yield + cluster_operator.cleanup() diff --git a/tests/feluda_integration_tests/test_dimension_reduction.py b/tests/feluda_integration_tests/test_dimension_reduction.py new file mode 100644 index 00000000..83f0d7c0 --- /dev/null +++ b/tests/feluda_integration_tests/test_dimension_reduction.py @@ -0,0 +1,210 @@ +import math + +import numpy as np +import pytest +from dimension_reduction import DimensionReduction + + +@pytest.fixture(scope="session") +def sample_inputs(): + """Fixture to provide sample inputs for testing.""" + return [ + {"payload": "A", "embedding": [1.0, 0.0, 0.0]}, + {"payload": "B", "embedding": [0.0, 1.0, 0.0]}, + {"payload": "C", "embedding": [0.0, 0.0, 1.0]}, + ] + + +@pytest.fixture(scope="session") +def expected_dim(): + """Fixture to provide expected dimension.""" + return 2 + + +class TestDimensionReductionIntegration: + """Test dimension reduction functionality.""" + + def test_end_to_end_reduction(self, sample_inputs, expected_dim): + """Test that tsne reduction runs end-to-end and outputs correct shape and finite values.""" + operator = DimensionReduction( + "tsne", + { + "n_components": 2, + "perplexity": 2, + "learning_rate": 100, + "max_iter": 250, + "random_state": 123, + }, + ) + + # Some TSNE implementations may reject certain configurations; skip if unsupported + try: + out = operator.run(sample_inputs) + except RuntimeError as e: + pytest.skip(f"t-SNE reduction not supported: {e}") + + assert isinstance(out, list) + assert len(out) == len(sample_inputs) + + for original, reduced in zip(sample_inputs, out, strict=False): + assert "payload" in reduced + assert "reduced_embedding" in reduced + assert reduced["payload"] == original["payload"] + vec = reduced["reduced_embedding"] + assert isinstance(vec, list) + assert len(vec) == expected_dim + for v in vec: + assert isinstance(v, float) + assert math.isfinite(v), "reduced value should be finite" + + def test_consistency_with_fixed_seed(self, sample_inputs): + """With a fixed random_state, repeated runs give identical results.""" + operator = DimensionReduction( + "tsne", + { + "n_components": 2, + "perplexity": 2, + "learning_rate": 100, + "max_iter": 250, + "random_state": 123, + }, + ) + + try: + out1 = operator.run(sample_inputs) + out2 = operator.run(sample_inputs) + except RuntimeError: + pytest.skip("t-SNE reduction not supported for consistency test") + + for v1, v2 in zip(out1, out2, strict=False): + np.testing.assert_array_almost_equal( + np.array(v1["reduced_embedding"]), + np.array(v2["reduced_embedding"]), + decimal=6, + err_msg="Repeated reductions should be identical with fixed seed", + ) + + def test_invalid_input_empty(self): + """Test that passing an empty list raises ValueError.""" + operator = DimensionReduction("tsne", {"n_components": 2}) + with pytest.raises(ValueError): + operator.run([]) + + def test_invalid_input_missing_keys(self): + """Test that missing 'embedding' or 'payload' keys raises KeyError.""" + operator = DimensionReduction("tsne", {"n_components": 2}) + with pytest.raises(KeyError): + operator.run([{"payload": "only_payload"}]) + with pytest.raises(KeyError): + operator.run([{"embedding": [1, 2, 3]}]) + + def test_operator_configuration(self): + """Test that operator initializes properly and has required methods.""" + operator = DimensionReduction("tsne", {"n_components": 2}) + assert operator is not None + assert hasattr(operator, "run") + + def test_different_tsne_parameters(self, sample_inputs): + """Test that different TSNE parameters are correctly applied.""" + # Initialize with exact method instead of barnes_hut + operator = DimensionReduction( + "tsne", + { + "n_components": 2, + "perplexity": 1, # Keep low for small test dataset + "method": "exact", # Different method + "random_state": 42, + "max_iter": 500, # Different max iterations + }, + ) + + try: + result = operator.run(sample_inputs) + assert len(result) == len(sample_inputs) + assert len(result[0]["reduced_embedding"]) == 2 + except RuntimeError as e: + pytest.skip(f"t-SNE with custom params not supported: {e}") + + def test_different_input_dimensions(self): + """Test reduction with different input dimensions.""" + # 5D input + inputs_5d = [ + {"payload": "A", "embedding": [1.0, 2.0, 3.0, 4.0, 5.0]}, + {"payload": "B", "embedding": [5.0, 4.0, 3.0, 2.0, 1.0]}, + {"payload": "C", "embedding": [1.0, 1.0, 1.0, 1.0, 1.0]}, + ] + + operator = DimensionReduction( + "tsne", + { + "n_components": 2, # Reduce to 2D + "perplexity": 1, # Valid perplexity for 3 samples + "random_state": 42, + }, + ) + + try: + result = operator.run(inputs_5d) + assert len(result) == len(inputs_5d) + assert len(result[0]["reduced_embedding"]) == 2 + except RuntimeError as e: + pytest.skip(f"t-SNE reduction for 5D input not supported: {e}") + + def test_invalid_embeddings_dimension(self): + """Test that mismatched embedding dimensions raise an error.""" + operator = DimensionReduction("tsne", {"n_components": 2}) + + # Create an input with mismatched embedding dimensions + bad_inputs = [ + {"payload": "A", "embedding": [1.0, 2.0]}, + {"payload": "B", "embedding": [1.0, 2.0, 3.0]}, # Different dimension + ] + + with pytest.raises((ValueError, RuntimeError)): + operator.run(bad_inputs) + + def test_multiple_initializations(self, sample_inputs): + """Test that multiple initializations work correctly.""" + operator1 = DimensionReduction( + "tsne", + { + "n_components": 2, + "perplexity": 1, + "random_state": 42, + }, + ) + + operator2 = DimensionReduction( + "tsne", + { + "n_components": 2, + "perplexity": 1, + "random_state": 43, # Different seed + }, + ) + + # First initialization + try: + result1 = operator1.run(sample_inputs) + except RuntimeError: + pytest.skip("t-SNE not supported for first initialization") + + # Second initialization with different parameters + try: + result2 = operator2.run(sample_inputs) + except RuntimeError: + pytest.skip("t-SNE not supported for second initialization") + + # Results should be different with different random seeds + any_different = False + for v1, v2 in zip(result1, result2, strict=False): + try: + np.testing.assert_array_almost_equal( + np.array(v1["reduced_embedding"]), np.array(v2["reduced_embedding"]) + ) + except AssertionError: + any_different = True + break + + # With different random seeds, results should differ + assert any_different, "Different random seeds should produce different results" diff --git a/tests/feluda_integration_tests/test_image_vec_rep_resnet.py b/tests/feluda_integration_tests/test_image_vec_rep_resnet.py new file mode 100644 index 00000000..561349d9 --- /dev/null +++ b/tests/feluda_integration_tests/test_image_vec_rep_resnet.py @@ -0,0 +1,86 @@ +from unittest.mock import patch + +import numpy as np +import pytest +from image_vec_rep_resnet import ImageVecRepResnet +from requests.exceptions import ConnectTimeout + +from feluda.factory import ImageFactory + + +@pytest.fixture(scope="session") +def image_operator(): + """Fixture to provide image vector representation operator.""" + return ImageVecRepResnet() + + +@pytest.fixture(scope="session") +def test_image_url(): + """Fixture to provide test image URL.""" + return "https://tattle-media.s3.amazonaws.com/test-data/tattle-search/text-in-image-test-hindi.png" + + +@pytest.fixture(scope="session") +def expected_vector_dim(): + """Fixture to provide expected vector dimension.""" + return 512 + + +class TestImageVectorIntegration: + """Test image vector representation functionality.""" + + def test_image_vector_generation( + self, image_operator, test_image_url, expected_vector_dim + ): + """Test that image vector generation works end-to-end.""" + image_obj = ImageFactory.make_from_url(test_image_url) + assert image_obj is not None, "Image object should be successfully created" + + image_vec = image_operator.run(image_obj) + + assert isinstance(image_vec, (list, np.ndarray)), ( + "Vector should be a list or numpy array" + ) + assert len(image_vec) > 0, "Vector should not be empty" + assert len(image_vec) == expected_vector_dim, ( + f"Vector should have dimension {expected_vector_dim}" + ) + + if isinstance(image_vec, np.ndarray): + assert not np.all(image_vec == 0), "Vector should not be all zeros" + assert not np.any(np.isnan(image_vec)), ( + "Vector should not contain NaN values" + ) + + def test_invalid_image_url(self): + """Test handling of invalid image URL.""" + invalid_url = "https://nonexistent-url/image.jpg" + + for exception in [ConnectTimeout]: + with patch("requests.get") as mock_get: + mock_get.side_effect = exception + with pytest.raises(Exception, match="Request has timed out"): + ImageFactory.make_from_url(invalid_url) + + def test_operator_configuration(self, image_operator): + """Test that operator is properly configured.""" + assert image_operator is not None, "Operator should be properly initialized" + assert hasattr(image_operator, "run"), "Operator should have 'run' method" + + def test_image_vector_consistency(self, image_operator, test_image_url): + """Test that generating vectors twice from the same image gives consistent results.""" + image_obj = ImageFactory.make_from_url(test_image_url) + + vec1 = image_operator.run(image_obj) + vec2 = image_operator.run(image_obj) + + np.testing.assert_array_equal( + vec1, vec2, "Vectors should be identical for the same image" + ) + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_operators(image_operator): + """Cleanup operators after each test.""" + yield + image_operator.cleanup() diff --git a/tests/feluda_integration_tests/test_vid_vec_rep_clip.py b/tests/feluda_integration_tests/test_vid_vec_rep_clip.py new file mode 100644 index 00000000..c9582c65 --- /dev/null +++ b/tests/feluda_integration_tests/test_vid_vec_rep_clip.py @@ -0,0 +1,99 @@ +import os +from pathlib import Path + +import numpy as np +import pytest +from vid_vec_rep_clip import VidVecRepClip + +from feluda.factory import VideoFactory + + +@pytest.fixture(scope="session") +def video_operator(): + """Fixture to provide video vector representation operator.""" + return VidVecRepClip() + + +@pytest.fixture(scope="session") +def test_video_url(): + """Fixture to provide test video URL.""" + return "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" + + +@pytest.fixture(scope="session") +def expected_vector_dim(): + """Fixture to provide expected vector dimension.""" + return 512 + + +class TestVideoVectorGeneration: + """Test video vector generation functionality.""" + + def test_video_vector_generation( + self, video_operator, test_video_url, expected_vector_dim + ): + """Test that video vector generation works end-to-end.""" + video_object = VideoFactory.make_from_url(test_video_url) + downloaded_path = video_object.get("path") + assert downloaded_path is not None, "VideoFactory did not return a path" + assert Path(downloaded_path).exists(), ( + f"Downloaded file not found at {downloaded_path}" + ) + + vector_generator = video_operator.run(video_object) + + try: + first_output_item = next(vector_generator) + except Exception as e: + if Path(downloaded_path).exists(): + os.remove(downloaded_path) + raise AssertionError( + f"Calling next on generator raised an unexpected error: {e}" + ) + + assert isinstance(first_output_item, dict), ( + "Operator did not yield a dictionary" + ) + assert "vid_vec" in first_output_item, ( + "Yielded dictionary missing 'vid_vec' key" + ) + assert "is_avg" in first_output_item, "Yielded dictionary missing 'is_avg' key" + + actual_vector = first_output_item["vid_vec"] + is_average = first_output_item["is_avg"] + + assert is_average, "First yielded vector should have is_avg=True" + + assert isinstance(actual_vector, list), "Vector ('vid_vec') should be a list" + assert len(actual_vector) > 0, "Vector should not be empty" + assert len(actual_vector) == expected_vector_dim, ( + f"Vector should have dimension {expected_vector_dim}" + ) + + vector_np = np.array(actual_vector) + assert not np.all(vector_np == 0), "Vector should not be all zeros" + assert not np.any(np.isnan(vector_np)), "Vector should not contain NaN values" + + def test_video_vector_consistency(self, video_operator, test_video_url): + """Test that generating vectors twice from the same video gives consistent results.""" + video_obj = VideoFactory.make_from_url(test_video_url) + vec1_generator = video_operator.run(video_obj) + vec1 = next(vec1_generator)["vid_vec"] + + video_obj = VideoFactory.make_from_url(test_video_url) + vec2_generator = video_operator.run(video_obj) + vec2 = next(vec2_generator)["vid_vec"] + + np.testing.assert_almost_equal( + vec1, + vec2, + decimal=5, + err_msg="Vectors should be nearly identical for the same video", + ) + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_operators(video_operator): + """Cleanup operators after each test.""" + yield + video_operator.cleanup() diff --git a/tests/feluda_integration_tests/test_vid_vec_rep_clip_and_cluster_embeddings.py b/tests/feluda_integration_tests/test_vid_vec_rep_clip_and_cluster_embeddings.py new file mode 100644 index 00000000..bd748e51 --- /dev/null +++ b/tests/feluda_integration_tests/test_vid_vec_rep_clip_and_cluster_embeddings.py @@ -0,0 +1,301 @@ +import numpy as np +import pytest +from cluster_embeddings import ClusterEmbeddings +from vid_vec_rep_clip import VidVecRepClip + +from feluda.factory import VideoFactory + + +@pytest.fixture(scope="session") +def video_operator(): + """Fixture to provide video vector representation operator.""" + return VidVecRepClip() + + +@pytest.fixture(scope="session") +def cluster_operator(): + """Fixture to provide cluster embeddings operator.""" + return ClusterEmbeddings() + + +@pytest.fixture(scope="module") +def test_video_url(): + """Fixture to provide test video URL.""" + return "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" + + +@pytest.fixture(scope="module") +def expected_vector_dim(): + """Fixture to provide expected vector dimension.""" + return 512 + + +@pytest.fixture(scope="module") +def sample_clustering_data(): + """Fixture to provide sample data for clustering tests.""" + return [ + { + "embedding": [1.0, 2.0, 3.0], + "payload": {"id": "item1", "metadata": "data1"}, + }, + { + "embedding": [1.1, 2.1, 3.1], + "payload": {"id": "item2", "metadata": "data2"}, + }, + { + "embedding": [10.0, 11.0, 12.0], + "payload": {"id": "item3", "metadata": "data3"}, + }, + { + "embedding": [10.2, 11.2, 12.2], + "payload": {"id": "item4", "metadata": "data4"}, + }, + { + "embedding": [20.0, 21.0, 22.0], + "payload": {"id": "item5", "metadata": "data5"}, + }, + ] + + +class TestVideoVectorGeneration: + """Test video vector generation functionality.""" + + def test_video_vector_generation( + self, video_operator, test_video_url, expected_vector_dim + ): + """Test that video vector generation works end-to-end.""" + video_obj = VideoFactory.make_from_url(test_video_url) + assert video_obj is not None, "Video object should be successfully created" + + video_vec_generator = video_operator.run(video_obj) + + # Test first vector (average vector) + first_vec = next(video_vec_generator) + assert isinstance(first_vec, dict), "Result should be a dictionary" + assert "vid_vec" in first_vec, "Result should contain 'vid_vec' key" + assert "is_avg" in first_vec, "Result should contain 'is_avg' key" + assert first_vec["is_avg"], "First vector should be the average vector" + + # Verify vector dimensions + vid_vec = first_vec["vid_vec"] + assert isinstance(vid_vec, list), "Vector should be a list" + assert len(vid_vec) == expected_vector_dim, ( + f"Vector should have dimension {expected_vector_dim}" + ) + + # Check for I-frame vectors + i_frame_vectors = [] + for vec_data in video_vec_generator: + assert not vec_data["is_avg"], ( + "Subsequent vectors should be I-frame vectors" + ) + i_frame_vectors.append(vec_data["vid_vec"]) + + # There should be at least one I-frame + assert len(i_frame_vectors) > 0, "Should have at least one I-frame vector" + + # All vectors should have the same dimension + for vec in i_frame_vectors: + assert len(vec) == expected_vector_dim, ( + f"All I-frame vectors should have dimension {expected_vector_dim}" + ) + + def test_video_vector_consistency(self, video_operator, test_video_url): + """Test that generating vectors twice from the same video gives consistent results.""" + # First vector generation + video_obj = VideoFactory.make_from_url(test_video_url) + vec1_generator = video_operator.run(video_obj) + vec1 = next(vec1_generator)["vid_vec"] + + # Second vector generation with a new video object + video_obj = VideoFactory.make_from_url(test_video_url) + vec2_generator = video_operator.run(video_obj) + vec2 = next(vec2_generator)["vid_vec"] + + # Vectors should be nearly identical (floating point comparison) + np.testing.assert_almost_equal( + vec1, + vec2, + decimal=5, + err_msg="Vectors should be nearly identical for the same video", + ) + + +class TestClusterEmbeddings: + """Test cluster embeddings functionality.""" + + def test_kmeans_clustering(self, cluster_operator, sample_clustering_data): + """Test KMeans clustering with audio modality.""" + n_clusters = 3 + modality = "audio" + + result = cluster_operator.run( + sample_clustering_data, n_clusters=n_clusters, modality=modality + ) + + # Verify the result structure + assert isinstance(result, dict), "Result should be a dictionary" + assert len(result) == n_clusters, f"Should have {n_clusters} clusters" + + # Check that each cluster key follows the expected format + for key in result: + assert key.startswith("cluster_"), ( + "Cluster keys should start with 'cluster_'" + ) + assert isinstance(result[key], list), "Cluster values should be lists" + + # Check that all items are assigned to some cluster + all_items = [] + for cluster_items in result.values(): + all_items.extend(cluster_items) + assert len(all_items) == len(sample_clustering_data), ( + "All items should be assigned to a cluster" + ) + + def test_agglomerative_clustering(self, cluster_operator, sample_clustering_data): + """Test Agglomerative clustering with video modality.""" + n_clusters = 2 + modality = "video" + + result = cluster_operator.run( + sample_clustering_data, n_clusters=n_clusters, modality=modality + ) + + # Verify the result structure + assert isinstance(result, dict), "Result should be a dictionary" + assert len(result) == n_clusters, f"Should have {n_clusters} clusters" + + # All other checks are similar to KMeans test + all_items = [] + for key, cluster_items in result.items(): + assert key.startswith("cluster_"), ( + "Cluster keys should start with 'cluster_'" + ) + assert isinstance(cluster_items, list), "Cluster values should be lists" + all_items.extend(cluster_items) + + assert len(all_items) == len(sample_clustering_data), ( + "All items should be assigned to a cluster" + ) + + +class TestIntegration: + """Test integration between video vector generation and clustering.""" + + def test_video_to_clusters_integration( + self, video_operator, cluster_operator, test_video_url, expected_vector_dim + ): + """Test integration between video vector generation and clustering.""" + # Generate video vectors + video_obj = VideoFactory.make_from_url(test_video_url) + assert video_obj is not None, "Video object should be successfully created" + + video_vec_generator = video_operator.run(video_obj) + first_vec = next(video_vec_generator) + avg_vector = first_vec["vid_vec"] + + # Collect I-frame vectors as well (up to 5) + i_frame_vectors = [] + for i, vec_data in enumerate(video_vec_generator): + if i >= 5: # Limit to 5 I-frames to keep test reasonable + break + i_frame_vectors.append(vec_data["vid_vec"]) + + # Prepare data for clustering + clustering_data = [ + {"embedding": avg_vector, "payload": {"id": "avg_frame", "type": "average"}} + ] + + for i, vec in enumerate(i_frame_vectors): + clustering_data.append({ + "embedding": vec, + "payload": {"id": f"iframe_{i}", "type": "iframe"}, + }) + + # Cluster the vectors + n_clusters = 2 # Arbitrary, just to demonstrate + modality = "video" + + result = cluster_operator.run( + clustering_data, n_clusters=n_clusters, modality=modality + ) + + # Verify clustering results + assert 1 <= len(result) <= n_clusters, ( + f"Should have between 1 and {n_clusters} clusters" + ) + + # All items should be in clusters + all_items = [] + for cluster_items in result.values(): + all_items.extend(cluster_items) + assert len(all_items) == len(clustering_data), ( + "All items should be assigned to a cluster" + ) + + def test_full_pipeline( + self, video_operator, cluster_operator, test_video_url, expected_vector_dim + ): + """Test the full pipeline with video vector extraction and clustering.""" + # 1. Generate video vector + video_obj = VideoFactory.make_from_url(test_video_url) + video_vec_generator = video_operator.run(video_obj) + video_avg_vec = next(video_vec_generator)["vid_vec"] + + # Collect some I-frame vectors + iframe_vectors = [] + for i, vec_data in enumerate(video_vec_generator): + if i >= 4: # Just get a few frames + break + iframe_vectors.append(vec_data["vid_vec"]) + + # 2. Prepare data for clustering + clustering_data = [ + { + "embedding": video_avg_vec, + "payload": {"id": "avg_vector", "type": "video_avg"}, + } + ] + + # Add I-frame vectors + for i, vec in enumerate(iframe_vectors): + clustering_data.append({ + "embedding": vec, + "payload": {"id": f"iframe_{i}", "type": "video_iframe"}, + }) + + # Add some synthetic vectors to ensure meaningful clustering + for i in range(3): + clustering_data.append({ + "embedding": [float(10 * (i + 1))] * expected_vector_dim, + "payload": {"id": f"synthetic_{i}", "type": "synthetic"}, + }) + + # 3. Cluster the vectors + n_clusters = 3 # Expect video frames in one cluster, synthetic in another + modality = "video" + + cluster_result = cluster_operator.run( + clustering_data, n_clusters=n_clusters, modality=modality + ) + + # 4. Verify results + assert 1 <= len(cluster_result) <= n_clusters, ( + f"Should have between 1 and {n_clusters} clusters" + ) + + # All items should be in clusters + all_items = [] + for cluster_items in cluster_result.values(): + all_items.extend(cluster_items) + assert len(all_items) == len(clustering_data), ( + "All items should be assigned to a cluster" + ) + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_operators(video_operator, cluster_operator): + """Cleanup operators after each test.""" + yield + video_operator.cleanup() + cluster_operator.cleanup() diff --git a/tests/feluda_integration_tests/test_vid_vec_rep_clip_and_dimension_reduction.py b/tests/feluda_integration_tests/test_vid_vec_rep_clip_and_dimension_reduction.py new file mode 100644 index 00000000..83a2bfb3 --- /dev/null +++ b/tests/feluda_integration_tests/test_vid_vec_rep_clip_and_dimension_reduction.py @@ -0,0 +1,174 @@ +import numpy as np +import pytest +from dimension_reduction import DimensionReduction +from vid_vec_rep_clip import VidVecRepClip + +from feluda.factory import VideoFactory + + +@pytest.fixture(scope="session") +def video_operator(): + """Fixture to provide video vector representation operator.""" + return VidVecRepClip() + + +@pytest.fixture(scope="session") +def test_video_url(): + """Fixture to provide test video URL.""" + return "https://github.com/tattle-made/feluda_datasets/raw/main/feluda-sample-media/sample-cat-video.mp4" + + +@pytest.fixture(scope="session") +def video_vectors(video_operator, test_video_url): + """Fixture to provide video vectors for testing.""" + vecs = list(video_operator.run(VideoFactory.make_from_url(test_video_url))) + if len(vecs) < 3: + raise RuntimeError(f"Need ≥3 embeddings but got {len(vecs)}") + + return { + "avg_vec": vecs[0]["vid_vec"], + "frame_vecs": [vecs[1]["vid_vec"], vecs[2]["vid_vec"]], + "expected_dim": len(vecs[0]["vid_vec"]), + } + + +@pytest.fixture(scope="session") +def video_vectors_for_umap(video_operator, test_video_url): + """Fixture to provide video vectors for UMAP testing.""" + vecs = list(video_operator.run(VideoFactory.make_from_url(test_video_url))) + needed = 5 + if len(vecs) < needed: + raise RuntimeError(f"Need ≥{needed} embeddings but got {len(vecs)}") + + return [v["vid_vec"] for v in vecs[:needed]] + + +class TestTSNEIntegration: + """Test t-SNE dimension reduction integration.""" + + def test_smoke_video_and_tsne(self, video_vectors): + """Smoke: two 512d → TSNE→ two 2d embeddings.""" + dr_operator = DimensionReduction( + "tsne", {"n_components": 2, "perplexity": 1, "random_state": 42} + ) + + data = [ + {"payload": "avg", "embedding": video_vectors["avg_vec"]}, + {"payload": "f1", "embedding": video_vectors["frame_vecs"][0]}, + ] + + out = dr_operator.run(data) + assert len(out) == 2 + for item in out: + assert "reduced_embedding" in item + assert len(item["reduced_embedding"]) == 2 + + def test_tsne_seed_consistency(self, video_vectors): + """Fixed-seed TSNE on the same two vectors yields identical outputs.""" + dr_operator = DimensionReduction( + "tsne", {"n_components": 2, "perplexity": 1, "random_state": 42} + ) + + data = [ + {"payload": "avg", "embedding": video_vectors["avg_vec"]}, + {"payload": "f1", "embedding": video_vectors["frame_vecs"][0]}, + ] + + a = dr_operator.run(data) + b = dr_operator.run(data) + + for x, y in zip(a, b, strict=False): + np.testing.assert_allclose(x["reduced_embedding"], y["reduced_embedding"]) + + def test_full_video_to_tsne_pipeline(self, video_vectors): + """End-to-end video→TSNE preserves payloads and dims.""" + dr_operator = DimensionReduction( + "tsne", {"n_components": 2, "perplexity": 1, "random_state": 42} + ) + + data = [ + {"payload": "avg", "embedding": video_vectors["avg_vec"]}, + {"payload": "f2", "embedding": video_vectors["frame_vecs"][1]}, + ] + + out = dr_operator.run(data) + got = [o["payload"] for o in out] + assert sorted(got) == sorted(["avg", "f2"]) + + for o in out: + assert len(o["reduced_embedding"]) == 2 + + +class TestUMAPIntegration: + """Test UMAP dimension reduction integration.""" + + def test_umap_integration(self, video_vectors_for_umap): + """UMAP: avg + frames → 3d embeddings.""" + dr_operator = DimensionReduction( + "umap", {"n_components": 3, "n_neighbors": 2, "random_state": 42} + ) + + data = [ + {"payload": f"p{i}", "embedding": vec} + for i, vec in enumerate(video_vectors_for_umap) + ] + + out = dr_operator.run(data) + assert len(out) == len(video_vectors_for_umap) + + for item in out: + assert len(item["reduced_embedding"]) == 3 + + +class TestDimensionReductionValidation: + """Test dimension reduction validation and error handling.""" + + def test_invalid_model_type(self): + """Test that invalid model type raises ValueError.""" + with pytest.raises(ValueError, match="Unsupported model type"): + DimensionReduction("bogus", {}) + + def test_empty_input_list(self): + """Test that empty input list raises ValueError.""" + operator = DimensionReduction("umap", {"n_components": 2}) + with pytest.raises(ValueError, match="non-empty list"): + operator.run([]) + + def test_missing_keys(self): + """Test that missing required keys raises KeyError.""" + operator = DimensionReduction("tsne", {"n_components": 2}) + + with pytest.raises(KeyError): + operator.run([{"embedding": [0.1, 0.2]}]) # Missing payload + + with pytest.raises(KeyError): + operator.run([{"payload": "x"}]) # Missing embedding + + def test_invalid_embeddings(self): + """Test that invalid embeddings raise appropriate errors.""" + operator = DimensionReduction("tsne", {"n_components": 2}) + + # Non-numeric embedding + with pytest.raises(ValueError): + operator.run([{"payload": "x", "embedding": ["a", "b", "c"]}]) + + # Embedding with NaN + with pytest.raises(ValueError): + operator.run([{"payload": "x", "embedding": [1.0, float("nan")]}]) + + # Embedding with inf + with pytest.raises(ValueError): + operator.run([{"payload": "x", "embedding": [1.0, float("inf")]}]) + + def test_single_point_error(self): + """Test that single point with high perplexity raises error.""" + operator = DimensionReduction("tsne", {"n_components": 2, "perplexity": 5}) + with pytest.raises(RuntimeError, match="must be less than n_samples"): + operator.run([{"payload": "x", "embedding": list(np.random.rand(50))}]) + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_operators(video_operator): + """Cleanup operators after each test.""" + yield + video_operator.cleanup()