Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 99 additions & 48 deletions eodag/plugins/download/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import shutil
import tarfile
import zipfile
from copy import deepcopy
from email.message import Message
from itertools import chain
from json import JSONDecodeError
Expand Down Expand Up @@ -1075,6 +1076,24 @@ def _stream_download(
product.filename = filename
return product._stream.iter_content(chunk_size=64 * 1024)

def _handle_retry(self, asset_href: str, e: Exception, retries_sofar: int) -> int:
retries = retries_sofar
if retries > 2:
logger.error(
"Unexpected error at download of asset %s: %s",
asset_href,
e,
)
raise DownloadError(e)
else:
logger.warning(
"Retry because of unexpected error at download of asset %s: %s",
asset_href,
e,
)
retries += 1
return retries

def _stream_download_assets(
self,
product: EOProduct,
Expand Down Expand Up @@ -1146,58 +1165,90 @@ def get_chunks_generator(asset: Asset) -> Iterator[bytes]:
auth_object = None

# Make the request inside the generator
try:
with requests.get(
asset_href,
stream=True,
auth=auth_object,
params=params,
headers=USER_AGENT,
timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT,
verify=ssl_verify,
) as stream:
stream.raise_for_status()

# Process asset path
asset_rel_path = (
asset.rel_path.replace(assets_common_subdir, "").strip(os.sep)
if flatten_top_dirs
else asset.rel_path
)
asset_rel_dir = os.path.dirname(asset_rel_path)
partial_result = b""
continue_requests = True
retries = 0
while continue_requests:
continue_requests = False
headers = deepcopy(USER_AGENT)
if partial_result:
headers["Range"] = "bytes=%d-" % len(partial_result)
try:
with requests.get(
asset_href,
stream=True,
auth=auth_object,
params=params,
headers=headers,
timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT,
verify=ssl_verify,
) as stream:
stream.raise_for_status()

if not getattr(asset, "filename", None):
# try getting filename in GET header if was not found in HEAD result
asset_content_disposition = stream.headers.get(
"content-disposition"
)
if asset_content_disposition:
asset.filename = cast(
Optional[str],
parse_header(asset_content_disposition).get_param(
"filename", None
),
# Process asset path
asset_rel_path = (
asset.rel_path.replace(assets_common_subdir, "").strip(
os.sep
)
if flatten_top_dirs
else asset.rel_path
)
asset_rel_dir = os.path.dirname(asset_rel_path)

if not getattr(asset, "filename", None):
# default filename extracted from path
asset.filename = os.path.basename(asset.rel_path)

asset.rel_path = os.path.join(
asset_rel_dir, cast(str, asset.filename)
)

for chunk in stream.iter_content(chunk_size=64 * 1024):
if chunk:
progress_callback(len(chunk))
yield chunk
if not getattr(asset, "filename", None):
# try getting filename in GET header if was not found in HEAD result
asset_content_disposition = stream.headers.get(
"content-disposition"
)
if asset_content_disposition:
asset.filename = cast(
Optional[str],
parse_header(asset_content_disposition).get_param(
"filename", None
),
)

if not getattr(asset, "filename", None):
# default filename extracted from path
asset.filename = os.path.basename(asset.rel_path)

asset.rel_path = os.path.join(
asset_rel_dir, cast(str, asset.filename)
)

except requests.exceptions.Timeout as exc:
raise TimeOutError(
exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT
) from exc
except RequestException as e:
self._handle_asset_exception(e, asset)
for chunk in stream.iter_content(chunk_size=64 * 1024):
if chunk:
progress_callback(len(chunk))
partial_result += chunk
yield chunk

except requests.exceptions.Timeout as exc:
raise TimeOutError(
exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT
) from exc
except requests.exceptions.ChunkedEncodingError as e:
# if stream download gets interrupted, try to store point of failure and resume
# (if range requests are supported)
if (
"Accept-Ranges" in stream.headers
and stream.headers["Accept-Ranges"] == "bytes"
):
logger.warning(
"Interruption of stream of asset: %s: %s", asset["href"], e
)
continue_requests = True
else:
# if range requests are not supported retry twice
continue_requests = True
partial_result = b""
retries = self._handle_retry(asset["href"], e, retries)
except requests.exceptions.ConnectionError as ce:
# retry in case of connection problem
continue_requests = True
partial_result = b""
retries = self._handle_retry(asset["href"], ce, retries)
except RequestException as e:
self._handle_asset_exception(e, asset)

assets_stream_list = []

Expand Down
Loading