diff --git a/google/resumable_media/_helpers.py b/google/resumable_media/_helpers.py index 2043d19d..08f651aa 100644 --- a/google/resumable_media/_helpers.py +++ b/google/resumable_media/_helpers.py @@ -32,6 +32,7 @@ RANGE_HEADER = "range" CONTENT_RANGE_HEADER = "content-range" +CONTENT_ENCODING_HEADER = "content-encoding" _SLOW_CRC32C_WARNING = ( "Currently using crcmod in pure python form. This is a slow " @@ -40,6 +41,8 @@ ) _GENERATION_HEADER = "x-goog-generation" _HASH_HEADER = "x-goog-hash" +_STORED_CONTENT_ENCODING_HEADER = "x-goog-stored-content-encoding" + _MISSING_CHECKSUM = """\ No {checksum_type} checksum was returned from the service while downloading {} (which happens for composite objects), so client-side content integrity @@ -369,6 +372,23 @@ def add_query_parameters(media_url, query_params): return urlunsplit((scheme, netloc, path, query, frag)) +def _is_decompressive_transcoding(response, get_headers): + """Returns True if the object was served decompressed. This happens when the + "x-goog-stored-content-encoding" header is "gzip" and "content-encoding" header + is not "gzip". See more at: https://cloud.google.com/storage/docs/transcoding#transcoding_and_gzip + Args: + response (~requests.Response): The HTTP response object. + get_headers (callable: response->dict): returns response headers. + Returns: + bool: Returns True if decompressive transcoding has occurred; otherwise, False. + """ + headers = get_headers(response) + return ( + headers.get(_STORED_CONTENT_ENCODING_HEADER) == "gzip" + and headers.get(CONTENT_ENCODING_HEADER) != "gzip" + ) + + class _DoNothingHash(object): """Do-nothing hash object. diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index 58de0100..88de223f 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -36,6 +36,13 @@ {} """ +_STREAM_SEEK_ERROR = """\ +Incomplete download for: +{} +Error writing to stream while handling a gzip-compressed file download. +Please restart the download. +""" + class Download(_request_helpers.RequestsMixin, _download.Download): """Helper to manage downloading a resource from a Google API. @@ -206,7 +213,18 @@ def retriable_request(): self._process_response(result) + # With decompressive transcoding, GCS serves back the whole file regardless of the range request, + # thus we reset the stream position to the start of the stream. + # See: https://cloud.google.com/storage/docs/transcoding#range if self._stream is not None: + if _helpers._is_decompressive_transcoding(result, self._get_headers): + try: + self._stream.seek(0) + except Exception as exc: + msg = _STREAM_SEEK_ERROR.format(url) + raise Exception(msg) from exc + self._bytes_downloaded = 0 + self._write_to_stream(result) return result @@ -379,7 +397,18 @@ def retriable_request(): self._process_response(result) + # With decompressive transcoding, GCS serves back the whole file regardless of the range request, + # thus we reset the stream position to the start of the stream. + # See: https://cloud.google.com/storage/docs/transcoding#range if self._stream is not None: + if _helpers._is_decompressive_transcoding(result, self._get_headers): + try: + self._stream.seek(0) + except Exception as exc: + msg = _STREAM_SEEK_ERROR.format(url) + raise Exception(msg) from exc + self._bytes_downloaded = 0 + self._write_to_stream(result) return result diff --git a/tests/system/requests/test_download.py b/tests/system/requests/test_download.py index d2d69f06..347e19b5 100644 --- a/tests/system/requests/test_download.py +++ b/tests/system/requests/test_download.py @@ -294,6 +294,26 @@ def test_download_to_stream(self, add_files, authorized_transport): assert stream.getvalue() == actual_contents check_tombstoned(download, authorized_transport) + def test_download_gzip_w_stored_content_headers( + self, add_files, authorized_transport + ): + # Retrieve the gzip compressed file + info = ALL_FILES[-1] + actual_contents = self._get_contents(info) + blob_name = get_blob_name(info) + + # Create the actual download object. + media_url = utils.DOWNLOAD_URL_TEMPLATE.format(blob_name=blob_name) + stream = io.BytesIO() + download = self._make_one(media_url, stream=stream) + # Consume the resource. + response = download.consume(authorized_transport) + assert response.status_code == http.client.OK + assert response.headers.get(_helpers._STORED_CONTENT_ENCODING_HEADER) == "gzip" + assert response.headers.get("X-Goog-Stored-Content-Length") is not None + assert stream.getvalue() == actual_contents + check_tombstoned(download, authorized_transport) + def test_extra_headers(self, authorized_transport, secret_file): blob_name, data, headers = secret_file # Create the actual download object. diff --git a/tests/unit/requests/test_download.py b/tests/unit/requests/test_download.py index ace0df7c..8da7ef27 100644 --- a/tests/unit/requests/test_download.py +++ b/tests/unit/requests/test_download.py @@ -395,6 +395,45 @@ def test_consume_w_bytes_downloaded(self): range_bytes = "bytes={:d}-{:d}".format(offset, end) assert download._headers["range"] == range_bytes + def test_consume_gzip_reset_stream_w_bytes_downloaded(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.Download( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + + # Mock a decompressive transcoding retry operation with bytes already downloaded in the stream + headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"} + transport.request.return_value = _mock_response(chunks=chunks, headers=headers) + offset = 16 + download._bytes_downloaded = offset + download.consume(transport) + + assert stream.getvalue() == b"".join(chunks) + assert download._bytes_downloaded == len(b"".join(chunks)) + + def test_consume_gzip_reset_stream_error(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.Download( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + + # Mock a stream seek error while resuming a decompressive transcoding download + stream.seek = mock.Mock(side_effect=OSError("mock stream seek error")) + headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"} + transport.request.return_value = _mock_response(chunks=chunks, headers=headers) + offset = 16 + download._bytes_downloaded = offset + with pytest.raises(Exception): + download.consume(transport) + class TestRawDownload(object): def test__write_to_stream_no_hash_check(self): @@ -772,6 +811,49 @@ def test_consume_w_bytes_downloaded(self): range_bytes = "bytes={:d}-{:d}".format(offset, end) assert download._headers["range"] == range_bytes + def test_consume_gzip_reset_stream_w_bytes_downloaded(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.RawDownload( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + + # Mock a decompressive transcoding retry operation with bytes already downloaded in the stream + headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"} + transport.request.return_value = _mock_raw_response( + chunks=chunks, headers=headers + ) + offset = 16 + download._bytes_downloaded = offset + download.consume(transport) + + assert stream.getvalue() == b"".join(chunks) + assert download._bytes_downloaded == len(b"".join(chunks)) + + def test_consume_gzip_reset_stream_error(self): + stream = io.BytesIO() + chunks = (b"up down ", b"charlie ", b"brown") + end = 65536 + + download = download_mod.RawDownload( + EXAMPLE_URL, stream=stream, end=end, headers=None, checksum="md5" + ) + transport = mock.Mock(spec=["request"]) + + # Mock a stream seek error while resuming a decompressive transcoding download + stream.seek = mock.Mock(side_effect=OSError("mock stream seek error")) + headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"} + transport.request.return_value = _mock_raw_response( + chunks=chunks, headers=headers + ) + offset = 16 + download._bytes_downloaded = offset + with pytest.raises(Exception): + download.consume(transport) + class TestChunkedDownload(object): @staticmethod diff --git a/tests/unit/test__helpers.py b/tests/unit/test__helpers.py index 0bfbae42..b357f79e 100644 --- a/tests/unit/test__helpers.py +++ b/tests/unit/test__helpers.py @@ -425,6 +425,31 @@ def test_header_value(self): assert generation_header == self.GENERATION_VALUE +class Test__is_decompressive_transcoding(object): + def test_empty_value(self): + headers = {} + response = _mock_response(headers=headers) + assert _helpers._is_decompressive_transcoding(response, _get_headers) is False + + def test_gzip_in_headers(self): + headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "gzip"} + response = _mock_response(headers=headers) + assert _helpers._is_decompressive_transcoding(response, _get_headers) is True + + def test_gzip_not_in_headers(self): + headers = {_helpers._STORED_CONTENT_ENCODING_HEADER: "identity"} + response = _mock_response(headers=headers) + assert _helpers._is_decompressive_transcoding(response, _get_headers) is False + + def test_gzip_w_content_encoding_in_headers(self): + headers = { + _helpers._STORED_CONTENT_ENCODING_HEADER: "gzip", + _helpers.CONTENT_ENCODING_HEADER: "gzip", + } + response = _mock_response(headers=headers) + assert _helpers._is_decompressive_transcoding(response, _get_headers) is False + + class Test__get_generation_from_url(object): GENERATION_VALUE = 1641590104888641