Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,10 @@ jobs:

name: Cython coverage
needs: gen_llhttp
runs-on: ubuntu-latest
strategy:
matrix:
os: [ubuntu, windows]
runs-on: ${{ matrix.os }}-latest
steps:
- name: Checkout
uses: actions/checkout@v6
Expand Down
8 changes: 8 additions & 0 deletions CHANGES/11966.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Large overhaul of parser/decompression code.

The zip bomb security fix in 3.13 stopped highly compressed payloads
from being decompressed, regardless of validity. Now aiohttp will
decompress such payloads in chunks of 256+ KiB, allowing safe decompression
of such payloads.

-- by :user:`Dreamsorcerer`.
1 change: 1 addition & 0 deletions aiohttp/_cparser.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ cdef extern from "llhttp.h":

int llhttp_should_keep_alive(const llhttp_t* parser)

void llhttp_resume(llhttp_t* parser)
void llhttp_resume_after_upgrade(llhttp_t* parser)

llhttp_errno_t llhttp_get_errno(const llhttp_t* parser)
Expand Down
120 changes: 89 additions & 31 deletions aiohttp/_http_parser.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ include "_headers.pxi"

from aiohttp cimport _find_header

ALLOWED_UPGRADES = frozenset({"websocket"})

cdef frozenset ALLOWED_UPGRADES = frozenset({"websocket"})
DEF DEFAULT_FREELIST_SIZE = 250

cdef extern from "Python.h":
Expand All @@ -69,7 +70,7 @@ cdef object CONTENT_ENCODING = hdrs.CONTENT_ENCODING
cdef object EMPTY_PAYLOAD = _EMPTY_PAYLOAD
cdef object StreamReader = _StreamReader
cdef object DeflateBuffer = _DeflateBuffer
cdef bytes EMPTY_BYTES = b""
cdef tuple EMPTY_FEED_DATA_RESULT = ((), False, b"")

# RFC 9110 singleton headers — duplicates are rejected in strict mode.
# In lax mode (response parser default), the check is skipped entirely
Expand Down Expand Up @@ -298,7 +299,7 @@ cdef class HttpParser:
bint _has_value
int _header_name_size

object _protocol
readonly object protocol
object _loop
object _timer

Expand All @@ -309,6 +310,7 @@ cdef class HttpParser:
bint _read_until_eof
bint _lax

bytes _tail
bint _started
object _url
bytearray _buf
Expand All @@ -319,6 +321,9 @@ cdef class HttpParser:
list _raw_headers
bint _upgraded
list _messages
bint _more_data_available
bint _paused
bint _eof_pending
object _payload
bint _payload_error
object _payload_exception
Expand Down Expand Up @@ -359,18 +364,22 @@ cdef class HttpParser:
self._cparser.data = <void*>self
self._cparser.content_length = 0

self._protocol = protocol
self.protocol = protocol
self._loop = loop
self._timer = timer

self._buf = bytearray()
self._more_data_available = False
self._paused = False
self._eof_pending = False
self._payload = None
self._payload_error = 0
self._payload_exception = payload_exception
self._messages = []

self._raw_name = EMPTY_BYTES
self._raw_value = EMPTY_BYTES
self._raw_name = b""
self._raw_value = b""
self._tail = b""
self._has_value = False
self._header_name_size = 0

Expand Down Expand Up @@ -401,7 +410,7 @@ cdef class HttpParser:

cdef _process_header(self):
cdef str value
if self._raw_name is not EMPTY_BYTES:
if self._raw_name != b"":
name = find_header(self._raw_name)
value = self._raw_value.decode('utf-8', 'surrogateescape')

Expand All @@ -426,20 +435,20 @@ cdef class HttpParser:
self._has_value = False
self._header_name_size = 0
self._raw_headers.append((self._raw_name, self._raw_value))
self._raw_name = EMPTY_BYTES
self._raw_value = EMPTY_BYTES
self._raw_name = b""
self._raw_value = b""

cdef _on_header_field(self, char* at, size_t length):
if self._has_value:
self._process_header()

if self._raw_name is EMPTY_BYTES:
if self._raw_name == b"":
self._raw_name = at[:length]
else:
self._raw_name += at[:length]

cdef _on_header_value(self, char* at, size_t length):
if self._raw_value is EMPTY_BYTES:
if self._raw_value == b"":
self._raw_value = at[:length]
else:
self._raw_value += at[:length]
Expand Down Expand Up @@ -495,14 +504,14 @@ cdef class HttpParser:
self._read_until_eof)
):
payload = StreamReader(
self._protocol, timer=self._timer, loop=self._loop,
self.protocol, timer=self._timer, loop=self._loop,
limit=self._limit)
else:
payload = EMPTY_PAYLOAD

self._payload = payload
if encoding is not None and self._auto_decompress:
self._payload = DeflateBuffer(payload, encoding)
self._payload = DeflateBuffer(payload, encoding, max_decompress_size=self._limit)

if not self._response_with_body:
payload = EMPTY_PAYLOAD
Expand Down Expand Up @@ -535,6 +544,10 @@ cdef class HttpParser:

### Public API ###

def pause_reading(self):
assert self._payload is not None
self._paused = True

def feed_eof(self):
cdef bytes desc

Expand All @@ -549,18 +562,52 @@ cdef class HttpParser:
desc = cparser.llhttp_get_error_reason(self._cparser)
raise PayloadEncodingError(desc.decode('latin-1'))
else:
self._eof_pending = True
while self._more_data_available:
if self._paused:
self._paused = False
return # Will resume via feed_data(b"") later
self._more_data_available = self._payload.feed_data(b"")
self._payload.feed_eof()
self._payload = None
self._more_data_available = False
self._eof_pending = False
elif self._started:
self._on_headers_complete()
if self._messages:
return self._messages[-1][0]

def feed_data(self, data):
def feed_data(self, incoming_data):
cdef:
size_t data_len
size_t nb
char* base
cdef cparser.llhttp_errno_t errno
cdef bytes data

# Proactor loop sends bytearray.
# Ensure cython sees `data` as bytes
if type(incoming_data) is not bytes:
data = bytes(incoming_data)
else:
data = incoming_data

if self._tail:
data, self._tail = self._tail + data, b""

if self._more_data_available:
result = cb_on_body(self._cparser, b"", 0)
if result is cparser.HPE_PAUSED:
self._tail = data
return EMPTY_FEED_DATA_RESULT

if self._eof_pending:
self._payload.feed_eof()
self._payload = None
self._eof_pending = False
# We can't have new messages here, otherwise we wouldn't have
# received EOF.
return EMPTY_FEED_DATA_RESULT

PyObject_GetBuffer(data, &self.py_buf, PyBUF_SIMPLE)
# Cache buffer pointer before PyBuffer_Release to avoid use-after-release.
Expand All @@ -574,12 +621,15 @@ cdef class HttpParser:

if errno is cparser.HPE_PAUSED_UPGRADE:
cparser.llhttp_resume_after_upgrade(self._cparser)

nb = cparser.llhttp_get_error_pos(self._cparser) - base
elif errno is cparser.HPE_PAUSED:
cparser.llhttp_resume(self._cparser)
pos = cparser.llhttp_get_error_pos(self._cparser) - base
self._tail = data[pos:]

PyBuffer_Release(&self.py_buf)

if errno not in (cparser.HPE_OK, cparser.HPE_PAUSED_UPGRADE):
if errno not in (cparser.HPE_OK, cparser.HPE_PAUSED, cparser.HPE_PAUSED_UPGRADE):
if self._payload_error == 0:
if self._last_error is not None:
ex = self._last_error
Expand All @@ -603,8 +653,9 @@ cdef class HttpParser:

if self._upgraded:
return messages, True, data[nb:]
else:
return messages, False, b""
if not messages: # Shortcut to reduce Python overhead
return EMPTY_FEED_DATA_RESULT
return messages, False, b""

def set_upgraded(self, val):
self._upgraded = val
Expand Down Expand Up @@ -799,19 +850,26 @@ cdef int cb_on_body(cparser.llhttp_t* parser,
const char *at, size_t length) except -1:
cdef HttpParser pyparser = <HttpParser>parser.data
cdef bytes body = at[:length]
try:
pyparser._payload.feed_data(body)
except BaseException as underlying_exc:
reraised_exc = underlying_exc
if pyparser._payload_exception is not None:
reraised_exc = pyparser._payload_exception(str(underlying_exc))

set_exception(pyparser._payload, reraised_exc, underlying_exc)

pyparser._payload_error = 1
return -1
else:
return 0
while body or pyparser._more_data_available:
try:
pyparser._more_data_available = pyparser._payload.feed_data(body)
except BaseException as underlying_exc:
reraised_exc = underlying_exc
if pyparser._payload_exception is not None:
reraised_exc = pyparser._payload_exception(str(underlying_exc))

set_exception(pyparser._payload, reraised_exc, underlying_exc)

pyparser._payload_error = 1
pyparser._paused = False
return -1
body = b""

if pyparser._paused:
pyparser._paused = False
return cparser.HPE_PAUSED
pyparser._paused = False
return 0


cdef int cb_on_message_complete(cparser.llhttp_t* parser) except -1:
Expand Down
2 changes: 1 addition & 1 deletion aiohttp/_websocket/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from .models import WS_DEFLATE_TRAILING, WSMsgType

DEFAULT_LIMIT: Final[int] = 2**16
DEFAULT_LIMIT: Final[int] = 2**18

# WebSocket opcode boundary: opcodes 0-7 are data frames, 8-15 are control frames
# Control frames (ping, pong, close) are never compressed
Expand Down
33 changes: 27 additions & 6 deletions aiohttp/base_protocol.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
import asyncio
from typing import cast
from typing import TYPE_CHECKING, Any, cast

from .client_exceptions import ClientConnectionResetError
from .helpers import set_exception
from .tcp_helpers import tcp_nodelay

if TYPE_CHECKING:
from .http_parser import HttpParser


class BaseProtocol(asyncio.Protocol):
__slots__ = (
"_loop",
"_paused",
"_parser",
"_drain_waiter",
"_connection_lost",
"_reading_paused",
"_upgraded",
"transport",
)

def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
def __init__(
self, loop: asyncio.AbstractEventLoop, parser: "HttpParser[Any] | None" = None
) -> None:
self._loop: asyncio.AbstractEventLoop = loop
self._paused = False
self._drain_waiter: asyncio.Future[None] | None = None
self._reading_paused = False
self._parser = parser
self._upgraded = False

self.transport: asyncio.Transport | None = None

Expand Down Expand Up @@ -48,15 +57,27 @@ def resume_writing(self) -> None:
waiter.set_result(None)

def pause_reading(self) -> None:
if not self._reading_paused and self.transport is not None:
self._reading_paused = True
# Parser shouldn't be paused on websockets.
if not self._upgraded:
assert self._parser is not None
self._parser.pause_reading()
if self.transport is not None:
try:
self.transport.pause_reading()
except (AttributeError, NotImplementedError, RuntimeError):
pass
self._reading_paused = True

def resume_reading(self) -> None:
if self._reading_paused and self.transport is not None:
def resume_reading(self, resume_parser: bool = True) -> None:
self._reading_paused = False

# This will resume parsing any unprocessed data from the last pause.
if not self._upgraded and resume_parser:
self.data_received(b"")

# Reading may have been paused again in the above call if there was a lot of
# compressed data still pending.
if not self._reading_paused and self.transport is not None:
try:
self.transport.resume_reading()
except (AttributeError, NotImplementedError, RuntimeError):
Expand Down
4 changes: 2 additions & 2 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ def __init__(
trust_env: bool = False,
requote_redirect_url: bool = True,
trace_configs: list[TraceConfig[object]] | None = None,
read_bufsize: int = 2**16,
read_bufsize: int = 2**18,
max_line_size: int = 8190,
max_field_size: int = 8190,
max_headers: int = 128,
Expand Down Expand Up @@ -1226,7 +1226,7 @@ async def _ws_connect(

transport = conn.transport
assert transport is not None
reader = WebSocketDataQueue(conn_proto, 2**16, loop=self._loop)
reader = WebSocketDataQueue(conn_proto, 2**18, loop=self._loop)
writer = WebSocketWriter(
conn_proto,
transport,
Expand Down
Loading
Loading