Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions eodag/api/product/_product.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import base64
import json
import logging
import os
import re
Expand All @@ -27,11 +28,15 @@

import orjson
import requests
from requests import RequestException
from boto3 import Session
from boto3.resources.base import ServiceResource
from requests import PreparedRequest, RequestException
from requests.auth import AuthBase
from requests.structures import CaseInsensitiveDict
from shapely import geometry
from shapely.errors import ShapelyError

from eodag.plugins.authentication.aws_auth import AwsAuth
from eodag.types.queryables import CommonStacMetadata
from eodag.types.stac_metadata import create_stac_metadata_model

Expand Down Expand Up @@ -63,7 +68,12 @@
format_string,
get_geometry_from_various,
)
from eodag.utils.exceptions import DownloadError, MisconfiguredError, ValidationError
from eodag.utils.exceptions import (
DatasetCreationError,
DownloadError,
MisconfiguredError,
ValidationError,
)
from eodag.utils.repr import dict_to_html_table

if TYPE_CHECKING:
Expand Down Expand Up @@ -525,6 +535,88 @@ def stream_download(
**kwargs,
)

def get_storage_options(
self,
asset_key: Optional[str] = None,
) -> dict[str, Any]:
"""
Get fsspec storage_options keyword arguments
"""
auth = self.downloader_auth.authenticate() if self.downloader_auth else None
if self.downloader is None:
return {}

# default url and headers
try:
url = self.assets[asset_key]["href"] if asset_key else self.location
except KeyError as e:
raise DatasetCreationError(f"{asset_key} not found in {self} assets") from e
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not creating a xarray here. The exception class is not accurate.

headers = {**USER_AGENT}

if isinstance(auth, ServiceResource) and isinstance(
self.downloader_auth, AwsAuth
):
auth_kwargs: dict[str, Any] = dict()
# AwsAuth
if s3_endpoint := getattr(self.downloader_auth.config, "s3_endpoint", None):
auth_kwargs["client_kwargs"] = {"endpoint_url": s3_endpoint}
if creds := cast(
Session, self.downloader_auth.s3_session
).get_credentials():
auth_kwargs["key"] = creds.access_key
auth_kwargs["secret"] = creds.secret_key
if creds.token:
auth_kwargs["token"] = creds.token
if requester_pays := getattr(
self.downloader_auth.config, "requester_pays", False
):
auth_kwargs["requester_pays"] = requester_pays
else:
auth_kwargs["anon"] = True
return {"path": url, **auth_kwargs}

if isinstance(auth, AuthBase):
# update url and headers with auth
req = PreparedRequest()
req.url = url
req.headers = CaseInsensitiveDict(headers)
if auth:
auth(req)
return {"path": req.url, "headers": dict(req.headers)}

return {"path": url}

def request_asset(
self,
url: str,
) -> requests.Response:
"""Perform a GET request to the given URL using product's authentication headers."""
headers = self.get_storage_options().get("headers", {})
return requests.get(url, headers=headers, stream=True)
Comment on lines +589 to +595
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use the stream download method from EODAG download plugins instead of creating a new method.


def list_zarr_files_from_metadata(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In zarr, we have keys and not files. I suppose you can rename the method to something like list_zarr_keys ?

self,
base_url: str,
) -> list[str]:
"""List file paths from a Zarr store metadata file."""
import fsspec # type: ignore[import-untyped]

headers = self.get_storage_options().get("headers", {})
mapper = fsspec.get_mapper(
base_url,
client_kwargs={"headers": headers, "trust_env": False},
)
Comment on lines +605 to +608
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about introducing fsspec in EODAG library.
The library is a wrapper above multiple backends. To use it, you need to bring backends to support all the filesystems: s3fs for S3, gcsfs for google cloud storage and adlfs for azure blob storage.
Most of the times, users will store zarr files on a S3 compatible store but some may store them on google or azure stores.
Maybe we don't want to support google / azure stores ? or in an optional dependencies branch ?

My opinion:

  1. we do not support google and azure yet. Maybe once if we replace boto3 by obstore.
  2. we do not bring fsspec in EODAG but instead call boto3 to get the list of zarr keys. If a dedicated function is needed to get the list of keys with boto3, it should live in eodag/utils/s3.py.

@sbrunato what do you think?


if ".zmetadata" in mapper:
meta = json.loads(mapper[".zmetadata"])
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use orjson instead of json.

return [".zmetadata", *meta["metadata"].keys()]

# TODO: Support Zarr v3 when test data becomes available.
# Zarr v2 uses `.zmetadata`, while Zarr v3 exposes `zarr.json`.
# The implementation should be straightforward once we can validate it
# against real examples.
raise ValueError(f"No Zarr metadata file found at {base_url}")
Comment on lines +614 to +618
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With Destination Earth, we do have a zarr v3 store: DanubeHis data. I don't understand what is blocking here.


def _init_progress_bar(
self,
progress_callback: Optional[ProgressCallback],
Expand Down
36 changes: 30 additions & 6 deletions eodag/plugins/authentication/openid_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
# limitations under the License.
from __future__ import annotations

import base64
import logging
import re
import string
from datetime import datetime, timedelta, timezone
from random import SystemRandom
from threading import Lock
from typing import TYPE_CHECKING, Any, Optional
from urllib.parse import parse_qs, urlparse

Expand Down Expand Up @@ -76,6 +78,7 @@ class OIDCRefreshTokenBase(Authentication):

def __init__(self, provider: str, config: PluginConfig) -> None:
super(OIDCRefreshTokenBase, self).__init__(provider, config)
self._auth_lock = Lock()

self.access_token = ""
self.access_token_expiration = datetime.min.replace(tzinfo=timezone.utc)
Expand Down Expand Up @@ -252,8 +255,9 @@ class OIDCAuthorizationCodeFlowAuth(OIDCRefreshTokenBase):
* :attr:`~eodag.config.PluginConfig.token_key` (``str``): The key pointing
to the token in the json response to the POST request to the token server
* :attr:`~eodag.config.PluginConfig.token_provision` (``str``) (**mandatory**): One of
``qs`` or ``header``. This is how the token obtained will be used to authenticate the
user on protected requests. If ``qs`` is chosen, then ``token_qs_key`` is mandatory
``qs``, ``header`` or ``basic``. This is how the token obtained will be used to authenticate the
user on protected requests. If ``qs`` is chosen, then ``token_qs_key`` is mandatory. If ``basic`` is chosen,
the token is used as password with username "anonymous".
* :attr:`~eodag.config.PluginConfig.login_form_xpath` (``str``) (**mandatory**): The
xpath to the HTML form element representing the user login form
* :attr:`~eodag.config.PluginConfig.authentication_uri_source` (``str``) (**mandatory**): Where
Expand Down Expand Up @@ -301,9 +305,13 @@ def __init__(self, provider: str, config: PluginConfig) -> None:
def validate_config_credentials(self) -> None:
"""Validate configured credentials"""
super(OIDCAuthorizationCodeFlowAuth, self).validate_config_credentials()
if getattr(self.config, "token_provision", None) not in ("qs", "header"):
if getattr(self.config, "token_provision", None) not in (
"qs",
"header",
"basic",
):
raise MisconfiguredError(
'Provider config parameter "token_provision" must be one of "qs" or "header"'
'Provider config parameter "token_provision" must be one of "qs", "header", or "basic"'
)
if self.config.token_provision == "qs" and not getattr(
self.config, "token_qs_key", ""
Expand All @@ -315,12 +323,14 @@ def validate_config_credentials(self) -> None:

def authenticate(self) -> CodeAuthorizedAuth:
"""Authenticate"""
self._get_access_token()
with self._auth_lock:
self._get_access_token()

return CodeAuthorizedAuth(
self.access_token,
self.config.token_provision,
key=getattr(self.config, "token_qs_key", None),
refresh_token=self.refresh_token,
)

def _request_new_token(self) -> dict[str, str]:
Expand Down Expand Up @@ -583,10 +593,17 @@ def compute_state() -> str:
class CodeAuthorizedAuth(AuthBase):
"""CodeAuthorizedAuth custom authentication class to be used with requests module"""

def __init__(self, token: str, where: str, key: Optional[str] = None) -> None:
def __init__(
self,
token: str,
where: str,
key: Optional[str] = None,
refresh_token: Optional[str] = None,
) -> None:
self.token = token
self.where = where
self.key = key
self.refresh_token = refresh_token

def __call__(self, request: PreparedRequest) -> PreparedRequest:
"""Perform the actual authentication"""
Expand All @@ -601,6 +618,13 @@ def __call__(self, request: PreparedRequest) -> PreparedRequest:

elif self.where == "header":
request.headers["Authorization"] = "Bearer {}".format(self.token)

if self.where == "basic" and self.refresh_token is not None:
auth_str = base64.b64encode(
f"anonymous:{self.refresh_token}".encode()
).decode()
request.headers["Authorization"] = f"Basic {auth_str}"

logger.debug(
re.sub(
r"'Bearer [^']+'",
Expand Down
4 changes: 4 additions & 0 deletions eodag/utils/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,7 @@ def __init__(
f"Request timeout {timeout_msg} for URL {url}" if url else str(exception)
)
super().__init__(message)


class DatasetCreationError(EodagError):
"""An error indicating that :class:`xarray.Dataset` or :class:`eodag_cube.types.XarrayDict` could not be created"""
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ dependencies = [
"typing_extensions >= 4.8.0",
"urllib3",
"zipstream-ng",
"fsspec",
"aiohttp",
"requests"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requests is already a dependency in EODAG.

]
dynamic = ["version"]

Expand Down
3 changes: 3 additions & 0 deletions tests/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
from eodag.plugins.authentication.aws_auth import AwsAuth
from eodag.plugins.authentication.header import HeaderAuth
from eodag.plugins.authentication.openid_connect import CodeAuthorizedAuth
from eodag.plugins.authentication.header import HTTPHeaderAuth
from eodag.plugins.authentication.qsauth import HttpQueryStringAuth
from eodag.plugins.base import PluginTopic
from eodag.plugins.crunch.filter_date import FilterDate
from eodag.plugins.crunch.filter_latest_tpl_name import FilterLatestByName
Expand Down Expand Up @@ -136,6 +138,7 @@
UnsupportedProvider,
ValidationError,
InvalidDataError,
DatasetCreationError,
)
from eodag.utils.stac_reader import fetch_stac_items, _TextOpener
from tests import TEST_RESOURCES_PATH
Expand Down
50 changes: 45 additions & 5 deletions tests/units/test_auth_plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import pickle
import unittest
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from unittest import mock

import boto3
Expand Down Expand Up @@ -1996,12 +1996,18 @@ def get_auth_plugin(self, provider):
"jwks_uri": "http://foo.bar/auth/realms/myrealm/protocol/openid-connect/certs",
"id_token_signing_alg_values_supported": ["RS256", "HS512"],
}
mock_request.return_value.json.side_effect = [oidc_config, oidc_config]
mock_request.return_value.json.return_value = oidc_config
auth_plugin = super(
TestAuthPluginOIDCAuthorizationCodeFlowAuth, self
).get_auth_plugin(provider)
# reset token info
auth_plugin.token_info = {}
auth_plugin.access_token = ""
auth_plugin.refresh_token = ""
auth_plugin.access_token_expiration = datetime.min.replace(
tzinfo=timezone.utc
)
auth_plugin.refresh_token_expiration = datetime.min.replace(
tzinfo=timezone.utc
)
return auth_plugin

def test_plugins_auth_codeflowauth_validate_credentials(self):
Expand All @@ -2012,7 +2018,7 @@ def test_plugins_auth_codeflowauth_validate_credentials(self):
with self.assertRaises(MisconfiguredError) as context:
auth_plugin.validate_config_credentials()
self.assertTrue(
'"token_provision" must be one of "qs" or "header"'
'"token_provision" must be one of "qs", "header", or "basic"'
in str(context.exception)
)
# `token_provision=="qs"` but `token_qs_key` is missing
Expand Down Expand Up @@ -2248,6 +2254,40 @@ def test_plugins_auth_codeflowauth_authenticate_token_qs_key_ok(
self.assertEqual(auth.where, "qs")
self.assertEqual(auth.key, auth_plugin.config.token_qs_key)

@mock.patch(
"eodag.plugins.authentication.openid_connect.OIDCRefreshTokenBase.decode_jwt_token",
autospec=True,
)
@mock.patch(
"eodag.plugins.authentication.openid_connect.OIDCAuthorizationCodeFlowAuth._request_new_token",
autospec=True,
)
def test_plugins_auth_codeflowauth_authenticate_basic_ok(
self,
mock_request_new_token,
mock_decode,
):
"""OIDCAuthorizationCodeFlowAuth.authenticate must return a basic auth object with the refresh token."""
auth_plugin = self.get_auth_plugin("provider_ok")
auth_plugin.config.token_provision = "basic"
json_response = {
"access_token": "obtained-access-token",
"expires_in": "3600",
"refresh_expires_in": "7200",
"refresh_token": "obtained-refresh-token",
}
mock_request_new_token.return_value = json_response
mock_decode.return_value = {
"exp": (now_in_utc() + timedelta(seconds=3600)).timestamp()
}

auth = auth_plugin.authenticate()

self.assertIsInstance(auth, CodeAuthorizedAuth)
self.assertEqual(auth.token, json_response["access_token"])
self.assertEqual(auth.where, "basic")
self.assertEqual(auth.refresh_token, json_response["refresh_token"])

@mock.patch(
"eodag.plugins.authentication.openid_connect.OIDCAuthorizationCodeFlowAuth.authenticate_user",
autospec=True,
Expand Down
Loading
Loading