diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index b5ba485d9..a9348e846 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -99,10 +99,10 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip wheel pytest-timeout - pip install ./stac_fastapi/core[catalogs] + pip install ./stac_fastapi/core pip install ./stac_fastapi/sfeos_helpers - pip install ./stac_fastapi/elasticsearch[dev,server] - pip install ./stac_fastapi/opensearch[dev,server] + pip install ./stac_fastapi/elasticsearch[catalogs,dev,server,validator] + pip install ./stac_fastapi/opensearch[catalogs,dev,server,validator] - name: Run test suite run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index d33e15e84..73c216750 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,10 +9,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- Added `ENABLE_STAC_VALIDATOR` environment variable to enable strict STAC schema validation on ingestion via the Python `stac-valid` package. [#732](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/732) +- Added `[validator]` installation extra to `stac-fastapi-core`, `elasticsearch`, and `opensearch` packages. [#732](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/732) - Added test for conformance endpoint in catalogs extension. [#727](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/727) ### Changed +- Updated STAC validation from batch_validator to FastValidator for improved performance using fastjsonschema. [#732](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/729) + ### Fixed ### Removed diff --git a/Makefile b/Makefile index 2ea5c76ae..4b91c0d35 100644 --- a/Makefile +++ b/Makefile @@ -75,6 +75,11 @@ test-elasticsearch-catalogs: image-es-os -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest extensions/test_catalogs.py -v' docker compose down +.PHONY: test-elasticsearch-validation +test-elasticsearch-validation: image-es-os + -$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest api/test_api_stac_validator.py -v' + docker compose down + .PHONY: test-opensearch test-opensearch: image-es-os -$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest' diff --git a/README.md b/README.md index f34658056..f57c93c98 100644 --- a/README.md +++ b/README.md @@ -31,13 +31,13 @@ The following organizations have contributed time and/or funding to support the - **11/07/2025:** 🌍 The SFEOS STAC Viewer is now available at: https://healy-hyperspatial.github.io/sfeos-web. Use this site to examine your data and test your STAC API! - **10/24/2025:** Added `previous_token` pagination using Redis for efficient navigation. This feature allows users to navigate backwards through large result sets by storing pagination state in Redis. To use this feature, ensure Redis is configured (see [Redis for navigation](#redis-for-navigation)) and set `REDIS_ENABLE=true` in your environment. - **10/23/2025:** The `EXCLUDED_FROM_QUERYABLES` environment variable was added to exclude fields from the `queryables` endpoint. See [docs](#excluding-fields-from-queryables). -- **10/15/2025:** 🚀 SFEOS Tools v0.1.0 Released! - The new `sfeos-tools` CLI is now available on [PyPI](https://pypi.org/project/sfeos-tools/) -- **10/15/2025:** Added `reindex` command to **[SFEOS-tools](https://github.com/Healy-Hyperspatial/sfeos-tools)** for zero-downtime index updates when changing mappings or settings. The new `reindex` command makes it easy to apply mapping changes, update index settings, or migrate to new index structures without any service interruption, ensuring high availability of your STAC API during maintenance operations.
View Older News (Click to Expand) ------------- +- **10/15/2025:** 🚀 SFEOS Tools v0.1.0 Released! - The new `sfeos-tools` CLI is now available on [PyPI](https://pypi.org/project/sfeos-tools/) +- **10/15/2025:** Added `reindex` command to **[SFEOS-tools](https://github.com/Healy-Hyperspatial/sfeos-tools)** for zero-downtime index updates when changing mappings or settings. The new `reindex` command makes it easy to apply mapping changes, update index settings, or migrate to new index structures without any service interruption, ensuring high availability of your STAC API during maintenance operations. - **10/12/2025:** Collections search **bbox** functionality added! The collections search extension now supports bbox queries. Collections will need to be updated via the API or with the new **[SFEOS-tools](https://github.com/Healy-Hyperspatial/sfeos-tools)** CLI package to support geospatial discoverability. 🙏 Thanks again to **CloudFerro** for their sponsorship of this work! - **10/04/2025:** The **[CloudFerro](https://cloudferro.com/)** logo has been added to the sponsors and supporters list above. Their sponsorship of the ongoing collections search extension work has been invaluable. This is in addition to the many other important changes and updates their developers have added to the project. - **09/25/2025:** v6.5.0 adds a new GET/POST /collections-search endpoint (disabled by default via ENABLE_COLLECTIONS_SEARCH_ROUTE) to avoid conflicts with the Transactions Extension, and enhances collections search with structured filtering (CQL2 JSON/text), query, and datetime filtering. These changes make collection discovery more powerful and configurable while preserving compatibility with transaction-enabled deployments. @@ -106,6 +106,7 @@ This project is built on the following technologies: STAC, stac-fastapi, FastAPI - [Using Pre-built Docker Images](#using-pre-built-docker-images) - [Using Docker Compose](#using-docker-compose) - [Configuration Reference](#configuration-reference) + - [STAC Validation](#stac-validation) - [Free-Text Search (`q` parameter)](#free-text-search-q-parameter) - [Queryables Endpoint](#queryables-endpoint) - [Root Queryables Configuration](#root-queryables-configuration) @@ -741,6 +742,7 @@ You can customize additional settings in your `.env` file: | `ENABLE_COLLECTIONS_SEARCH_ROUTE` | Enable the custom `/collections-search` endpoint (both GET and POST methods). When disabled, the custom endpoint will not be available, but collection search extensions will still be available on the core `/collections` endpoint if `ENABLE_COLLECTIONS_SEARCH` is true. | `false` | Optional | | `ENABLE_TRANSACTIONS_EXTENSIONS` | Enables or disables the Transactions and Bulk Transactions API extensions. This is useful for deployments where mutating the catalog via the API should be prevented. If set to `true`, the POST `/collections` route for search will be unavailable in the API. | `true` | Optional | | `ENABLE_CATALOGS_ROUTE` | Enable the **/catalogs** endpoint for hierarchical catalog browsing and navigation. **Note:** Requires the catalogs extension to be installed via `stac-fastapi-elasticsearch[catalogs]`, `stac-fastapi-opensearch[catalogs]`, or `stac-fastapi-core[catalogs]`. See [Catalogs Route](#catalogs-route) for installation instructions. | `false` | Optional | +| `ENABLE_STAC_VALIDATOR` | Enable [stac-validator](https://github.com/stac-utils/stac-validator) to validate STAC items and collections on ingestion. This is especially useful for items or collections that use extensions. | `false` | Optional | | `STAC_INDEX_ASSETS` | Controls if Assets are indexed when added to Elasticsearch/Opensearch. This allows asset fields to be included in search queries. | `false` | Optional | ### 5. Limits & Performance @@ -794,6 +796,71 @@ You can customize additional settings in your `.env` file: > [!NOTE] > The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, `ES_VERIFY_CERTS` and `ES_TIMEOUT` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch. +## STAC Validation + +STAC FastAPI provides a flexible, 2-tier validation architecture for STAC items and collections on ingestion. This ensures data quality and compliance with the STAC specification while allowing you to balance strict schema enforcement with high-throughput ingestion performance. + +### 1. Native Pydantic Validation (Always Enabled) + +By default, all STAC items and collections are validated using **Pydantic** (via `stac-pydantic`) at the API routing layer. This validation: + +- Enforces required STAC fields and correct data types. +- Validates spatial and temporal properties. +- Provides extremely fast, built-in validation without external dependencies. + +This validation is always enabled and happens automatically before data reaches the database or the Redis queue. + +### 2. Python STAC Validator + +If you require strict validation beyond Pydantic's type checking, you can enable the Python-based `stac-valid` package with FastValidator backend for improved performance. + +#### Enabling STAC Validator with FastValidator + +1. **Install the validator**: + ```bash + pip install stac-fastapi-core[validator] + # or + pip install stac-fastapi-elasticsearch[validator] + # or + pip install stac-fastapi-opensearch[validator] + ``` + +2. **Enable validation via environment variable**: + ```bash + export ENABLE_STAC_VALIDATOR=true + ``` + +When enabled, the STAC validator using FastValidator will: +- Validate items and collections against the official STAC JSON schemas using `fastjsonschema` for performance +- Check compliance with STAC extensions (e.g., EO, SAR, Projection) +- Catch schema violations that Pydantic doesn't enforce +- Provide detailed error messages grouped by validation error type +- Support batch validation of FeatureCollections with per-item error reporting + +#### Example: Validation in Action + +```bash +# Enable STAC validator +export ENABLE_STAC_VALIDATOR=true + +# Now POST/PUT requests will validate against STAC schemas +curl -X POST http://localhost:8000/collections \ + -H "Content-Type: application/json" \ + -d @collection.json +``` + +If validation fails, you'll receive a detailed error response: +```json +{ + "detail": "STAC validation failed: 'eo:bands' does not match any of the regexes: '^(?!eo:)'. Error is in assets -> SR_B2" +} +``` + +#### Performance Considerations + +- **Pydantic validation**: Very fast and always enabled +- **STAC validator with FastValidator** (`ENABLE_STAC_VALIDATOR`): Uses `fastjsonschema` for efficient schema validation with batch processing support for FeatureCollections + ## Free-Text Search (`q` parameter) The free-text search feature allows users to discover items and collections using keywords or phrases. By default, the search targets core fields: `id`, `collection`, `properties.title`, `properties.description`, and `properties.keywords`. diff --git a/compose.yml b/compose.yml index bdbd63a1b..5fd1b2c85 100644 --- a/compose.yml +++ b/compose.yml @@ -24,6 +24,7 @@ services: - DATABASE_REFRESH=true - ENABLE_COLLECTIONS_SEARCH_ROUTE=true - ENABLE_CATALOGS_ROUTE=true + - ENABLE_STAC_VALIDATOR=true - REDIS_ENABLE=true - REDIS_HOST=redis - REDIS_PORT=6379 @@ -34,8 +35,10 @@ services: - ./scripts:/app/scripts - ./esdata:/usr/share/elasticsearch/data depends_on: - - elasticsearch - - redis + elasticsearch: + condition: service_started + redis: + condition: service_started command: bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app" @@ -65,6 +68,7 @@ services: - STAC_FASTAPI_RATE_LIMIT=200/minute - ENABLE_COLLECTIONS_SEARCH_ROUTE=true - ENABLE_CATALOGS_ROUTE=true + - ENABLE_STAC_VALIDATOR=true - REDIS_ENABLE=true - REDIS_HOST=redis - REDIS_PORT=6379 @@ -75,8 +79,10 @@ services: - ./scripts:/app/scripts - ./osdata:/usr/share/opensearch/data depends_on: - - opensearch - - redis + elasticsearch: + condition: service_started + redis: + condition: service_started command: bash -c "./scripts/wait-for-it-es.sh os-container:9202 && python -m stac_fastapi.opensearch.app" diff --git a/dockerfiles/Dockerfile.dev.es-os b/dockerfiles/Dockerfile.dev.es-os index 46575c46d..f1f1ae7ee 100644 --- a/dockerfiles/Dockerfile.dev.es-os +++ b/dockerfiles/Dockerfile.dev.es-os @@ -14,5 +14,5 @@ COPY . /app RUN pip install --no-cache-dir -e ./stac_fastapi/core RUN pip install --no-cache-dir -e ./stac_fastapi/sfeos_helpers -RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,catalogs] -RUN pip install --no-cache-dir -e ./stac_fastapi/opensearch[dev,server,catalogs] +RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,catalogs,validator] +RUN pip install --no-cache-dir -e ./stac_fastapi/opensearch[dev,server,catalogs,validator] diff --git a/scripts/item_queue_worker.py b/scripts/item_queue_worker.py index bf3ae471d..7be8b20f4 100755 --- a/scripts/item_queue_worker.py +++ b/scripts/item_queue_worker.py @@ -21,6 +21,8 @@ from redis.exceptions import LockError from stac_fastapi.core.redis_utils import AsyncRedisQueueManager, ItemQueueSettings +from stac_fastapi.core.utilities import get_bool_env +from stac_fastapi.core.validate import async_validate_with_fast_validator logger = logging.getLogger(__name__) @@ -167,6 +169,10 @@ async def _flush_collection(self, collection_id: str) -> None: The lock TTL is periodically refreshed by a background task to prevent expiration during long-running batch processing. + + If strict validation is enabled via `ENABLE_STAC_VALIDATOR`, items are + validated using FastValidator before database insertion. Invalid items are + routed to the Dead Letter Queue (DLQ), and only valid items are inserted. """ state = self._get_state(collection_id) @@ -206,54 +212,106 @@ async def _flush_collection(self, collection_id: str) -> None: break batch_num += 1 - item_ids = [item["id"] for item in items] logger.info( - f"Collection '{collection_id}' batch #{batch_num}: flushing {len(items)} items" + f"Collection '{collection_id}' batch #{batch_num}: pulled {len(items)} items from queue" ) + # VALIDATION LAYER: Use batch validation for efficiency (if enabled) + if get_bool_env("ENABLE_STAC_VALIDATOR"): + ( + valid_items, + validation_errors, + ) = await async_validate_with_fast_validator(items) + + # Extract invalid item IDs from grouped validation errors + invalid_item_ids = set() + for error_msg, item_ids in validation_errors.items(): + for item_id in item_ids: + invalid_item_ids.add(item_id) + logger.error( + f"Worker validation failed for '{item_id}' in collection '{collection_id}': {error_msg}" + ) + else: + # Skip STAC validation when disabled + valid_items = items + invalid_item_ids = set() + + # Handle invalid items (Dead Letter Queue) + if invalid_item_ids: + try: + await self.queue_manager.save_failed_items( + collection_id, list(invalid_item_ids) + ) + await self.queue_manager.mark_items_processed( + collection_id, list(invalid_item_ids) + ) + except Exception: + logger.exception( + f"Collection '{collection_id}': failed to save {len(invalid_item_ids)} invalid items to DLQ" + ) + + # If entire batch was invalid, skip database call + if not valid_items: + logger.warning( + f"Collection '{collection_id}' batch #{batch_num}: All {len(items)} items failed STAC validation. Skipping DB insert." + ) + state.last_flush_time = time.monotonic() + if len(items) < batch_size: + break + continue + + # DATABASE INSERTION: Only valid items reach the database try: success, errors = await self.db.bulk_async( collection_id=collection_id, - processed_items=items, + processed_items=valid_items, op_type="index", ) except Exception: logger.exception( - f"Collection '{collection_id}' batch #{batch_num}: bulk_async failed ({len(items)} items)" + f"Collection '{collection_id}' batch #{batch_num}: bulk_async failed ({len(valid_items)} valid items)" ) break - failed_ids = self._extract_failed_item_ids(errors) if errors else set() - successful_ids = [iid for iid in item_ids if iid not in failed_ids] + # Handle database errors + failed_db_ids = ( + self._extract_failed_item_ids(errors) if errors else set() + ) + successful_db_ids = [ + item["id"] + for item in valid_items + if item["id"] not in failed_db_ids + ] if errors: logger.error( f"Collection '{collection_id}' batch #{batch_num}: " - f"{len(failed_ids)} item(s) failed, saving to DLQ. " + f"{len(failed_db_ids)} DB insert(s) failed, saving to DLQ. " f"Bulk errors: {errors}" ) - if successful_ids: + if successful_db_ids: await self.queue_manager.mark_items_processed( - collection_id, successful_ids + collection_id, successful_db_ids ) - if failed_ids: + if failed_db_ids: try: await self.queue_manager.save_failed_items( - collection_id, list(failed_ids) + collection_id, list(failed_db_ids) ) await self.queue_manager.mark_items_processed( - collection_id, list(failed_ids) + collection_id, list(failed_db_ids) ) except Exception: logger.exception( - f"Collection '{collection_id}': failed to save {len(failed_ids)} item(s) to DLQ; items remain in pending queue" + f"Collection '{collection_id}': failed to save {len(failed_db_ids)} DB failures to DLQ" ) logger.info( - f"Collection '{collection_id}' batch #{batch_num}: {success} succeeded, {len(errors)} errors" + f"Collection '{collection_id}' batch #{batch_num}: {success} succeeded DB insert, " + f"{len(invalid_item_ids)} failed STAC validation, {len(failed_db_ids)} failed DB insert." ) state.last_flush_time = time.monotonic() diff --git a/stac_fastapi/core/pyproject.toml b/stac_fastapi/core/pyproject.toml index 1e03ed798..956ff324f 100644 --- a/stac_fastapi/core/pyproject.toml +++ b/stac_fastapi/core/pyproject.toml @@ -56,6 +56,9 @@ sentry = [ catalogs = [ "stac-fastapi-catalogs-extension==0.2.0", ] +validator = [ + "stac-valid~=4.2.2" +] [project.urls] Homepage = "https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch" diff --git a/stac_fastapi/core/stac_fastapi/core/core.py b/stac_fastapi/core/stac_fastapi/core/core.py index 10f20d43f..99dc50b1d 100644 --- a/stac_fastapi/core/stac_fastapi/core/core.py +++ b/stac_fastapi/core/stac_fastapi/core/core.py @@ -35,6 +35,10 @@ ) from stac_fastapi.core.session import Session from stac_fastapi.core.utilities import filter_fields, get_bool_env +from stac_fastapi.core.validate import ( + async_validate_stac, + async_validate_with_fast_validator, +) from stac_fastapi.extensions.core.transaction import AsyncBaseTransactionsClient from stac_fastapi.extensions.core.transaction.request import ( PartialCollection, @@ -995,129 +999,351 @@ class TransactionsClient(AsyncBaseTransactionsClient): settings: ApiBaseSettings = attr.ib() session: Session = attr.ib(default=attr.Factory(Session.create_from_env)) + async def _validate_single_item(self, item_dict: dict) -> None: + """Validate a single STAC item. + + Args: + item_dict: The item dictionary to validate. + + Raises: + HTTPException: If validation fails. + """ + try: + await async_validate_stac(item_dict) + except (ValidationError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid item: {e}") + + async def _validate_feature_collection( + self, features: list[dict], skip_validation: bool + ) -> tuple[list[dict], dict[str, list[str]]]: + """Validate a collection of STAC features. + + Args: + features: List of feature dictionaries to validate. + skip_validation: Whether to skip validation (e.g., when using queue). + + Returns: + Tuple of (valid_features, validation_errors) where validation_errors + maps error messages to lists of affected item IDs. + """ + if get_bool_env("ENABLE_STAC_VALIDATOR") and not skip_validation: + return await async_validate_with_fast_validator(features) + return features, {} + @overrides async def create_item( self, collection_id: str, item: Item | ItemCollection, **kwargs - ) -> stac_types.Item | str: - """ - Create an item or a feature collection of items in the specified collection. + ) -> stac_types.Item | str | dict: + """Create an item or a feature collection of items in the specified collection. + + Acts as a traffic router, inspecting the payload type and delegating to the + appropriate single-item or bulk-item processing pipeline. Args: collection_id (str): The ID of the collection to add the item(s) to. - item (Item | ItemCollection): A single item or a collection of items to be added. - **kwargs: Additional keyword arguments, such as `request` and `refresh`. + item (Item | ItemCollection): A single item or a collection of items. + **kwargs: Additional keyword arguments, such as `request`. Returns: - stac_types.Item | str: The created item if a single item is added, or a summary string - indicating the number of items successfully added and errors if a collection of items is added. + stac_types.Item | str | dict: + - Single item (DB): The created `Item` object. + - Single item (Queue): A success string. + - FeatureCollection: A dictionary summarizing successes, failures, and duplicates. Raises: - NotFoundError: If the specified collection is not found in the database. - ConflictError: If an item with the same ID already exists in the collection. + HTTPException: If payload validation or bulk database insertion fails. """ request = kwargs.get("request") base_url = str(request.base_url) - - # Convert Pydantic model to dict for uniform processing item_dict = item.model_dump(mode="json") - - # Check if Redis queue is enabled for async item processing use_queue = get_bool_env("ENABLE_REDIS_QUEUE", default=False) - # Handle FeatureCollection (bulk insert) - if item_dict["type"] == "FeatureCollection": - bulk_client = BulkTransactionsClient( - database=self.database, settings=self.settings + # Route the request to the dedicated handler + if item_dict.get("type") == "FeatureCollection": + return await self._create_feature_collection( + collection_id, item_dict, base_url, use_queue, **kwargs + ) + else: + return await self._create_single_item( + collection_id, item_dict, base_url, use_queue, **kwargs ) - features = item_dict["features"] - processed_items = [ - bulk_client.preprocess_item(feature, base_url) for feature in features - ] - # Deduplicate items within the batch by ID (keep last occurrence) - seen_ids: dict = {} - for item in processed_items: - seen_ids[item["id"]] = item - unique_items = list(seen_ids.values()) - skipped_batch_duplicates = len(processed_items) - len(unique_items) - processed_items = unique_items + async def _create_single_item( + self, + collection_id: str, + item_dict: dict, + base_url: str, + use_queue: bool, + **kwargs, + ) -> stac_types.Item | str: + """Handle the ingestion pipeline for a single STAC Item. - attempted = len(processed_items) + Executes preprocessing, optional STAC schema validation, and routes the + item to either the Redis queue or the database. - if not processed_items: - return f"No items to insert. {skipped_batch_duplicates} items were skipped (duplicates)." + Args: + collection_id (str): The ID of the destination collection. + item_dict (dict): The raw dictionary representation of the item. + base_url (str): The base URL of the incoming request. + use_queue (bool): Whether to push to Redis instead of the database. + **kwargs: Additional arguments passed through the API. - if use_queue: - from stac_fastapi.core.redis_utils import AsyncRedisQueueManager + Returns: + stac_types.Item | str: The created STAC item (if DB insert) or a success string (if Queued). - queue_manager = await AsyncRedisQueueManager.create() - try: - queue_len = await queue_manager.queue_items( - collection_id, processed_items - ) - logger.info( - f"Queued {len(processed_items)} items for collection '{collection_id}'. " - f"Queue length: {queue_len}" - ) - return f"Successfully queued {len(processed_items)} items for processing." - finally: - await queue_manager.close() + Raises: + HTTPException: If preprocessing fails or the item is a duplicate. + ValueError: If strict STAC validation fails. + """ + # 1. PREPROCESSING LAYER + bulk_client = BulkTransactionsClient( + database=self.database, settings=self.settings + ) + preprocessed_item = bulk_client.preprocess_item(item_dict, base_url) - success, errors = await self.database.bulk_async( - collection_id=collection_id, - processed_items=processed_items, - op_type="create", - **kwargs, + if preprocessed_item is None: + raise HTTPException( + status_code=400, detail="Item preprocessing failed or duplicate." ) - conflict_errors, other_errors = separate_bulk_conflict_errors(errors) - if conflict_errors and get_bool_env("RAISE_ON_BULK_ERROR"): - doc_id = next(iter(conflict_errors[0].values())).get("_id", "") - item_id = doc_id.split("|")[0] if "|" in doc_id else doc_id - raise ItemAlreadyExistsError( - item_id=item_id, collection_id=collection_id - ) - if other_errors: - logger.error( - f"Bulk async operation encountered errors for collection {collection_id}: {other_errors} (attempted {attempted})" - ) - if get_bool_env("RAISE_ON_BULK_ERROR"): - raise BulkIndexError( - errors=other_errors, collection_id=collection_id - ) - else: - logger.info( - f"Bulk async operation succeeded with {success} actions for collection {collection_id}." - ) - total_skipped = skipped_batch_duplicates + len(conflict_errors) - return f"Successfully added {success} Items. {total_skipped} skipped (duplicates). {len(other_errors)} errors occurred." + # 2. VALIDATION LAYER + if get_bool_env("ENABLE_STAC_VALIDATOR") and not use_queue: + await self._validate_single_item(preprocessed_item) + + # 3. ROUTING LAYER (Queue vs Database) if use_queue: - from stac_fastapi.core.redis_utils import AsyncRedisQueueManager + from stac_fastapi.core.utilities import queue_items_if_enabled - bulk_client = BulkTransactionsClient( - database=self.database, settings=self.settings + result = await queue_items_if_enabled( + collection_id, preprocessed_item, item_ids=item_dict.get("id") ) - processed_item = bulk_client.preprocess_item(item_dict, base_url) + if result: + return result + + # 4. DATABASE INSERTION LAYER + await self.database.create_item( + preprocessed_item, base_url=base_url, upsert=False, **kwargs + ) + return ItemSerializer.db_to_stac(preprocessed_item, base_url) + + async def _create_feature_collection( + self, + collection_id: str, + item_dict: dict, + base_url: str, + use_queue: bool, + **kwargs, + ) -> dict: + """Handle high-throughput ingestion pipeline for a bulk FeatureCollection. + + Executes payload deduplication, preprocessing, concurrent STAC schema validation, + and safely routes the batch to either the Redis queue or the database while + grouping and formatting any errors encountered. + + Args: + collection_id (str): The ID of the destination collection. + item_dict (dict): The FeatureCollection payload. + base_url (str): The base URL of the incoming request. + use_queue (bool): Whether to push to Redis instead of the database. + **kwargs: Additional arguments passed through the API. + + Returns: + dict: A detailed response payload containing overall messages, successfully + added item IDs, validation errors, database errors, and conflict errors. + + Raises: + HTTPException: If no valid items remain to be inserted, or if the batch is + rejected under strict mode (RAISE_ON_BULK_ERROR=True). + """ + try: + await self.database.find_collection(collection_id=collection_id) + except Exception: + raise HTTPException( + status_code=404, + detail=f"Collection '{collection_id}' not found", + ) + + raw_features = item_dict.get("features", []) + logger.info( + f"Processing FeatureCollection with {len(raw_features)} features for collection {collection_id}" + ) + + # 1. DEDUPLICATION LAYER + COLLECTION INJECTION + seen_ids: dict = {} + for feature in raw_features: + feature_id = feature.get("id") + if feature_id is not None: + # Inject collection ID if not present + if "collection" not in feature: + feature["collection"] = collection_id + logger.debug( + f"Injected collection '{collection_id}' into item {feature_id}" + ) + seen_ids[feature_id] = feature + unique_features = list(seen_ids.values()) + skipped_batch_duplicates = len(raw_features) - len(unique_features) + + logger.info( + f"Input features: {len(raw_features)}, Unique features after dedup: {len(unique_features)}, Skipped: {skipped_batch_duplicates}" + ) + if skipped_batch_duplicates > 0: + logger.info(f"Unique feature IDs: {list(seen_ids.keys())}") + + raise_on_error = get_bool_env("RAISE_ON_BULK_ERROR", default=False) + + # 2. PREPROCESSING LAYER + bulk_client = BulkTransactionsClient( + database=self.database, settings=self.settings + ) + processed_items = [] + skipped_db_duplicates = 0 - queue_manager = await AsyncRedisQueueManager.create() + for feature in unique_features: try: - queue_len = await queue_manager.queue_items( - collection_id, processed_item + prepped = bulk_client.preprocess_item(feature, base_url) + if prepped is not None: + processed_items.append(prepped) + else: + skipped_db_duplicates += 1 + except Exception as e: + logger.warning( + f"Failed to preprocess item {feature.get('id', 'unknown')}: {e}" + ) + skipped_db_duplicates += 1 + + # 3. VALIDATION LAYER + if use_queue: + valid_items = processed_items + validation_errors: dict[str, list[str]] = {} + validation_error_count = 0 + else: + valid_items, validation_errors = await self._validate_feature_collection( + processed_items, skip_validation=False + ) + validation_error_count = sum( + len(item_ids) if isinstance(item_ids, list) else 1 + for item_ids in validation_errors.values() + ) + + # Strict mode rejections + if validation_errors and raise_on_error: + raise HTTPException( + status_code=400, + detail={ + "message": f"Batch rejected. {validation_error_count} items failed validation.", + "errors": validation_errors, + }, ) - logger.info( - f"Queued item '{item_dict.get('id')}' for collection '{collection_id}'. " - f"Queue length: {queue_len}" + if not valid_items and raise_on_error: + total_skipped = ( + skipped_batch_duplicates + + skipped_db_duplicates + + validation_error_count ) - return ( - f"Successfully queued item '{item_dict.get('id')}' for processing." + raise HTTPException( + status_code=400, + detail={ + "message": f"No valid items to insert. Skipped {total_skipped} items.", + "validation_errors": validation_errors, + }, ) - finally: - await queue_manager.close() - await self.database.create_item( - item_dict, base_url=base_url, upsert=False, **kwargs + # 4. ROUTING LAYER (Queue vs Database) + if use_queue: + from stac_fastapi.core.utilities import queue_items_if_enabled + + result = await queue_items_if_enabled(collection_id, valid_items) + if result: + if validation_error_count > 0 or skipped_batch_duplicates > 0: + result += f" (Skipped {validation_error_count} invalid, {skipped_batch_duplicates} duplicates)" + # Wrap the string response in a dict so the return type is consistent + return {"message": result} + + # 5. DATABASE INSERTION LAYER + success, errors = await self.database.bulk_async( + collection_id=collection_id, + processed_items=valid_items, + op_type="create", + **kwargs, ) - return ItemSerializer.db_to_stac(item_dict, base_url) + + # 6. RESPONSE FORMATTING + conflict_errors, other_errors = separate_bulk_conflict_errors(errors) + failed_item_ids = set() + + for error in errors: + doc_id = next(iter(error.values())).get("_id", "") + failed_item_ids.add(doc_id.split("|")[0] if "|" in doc_id else doc_id) + + successfully_added_ids = [ + item.get("id") + for item in valid_items + if item.get("id") not in failed_item_ids + ] + + if conflict_errors and raise_on_error: + doc_id = next(iter(conflict_errors[0].values())).get("_id", "") + item_id = doc_id.split("|")[0] if "|" in doc_id else doc_id + raise ItemAlreadyExistsError(item_id=item_id, collection_id=collection_id) + + if other_errors and raise_on_error: + raise BulkIndexError(errors=other_errors, collection_id=collection_id) + + def format_conflict_errors(conflicts): + conflict_details = {} + for c in conflicts: + doc_id = next(iter(c.values())).get("_id", "") + if "|" in doc_id: + item_id, coll_id = doc_id.split("|", 1) + conflict_details[ + item_id + ] = f"Item '{item_id}' already exists in collection '{coll_id}'" + else: + conflict_details[doc_id] = f"Item '{doc_id}' already exists" + return conflict_details + + total_skipped = ( + skipped_batch_duplicates + + skipped_db_duplicates + + validation_error_count + + len(conflict_errors) + ) + + if success == 0: + raise HTTPException( + status_code=400, + detail={ + "message": f"No items were added. {total_skipped} skipped. {len(other_errors)} errors.", + "validation_errors": validation_errors, + "conflict_errors": format_conflict_errors(conflict_errors) + if conflict_errors + else {}, + }, + ) + + message_parts = [f"Processed {len(raw_features)} items: {success} added"] + if skipped_batch_duplicates > 0: + message_parts.append(f"{skipped_batch_duplicates} input duplicates") + if validation_error_count > 0: + message_parts.append(f"{validation_error_count} validation errors") + if skipped_db_duplicates > 0: + message_parts.append(f"{skipped_db_duplicates} preprocessing skipped") + if len(conflict_errors) > 0: + message_parts.append(f"{len(conflict_errors)} conflicts") + if len(other_errors) > 0: + message_parts.append(f"{len(other_errors)} database errors") + + response: dict = {"message": " | ".join(message_parts)} + if successfully_added_ids: + response["successfully_added"] = successfully_added_ids + if validation_errors: + response["validation_errors"] = validation_errors + if conflict_errors: + response["conflict_errors"] = format_conflict_errors(conflict_errors) + if other_errors: + response["database_errors"] = other_errors + + return response @overrides async def update_item( @@ -1146,27 +1372,28 @@ async def update_item( use_queue = get_bool_env("ENABLE_REDIS_QUEUE", default=False) + # Handle inline imports once to keep code DRY if use_queue: - from stac_fastapi.core.redis_utils import AsyncRedisQueueManager + from stac_fastapi.core.utilities import queue_items_if_enabled + # PATH A: REDIS QUEUE ENABLED + # Skip validation, push raw item to Redis immediately + if use_queue: bulk_client = BulkTransactionsClient( database=self.database, settings=self.settings ) processed_item = bulk_client.preprocess_item(item_dict, base_url) - queue_manager = await AsyncRedisQueueManager.create() - try: - queue_len = await queue_manager.queue_items( - collection_id, processed_item - ) - logger.info( - f"Queued update for item '{item_id}' in collection '{collection_id}'. " - f"Queue length: {queue_len}" - ) - finally: - await queue_manager.close() + result = await queue_items_if_enabled( + collection_id, processed_item, item_ids=item_id + ) + if result: + return result - return ItemSerializer.db_to_stac(item_dict, base_url) + # PATH B: DIRECT DATABASE INSERTION (No Queue) + # Validate before database insertion + if get_bool_env("ENABLE_STAC_VALIDATOR"): + await self._validate_single_item(item_dict) await self.database.create_item( item_dict, base_url=base_url, upsert=True, **kwargs @@ -1262,6 +1489,13 @@ async def create_collection( Raises: ConflictError: If the collection already exists. """ + # Validate collection + if get_bool_env("ENABLE_STAC_VALIDATOR"): + try: + await async_validate_stac(collection, pydantic_model=Collection) + except (ValidationError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid collection: {e}") + collection = collection.model_dump(mode="json") request = kwargs["request"] @@ -1296,6 +1530,12 @@ async def update_collection( A STAC collection that has been updated in the database. """ + # Validate collection + try: + await async_validate_stac(collection, pydantic_model=Collection) + except (ValidationError, ValueError) as e: + raise HTTPException(status_code=400, detail=f"Invalid collection: {e}") + collection = collection.model_dump(mode="json") request = kwargs["request"] @@ -1401,7 +1641,7 @@ def __attrs_post_init__(self): """Create es engine.""" self.client = self.settings.create_client - def preprocess_item(self, item: stac_types.Item, base_url) -> stac_types.Item: + def preprocess_item(self, item: stac_types.Item, base_url: str) -> stac_types.Item: """Preprocess an item to match the data model. Args: @@ -1426,6 +1666,7 @@ def bulk_item_insert( Returns: A string indicating the number of items successfully added. + On error, raises HTTPException with detailed error information. """ request = kwargs.get("request") @@ -1443,29 +1684,53 @@ def bulk_item_insert( # Determine op_type from bulk transaction method op_type = "index" if items.method == BulkTransactionMethod.UPSERT else "create" - processed_items = [] - for item in items.items.values(): - try: - validated = Item(**item) if not isinstance(item, Item) else item - prepped = self.preprocess_item( - validated.model_dump(mode="json"), base_url - ) - processed_items.append(prepped) - except ValidationError: - # Immediately raise on the first invalid item (strict mode) - raise + # Convert Pydantic models to raw dictionaries for uniform processing + raw_items = [ + item.model_dump(mode="json") if hasattr(item, "model_dump") else item + for item in items.items.values() + ] - # Deduplicate items within the batch by ID (keep last occurrence) + # 1. DEDUPLICATE FIRST + # Doing this before validation saves us from validating the exact same STAC item twice seen_ids: dict = {} - for item in processed_items: - seen_ids[item["id"]] = item + for item in raw_items: + item_id = item.get("id") + if item_id is not None: + seen_ids[item_id] = item + unique_items = list(seen_ids.values()) - skipped_batch_duplicates = len(processed_items) - len(unique_items) - processed_items = unique_items + skipped_batch_duplicates = len(raw_items) - len(unique_items) - if not processed_items: + if not unique_items: return f"No items to insert. {skipped_batch_duplicates} items were skipped (duplicates)." + # 2. VALIDATION LAYER (Use FastValidator for efficiency) + validation_errors: dict[str, list[str]] = {} + validation_error_count = 0 + if get_bool_env("ENABLE_STAC_VALIDATOR"): + from stac_fastapi.core.validate import validate_with_fast_validator + + valid_items, validation_errors = validate_with_fast_validator(unique_items) + + # Count total validation errors (validation_errors maps error_msg -> [item_ids]) + validation_error_count = sum( + len(item_ids) if isinstance(item_ids, list) else 1 + for item_ids in validation_errors.values() + ) + else: + valid_items = unique_items + + # 3. PREPROCESSING LAYER + processed_items = [] + for item in valid_items: + prepped = self.preprocess_item(item, base_url) + if prepped is not None: + processed_items.append(prepped) + + if not processed_items: + return f"No items to insert after preprocessing. Skipped {skipped_batch_duplicates} duplicates." + + # 4. DATABASE INSERTION LAYER collection_id = processed_items[0]["collection"] success, errors = self.database.bulk_sync( collection_id, @@ -1473,16 +1738,55 @@ def bulk_item_insert( op_type=op_type, **kwargs, ) + conflict_errors, other_errors = separate_bulk_conflict_errors(errors) + if conflict_errors and get_bool_env("RAISE_ON_BULK_ERROR"): doc_id = next(iter(conflict_errors[0].values())).get("_id", "") item_id = doc_id.split("|")[0] if "|" in doc_id else doc_id raise ItemAlreadyExistsError(item_id=item_id, collection_id=collection_id) + if other_errors: logger.error(f"Bulk sync operation encountered errors: {other_errors}") if get_bool_env("RAISE_ON_BULK_ERROR"): raise BulkIndexError(errors=other_errors, collection_id=collection_id) else: logger.info(f"Bulk sync operation succeeded with {success} actions.") - total_skipped = skipped_batch_duplicates + len(conflict_errors) - return f"Successfully added/updated {success} Items. {total_skipped} skipped (duplicates). {len(other_errors)} errors occurred." + + # Format conflict errors for response + def format_conflict_errors(conflicts): + conflict_details = {} + for c in conflicts: + doc_id = next(iter(c.values())).get("_id", "") + if "|" in doc_id: + item_id, coll_id = doc_id.split("|", 1) + conflict_details[ + item_id + ] = f"Item '{item_id}' already exists in collection '{coll_id}'" + else: + conflict_details[doc_id] = f"Item '{doc_id}' already exists" + return conflict_details + + total_skipped = ( + skipped_batch_duplicates + len(conflict_errors) + validation_error_count + ) + + # Build response with validation, conflict and error details + response: dict = { + "message": f"Successfully added/updated {success} Items. {total_skipped} skipped ({skipped_batch_duplicates} duplicates, {validation_error_count} validation errors, {len(conflict_errors)} conflicts). {len(other_errors)} errors occurred." + } + if validation_errors: + response["validation_errors"] = validation_errors + if conflict_errors: + response["conflict_errors"] = format_conflict_errors(conflict_errors) + if other_errors: + response["database_errors"] = other_errors + + # If no items were successfully added, return as error response + if success == 0: + raise HTTPException( + status_code=400, + detail=response, + ) + + return orjson.dumps(response).decode("utf-8") diff --git a/stac_fastapi/core/stac_fastapi/core/serializers.py b/stac_fastapi/core/stac_fastapi/core/serializers.py index 84764d3a8..8ef759499 100644 --- a/stac_fastapi/core/stac_fastapi/core/serializers.py +++ b/stac_fastapi/core/stac_fastapi/core/serializers.py @@ -155,7 +155,27 @@ def stac_to_db(cls, stac_data: stac_types.Item, base_url: str) -> dict: Returns: A dictionary representation of the item ready for database insertion. """ + # Add required STAC v1.0.0 collection link if item has collection field item_links = resolve_links(stac_data.get("links", []), base_url) + + # Ensure collection link exists (required by STAC v1.0.0 spec) + if stac_data.get("collection"): + collection_id = stac_data["collection"] + + # Check if collection link already exists + has_collection_link = any( + link.get("rel") == "collection" for link in item_links + ) + + if not has_collection_link: + # Add collection link + collection_link = { + "rel": "collection", + "href": f"{base_url}collections/{collection_id}", + "type": "application/json", + } + item_links.append(collection_link) + stac_data["links"] = item_links if get_bool_env("STAC_INDEX_ASSETS"): diff --git a/stac_fastapi/core/stac_fastapi/core/utilities.py b/stac_fastapi/core/stac_fastapi/core/utilities.py index 28e0f62ab..ead31b1a0 100644 --- a/stac_fastapi/core/stac_fastapi/core/utilities.py +++ b/stac_fastapi/core/stac_fastapi/core/utilities.py @@ -14,6 +14,9 @@ MAX_LIMIT = 10000 +logger = logging.getLogger(__name__) + + def get_bool_env(name: str, default: bool | str = False) -> bool: """ Retrieve a boolean value from an environment variable. @@ -273,3 +276,48 @@ def get_excluded_from_items(obj: dict, field_path: str) -> None: return current.pop(final, None) + + +async def queue_items_if_enabled( + collection_id: str, + items: dict | list[dict], + item_ids: str | list[str] | None = None, +) -> str | None: + """Queue items to Redis if ENABLE_REDIS_QUEUE is set. + + Handles both single items and bulk items. Returns a status message if queuing + was performed, or None if queuing is disabled. + + Args: + collection_id: The collection ID to queue items for. + items: Single item dict or list of item dicts to queue. + item_ids: Optional item ID(s) for logging. If not provided, extracted from items. + + Returns: + Status message if items were queued, None if queuing is disabled. + + Raises: + Exception: Any exception from the queue manager is propagated. + """ + if not get_bool_env("ENABLE_REDIS_QUEUE", default=False): + return None + + from stac_fastapi.core.redis_utils import AsyncRedisQueueManager + + queue_manager = await AsyncRedisQueueManager.create() + try: + queue_len = await queue_manager.queue_items(collection_id, items) + + # Format logging message based on whether single or bulk items + if isinstance(items, list): + count = len(items) + return f"Successfully queued {count} items for processing." + else: + item_id = item_ids or items.get("id", "unknown") + logger.info( + f"Queued item '{item_id}' for collection '{collection_id}'. " + f"Queue length: {queue_len}" + ) + return f"Successfully queued item '{item_id}' for processing." + finally: + await queue_manager.close() diff --git a/stac_fastapi/core/stac_fastapi/core/validate.py b/stac_fastapi/core/stac_fastapi/core/validate.py new file mode 100644 index 000000000..390c3ced3 --- /dev/null +++ b/stac_fastapi/core/stac_fastapi/core/validate.py @@ -0,0 +1,346 @@ +"""STAC validation module. + +Provides validation for STAC items and collections using multiple validation backends: +- Pydantic validation (always enabled) +- Python STAC Validator with FastValidator (fast JSON schema validation) +""" + +import asyncio +import json +import logging +import os +import tempfile +import threading + +from stac_pydantic import Collection, Item + +from stac_fastapi.core.utilities import get_bool_env + +logger = logging.getLogger(__name__) + +# Suppress verbose logging from stac_validator +logging.getLogger("stac_validator.utilities").setLevel(logging.WARNING) + +# Global instances to cache validators and avoid repeated initialization +_fast_validator_instance = None +_validator_lock = threading.Lock() + + +def _get_fast_validator(): + """Get or create the singleton FastValidator instance. + + Initializes and caches a single FastValidator instance for efficient + JSON schema validation using fastjsonschema. + Uses double-checked locking for thread safety. + + Returns: + The FastValidator instance. + + Raises: + ImportError: If stac-validator is not installed and ENABLE_STAC_VALIDATOR is true. + """ + # Only attempt import if validation is enabled + if not get_bool_env("ENABLE_STAC_VALIDATOR"): + return None + + global _fast_validator_instance + if _fast_validator_instance is None: + with _validator_lock: + if _fast_validator_instance is None: + try: + from stac_validator.fast_validator import FastValidator + + _fast_validator_instance = FastValidator + except ImportError as e: + logger.error("stac_validator FastValidator not available") + raise ImportError( + "STAC validator FastValidator is not installed. " + "Install it with: pip install stac-fastapi-core[validator] " + "or pip install stac-fastapi-elasticsearch[validator] " + "or pip install stac-fastapi-opensearch[validator]" + ) from e + return _fast_validator_instance + + +def validate_with_fast_validator( + items: list[dict], +) -> tuple[list[dict], dict[str, list[str]]]: + """Validate STAC items using FastValidator. + + Uses fastjsonschema for efficient JSON schema validation. + Validates items as a FeatureCollection for better performance, + then separates valid items from invalid ones. + + Args: + items: List of STAC item dictionaries to validate. + + Returns: + Tuple of (valid_items_list, invalid_items_dict) where invalid_items_dict + maps error messages to lists of affected item IDs. + """ + FastValidator = _get_fast_validator() + + if FastValidator is None: + # Validation disabled + return items, {} + + valid_items = [] + invalid_items = {} + errors_by_message: dict[str, list[str]] = {} + + try: + # Create a FeatureCollection from the items for batch validation + feature_collection = { + "type": "FeatureCollection", + "features": items, + } + + # Create a temporary file to write the FeatureCollection JSON + with tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False + ) as tmp_file: + json.dump(feature_collection, tmp_file) + tmp_file_path = tmp_file.name + + try: + # Validate the entire FeatureCollection using FastValidator + logger.info(f"Validating FeatureCollection with {len(items)} items") + validator = FastValidator(tmp_file_path, quiet=True, verbose=False) + validator.run() + + # Check if there are any errors in the message, even if validator.valid is True + # (FastValidator might report valid=True for the FeatureCollection structure + # but have errors for individual items) + has_errors = False + if hasattr(validator, "message") and validator.message: + message_data = validator.message[0] + errors_list = message_data.get("errors", []) + has_errors = len(errors_list) > 0 + + if validator.valid and not has_errors: + # All items are valid + logger.info(f"All {len(items)} items are valid") + valid_items = items + else: + # FeatureCollection validation failed - extract per-item errors + # FastValidator stores error information in self.message + logger.warning( + "FeatureCollection validation failed, extracting per-item errors" + ) + + # FastValidator stores errors in self.message[0]["errors"] + if hasattr(validator, "message") and validator.message: + message_data = validator.message[0] + errors_list = message_data.get("errors", []) + + # Process error breakdown from FastValidator + logger.info(f"Found {len(errors_list)} error types") + for error_entry in errors_list: + err_msg = error_entry.get( + "error_message", "STAC validation failed" + ) + affected_items = error_entry.get("affected_items", []) + + logger.warning( + f"Error: {err_msg} | Affected items: {len(affected_items)}" + ) + + # Group by error message + if err_msg not in errors_by_message: + errors_by_message[err_msg] = [] + errors_by_message[err_msg].extend(affected_items) + + for item_id in affected_items: + logger.error( + f"STAC validation failed for '{item_id}': {err_msg}" + ) + + # Identify valid items (those not in any error list) + all_invalid_ids = set() + for affected_ids in errors_by_message.values(): + all_invalid_ids.update(affected_ids) + + valid_items = [ + item for item in items if item.get("id") not in all_invalid_ids + ] + logger.info( + f"Valid items: {len(valid_items)}, Invalid items: {len(all_invalid_ids)}" + ) + else: + # Fallback: validate items individually if message not available + logger.warning( + "FastValidator message not available, validating items individually" + ) + for idx, item in enumerate(items): + item_id = item.get("id", f"unknown_id_{idx}") + + # Create a temporary file for individual item + with tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False + ) as item_tmp_file: + json.dump(item, item_tmp_file) + item_tmp_file_path = item_tmp_file.name + + try: + # Validate individual item + item_validator = FastValidator( + item_tmp_file_path, quiet=True, verbose=False + ) + item_validator.run() + + if item_validator.valid: + valid_items.append(item) + else: + # Extract error message from validator.message + err_msg = "STAC validation failed" + if ( + hasattr(item_validator, "message") + and item_validator.message + ): + msg_data = item_validator.message[0] + errors_list = msg_data.get("errors", []) + if errors_list: + err_msg = errors_list[0].get( + "error_message", "STAC validation failed" + ) + + # Group by error message + if err_msg not in errors_by_message: + errors_by_message[err_msg] = [] + errors_by_message[err_msg].append(item_id) + logger.error( + f"STAC validation failed for '{item_id}': {err_msg}" + ) + finally: + # Clean up temporary file + try: + os.unlink(item_tmp_file_path) + except OSError: + pass + finally: + # Clean up temporary file + try: + os.unlink(tmp_file_path) + except OSError: + pass + + invalid_items = errors_by_message + + except Exception as exc: + logger.error(f"Batch validation request failed: {exc}", exc_info=True) + error_msg = f"Batch validation failed: {str(exc)}" + item_ids = [ + item.get("id", f"unknown_id_{idx}") for idx, item in enumerate(items) + ] + invalid_items[error_msg] = item_ids + + logger.info( + f"Validation complete: {len(valid_items)} valid, {sum(len(v) for v in invalid_items.values())} invalid" + ) + return valid_items, invalid_items + + +def validate_stac( + stac_data: dict | Item | Collection, + pydantic_model: type[Item] | type[Collection] = Item, +) -> Item | Collection: + """Validate a single STAC item or collection using optional STAC validator. + + If stac_data is already a Pydantic model object, Pydantic validation is skipped + (assuming it was already validated by FastAPI). Only STAC validator is run if enabled. + + Args: + stac_data: STAC data as dict or Pydantic model object. + pydantic_model: The Pydantic model class to use for validation (Item or Collection). + + Returns: + Validated STAC object (Item or Collection). + + Raises: + ValueError: If STAC validation fails. + """ + # 1. Pydantic Parsing/Validation + # If already a Pydantic model object, skip Pydantic validation (FastAPI already validated it) + if isinstance(stac_data, (Item, Collection)): + stac_obj = stac_data + stac_dict = stac_data.model_dump(mode="json") + else: + # For dict input, validate with Pydantic first + stac_obj = pydantic_model(**stac_data) + stac_dict = stac_data + + # 2. STAC Validator (optional, enabled via ENABLE_STAC_VALIDATOR env var) + if get_bool_env("ENABLE_STAC_VALIDATOR"): + FastValidator = _get_fast_validator() + item_id = stac_dict.get("id", "unknown_id") + + # Create a temporary file to write the item JSON + with tempfile.NamedTemporaryFile( + mode="w", suffix=".json", delete=False + ) as tmp_file: + json.dump(stac_dict, tmp_file) + tmp_file_path = tmp_file.name + + try: + # Validate using FastValidator + validator = FastValidator(tmp_file_path, quiet=True, verbose=False) + validator.run() + + if not validator.valid: + err_msg = "STAC validation failed" + if hasattr(validator, "message") and validator.message: + err_msg = validator.message + elif hasattr(validator, "errors") and validator.errors: + err_msg = "; ".join( + str(e) if isinstance(e, str) else str(e) + for e in validator.errors + ) + raise ValueError(f"STAC validation failed for '{item_id}': {err_msg}") + finally: + # Clean up temporary file + try: + os.unlink(tmp_file_path) + except OSError: + pass + + return stac_obj + + +async def async_validate_stac( + stac_data: dict | Item | Collection, + pydantic_model: type[Item] | type[Collection] = Item, +) -> Item | Collection: + """Asynchronous wrapper for validate_stac. + + Offloads the CPU-bound STAC validation to a separate thread to prevent + blocking the FastAPI asyncio event loop during API requests. + + Args: + stac_data: STAC data as dict or Pydantic model. + pydantic_model: The Pydantic model class to use for validation (Item or Collection). + + Returns: + Validated STAC object (Item or Collection). + + Raises: + ValueError: If validation fails. + """ + return await asyncio.to_thread(validate_stac, stac_data, pydantic_model) + + +async def async_validate_with_fast_validator( + items: list[dict], +) -> tuple[list[dict], dict[str, list[str]]]: + """Asynchronously validate STAC items using FastValidator. + + Offloads the CPU-bound validation to a separate thread to prevent + blocking the FastAPI asyncio event loop. + + Args: + items: List of STAC item dictionaries to validate. + + Returns: + Tuple of (valid_items_list, invalid_items_dict) where invalid_items_dict + maps error messages to lists of affected item IDs. + """ + return await asyncio.to_thread(validate_with_fast_validator, items) diff --git a/stac_fastapi/elasticsearch/pyproject.toml b/stac_fastapi/elasticsearch/pyproject.toml index 7fd09d463..1833f94d7 100644 --- a/stac_fastapi/elasticsearch/pyproject.toml +++ b/stac_fastapi/elasticsearch/pyproject.toml @@ -65,6 +65,9 @@ redis = [ server = [ "uvicorn[standard]>=0.23,<0.47", ] +validator = [ + "stac-fastapi-core[validator]==6.16.0", +] [project.scripts] stac-fastapi-elasticsearch = "stac_fastapi.elasticsearch.app:run" diff --git a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py index 1ce567148..b6aba5d2b 100644 --- a/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py +++ b/stac_fastapi/elasticsearch/stac_fastapi/elasticsearch/app.py @@ -77,12 +77,15 @@ "ENABLE_COLLECTIONS_SEARCH_ROUTE", default=False ) ENABLE_CATALOGS_ROUTE = get_bool_env("ENABLE_CATALOGS_ROUTE", default=False) +ENABLE_STAC_VALIDATOR = get_bool_env("ENABLE_STAC_VALIDATOR", default=False) logger.info("TRANSACTIONS_EXTENSIONS is set to %s", TRANSACTIONS_EXTENSIONS) logger.info("ENABLE_COLLECTIONS_SEARCH is set to %s", ENABLE_COLLECTIONS_SEARCH) logger.info( "ENABLE_COLLECTIONS_SEARCH_ROUTE is set to %s", ENABLE_COLLECTIONS_SEARCH_ROUTE ) logger.info("ENABLE_CATALOGS_ROUTE is set to %s", ENABLE_CATALOGS_ROUTE) +logger.info("ENABLE_STAC_VALIDATOR is set to %s", ENABLE_STAC_VALIDATOR) + settings = ElasticsearchSettings() session = Session.create_from_settings(settings) diff --git a/stac_fastapi/opensearch/pyproject.toml b/stac_fastapi/opensearch/pyproject.toml index 245f3f426..9dc048b30 100644 --- a/stac_fastapi/opensearch/pyproject.toml +++ b/stac_fastapi/opensearch/pyproject.toml @@ -64,6 +64,9 @@ redis = [ server = [ "uvicorn[standard]>=0.23,<0.47", ] +validator = [ + "stac-fastapi-core[validator]==6.16.0", +] [project.scripts] stac-fastapi-opensearch = "stac_fastapi.opensearch.app:run" diff --git a/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py b/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py index 622ad32fe..f696f8c8c 100644 --- a/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py +++ b/stac_fastapi/opensearch/stac_fastapi/opensearch/app.py @@ -77,12 +77,14 @@ "ENABLE_COLLECTIONS_SEARCH_ROUTE", default=False ) ENABLE_CATALOGS_ROUTE = get_bool_env("ENABLE_CATALOGS_ROUTE", default=False) +ENABLE_STAC_VALIDATOR = get_bool_env("ENABLE_STAC_VALIDATOR", default=False) logger.info("TRANSACTIONS_EXTENSIONS is set to %s", TRANSACTIONS_EXTENSIONS) logger.info("ENABLE_COLLECTIONS_SEARCH is set to %s", ENABLE_COLLECTIONS_SEARCH) logger.info( "ENABLE_COLLECTIONS_SEARCH_ROUTE is set to %s", ENABLE_COLLECTIONS_SEARCH_ROUTE ) logger.info("ENABLE_CATALOGS_ROUTE is set to %s", ENABLE_CATALOGS_ROUTE) +logger.info("ENABLE_STAC_VALIDATOR is set to %s", ENABLE_STAC_VALIDATOR) settings = OpensearchSettings() session = Session.create_from_settings(settings) diff --git a/stac_fastapi/tests/api/test_api.py b/stac_fastapi/tests/api/test_api.py index 13980d4fa..64581d8ed 100644 --- a/stac_fastapi/tests/api/test_api.py +++ b/stac_fastapi/tests/api/test_api.py @@ -71,8 +71,12 @@ "DELETE /catalogs/{catalog_id}", "GET /catalogs/{catalog_id}/catalogs", "POST /catalogs/{catalog_id}/catalogs", + "GET /catalogs/{catalog_id}/catalogs/{sub_catalog_id}", + "PUT /catalogs/{catalog_id}/catalogs/{sub_catalog_id}", "DELETE /catalogs/{catalog_id}/catalogs/{sub_catalog_id}", "GET /catalogs/{catalog_id}/children", + "GET /catalogs/{catalog_id}/conformance", + "GET /catalogs/{catalog_id}/queryables", "GET /catalogs/{catalog_id}/collections", "POST /catalogs/{catalog_id}/collections", "GET /catalogs/{catalog_id}/collections/{collection_id}", @@ -893,58 +897,67 @@ async def test_search_line_string_intersects(app_client, ctx): async def test_big_int_eo_search( app_client, txn_client, test_item, test_collection, value, expected ): - random_str = "".join(random.choice("abcdef") for _ in range(5)) - collection_id = f"test-collection-eo-{random_str}" + # Disable STAC validator for this test as test data may have schema violations + original_validator_setting = os.getenv("ENABLE_STAC_VALIDATOR") + os.environ.pop("ENABLE_STAC_VALIDATOR", None) - test_collection["id"] = collection_id - test_collection["stac_extensions"] = [ - "https://stac-extensions.github.io/eo/v2.0.0/schema.json" - ] + try: + random_str = "".join(random.choice("abcdef") for _ in range(5)) + collection_id = f"test-collection-eo-{random_str}" - test_item["collection"] = collection_id - test_item["stac_extensions"] = test_collection["stac_extensions"] + test_collection["id"] = collection_id + test_collection["stac_extensions"] = [ + "https://stac-extensions.github.io/eo/v2.0.0/schema.json" + ] - # Remove "eo:bands" to simplify the test - del test_item["properties"]["eo:bands"] + test_item["collection"] = collection_id + test_item["stac_extensions"] = test_collection["stac_extensions"] - # Attribute to test - attr = "eo:full_width_half_max" + # Remove "eo:bands" to simplify the test + del test_item["properties"]["eo:bands"] - try: - await create_collection(txn_client, test_collection) - except ConflictError: - pass + # Attribute to test + attr = "eo:full_width_half_max" - # Create items with deterministic offsets - for val in [value, value + 100, value - 100]: - item = deepcopy(test_item) - item["id"] = str(uuid.uuid4()) - item["properties"][attr] = val - await create_item(txn_client, item) + try: + await create_collection(txn_client, test_collection) + except ConflictError: + pass - # Search for the exact value - params = { - "collections": [collection_id], - "filter": { - "args": [ - { - "args": [ - {"property": f"properties.{attr}"}, - value, - ], - "op": "=", - } - ], - "op": "and", - }, - } - resp = await app_client.post("/search", json=params) - resp_json = resp.json() + # Create items with deterministic offsets + for val in [value, value + 100, value - 100]: + item = deepcopy(test_item) + item["id"] = str(uuid.uuid4()) + item["properties"][attr] = val + await create_item(txn_client, item) + + # Search for the exact value + params = { + "collections": [collection_id], + "filter": { + "args": [ + { + "args": [ + {"property": f"properties.{attr}"}, + value, + ], + "op": "=", + } + ], + "op": "and", + }, + } + resp = await app_client.post("/search", json=params) + resp_json = resp.json() - # Validate results - results = {x["properties"][attr] for x in resp_json["features"]} - assert len(results) == expected - assert results == {value} + # Validate results + results = {x["properties"][attr] for x in resp_json["features"]} + assert len(results) == expected + assert results == {value} + finally: + # Restore original STAC validator setting + if original_validator_setting: + os.environ["ENABLE_STAC_VALIDATOR"] = original_validator_setting @pytest.mark.asyncio diff --git a/stac_fastapi/tests/api/test_api_stac_validator.py b/stac_fastapi/tests/api/test_api_stac_validator.py new file mode 100644 index 000000000..8ffeee6df --- /dev/null +++ b/stac_fastapi/tests/api/test_api_stac_validator.py @@ -0,0 +1,334 @@ +import os +import uuid +from copy import deepcopy + +import pytest + +from ..conftest import create_collection, create_item + + +@pytest.mark.asyncio +async def test_stac_validator_allows_valid_datetime_range(txn_client, load_test_data): + """Test that STAC validator allows valid datetime range with null datetime.""" + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-dt-range-{uuid.uuid4()}" + await create_collection(txn_client, collection=test_collection) + + base_item = load_test_data("test_item.json") + base_item["collection"] = test_collection["id"] + + # Create item with null datetime but valid start/end_datetime (valid per STAC schema) + valid_item = deepcopy(base_item) + valid_item["id"] = "valid-datetime-range" + valid_item["properties"]["datetime"] = None + valid_item["properties"]["start_datetime"] = "2020-01-01T00:00:00Z" + valid_item["properties"]["end_datetime"] = "2020-01-02T00:00:00Z" + + # This should succeed - valid Pydantic and STAC item + await create_item(txn_client, valid_item) + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_stac_validator_catches_eo_bands_in_assets(txn_client, load_test_data): + """Test that STAC validator catches eo:bands in assets when using EO v2.0.0.""" + from fastapi import HTTPException + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-eo-{uuid.uuid4()}" + await create_collection(txn_client, collection=test_collection) + + base_item = load_test_data("test_item.json") + + # Create item with EO v2.0.0 extension which has stricter asset validation + invalid_item = deepcopy(base_item) + invalid_item["id"] = "invalid-eo-bands-in-assets" + invalid_item["collection"] = test_collection["id"] + invalid_item["stac_extensions"] = [ + "https://stac-extensions.github.io/eo/v2.0.0/schema.json" + ] + + # EO v2.0.0 doesn't allow eo:bands in assets - should fail validation + with pytest.raises(HTTPException) as exc_info: + await create_item(txn_client, invalid_item) + + # Verify the error message mentions the validation failure + assert "STAC validation failed" in str(exc_info.value) + assert exc_info.value.status_code == 400 + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_stac_validator_catches_invalid_cloud_cover(txn_client, load_test_data): + """Test that STAC validator catches invalid eo:cloud_cover values.""" + from fastapi import HTTPException + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-cloud-{uuid.uuid4()}" + await create_collection(txn_client, collection=test_collection) + + base_item = load_test_data("test_item.json") + + # Create item with invalid cloud_cover (must be 0-100) + invalid_item = deepcopy(base_item) + invalid_item["id"] = "invalid-cloud-cover" + invalid_item["collection"] = test_collection["id"] + invalid_item["properties"]["eo:cloud_cover"] = 150 # Invalid: > 100 + + # This should raise HTTPException due to STAC validation failure + with pytest.raises(HTTPException) as exc_info: + await create_item(txn_client, invalid_item) + + # Verify the error message mentions the validation failure + assert "STAC validation failed" in str(exc_info.value) + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_stac_validator_feature_collection_with_invalid_item_raise_on_error( + txn_client, load_test_data +): + """Test that STAC validator fails entire FeatureCollection when RAISE_ON_BULK_ERROR is true.""" + from fastapi import HTTPException + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + os.environ["RAISE_ON_BULK_ERROR"] = "true" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-fc-{uuid.uuid4()}" + await create_collection(txn_client, collection=test_collection) + + base_item = load_test_data("test_item.json") + + # Create FeatureCollection with 2 valid items and 1 invalid item + features = [] + for i in range(2): + item = deepcopy(base_item) + item["id"] = f"valid-item-{i}" + item["collection"] = test_collection["id"] + features.append(item) + + # Add invalid item (invalid cloud_cover) + invalid_item = deepcopy(base_item) + invalid_item["id"] = "invalid-item-fc" + invalid_item["collection"] = test_collection["id"] + invalid_item["properties"]["eo:cloud_cover"] = 150 # Invalid: > 100 + + features.append(invalid_item) + + feature_collection = { + "type": "FeatureCollection", + "features": features, + } + + # With RAISE_ON_BULK_ERROR=true, should fail on first invalid item + with pytest.raises(HTTPException) as exc_info: + await create_item(txn_client, feature_collection) + + assert "Batch rejected" in str(exc_info.value) + assert exc_info.value.status_code == 400 + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + os.environ.pop("RAISE_ON_BULK_ERROR", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_stac_validator_feature_collection_with_invalid_item_skip_on_error( + txn_client, core_client, load_test_data +): + """Test that STAC validator skips invalid items when RAISE_ON_BULK_ERROR is false.""" + from ..conftest import MockRequest + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + os.environ["RAISE_ON_BULK_ERROR"] = "false" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-fc-skip-{uuid.uuid4()}" + await create_collection(txn_client, collection=test_collection) + + base_item = load_test_data("test_item.json") + + # Create FeatureCollection with 2 valid items and 1 invalid item + features = [] + for i in range(2): + item = deepcopy(base_item) + item["id"] = f"valid-item-{i}" + item["collection"] = test_collection["id"] + # Remove eo:bands from properties (violates EO v1.0.0 spec - should only be in assets) + if "eo:bands" in item.get("properties", {}): + del item["properties"]["eo:bands"] + features.append(item) + + # Add invalid item (invalid cloud_cover) + invalid_item = deepcopy(base_item) + invalid_item["id"] = "invalid-item-fc" + invalid_item["collection"] = test_collection["id"] + # Remove eo:bands from properties (violates EO v1.0.0 spec - should only be in assets) + if "eo:bands" in invalid_item.get("properties", {}): + del invalid_item["properties"]["eo:bands"] + invalid_item["properties"]["eo:cloud_cover"] = 150 # Invalid: > 100 + + features.append(invalid_item) + print("features:", features) + feature_collection = { + "type": "FeatureCollection", + "features": features, + } + + # With RAISE_ON_BULK_ERROR=false, should skip invalid item and insert valid ones + await create_item(txn_client, feature_collection) + + # Verify only 2 valid items exist in the collection + fc = await core_client.item_collection( + test_collection["id"], request=MockRequest() + ) + assert len(fc["features"]) == 2 + item_ids = {f["id"] for f in fc["features"]} + assert item_ids == {"valid-item-0", "valid-item-1"} + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + os.environ.pop("RAISE_ON_BULK_ERROR", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_stac_validator_catches_invalid_snow_cover(txn_client, load_test_data): + """Test that STAC validator catches invalid eo:snow_cover values.""" + from fastapi import HTTPException + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-snow-{uuid.uuid4()}" + await create_collection(txn_client, collection=test_collection) + + base_item = load_test_data("test_item.json") + + # Create item with invalid snow_cover (must be 0-100) + invalid_item = deepcopy(base_item) + invalid_item["id"] = "invalid-snow-cover" + invalid_item["collection"] = test_collection["id"] + invalid_item["properties"]["eo:snow_cover"] = -10 # Invalid: < 0 + + # This should raise HTTPException due to STAC validation failure + with pytest.raises(HTTPException) as exc_info: + await create_item(txn_client, invalid_item) + + # Verify the error message mentions the validation failure + assert "STAC validation failed" in str(exc_info.value) + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_stac_validator_allows_valid_item(txn_client, load_test_data): + """Test that STAC validator allows valid STAC items.""" + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-valid-{uuid.uuid4()}" + await create_collection(txn_client, collection=test_collection) + + base_item = load_test_data("test_item.json") + valid_item = deepcopy(base_item) + valid_item["id"] = "valid-stac-item" + valid_item["collection"] = test_collection["id"] + + # This should succeed - valid STAC item (create_item doesn't return the item) + await create_item(txn_client, valid_item) + # If no exception is raised, the test passes + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_stac_validator_returns_400_on_invalid_item(app_client, load_test_data): + """Test that invalid STAC items return 400 Bad Request response.""" + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + + try: + # Create a test collection first + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-400-{uuid.uuid4()}" + + resp = await app_client.post( + "/collections", + json=test_collection, + ) + assert resp.status_code == 201 + + # Create invalid item with EO v2.0.0 extension (eo:bands not allowed in assets) + base_item = load_test_data("test_item.json") + invalid_item = deepcopy(base_item) + invalid_item["id"] = "invalid-item-400" + invalid_item["collection"] = test_collection["id"] + invalid_item["stac_extensions"] = [ + "https://stac-extensions.github.io/eo/v2.0.0/schema.json" + ] + # EO v2.0.0 doesn't allow eo:bands in assets - should fail validation + + # POST invalid item and verify 400 response + resp = await app_client.post( + f"/collections/{test_collection['id']}/items", + json=invalid_item, + ) + + # Should return 400 Bad Request, not 500 + assert ( + resp.status_code == 400 + ), f"Expected 400, got {resp.status_code}: {resp.text}" + + # Verify error message mentions validation failure + response_data = resp.json() + assert "detail" in response_data + assert "Invalid item" in response_data["detail"] + + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + try: + await app_client.delete(f"/collections/{test_collection['id']}") + except Exception: + pass diff --git a/stac_fastapi/tests/extensions/test_bulk_transactions.py b/stac_fastapi/tests/extensions/test_bulk_transactions.py index 8ef167e81..6e4286742 100644 --- a/stac_fastapi/tests/extensions/test_bulk_transactions.py +++ b/stac_fastapi/tests/extensions/test_bulk_transactions.py @@ -103,24 +103,33 @@ async def test_feature_collection_insert( @pytest.mark.asyncio async def test_bulk_item_insert_validation_error(ctx, core_client, bulk_txn_client): - items = {} - # Add 9 valid items - for _ in range(9): - _item = deepcopy(ctx.item) - _item["id"] = str(uuid.uuid4()) - items[_item["id"]] = _item - - # Add 1 invalid item (e.g., missing "datetime") - invalid_item = deepcopy(ctx.item) - invalid_item["id"] = str(uuid.uuid4()) - invalid_item["properties"].pop( - "datetime", None - ) # Remove datetime to make it invalid - items[invalid_item["id"]] = invalid_item - - # The bulk insert should raise a ValidationError due to the invalid item - with pytest.raises(ValidationError): - bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True) + import os + + from fastapi import HTTPException + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + + try: + items = {} + # Add 9 valid items + for _ in range(9): + _item = deepcopy(ctx.item) + _item["id"] = str(uuid.uuid4()) + items[_item["id"]] = _item + + # Add 1 invalid item (e.g., missing "datetime") + invalid_item = deepcopy(ctx.item) + invalid_item["id"] = str(uuid.uuid4()) + invalid_item["properties"].pop( + "datetime", None + ) # Remove datetime to make it invalid + items[invalid_item["id"]] = invalid_item + + # The bulk insert should raise an HTTPException due to the invalid item + with pytest.raises(HTTPException): + bulk_txn_client.bulk_item_insert(Items(items=items), refresh=True) + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) @pytest.mark.asyncio @@ -354,9 +363,9 @@ async def test_feature_collection_insert_with_in_batch_duplicates( ) # Should report 1 item added and 2 skipped (in-batch duplicates) - # create_item (FeatureCollection) returns: "Successfully added {n} Items. {m} skipped (duplicates). {k} errors occurred." - assert "Successfully added 1 Items" in result - assert "2 skipped (duplicates)" in result + # New format: "Processed {n} items: {m} added | {k} input duplicates" + assert "1 added" in result["message"] + assert "2 input duplicates" in result["message"] # Verify only 1 item exists in the collection with this ID fc = await core_client.item_collection(ctx.collection["id"], request=MockRequest()) diff --git a/stac_fastapi/tests/redis/test_queue_worker_validation.py b/stac_fastapi/tests/redis/test_queue_worker_validation.py new file mode 100644 index 000000000..627f5d9dd --- /dev/null +++ b/stac_fastapi/tests/redis/test_queue_worker_validation.py @@ -0,0 +1,313 @@ +"""Tests for Redis queue worker validation. + +Tests that the background worker correctly validates items pulled from the queue, +sends invalid items to the DLQ, and only inserts valid items into the database. +""" + +import os +import uuid +from copy import deepcopy + +import pytest + +from scripts.item_queue_worker import ItemQueueWorker # noqa: E402 +from stac_fastapi.core.redis_utils import AsyncRedisQueueManager + + +@pytest.mark.asyncio +async def test_worker_validates_items_in_queue(txn_client, core_client, load_test_data): + """Test that worker validates items pulled from queue and sends invalid to DLQ.""" + from ..conftest import create_collection + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + os.environ["ENABLE_REDIS_QUEUE"] = "true" + + try: + # Create a test collection + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-worker-{uuid.uuid4()}" + await create_collection(txn_client, test_collection) + + # Create a base item + base_item = load_test_data("test_item.json") + base_item["collection"] = test_collection["id"] + if "datetime" not in base_item.get("properties", {}): + base_item["properties"]["datetime"] = "2020-01-01T00:00:00Z" + + # Valid item 1 + valid_item_1 = deepcopy(base_item) + valid_item_1["id"] = "valid-item-1" + + # Valid item 2 + valid_item_2 = deepcopy(base_item) + valid_item_2["id"] = "valid-item-2" + + # Invalid item (STAC validator error) + invalid_item = deepcopy(base_item) + invalid_item["id"] = "invalid-item-worker" + invalid_item["stac_extensions"] = [ + "https://stac-extensions.github.io/eo/v2.0.0/schema.json", + "https://stac-extensions.github.io/projection/v1.0.0/schema.json", + ] + + feature_collection = { + "type": "FeatureCollection", + "features": [valid_item_1, valid_item_2, invalid_item], + } + + # 1. Queue the items + from ..conftest import create_item + + result = await create_item(txn_client, feature_collection) + if result is not None: + assert "queued" in result.lower() + + # 2. Verify they actually made it to the queue + queue_manager = await AsyncRedisQueueManager.create() + try: + pending_items = await queue_manager.get_pending_items(test_collection["id"]) + assert len(pending_items) == 3, "Items were not successfully queued" + + # 3. RUN THE REAL WORKER + worker = ItemQueueWorker() + await worker._init_queue_manager() + try: + # Tell the worker to process the collection + await worker._flush_collection(test_collection["id"]) + finally: + await worker.queue_manager.close() + + # 4. Verify DLQ has the invalid item (Direct Redis query!) + failed_key = queue_manager._get_failed_set_key(test_collection["id"]) + failed_ids = await queue_manager.redis.smembers(failed_key) + + # Redis might return bytes, safely decode them + failed_ids_str = { + fid.decode("utf-8") if isinstance(fid, bytes) else fid + for fid in failed_ids + } + assert ( + "invalid-item-worker" in failed_ids_str + ), "Invalid item was not sent to DLQ" + assert len(failed_ids_str) == 1 + + # 5. Verify Database has ONLY the valid items + db_items, _, _ = await core_client.database.execute_search( + search=core_client.database.make_search(), + limit=10, + token=None, + sort=None, + collection_ids=[test_collection["id"]], + datetime_search="", + ) + db_item_ids = {item["id"] for item in list(db_items)} + + assert "valid-item-1" in db_item_ids + assert "valid-item-2" in db_item_ids + assert "invalid-item-worker" not in db_item_ids + + finally: + await queue_manager.close() + + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + os.environ.pop("ENABLE_REDIS_QUEUE", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_worker_only_inserts_valid_items(txn_client, core_client, load_test_data): + """Test a mixed batch with multiple valid and invalid items.""" + from ..conftest import create_collection + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + os.environ["ENABLE_REDIS_QUEUE"] = "true" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-db-{uuid.uuid4()}" + await create_collection(txn_client, test_collection) + + base_item = load_test_data("test_item.json") + base_item["collection"] = test_collection["id"] + if "datetime" not in base_item.get("properties", {}): + base_item["properties"]["datetime"] = "2020-01-01T00:00:00Z" + + items_to_queue = [] + + # 3 Valid items + for i in range(3): + valid_item = deepcopy(base_item) + valid_item["id"] = f"valid-item-{i}" + items_to_queue.append(valid_item) + + # 2 Invalid items + for i in range(2): + invalid_item = deepcopy(base_item) + invalid_item["id"] = f"invalid-item-{i}" + invalid_item["stac_extensions"] = [ + "https://stac-extensions.github.io/eo/v2.0.0/schema.json" + ] + items_to_queue.append(invalid_item) + + feature_collection = { + "type": "FeatureCollection", + "features": items_to_queue, + } + + # 1. Queue items + from ..conftest import create_item + + result = await create_item(txn_client, feature_collection) + if result is not None: + assert "queued" in result.lower() + + queue_manager = await AsyncRedisQueueManager.create() + try: + pending_items = await queue_manager.get_pending_items(test_collection["id"]) + assert len(pending_items) == 5 + + # 2. RUN THE REAL WORKER + worker = ItemQueueWorker() + await worker._init_queue_manager() + try: + await worker._flush_collection(test_collection["id"]) + finally: + await worker.queue_manager.close() + + # 3. Verify Database + db_items, _, _ = await core_client.database.execute_search( + search=core_client.database.make_search(), + limit=10, + token=None, + sort=None, + collection_ids=[test_collection["id"]], + datetime_search="", + ) + db_item_ids = {item["id"] for item in list(db_items)} + + assert len(db_item_ids) == 3 + assert {"valid-item-0", "valid-item-1", "valid-item-2"}.issubset( + db_item_ids + ) + + # 4. Verify DLQ (Direct Redis query!) + failed_key = queue_manager._get_failed_set_key(test_collection["id"]) + failed_ids = await queue_manager.redis.smembers(failed_key) + failed_ids_str = { + fid.decode("utf-8") if isinstance(fid, bytes) else fid + for fid in failed_ids + } + + assert len(failed_ids_str) == 2 + assert {"invalid-item-0", "invalid-item-1"}.issubset(failed_ids_str) + + finally: + await queue_manager.close() + + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + os.environ.pop("ENABLE_REDIS_QUEUE", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass + + +@pytest.mark.asyncio +async def test_worker_handles_all_invalid_batch( + txn_client, core_client, load_test_data +): + """Test that worker safely skips database insertion if every item is invalid.""" + from ..conftest import create_collection + + os.environ["ENABLE_STAC_VALIDATOR"] = "true" + os.environ["ENABLE_REDIS_QUEUE"] = "true" + + try: + test_collection = load_test_data("test_collection.json") + test_collection["id"] = f"test-collection-all-invalid-{uuid.uuid4()}" + await create_collection(txn_client, test_collection) + + base_item = load_test_data("test_item.json") + base_item["collection"] = test_collection["id"] + if "datetime" not in base_item.get("properties", {}): + base_item["properties"]["datetime"] = "2020-01-01T00:00:00Z" + + # 3 entirely invalid items + items_to_queue = [] + for i in range(3): + invalid_item = deepcopy(base_item) + invalid_item["id"] = f"completely-invalid-{i}" + invalid_item["stac_extensions"] = [ + "https://stac-extensions.github.io/eo/v2.0.0/schema.json" + ] + items_to_queue.append(invalid_item) + + feature_collection = { + "type": "FeatureCollection", + "features": items_to_queue, + } + + # 1. Queue items + from ..conftest import create_item + + result = await create_item(txn_client, feature_collection) + if result is not None: + assert "queued" in result.lower() + + queue_manager = await AsyncRedisQueueManager.create() + try: + pending_items = await queue_manager.get_pending_items(test_collection["id"]) + assert len(pending_items) == 3 + + # 2. RUN THE REAL WORKER + worker = ItemQueueWorker() + await worker._init_queue_manager() + try: + await worker._flush_collection(test_collection["id"]) + finally: + await worker.queue_manager.close() + + # 3. Verify Database is empty for this collection + db_items, _, _ = await core_client.database.execute_search( + search=core_client.database.make_search(), + limit=10, + token=None, + sort=None, + collection_ids=[test_collection["id"]], + datetime_search="", + ) + db_items_list = list(db_items) + assert ( + len(db_items_list) == 0 + ), "Database should be empty since all items were invalid" + + # 4. Verify DLQ has everything (Direct Redis query!) + failed_key = queue_manager._get_failed_set_key(test_collection["id"]) + failed_ids = await queue_manager.redis.smembers(failed_key) + failed_ids_str = { + fid.decode("utf-8") if isinstance(fid, bytes) else fid + for fid in failed_ids + } + + assert len(failed_ids_str) == 3 + assert { + "completely-invalid-0", + "completely-invalid-1", + "completely-invalid-2", + }.issubset(failed_ids_str) + + finally: + await queue_manager.close() + + finally: + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + os.environ.pop("ENABLE_REDIS_QUEUE", None) + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception: + pass diff --git a/stac_fastapi/tests/resources/test_item.py b/stac_fastapi/tests/resources/test_item.py index 8be391079..337f00f9c 100644 --- a/stac_fastapi/tests/resources/test_item.py +++ b/stac_fastapi/tests/resources/test_item.py @@ -1005,132 +1005,141 @@ async def test_search_datetime_with_null_datetime( if os.getenv("ENABLE_DATETIME_INDEX_FILTERING"): pytest.skip() - """Test datetime filtering when properties.datetime is null or set, ensuring start_datetime and end_datetime are set when datetime is null.""" - # Setup: Create test collection - test_collection = load_test_data("test_collection.json") + # Disable STAC validator for this test as test data may have schema violations + original_validator_setting = os.getenv("ENABLE_STAC_VALIDATOR") + os.environ.pop("ENABLE_STAC_VALIDATOR", None) + try: - await create_collection(txn_client, collection=test_collection) - except Exception as e: - logger.error(f"Failed to create collection: {e}") - pytest.fail(f"Collection creation failed: {e}") - - base_item = load_test_data("test_item.json") - collection_id = base_item["collection"] - - # Item 1: Null datetime, valid start/end datetimes - null_dt_item = deepcopy(base_item) - null_dt_item["id"] = "null-datetime-item" - null_dt_item["properties"]["datetime"] = None - null_dt_item["properties"]["start_datetime"] = "2020-01-01T00:00:00Z" - null_dt_item["properties"]["end_datetime"] = "2020-01-02T00:00:00Z" - - # Item 2: Valid datetime, no start/end datetimes - valid_dt_item = deepcopy(base_item) - valid_dt_item["id"] = "valid-datetime-item" - valid_dt_item["properties"]["datetime"] = "2020-01-01T11:00:00Z" - valid_dt_item["properties"]["start_datetime"] = None - valid_dt_item["properties"]["end_datetime"] = None - - # Item 3: Valid datetime outside range, valid start/end datetimes - range_item = deepcopy(base_item) - range_item["id"] = "range-item" - range_item["properties"]["datetime"] = "2020-01-03T00:00:00Z" - range_item["properties"]["start_datetime"] = "2020-01-01T00:00:00Z" - range_item["properties"]["end_datetime"] = "2020-01-02T00:00:00Z" - - # Create valid items - items = [null_dt_item, valid_dt_item, range_item] - for item in items: + """Test datetime filtering when properties.datetime is null or set, ensuring start_datetime and end_datetime are set when datetime is null.""" + # Setup: Create test collection + test_collection = load_test_data("test_collection.json") try: - await create_item(txn_client, item) + await create_collection(txn_client, collection=test_collection) except Exception as e: - logger.error(f"Failed to create item {item['id']}: {e}") - pytest.fail(f"Item creation failed: {e}") - - # Refresh indices once - try: - await refresh_indices(txn_client) - except Exception as e: - logger.error(f"Failed to refresh indices: {e}") - pytest.fail(f"Index refresh failed: {e}") + logger.error(f"Failed to create collection: {e}") + pytest.fail(f"Collection creation failed: {e}") + + base_item = load_test_data("test_item.json") + collection_id = base_item["collection"] + + # Item 1: Null datetime, valid start/end datetimes + null_dt_item = deepcopy(base_item) + null_dt_item["id"] = "null-datetime-item" + null_dt_item["properties"]["datetime"] = None + null_dt_item["properties"]["start_datetime"] = "2020-01-01T00:00:00Z" + null_dt_item["properties"]["end_datetime"] = "2020-01-02T00:00:00Z" + + # Item 2: Valid datetime, no start/end datetimes + valid_dt_item = deepcopy(base_item) + valid_dt_item["id"] = "valid-datetime-item" + valid_dt_item["properties"]["datetime"] = "2020-01-01T11:00:00Z" + valid_dt_item["properties"]["start_datetime"] = None + valid_dt_item["properties"]["end_datetime"] = None + + # Item 3: Valid datetime outside range, valid start/end datetimes + range_item = deepcopy(base_item) + range_item["id"] = "range-item" + range_item["properties"]["datetime"] = "2020-01-03T00:00:00Z" + range_item["properties"]["start_datetime"] = "2020-01-01T00:00:00Z" + range_item["properties"]["end_datetime"] = "2020-01-02T00:00:00Z" + + # Create valid items + items = [null_dt_item, valid_dt_item, range_item] + for item in items: + try: + await create_item(txn_client, item) + except Exception as e: + logger.error(f"Failed to create item {item['id']}: {e}") + pytest.fail(f"Item creation failed: {e}") + + # Refresh indices once + try: + await refresh_indices(txn_client) + except Exception as e: + logger.error(f"Failed to refresh indices: {e}") + pytest.fail(f"Index refresh failed: {e}") - # Refresh indices once - try: - await refresh_indices(txn_client) - except Exception as e: - logger.error(f"Failed to refresh indices: {e}") - pytest.fail(f"Index refresh failed: {e}") - - # Test 1: Exact datetime matching valid-datetime-item and null-datetime-item - feature_ids = await _search_and_get_ids( - app_client, - params={ - "datetime": "2020-01-01T11:00:00Z", - "collections": [collection_id], - }, - ) - assert feature_ids == { - "valid-datetime-item", # Matches properties__datetime - "null-datetime-item", # Matches start_datetime <= datetime <= end_datetime - }, "Exact datetime search failed" - - # Test 2: Range including valid-datetime-item, null-datetime-item, and range-item - feature_ids = await _search_and_get_ids( - app_client, - params={ - "datetime": "2020-01-01T00:00:00Z/2020-01-03T00:00:00Z", - "collections": [collection_id], - }, - ) - assert feature_ids == { - "valid-datetime-item", # Matches properties__datetime in range - "null-datetime-item", # Matches start_datetime <= lte, end_datetime >= gte - "range-item", # Matches properties__datetime in range - }, "Range search failed" - - # Test 3: POST request for range matching null-datetime-item and valid-datetime-item - feature_ids = await _search_and_get_ids( - app_client, - method="post", - json={ - "datetime": "2020-01-01T00:00:00Z/2020-01-02T00:00:00Z", - "collections": [collection_id], - }, - ) - assert feature_ids == { - "null-datetime-item", # Matches start_datetime <= lte, end_datetime >= gte - "valid-datetime-item", # Matches properties__datetime in range - }, "POST range search failed" - - # Test 4: Exact datetime matching only range-item's datetime - feature_ids = await _search_and_get_ids( - app_client, - params={ - "datetime": "2020-01-03T00:00:00Z", - "collections": [collection_id], - }, - ) - assert feature_ids == { - "range-item", # Matches properties__datetime - }, "Exact datetime for range-item failed" - - # Test 5: Range matching null-datetime-item but not range-item's datetime - feature_ids = await _search_and_get_ids( - app_client, - params={ - "datetime": "2020-01-01T12:00:00Z/2020-01-02T12:00:00Z", - "collections": [collection_id], - }, - ) - assert feature_ids == { - "null-datetime-item", # Overlaps: search range [12:00-01-01 to 12:00-02-01] overlaps item range [00:00-01-01 to 00:00-02-01] - }, "Range search excluding range-item datetime failed" + # Refresh indices once + try: + await refresh_indices(txn_client) + except Exception as e: + logger.error(f"Failed to refresh indices: {e}") + pytest.fail(f"Index refresh failed: {e}") + + # Test 1: Exact datetime matching valid-datetime-item and null-datetime-item + feature_ids = await _search_and_get_ids( + app_client, + params={ + "datetime": "2020-01-01T11:00:00Z", + "collections": [collection_id], + }, + ) + assert feature_ids == { + "valid-datetime-item", # Matches properties__datetime + "null-datetime-item", # Matches start_datetime <= datetime <= end_datetime + }, "Exact datetime search failed" + + # Test 2: Range including valid-datetime-item, null-datetime-item, and range-item + feature_ids = await _search_and_get_ids( + app_client, + params={ + "datetime": "2020-01-01T00:00:00Z/2020-01-03T00:00:00Z", + "collections": [collection_id], + }, + ) + assert feature_ids == { + "valid-datetime-item", # Matches properties__datetime in range + "null-datetime-item", # Matches start_datetime <= lte, end_datetime >= gte + "range-item", # Matches properties__datetime in range + }, "Range search failed" + + # Test 3: POST request for range matching null-datetime-item and valid-datetime-item + feature_ids = await _search_and_get_ids( + app_client, + method="post", + json={ + "datetime": "2020-01-01T00:00:00Z/2020-01-02T00:00:00Z", + "collections": [collection_id], + }, + ) + assert feature_ids == { + "null-datetime-item", # Matches start_datetime <= lte, end_datetime >= gte + "valid-datetime-item", # Matches properties__datetime in range + }, "POST range search failed" + + # Test 4: Exact datetime matching only range-item's datetime + feature_ids = await _search_and_get_ids( + app_client, + params={ + "datetime": "2020-01-03T00:00:00Z", + "collections": [collection_id], + }, + ) + assert feature_ids == { + "range-item", # Matches properties__datetime + }, "Exact datetime for range-item failed" + + # Test 5: Range matching null-datetime-item but not range-item's datetime + feature_ids = await _search_and_get_ids( + app_client, + params={ + "datetime": "2020-01-01T12:00:00Z/2020-01-02T12:00:00Z", + "collections": [collection_id], + }, + ) + assert feature_ids == { + "null-datetime-item", # Overlaps: search range [12:00-01-01 to 12:00-02-01] overlaps item range [00:00-01-01 to 00:00-02-01] + }, "Range search excluding range-item datetime failed" - # Cleanup - try: - await txn_client.delete_collection(test_collection["id"]) - except Exception as e: - logger.warning(f"Failed to delete collection: {e}") + # Cleanup + try: + await txn_client.delete_collection(test_collection["id"]) + except Exception as e: + logger.warning(f"Failed to delete collection: {e}") + finally: + # Restore original STAC validator setting + if original_validator_setting: + os.environ["ENABLE_STAC_VALIDATOR"] = original_validator_setting @pytest.mark.asyncio