Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/cicd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip wheel pytest-timeout
pip install ./stac_fastapi/core[catalogs]
pip install ./stac_fastapi/core
pip install ./stac_fastapi/sfeos_helpers
pip install ./stac_fastapi/elasticsearch[dev,server]
pip install ./stac_fastapi/opensearch[dev,server]
pip install ./stac_fastapi/elasticsearch[catalogs,dev,server,validator]
pip install ./stac_fastapi/opensearch[catalogs,dev,server,validator]

- name: Run test suite
run: |
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Added `ENABLE_STAC_VALIDATOR` environment variable to enable strict STAC schema validation on ingestion via the Python `stac-valid` package. [#732](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/732)
- Added `[validator]` installation extra to `stac-fastapi-core`, `elasticsearch`, and `opensearch` packages. [#732](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/732)
- Added test for conformance endpoint in catalogs extension. [#727](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/727)

### Changed

- Updated STAC validation from batch_validator to FastValidator for improved performance using fastjsonschema. [#732](https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch/pull/729)

### Fixed

### Removed
Expand Down
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ test-elasticsearch-catalogs: image-es-os
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest extensions/test_catalogs.py -v'
docker compose down

.PHONY: test-elasticsearch-validation
test-elasticsearch-validation: image-es-os
-$(run_es) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh elasticsearch:9200 && cd stac_fastapi/tests/ && pytest api/test_api_stac_validator.py -v'
docker compose down

.PHONY: test-opensearch
test-opensearch: image-es-os
-$(run_os) /bin/bash -c 'export && ./scripts/wait-for-it-es.sh opensearch:9202 && cd stac_fastapi/tests/ && pytest'
Expand Down
71 changes: 69 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ The following organizations have contributed time and/or funding to support the
- **11/07/2025:** 🌍 The SFEOS STAC Viewer is now available at: https://healy-hyperspatial.github.io/sfeos-web. Use this site to examine your data and test your STAC API!
- **10/24/2025:** Added `previous_token` pagination using Redis for efficient navigation. This feature allows users to navigate backwards through large result sets by storing pagination state in Redis. To use this feature, ensure Redis is configured (see [Redis for navigation](#redis-for-navigation)) and set `REDIS_ENABLE=true` in your environment.
- **10/23/2025:** The `EXCLUDED_FROM_QUERYABLES` environment variable was added to exclude fields from the `queryables` endpoint. See [docs](#excluding-fields-from-queryables).
- **10/15/2025:** 🚀 SFEOS Tools v0.1.0 Released! - The new `sfeos-tools` CLI is now available on [PyPI](https://pypi.org/project/sfeos-tools/)
- **10/15/2025:** Added `reindex` command to **[SFEOS-tools](https://github.com/Healy-Hyperspatial/sfeos-tools)** for zero-downtime index updates when changing mappings or settings. The new `reindex` command makes it easy to apply mapping changes, update index settings, or migrate to new index structures without any service interruption, ensuring high availability of your STAC API during maintenance operations.

<details style="border: 1px solid #eaecef; border-radius: 6px; padding: 10px; margin-bottom: 16px; background-color: #f9f9f9;">
<summary style="cursor: pointer; font-weight: bold; margin: -10px -10px 0; padding: 10px; background-color: #f0f0f0; border-bottom: 1px solid #eaecef; border-top-left-radius: 6px; border-top-right-radius: 6px;">View Older News (Click to Expand)</summary>

-------------
- **10/15/2025:** 🚀 SFEOS Tools v0.1.0 Released! - The new `sfeos-tools` CLI is now available on [PyPI](https://pypi.org/project/sfeos-tools/)
- **10/15/2025:** Added `reindex` command to **[SFEOS-tools](https://github.com/Healy-Hyperspatial/sfeos-tools)** for zero-downtime index updates when changing mappings or settings. The new `reindex` command makes it easy to apply mapping changes, update index settings, or migrate to new index structures without any service interruption, ensuring high availability of your STAC API during maintenance operations.
- **10/12/2025:** Collections search **bbox** functionality added! The collections search extension now supports bbox queries. Collections will need to be updated via the API or with the new **[SFEOS-tools](https://github.com/Healy-Hyperspatial/sfeos-tools)** CLI package to support geospatial discoverability. 🙏 Thanks again to **CloudFerro** for their sponsorship of this work!
- **10/04/2025:** The **[CloudFerro](https://cloudferro.com/)** logo has been added to the sponsors and supporters list above. Their sponsorship of the ongoing collections search extension work has been invaluable. This is in addition to the many other important changes and updates their developers have added to the project.
- **09/25/2025:** v6.5.0 adds a new GET/POST /collections-search endpoint (disabled by default via ENABLE_COLLECTIONS_SEARCH_ROUTE) to avoid conflicts with the Transactions Extension, and enhances collections search with structured filtering (CQL2 JSON/text), query, and datetime filtering. These changes make collection discovery more powerful and configurable while preserving compatibility with transaction-enabled deployments.
Expand Down Expand Up @@ -106,6 +106,7 @@ This project is built on the following technologies: STAC, stac-fastapi, FastAPI
- [Using Pre-built Docker Images](#using-pre-built-docker-images)
- [Using Docker Compose](#using-docker-compose)
- [Configuration Reference](#configuration-reference)
- [STAC Validation](#stac-validation)
- [Free-Text Search (`q` parameter)](#free-text-search-q-parameter)
- [Queryables Endpoint](#queryables-endpoint)
- [Root Queryables Configuration](#root-queryables-configuration)
Expand Down Expand Up @@ -741,6 +742,7 @@ You can customize additional settings in your `.env` file:
| `ENABLE_COLLECTIONS_SEARCH_ROUTE` | Enable the custom `/collections-search` endpoint (both GET and POST methods). When disabled, the custom endpoint will not be available, but collection search extensions will still be available on the core `/collections` endpoint if `ENABLE_COLLECTIONS_SEARCH` is true. | `false` | Optional |
| `ENABLE_TRANSACTIONS_EXTENSIONS` | Enables or disables the Transactions and Bulk Transactions API extensions. This is useful for deployments where mutating the catalog via the API should be prevented. If set to `true`, the POST `/collections` route for search will be unavailable in the API. | `true` | Optional |
| `ENABLE_CATALOGS_ROUTE` | Enable the **/catalogs** endpoint for hierarchical catalog browsing and navigation. **Note:** Requires the catalogs extension to be installed via `stac-fastapi-elasticsearch[catalogs]`, `stac-fastapi-opensearch[catalogs]`, or `stac-fastapi-core[catalogs]`. See [Catalogs Route](#catalogs-route) for installation instructions. | `false` | Optional |
| `ENABLE_STAC_VALIDATOR` | Enable [stac-validator](https://github.com/stac-utils/stac-validator) to validate STAC items and collections on ingestion. This is especially useful for items or collections that use extensions. | `false` | Optional |
| `STAC_INDEX_ASSETS` | Controls if Assets are indexed when added to Elasticsearch/Opensearch. This allows asset fields to be included in search queries. | `false` | Optional |

### 5. Limits & Performance
Expand Down Expand Up @@ -794,6 +796,71 @@ You can customize additional settings in your `.env` file:
> [!NOTE]
> The variables `ES_HOST`, `ES_PORT`, `ES_USE_SSL`, `ES_VERIFY_CERTS` and `ES_TIMEOUT` apply to both Elasticsearch and OpenSearch backends, so there is no need to rename the key names to `OS_` even if you're using OpenSearch.

## STAC Validation

STAC FastAPI provides a flexible, 2-tier validation architecture for STAC items and collections on ingestion. This ensures data quality and compliance with the STAC specification while allowing you to balance strict schema enforcement with high-throughput ingestion performance.

### 1. Native Pydantic Validation (Always Enabled)

By default, all STAC items and collections are validated using **Pydantic** (via `stac-pydantic`) at the API routing layer. This validation:

- Enforces required STAC fields and correct data types.
- Validates spatial and temporal properties.
- Provides extremely fast, built-in validation without external dependencies.

This validation is always enabled and happens automatically before data reaches the database or the Redis queue.

### 2. Python STAC Validator

If you require strict validation beyond Pydantic's type checking, you can enable the Python-based `stac-valid` package with FastValidator backend for improved performance.

#### Enabling STAC Validator with FastValidator

1. **Install the validator**:
```bash
pip install stac-fastapi-core[validator]
# or
pip install stac-fastapi-elasticsearch[validator]
# or
pip install stac-fastapi-opensearch[validator]
```

2. **Enable validation via environment variable**:
```bash
export ENABLE_STAC_VALIDATOR=true
```

When enabled, the STAC validator using FastValidator will:
- Validate items and collections against the official STAC JSON schemas using `fastjsonschema` for performance
- Check compliance with STAC extensions (e.g., EO, SAR, Projection)
- Catch schema violations that Pydantic doesn't enforce
- Provide detailed error messages grouped by validation error type
- Support batch validation of FeatureCollections with per-item error reporting

#### Example: Validation in Action

```bash
# Enable STAC validator
export ENABLE_STAC_VALIDATOR=true

# Now POST/PUT requests will validate against STAC schemas
curl -X POST http://localhost:8000/collections \
-H "Content-Type: application/json" \
-d @collection.json
```

If validation fails, you'll receive a detailed error response:
```json
{
"detail": "STAC validation failed: 'eo:bands' does not match any of the regexes: '^(?!eo:)'. Error is in assets -> SR_B2"
}
```

#### Performance Considerations

- **Pydantic validation**: Very fast and always enabled
- **STAC validator with FastValidator** (`ENABLE_STAC_VALIDATOR`): Uses `fastjsonschema` for efficient schema validation with batch processing support for FeatureCollections

## Free-Text Search (`q` parameter)

The free-text search feature allows users to discover items and collections using keywords or phrases. By default, the search targets core fields: `id`, `collection`, `properties.title`, `properties.description`, and `properties.keywords`.
Expand Down
14 changes: 10 additions & 4 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ services:
- DATABASE_REFRESH=true
- ENABLE_COLLECTIONS_SEARCH_ROUTE=true
- ENABLE_CATALOGS_ROUTE=true
- ENABLE_STAC_VALIDATOR=true
- REDIS_ENABLE=true
- REDIS_HOST=redis
- REDIS_PORT=6379
Expand All @@ -34,8 +35,10 @@ services:
- ./scripts:/app/scripts
- ./esdata:/usr/share/elasticsearch/data
depends_on:
- elasticsearch
- redis
elasticsearch:
condition: service_started
redis:
condition: service_started
command:
bash -c "./scripts/wait-for-it-es.sh es-container:9200 && python -m stac_fastapi.elasticsearch.app"

Expand Down Expand Up @@ -65,6 +68,7 @@ services:
- STAC_FASTAPI_RATE_LIMIT=200/minute
- ENABLE_COLLECTIONS_SEARCH_ROUTE=true
- ENABLE_CATALOGS_ROUTE=true
- ENABLE_STAC_VALIDATOR=true
- REDIS_ENABLE=true
- REDIS_HOST=redis
- REDIS_PORT=6379
Expand All @@ -75,8 +79,10 @@ services:
- ./scripts:/app/scripts
- ./osdata:/usr/share/opensearch/data
depends_on:
- opensearch
- redis
elasticsearch:
condition: service_started
redis:
condition: service_started
command:
bash -c "./scripts/wait-for-it-es.sh os-container:9202 && python -m stac_fastapi.opensearch.app"

Expand Down
4 changes: 2 additions & 2 deletions dockerfiles/Dockerfile.dev.es-os
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ COPY . /app

RUN pip install --no-cache-dir -e ./stac_fastapi/core
RUN pip install --no-cache-dir -e ./stac_fastapi/sfeos_helpers
RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,catalogs]
RUN pip install --no-cache-dir -e ./stac_fastapi/opensearch[dev,server,catalogs]
RUN pip install --no-cache-dir -e ./stac_fastapi/elasticsearch[dev,server,catalogs,validator]
RUN pip install --no-cache-dir -e ./stac_fastapi/opensearch[dev,server,catalogs,validator]
86 changes: 72 additions & 14 deletions scripts/item_queue_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from redis.exceptions import LockError

from stac_fastapi.core.redis_utils import AsyncRedisQueueManager, ItemQueueSettings
from stac_fastapi.core.utilities import get_bool_env
from stac_fastapi.core.validate import async_validate_with_fast_validator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -167,6 +169,10 @@ async def _flush_collection(self, collection_id: str) -> None:

The lock TTL is periodically refreshed by a background task to prevent
expiration during long-running batch processing.

If strict validation is enabled via `ENABLE_STAC_VALIDATOR`, items are
validated using FastValidator before database insertion. Invalid items are
routed to the Dead Letter Queue (DLQ), and only valid items are inserted.
"""
state = self._get_state(collection_id)

Expand Down Expand Up @@ -206,54 +212,106 @@ async def _flush_collection(self, collection_id: str) -> None:
break

batch_num += 1
item_ids = [item["id"] for item in items]

logger.info(
f"Collection '{collection_id}' batch #{batch_num}: flushing {len(items)} items"
f"Collection '{collection_id}' batch #{batch_num}: pulled {len(items)} items from queue"
)

# VALIDATION LAYER: Use batch validation for efficiency (if enabled)
if get_bool_env("ENABLE_STAC_VALIDATOR"):
(
valid_items,
validation_errors,
) = await async_validate_with_fast_validator(items)

# Extract invalid item IDs from grouped validation errors
invalid_item_ids = set()
for error_msg, item_ids in validation_errors.items():
for item_id in item_ids:
invalid_item_ids.add(item_id)
logger.error(
f"Worker validation failed for '{item_id}' in collection '{collection_id}': {error_msg}"
)
else:
# Skip STAC validation when disabled
valid_items = items
invalid_item_ids = set()

# Handle invalid items (Dead Letter Queue)
if invalid_item_ids:
try:
await self.queue_manager.save_failed_items(
collection_id, list(invalid_item_ids)
)
await self.queue_manager.mark_items_processed(
collection_id, list(invalid_item_ids)
)
except Exception:
logger.exception(
f"Collection '{collection_id}': failed to save {len(invalid_item_ids)} invalid items to DLQ"
)

# If entire batch was invalid, skip database call
if not valid_items:
logger.warning(
f"Collection '{collection_id}' batch #{batch_num}: All {len(items)} items failed STAC validation. Skipping DB insert."
)
state.last_flush_time = time.monotonic()
if len(items) < batch_size:
break
continue

# DATABASE INSERTION: Only valid items reach the database
try:
success, errors = await self.db.bulk_async(
collection_id=collection_id,
processed_items=items,
processed_items=valid_items,
op_type="index",
)
except Exception:
logger.exception(
f"Collection '{collection_id}' batch #{batch_num}: bulk_async failed ({len(items)} items)"
f"Collection '{collection_id}' batch #{batch_num}: bulk_async failed ({len(valid_items)} valid items)"
)
break

failed_ids = self._extract_failed_item_ids(errors) if errors else set()
successful_ids = [iid for iid in item_ids if iid not in failed_ids]
# Handle database errors
failed_db_ids = (
self._extract_failed_item_ids(errors) if errors else set()
)
successful_db_ids = [
item["id"]
for item in valid_items
if item["id"] not in failed_db_ids
]

if errors:
logger.error(
f"Collection '{collection_id}' batch #{batch_num}: "
f"{len(failed_ids)} item(s) failed, saving to DLQ. "
f"{len(failed_db_ids)} DB insert(s) failed, saving to DLQ. "
f"Bulk errors: {errors}"
)

if successful_ids:
if successful_db_ids:
await self.queue_manager.mark_items_processed(
collection_id, successful_ids
collection_id, successful_db_ids
)

if failed_ids:
if failed_db_ids:
try:
await self.queue_manager.save_failed_items(
collection_id, list(failed_ids)
collection_id, list(failed_db_ids)
)
await self.queue_manager.mark_items_processed(
collection_id, list(failed_ids)
collection_id, list(failed_db_ids)
)
except Exception:
logger.exception(
f"Collection '{collection_id}': failed to save {len(failed_ids)} item(s) to DLQ; items remain in pending queue"
f"Collection '{collection_id}': failed to save {len(failed_db_ids)} DB failures to DLQ"
)

logger.info(
f"Collection '{collection_id}' batch #{batch_num}: {success} succeeded, {len(errors)} errors"
f"Collection '{collection_id}' batch #{batch_num}: {success} succeeded DB insert, "
f"{len(invalid_item_ids)} failed STAC validation, {len(failed_db_ids)} failed DB insert."
)

state.last_flush_time = time.monotonic()
Expand Down
3 changes: 3 additions & 0 deletions stac_fastapi/core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ sentry = [
catalogs = [
"stac-fastapi-catalogs-extension==0.2.0",
]
validator = [
"stac-valid~=4.2.2"
]

[project.urls]
Homepage = "https://github.com/stac-utils/stac-fastapi-elasticsearch-opensearch"
Expand Down
Loading
Loading