diff --git a/eodag/plugins/download/http.py b/eodag/plugins/download/http.py index d49f6a8615..916d14fe23 100644 --- a/eodag/plugins/download/http.py +++ b/eodag/plugins/download/http.py @@ -23,6 +23,7 @@ import shutil import tarfile import zipfile +from copy import deepcopy from email.message import Message from itertools import chain from json import JSONDecodeError @@ -1075,6 +1076,24 @@ def _stream_download( product.filename = filename return product._stream.iter_content(chunk_size=64 * 1024) + def _handle_retry(self, asset_href: str, e: Exception, retries_sofar: int) -> int: + retries = retries_sofar + if retries > 2: + logger.error( + "Unexpected error at download of asset %s: %s", + asset_href, + e, + ) + raise DownloadError(e) + else: + logger.warning( + "Retry because of unexpected error at download of asset %s: %s", + asset_href, + e, + ) + retries += 1 + return retries + def _stream_download_assets( self, product: EOProduct, @@ -1146,58 +1165,90 @@ def get_chunks_generator(asset: Asset) -> Iterator[bytes]: auth_object = None # Make the request inside the generator - try: - with requests.get( - asset_href, - stream=True, - auth=auth_object, - params=params, - headers=USER_AGENT, - timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, - verify=ssl_verify, - ) as stream: - stream.raise_for_status() - - # Process asset path - asset_rel_path = ( - asset.rel_path.replace(assets_common_subdir, "").strip(os.sep) - if flatten_top_dirs - else asset.rel_path - ) - asset_rel_dir = os.path.dirname(asset_rel_path) + partial_result = b"" + continue_requests = True + retries = 0 + while continue_requests: + continue_requests = False + headers = deepcopy(USER_AGENT) + if partial_result: + headers["Range"] = "bytes=%d-" % len(partial_result) + try: + with requests.get( + asset_href, + stream=True, + auth=auth_object, + params=params, + headers=headers, + timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT, + verify=ssl_verify, + ) as stream: + stream.raise_for_status() - if not getattr(asset, "filename", None): - # try getting filename in GET header if was not found in HEAD result - asset_content_disposition = stream.headers.get( - "content-disposition" - ) - if asset_content_disposition: - asset.filename = cast( - Optional[str], - parse_header(asset_content_disposition).get_param( - "filename", None - ), + # Process asset path + asset_rel_path = ( + asset.rel_path.replace(assets_common_subdir, "").strip( + os.sep ) + if flatten_top_dirs + else asset.rel_path + ) + asset_rel_dir = os.path.dirname(asset_rel_path) - if not getattr(asset, "filename", None): - # default filename extracted from path - asset.filename = os.path.basename(asset.rel_path) - - asset.rel_path = os.path.join( - asset_rel_dir, cast(str, asset.filename) - ) - - for chunk in stream.iter_content(chunk_size=64 * 1024): - if chunk: - progress_callback(len(chunk)) - yield chunk + if not getattr(asset, "filename", None): + # try getting filename in GET header if was not found in HEAD result + asset_content_disposition = stream.headers.get( + "content-disposition" + ) + if asset_content_disposition: + asset.filename = cast( + Optional[str], + parse_header(asset_content_disposition).get_param( + "filename", None + ), + ) + + if not getattr(asset, "filename", None): + # default filename extracted from path + asset.filename = os.path.basename(asset.rel_path) + + asset.rel_path = os.path.join( + asset_rel_dir, cast(str, asset.filename) + ) - except requests.exceptions.Timeout as exc: - raise TimeOutError( - exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT - ) from exc - except RequestException as e: - self._handle_asset_exception(e, asset) + for chunk in stream.iter_content(chunk_size=64 * 1024): + if chunk: + progress_callback(len(chunk)) + partial_result += chunk + yield chunk + + except requests.exceptions.Timeout as exc: + raise TimeOutError( + exc, timeout=DEFAULT_STREAM_REQUESTS_TIMEOUT + ) from exc + except requests.exceptions.ChunkedEncodingError as e: + # if stream download gets interrupted, try to store point of failure and resume + # (if range requests are supported) + if ( + "Accept-Ranges" in stream.headers + and stream.headers["Accept-Ranges"] == "bytes" + ): + logger.warning( + "Interruption of stream of asset: %s: %s", asset["href"], e + ) + continue_requests = True + else: + # if range requests are not supported retry twice + continue_requests = True + partial_result = b"" + retries = self._handle_retry(asset["href"], e, retries) + except requests.exceptions.ConnectionError as ce: + # retry in case of connection problem + continue_requests = True + partial_result = b"" + retries = self._handle_retry(asset["href"], ce, retries) + except RequestException as e: + self._handle_asset_exception(e, asset) assets_stream_list = []