From c19571de34c47de3a766541b041637ba5f716ed7 Mon Sep 17 00:00:00 2001 From: Illia Volochii Date: Fri, 5 Dec 2025 16:40:41 +0200 Subject: [PATCH] Merge commit from fork * Prevent decompression bomb for zstd in Python 3.14 * Add experimental `decompress_iter` for Brotli * Update changes for Brotli * Add `GzipDecoder.decompress_iter` * Test https://github.com/python-hyper/brotlicffi/pull/207 * Pin Brotli * Add `decompress_iter` to all decoders and make tests pass * Pin brotlicffi to an official release * Revert changes to response.py * Add `max_length` parameter to all `decompress` methods * Fix the `test_brotlipy` session * Unset `_data` on gzip error * Add a test for memory usage * Test more methods * Fix the test for `stream` * Cover more lines with tests * Add more coverage * Make `read1` a bit more efficient * Fix PyPy tests for Brotli * Revert an unnecessarily moved check * Add some comments * Leave just one `self._obj.decompress` call in `GzipDecoder` * Refactor test params * Test reads with all data already in the decompressor * Prevent needless copying of data decoded with `max_length` * Rename the changed test * Note that responses of unknown length should be streamed too * Add a changelog entry * Avoid returning a memory view from `BytesQueueBuffer` * Add one more note to the changelog entry --- CHANGES.rst | 22 ++++ docs/advanced-usage.rst | 3 +- docs/user-guide.rst | 4 +- noxfile.py | 16 ++- pyproject.toml | 5 +- src/urllib3/response.py | 279 ++++++++++++++++++++++++++++++++++------ test/test_response.py | 269 +++++++++++++++++++++++++++++++++++++- uv.lock | 177 +++++++++++-------------- 8 files changed, 621 insertions(+), 154 deletions(-) Index: urllib3-1.26.20/docs/advanced-usage.rst =================================================================== --- urllib3-1.26.20.orig/docs/advanced-usage.rst +++ urllib3-1.26.20/docs/advanced-usage.rst @@ -57,7 +57,8 @@ When using ``preload_content=True`` (the response body will be read immediately into memory and the HTTP connection will be released back into the pool without manual intervention. -However, when dealing with large responses it's often better to stream the response +However, when dealing with responses of large or unknown length, +it's often better to stream the response content using ``preload_content=False``. Setting ``preload_content`` to ``False`` means that urllib3 will only read from the socket when data is requested. Index: urllib3-1.26.20/docs/user-guide.rst =================================================================== --- urllib3-1.26.20.orig/docs/user-guide.rst +++ urllib3-1.26.20/docs/user-guide.rst @@ -99,8 +99,8 @@ to a byte string representing the respon >>> r.data b'\xaa\xa5H?\x95\xe9\x9b\x11' -.. note:: For larger responses, it's sometimes better to :ref:`stream ` - the response. +.. note:: For responses of large or unknown length, it's sometimes better to + :ref:`stream ` the response. Using io Wrappers with Response Content ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Index: urllib3-1.26.20/src/urllib3/response.py =================================================================== --- urllib3-1.26.20.orig/src/urllib3/response.py +++ urllib3-1.26.20/src/urllib3/response.py @@ -1,5 +1,6 @@ from __future__ import absolute_import +import collections import io import logging import sys @@ -23,6 +24,7 @@ from .connection import BaseSSLError, HT from .exceptions import ( BodyNotHttplibCompatible, DecodeError, + DependencyWarning, HTTPError, IncompleteRead, InvalidChunkLength, @@ -41,33 +43,60 @@ log = logging.getLogger(__name__) class DeflateDecoder(object): def __init__(self): self._first_try = True - self._data = b"" + self._first_try_data = b"" + self._unfed_data = b"" self._obj = zlib.decompressobj() def __getattr__(self, name): return getattr(self._obj, name) - def decompress(self, data): - if not data: + def decompress(self, data, max_length = -1): + data = self._unfed_data + data + self._unfed_data = b"" + if not data and not self._obj.unconsumed_tail: return data + original_max_length = max_length + if original_max_length < 0: + max_length = 0 + elif original_max_length == 0: + # We should not pass 0 to the zlib decompressor because 0 is + # the default value that will make zlib decompress without a + # length limit. + # Data should be stored for subsequent calls. + self._unfed_data = data + return b"" + # Subsequent calls always reuse `self._obj`. zlib requires + # passing the unconsumed tail if decompression is to continue if not self._first_try: - return self._obj.decompress(data) + return self._obj.decompress( + self._obj.unconsumed_tail + data, max_length=max_length + ) - self._data += data + # First call tries with RFC 1950 ZLIB format + self._first_try_data += data try: - decompressed = self._obj.decompress(data) + decompressed = self._obj.decompress(data, max_length=max_length) if decompressed: self._first_try = False - self._data = None + self._first_try_data = b"" return decompressed + # On failure, it falls back to RFC 1951 DEFLATE format. except zlib.error: self._first_try = False self._obj = zlib.decompressobj(-zlib.MAX_WBITS) try: - return self.decompress(self._data) + return self.decompress( + self._first_try_data, max_length=original_max_length + ) finally: - self._data = None + self._first_try_data = b"" + + @property + def has_unconsumed_tail(self): + return bool(self._unfed_data) or ( + bool(self._obj.unconsumed_tail) and not self._first_try + ) class GzipDecoderState(object): @@ -81,30 +110,65 @@ class GzipDecoder(object): def __init__(self): self._obj = zlib.decompressobj(16 + zlib.MAX_WBITS) self._state = GzipDecoderState.FIRST_MEMBER + self._unconsumed_tail = b"" def __getattr__(self, name): return getattr(self._obj, name) - def decompress(self, data): + def decompress(self, data, max_length = -1): ret = bytearray() - if self._state == GzipDecoderState.SWALLOW_DATA or not data: + if self._state == GzipDecoderState.SWALLOW_DATA: return bytes(ret) + + if max_length == 0: + # We should not pass 0 to the zlib decompressor because 0 is + # the default value that will make zlib decompress without a + # length limit. + # Data should be stored for subsequent calls. + self._unconsumed_tail += data + return b"" + + # zlib requires passing the unconsumed_tail to the subsequent + # call if the decompression is to continue. + data = self._unconsumed_tail + data + if not data and self._obj.eof: + return bytes(ret) + while True: try: - ret += self._obj.decompress(data) + ret += self._obj.decompress( + data, max_length=max(max_length - len(ret), 0) + ) except zlib.error: previous_state = self._state # Ignore data after the first error self._state = GzipDecoderState.SWALLOW_DATA + self._unconsumed_tail = b"" if previous_state == GzipDecoderState.OTHER_MEMBERS: # Allow trailing garbage acceptable in other gzip clients return bytes(ret) raise + + self._unconsumed_tail = data = ( + self._obj.unconsumed_tail or self._obj.unused_data + ) + if max_length > 0 and len(ret) >= max_length: + break + data = self._obj.unused_data if not data: return bytes(ret) - self._state = GzipDecoderState.OTHER_MEMBERS - self._obj = zlib.decompressobj(16 + zlib.MAX_WBITS) + # When the end of a gzip member is reached, a new decompressor + # must be created for unused (possibly future) data. + if self._obj.eof: + self._state = GzipDecoderState.OTHER_MEMBERS + self._obj = zlib.decompressobj(16 + zlib.MAX_WBITS) + + return bytes(ret) + + @property + def has_unconsumed_tail(self): + return bool(self._unconsumed_tail) if brotli is not None: @@ -116,9 +180,35 @@ if brotli is not None: def __init__(self): self._obj = brotli.Decompressor() if hasattr(self._obj, "decompress"): - self.decompress = self._obj.decompress + setattr(self, "_decompress", self._obj.decompress) else: - self.decompress = self._obj.process + setattr(self, "_decompress", self._obj.process) + + # Requires Brotli >= 1.2.0 for `output_buffer_limit`. + def _decompress(self, data, output_buffer_limit = -1): + raise NotImplementedError() + + def decompress(self, data, max_length = -1): + try: + if max_length > 0: + return self._decompress(data, output_buffer_limit=max_length) + else: + return self._decompress(data) + except TypeError: + # Fallback for Brotli/brotlicffi/brotlipy versions without + # the `output_buffer_limit` parameter. + warnings.warn( + "Brotli >= 1.2.0 is required to prevent decomporession bombs.", + DependencyWarning, + ) + return self._decompress(data) + + @property + def has_unconsumed_tail(self): + try: + return not self._obj.can_accept_more_data() + except AttributeError: + return False def flush(self): if hasattr(self._obj, "flush"): @@ -141,10 +231,30 @@ class MultiDecoder(object): def flush(self): return self._decoders[0].flush() - def decompress(self, data): - for d in reversed(self._decoders): - data = d.decompress(data) - return data + def decompress(self, data, max_length = -1): + if max_length <= 0: + for d in reversed(self._decoders): + data = d.decompress(data) + return data + + ret = bytearray() + + while True: + any_data = False + for d in reversed(self._decoders): + data = d.decompress(data, max_length=max_length - len(ret)) + if data: + any_data = True + # We should not break when no data is returned because + # next decoders may produce data even with empty input. + ret += data + if not any_data or len(ret) >= max_length: + return bytes(ret) + data = b"" + + @property + def has_unconsumed_tail(self): + return any(d.has_unconsumed_tail for d in self._decoders) def _get_decoder(mode): @@ -160,6 +270,67 @@ def _get_decoder(mode): return DeflateDecoder() +class BytesQueueBuffer: + """Memory-efficient bytes buffer + + To return decoded data in read() and still follow the BufferedIOBase API, we need a + buffer to always return the correct amount of bytes. + + This buffer should be filled using calls to put() + + Our maximum memory usage is determined by the sum of the size of: + + * self.buffer, which contains the full data + * the largest chunk that we will copy in get() + """ + + def __init__(self): + self.buffer = collections.deque() + self._size: int = 0 + + def __len__(self): + return self._size + + def put(self, data): + self.buffer.append(data) + self._size += len(data) + + def get(self, n: int): + if n == 0: + return b"" + elif not self.buffer: + raise RuntimeError("buffer is empty") + elif n < 0: + raise ValueError("n should be > 0") + + if len(self.buffer[0]) == n and isinstance(self.buffer[0], bytes): + self._size -= n + return self.buffer.popleft() + + fetched = 0 + ret = io.BytesIO() + while fetched < n: + remaining = n - fetched + chunk = self.buffer.popleft() + chunk_length = len(chunk) + if remaining < chunk_length: + chunk = memoryview(chunk) + left_chunk, right_chunk = chunk[:remaining], chunk[remaining:] + ret.write(left_chunk) + self.buffer.appendleft(right_chunk) + self._size -= remaining + break + else: + ret.write(chunk) + self._size -= chunk_length + fetched += chunk_length + + if not self.buffer: + break + + return ret.getvalue() + + class HTTPResponse(io.IOBase): """ HTTP Response container. @@ -228,6 +399,7 @@ class HTTPResponse(io.IOBase): self.reason = reason self.strict = strict self.decode_content = decode_content + self._has_decoded_content = False self.retries = retries self.enforce_content_length = enforce_content_length self.auto_close = auto_close @@ -261,6 +433,9 @@ class HTTPResponse(io.IOBase): # Determine length of response self.length_remaining = self._init_length(request_method) + # Used to return the correct amount of bytes for partial read()s + self._decoded_buffer = BytesQueueBuffer() + # If requested, preload the body. if preload_content and not self._body: self._body = self.read(decode_content=decode_content) @@ -395,16 +570,19 @@ class HTTPResponse(io.IOBase): if brotli is not None: DECODER_ERROR_CLASSES += (brotli.error,) - def _decode(self, data, decode_content, flush_decoder): + def _decode(self, data, decode_content, flush_decoder, max_length = None): """ Decode the data passed in and potentially flush the decoder. """ if not decode_content: return data + if max_length is None or flush_decoder: + max_length = -1 + try: if self._decoder: - data = self._decoder.decompress(data) + data = self._decoder.decompress(data, max_length=max_length) except self.DECODER_ERROR_CLASSES as e: content_encoding = self.headers.get("content-encoding", "").lower() raise DecodeError( @@ -532,6 +710,47 @@ class HTTPResponse(io.IOBase): # StringIO doesn't like amt=None return self._fp.read(amt) if amt is not None else self._fp.read() + def _raw_read( + self, + amt=None, + ): + """ + Reads `amt` of bytes from the socket. + """ + if self._fp is None: + return + + fp_closed = getattr(self._fp, "closed", False) + + with self._error_catcher(): + data = self._fp_read(amt) if not fp_closed else b"" + if amt is not None and amt != 0 and not data: + # Platform-specific: Buggy versions of Python. + # Close the connection when no data is returned + # + # This is redundant to what httplib/http.client _should_ + # already do. However, versions of python released before + # December 15, 2012 (http://bugs.python.org/issue16298) do + # not properly close the connection in all cases. There is + # no harm in redundantly calling close. + self._fp.close() + if self.enforce_content_length and self.length_remaining not in ( + 0, + None, + ): + # This is an edge case that httplib failed to cover due + # to concerns of backward compatibility. We're + # addressing it here to make sure IncompleteRead is + # raised during streaming, so all calls with incorrect + # Content-Length are caught. + raise IncompleteRead(self._fp_bytes_read, self.length_remaining) + + if data: + self._fp_bytes_read += len(data) + if self.length_remaining is not None: + self.length_remaining -= len(data) + return data + def read(self, amt=None, decode_content=None, cache_content=False): """ Similar to :meth:`http.client.HTTPResponse.read`, but with two additional @@ -557,53 +776,73 @@ class HTTPResponse(io.IOBase): if decode_content is None: decode_content = self.decode_content - if self._fp is None: - return + if amt and amt < 0: + # Negative numbers and `None` should be treated the same. + amt = None + elif amt is not None: + cache_content = False + + if self._decoder and self._decoder.has_unconsumed_tail: + decoded_data = self._decode( + b"", + decode_content, + flush_decoder=False, + max_length=amt - len(self._decoded_buffer), + ) + self._decoded_buffer.put(decoded_data) + if len(self._decoded_buffer) >= amt: + return self._decoded_buffer.get(amt) - flush_decoder = False - fp_closed = getattr(self._fp, "closed", False) + data = self._raw_read(amt) - with self._error_catcher(): - data = self._fp_read(amt) if not fp_closed else b"" - if amt is None: - flush_decoder = True - else: - cache_content = False - if ( - amt != 0 and not data - ): # Platform-specific: Buggy versions of Python. - # Close the connection when no data is returned - # - # This is redundant to what httplib/http.client _should_ - # already do. However, versions of python released before - # December 15, 2012 (http://bugs.python.org/issue16298) do - # not properly close the connection in all cases. There is - # no harm in redundantly calling close. - self._fp.close() - flush_decoder = True - if self.enforce_content_length and self.length_remaining not in ( - 0, - None, - ): - # This is an edge case that httplib failed to cover due - # to concerns of backward compatibility. We're - # addressing it here to make sure IncompleteRead is - # raised during streaming, so all calls with incorrect - # Content-Length are caught. - raise IncompleteRead(self._fp_bytes_read, self.length_remaining) + flush_decoder = amt is None or (amt != 0 and not data) - if data: - self._fp_bytes_read += len(data) - if self.length_remaining is not None: - self.length_remaining -= len(data) + if ( + not data + and len(self._decoded_buffer) == 0 + and not (self._decoder and self._decoder.has_unconsumed_tail) + ): + return data + if amt is None: data = self._decode(data, decode_content, flush_decoder) - if cache_content: self._body = data + else: + # do not waste memory on buffer when not decoding + if not decode_content: + if self._has_decoded_content: + raise RuntimeError( + "Calling read(decode_contennt=False) is not supported after " + "read(decode_content=True) was called." + ) + return data + + decoded_data = self._decode( + data, + decode_content, + flush_decoder, + max_length=amt - len(self._decoded_buffer), + ) + self._decoded_buffer.put(decoded_data) + + while len(self._decoded_buffer) < amt and data: + # TODO make sure to initially read enough data to get past the headers + # For example, the GZ file header takes 10 bytes, we don't want to read + # it one byte at a time + data = self._raw_read(amt) + decoded_data = self._decode( + data, + decode_content, + flush_decoder, + max_length=amt - len(self._decoded_buffer), + ) + self._decoded_buffer.put(decoded_data) + data = self._decoded_buffer.get(amt) return data + def stream(self, amt=2 ** 16, decode_content=None): """ A generator wrapper for the read() method. A call will block until @@ -624,7 +863,11 @@ class HTTPResponse(io.IOBase): for line in self.read_chunked(amt, decode_content=decode_content): yield line else: - while not is_fp_closed(self._fp): + while( + not is_fp_closed(self._fp) + or len(self._decoded_buffer) > 0 + or (self._decoder and self._decoder.has_unconsumed_tail) + ): data = self.read(amt=amt, decode_content=decode_content) if data: @@ -830,7 +1073,10 @@ class HTTPResponse(io.IOBase): break chunk = self._handle_chunk(amt) decoded = self._decode( - chunk, decode_content=decode_content, flush_decoder=False + chunk, + decode_content=decode_content, + flush_decoder=False, + max_length=amt, ) if decoded: yield decoded Index: urllib3-1.26.20/setup.py =================================================================== --- urllib3-1.26.20.orig/setup.py +++ urllib3-1.26.20/setup.py @@ -88,10 +88,10 @@ setup( extras_require={ "brotli": [ # https://github.com/google/brotli/issues/1074 - "brotli==1.0.9; os_name != 'nt' and python_version < '3' and platform_python_implementation == 'CPython'", - "brotli>=1.0.9; python_version >= '3' and platform_python_implementation == 'CPython'", - "brotlicffi>=0.8.0; (os_name != 'nt' or python_version >= '3') and platform_python_implementation != 'CPython'", - "brotlipy>=0.6.0; os_name == 'nt' and python_version < '3'", + "brotli==1.2.0; os_name != 'nt' and python_version < '3' and platform_python_implementation == 'CPython'", + "brotli>=1.2.0; python_version >= '3' and platform_python_implementation == 'CPython'", + "brotlicffi>=1.2.0; (os_name != 'nt' or python_version >= '3') and platform_python_implementation != 'CPython'", + "brotlipy>=1.2.0; os_name == 'nt' and python_version < '3'", ], "secure": [ "pyOpenSSL>=0.14", Index: urllib3-1.26.20/test/test_response.py =================================================================== --- urllib3-1.26.20.orig/test/test_response.py +++ urllib3-1.26.20/test/test_response.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import contextlib +import gzip import re import socket import ssl @@ -28,7 +29,7 @@ from urllib3.exceptions import ( httplib_IncompleteRead, ) from urllib3.packages.six.moves import http_client as httplib -from urllib3.response import HTTPResponse, brotli +from urllib3.response import HTTPResponse, BytesQueueBuffer, brotli from urllib3.util.response import is_fp_closed from urllib3.util.retry import RequestHistory, Retry @@ -56,6 +57,30 @@ def sock(): yield s s.close() +def deflate2_compress(data): + compressor = zlib.compressobj(6, zlib.DEFLATED, -zlib.MAX_WBITS) + return compressor.compress(data) + compressor.flush() + + +if brotli: + try: + brotli.Decompressor().process(b"", output_buffer_limit=1024) + _brotli_gte_1_2_0_available = True + except (AttributeError, TypeError): + _brotli_gte_1_2_0_available = False +else: + _brotli_gte_1_2_0_available = False + + +class TestBytesQueueBuffer: + def test_memory_usage_single_chunk( + self + ): + buffer = BytesQueueBuffer() + chunk = bytes(10 * 2**20) # 10 MiB + buffer.put(chunk) + assert buffer.get(len(buffer)) is chunk + class TestLegacyResponse(object): def test_getheaders(self): @@ -146,12 +171,7 @@ class TestResponse(object): fp, headers={"content-encoding": "deflate"}, preload_content=False ) - assert r.read(3) == b"" - # Buffer in case we need to switch to the raw stream - assert r._decoder._data is not None assert r.read(1) == b"f" - # Now that we've decoded data, we just stream through the decoder - assert r._decoder._data is None assert r.read(2) == b"oo" assert r.read() == b"" assert r.read() == b"" @@ -166,10 +186,7 @@ class TestResponse(object): fp, headers={"content-encoding": "deflate"}, preload_content=False ) - assert r.read(1) == b"" assert r.read(1) == b"f" - # Once we've decoded data, we just stream to the decoder; no buffering - assert r._decoder._data is None assert r.read(2) == b"oo" assert r.read() == b"" assert r.read() == b"" @@ -184,7 +201,6 @@ class TestResponse(object): fp, headers={"content-encoding": "gzip"}, preload_content=False ) - assert r.read(11) == b"" assert r.read(1) == b"f" assert r.read(2) == b"oo" assert r.read() == b"" @@ -266,6 +282,157 @@ class TestResponse(object): with pytest.raises(DecodeError): HTTPResponse(fp, headers={"content-encoding": "br"}) + _test_compressor_params = [ + ("deflate1", ("deflate", zlib.compress)), + ("deflate2", ("deflate", deflate2_compress)), + ("gzip", ("gzip", gzip.compress)), + ] + if _brotli_gte_1_2_0_available: + _test_compressor_params.append(("brotli", ("br", brotli.compress))) + else: + _test_compressor_params.append(("brotli", None)) + + @pytest.mark.parametrize( + "data", + [d[1] for d in _test_compressor_params], + ids=[d[0] for d in _test_compressor_params], + ) + def test_read_with_all_data_already_in_decompressor( + self, + request, + data, + ): + if data is None: + pytest.skip(f"Proper {request.node.callspec.id} decoder is not available") + original_data = b"bar" * 1000 + name, compress_func = data + compressed_data = compress_func(original_data) + fp = mock.Mock(read=mock.Mock(return_value=b"")) + r = HTTPResponse(fp, headers={"content-encoding": name}, preload_content=False) + # Put all data in the decompressor's buffer. + r._init_decoder() + assert r._decoder is not None # for mypy + decoded = r._decoder.decompress(compressed_data, max_length=0) + if name == "br": + # It's known that some Brotli libraries do not respect + # `max_length`. + r._decoded_buffer.put(decoded) + else: + assert decoded == b"" + # Read the data via `HTTPResponse`. + read = getattr(r, "read") + assert read(0) == b"" + assert read(2500) == original_data[:2500] + assert read(500) == original_data[2500:] + assert read(0) == b"" + assert read() == b"" + + @pytest.mark.parametrize( + "delta", + ( + 0, # First read from socket returns all compressed data. + -1, # First read from socket returns all but one byte of compressed data. + ), + ) + @pytest.mark.parametrize( + "data", + [d[1] for d in _test_compressor_params], + ids=[d[0] for d in _test_compressor_params], + ) + def test_decode_with_max_length_close_to_compressed_data_size( + self, + request, + delta, + data, + ): + """ + Test decoding when the first read from the socket returns all or + almost all the compressed data, but then it has to be + decompressed in a couple of read calls. + """ + if data is None: + pytest.skip(f"Proper {request.node.callspec.id} decoder is not available") + + original_data = b"foo" * 1000 + name, compress_func = data + compressed_data = compress_func(original_data) + fp = BytesIO(compressed_data) + r = HTTPResponse(fp, headers={"content-encoding": name}, preload_content=False) + initial_limit = len(compressed_data) + delta + read = getattr(r, "read") + initial_chunk = read(amt=initial_limit, decode_content=True) + assert len(initial_chunk) == initial_limit + assert ( + len(read(amt=len(original_data), decode_content=True)) + == len(original_data) - initial_limit + ) + + # Prepare 50 MB of compressed data outside of the test measuring + # memory usage. + _test_memory_usage_decode_with_max_length_params = [ + ( + params[0], + (params[1][0], params[1][1](b"A" * (50 * 2**20))) if params[1] else None, + ) + for params in _test_compressor_params + ] + + @pytest.mark.parametrize( + "data", + [d[1] for d in _test_memory_usage_decode_with_max_length_params], + ids=[d[0] for d in _test_memory_usage_decode_with_max_length_params], + ) + @pytest.mark.parametrize("read_method", ("read", "read_chunked", "stream")) + # Decoders consume different amounts of memory during decompression. + # We set the 10 MB limit to ensure that the whole decompressed data + # is not stored unnecessarily. + # + # FYI, the following consumption was observed for the test with + # `read` on CPython 3.14.0: + # - deflate: 2.3 MiB + # - deflate2: 2.1 MiB + # - gzip: 2.1 MiB + # - brotli: + # - brotli v1.2.0: 9 MiB + # - brotlicffi v1.2.0.0: 6 MiB + # - brotlipy v0.7.0: 105.8 MiB + @pytest.mark.limit_memory("10 MB", current_thread_only=True) + def test_memory_usage_decode_with_max_length( + self, + request, + read_method, + data, + ): + if data is None: + pytest.skip(f"Proper {request.node.callspec.id} decoder is not available") + + name, compressed_data = data + limit = 1024 * 1024 # 1 MiB + if read_method in ("read_chunked", "stream"): + httplib_r = httplib.HTTPResponse(MockSock) # type: ignore[arg-type] + httplib_r.fp = MockChunkedEncodingResponse([compressed_data]) # type: ignore[assignment] + r = HTTPResponse( + httplib_r, + preload_content=False, + headers={"transfer-encoding": "chunked", "content-encoding": name}, + ) + next(getattr(r, read_method)(amt=limit, decode_content=True)) + else: + fp = BytesIO(compressed_data) + r = HTTPResponse( + fp, headers={"content-encoding": name}, preload_content=False + ) + getattr(r, read_method)(amt=limit, decode_content=True) + + # Check that the internal decoded buffer is empty unless brotli + # is used. + # Google's brotli library does not fully respect the output + # buffer limit: https://github.com/google/brotli/issues/1396 + # And unmaintained brotlipy cannot limit the output buffer size. + if name != "br" or brotli.__name__ == "brotlicffi": + assert len(r._decoded_buffer) == 0 + + def test_multi_decoding_deflate_deflate(self): data = zlib.compress(zlib.compress(b"foo")) @@ -494,8 +661,8 @@ class TestResponse(object): ) stream = resp.stream(2) - assert next(stream) == b"f" - assert next(stream) == b"oo" + assert next(stream) == b"fo" + assert next(stream) == b"o" with pytest.raises(StopIteration): next(stream) @@ -524,6 +691,7 @@ class TestResponse(object): # Ensure that ``tell()`` returns the correct number of bytes when # part-way through streaming compressed content. NUMBER_OF_READS = 10 + PART_SIZE = 64 class MockCompressedDataReading(BytesIO): """ @@ -552,7 +720,7 @@ class TestResponse(object): resp = HTTPResponse( fp, headers={"content-encoding": "deflate"}, preload_content=False ) - stream = resp.stream() + stream = resp.stream(PART_SIZE) parts_positions = [(part, resp.tell()) for part in stream] end_of_stream = resp.tell() @@ -567,12 +735,28 @@ class TestResponse(object): assert uncompressed_data == payload # Check that the positions in the stream are correct - expected = [(i + 1) * payload_part_size for i in range(NUMBER_OF_READS)] - assert expected == list(positions) + # It is difficult to determine programatically what the positions + # returned by `tell` will be because the `HTTPResponse.read` method may + # call socket `read` a couple of times if it doesn't have enough data + # in the buffer or not call socket `read` at all if it has enough. All + # this depends on the message, how it was compressed, what is + # `PART_SIZE` and `payload_part_size`. + # So for simplicity the expected values are hardcoded. + expected = (92, 184, 230, 276, 322, 368, 414, 460) + assert expected == positions # Check that the end of the stream is in the correct place assert len(ZLIB_PAYLOAD) == end_of_stream + # Check that all parts have expected length + expected_last_part_size = len(uncompressed_data) % PART_SIZE + whole_parts = len(uncompressed_data) // PART_SIZE + if expected_last_part_size == 0: + expected_lengths = [PART_SIZE] * whole_parts + else: + expected_lengths = [PART_SIZE] * whole_parts + [expected_last_part_size] + assert expected_lengths == [len(part) for part in parts] + def test_deflate_streaming(self): data = zlib.compress(b"foo") @@ -582,8 +766,8 @@ class TestResponse(object): ) stream = resp.stream(2) - assert next(stream) == b"f" - assert next(stream) == b"oo" + assert next(stream) == b"fo" + assert next(stream) == b"o" with pytest.raises(StopIteration): next(stream) @@ -598,8 +782,8 @@ class TestResponse(object): ) stream = resp.stream(2) - assert next(stream) == b"f" - assert next(stream) == b"oo" + assert next(stream) == b"fo" + assert next(stream) == b"o" with pytest.raises(StopIteration): next(stream) Index: urllib3-1.26.20/test/with_dummyserver/test_socketlevel.py =================================================================== --- urllib3-1.26.20.orig/test/with_dummyserver/test_socketlevel.py +++ urllib3-1.26.20/test/with_dummyserver/test_socketlevel.py @@ -1901,15 +1901,8 @@ class TestBadContentLength(SocketDummySe "GET", url="/", preload_content=False, enforce_content_length=True ) data = get_response.stream(100) - # Read "good" data before we try to read again. - # This won't trigger till generator is exhausted. - next(data) - try: + with pytest.raises(ProtocolError, match="12 bytes read, 10 more expected"): next(data) - assert False - except ProtocolError as e: - assert "12 bytes read, 10 more expected" in str(e) - done_event.set() def test_enforce_content_length_no_body(self):