|
|
|
|
@@ -1,214 +0,0 @@
|
|
|
|
|
From 3908f144229eed4df24bd569d16e5991ace44970 Mon Sep 17 00:00:00 2001
|
|
|
|
|
From: Simon Pichugin <simon.pichugin@gmail.com>
|
|
|
|
|
Date: Fri, 16 Jan 2026 08:57:23 -0800
|
|
|
|
|
Subject: [PATCH] Merge commit from fork
|
|
|
|
|
|
|
|
|
|
Add limit of 20 continuation octets per OID arc to prevent a potential memory
|
|
|
|
|
exhaustion from excessive continuation bytes input.
|
|
|
|
|
---
|
|
|
|
|
pyasn1/codec/ber/decoder.py | 20 ++++-
|
|
|
|
|
tests/codec/ber/test_decoder.py | 130 ++++++++++++++++++++++++++++++++
|
|
|
|
|
2 files changed, 149 insertions(+), 1 deletion(-)
|
|
|
|
|
|
|
|
|
|
diff --git a/pyasn1/codec/ber/decoder.py b/pyasn1/codec/ber/decoder.py
|
|
|
|
|
index 7e69ca15..853e837b 100644
|
|
|
|
|
--- a/pyasn1/codec/ber/decoder.py
|
|
|
|
|
+++ b/pyasn1/codec/ber/decoder.py
|
|
|
|
|
@@ -33,6 +33,10 @@
|
|
|
|
|
|
|
|
|
|
SubstrateUnderrunError = error.SubstrateUnderrunError
|
|
|
|
|
|
|
|
|
|
+# Maximum number of continuation octets (high-bit set) allowed per OID arc.
|
|
|
|
|
+# 20 octets allows up to 140-bit integers, supporting UUID-based OIDs
|
|
|
|
|
+MAX_OID_ARC_CONTINUATION_OCTETS = 20
|
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
class AbstractPayloadDecoder(object):
|
|
|
|
|
protoComponent = None
|
|
|
|
|
@@ -427,7 +431,14 @@ def valueDecoder(self, substrate, asn1Spec,
|
|
|
|
|
# Construct subid from a number of octets
|
|
|
|
|
nextSubId = subId
|
|
|
|
|
subId = 0
|
|
|
|
|
+ continuationOctetCount = 0
|
|
|
|
|
while nextSubId >= 128:
|
|
|
|
|
+ continuationOctetCount += 1
|
|
|
|
|
+ if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS:
|
|
|
|
|
+ raise error.PyAsn1Error(
|
|
|
|
|
+ 'OID arc exceeds maximum continuation octets limit (%d) '
|
|
|
|
|
+ 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index)
|
|
|
|
|
+ )
|
|
|
|
|
subId = (subId << 7) + (nextSubId & 0x7F)
|
|
|
|
|
if index >= substrateLen:
|
|
|
|
|
raise error.SubstrateUnderrunError(
|
|
|
|
|
@@ -485,7 +496,14 @@ def valueDecoder(self, substrate, asn1Spec,
|
|
|
|
|
# Construct subid from a number of octets
|
|
|
|
|
nextSubId = subId
|
|
|
|
|
subId = 0
|
|
|
|
|
+ continuationOctetCount = 0
|
|
|
|
|
while nextSubId >= 128:
|
|
|
|
|
+ continuationOctetCount += 1
|
|
|
|
|
+ if continuationOctetCount > MAX_OID_ARC_CONTINUATION_OCTETS:
|
|
|
|
|
+ raise error.PyAsn1Error(
|
|
|
|
|
+ 'RELATIVE-OID arc exceeds maximum continuation octets limit (%d) '
|
|
|
|
|
+ 'at position %d' % (MAX_OID_ARC_CONTINUATION_OCTETS, index)
|
|
|
|
|
+ )
|
|
|
|
|
subId = (subId << 7) + (nextSubId & 0x7F)
|
|
|
|
|
if index >= substrateLen:
|
|
|
|
|
raise error.SubstrateUnderrunError(
|
|
|
|
|
@@ -1915,7 +1933,7 @@ class StreamingDecoder(object):
|
|
|
|
|
:py:class:`~pyasn1.error.SubstrateUnderrunError` object indicating
|
|
|
|
|
insufficient BER/CER/DER serialization on input to fully recover ASN.1
|
|
|
|
|
objects from it.
|
|
|
|
|
-
|
|
|
|
|
+
|
|
|
|
|
In the latter case the caller is advised to ensure some more data in
|
|
|
|
|
the input stream, then call the iterator again. The decoder will resume
|
|
|
|
|
the decoding process using the newly arrived data.
|
|
|
|
|
diff --git a/tests/codec/ber/test_decoder.py b/tests/codec/ber/test_decoder.py
|
|
|
|
|
index f69da110..741605f3 100644
|
|
|
|
|
--- a/tests/codec/ber/test_decoder.py
|
|
|
|
|
+++ b/tests/codec/ber/test_decoder.py
|
|
|
|
|
@@ -449,6 +449,72 @@ def testLarge2(self):
|
|
|
|
|
bytes((0x06, 0x13, 0x88, 0x37, 0x83, 0xC6, 0xDF, 0xD4, 0xCC, 0xB3, 0xFF, 0xFF, 0xFE, 0xF0, 0xB8, 0xD6, 0xB8, 0xCB, 0xE2, 0xB6, 0x47))
|
|
|
|
|
) == ((2, 999, 18446744073709551535184467440737095), b'')
|
|
|
|
|
|
|
|
|
|
+ def testExcessiveContinuationOctets(self):
|
|
|
|
|
+ """Test that OID arcs with excessive continuation octets are rejected."""
|
|
|
|
|
+ # Create a payload with 25 continuation octets (exceeds 20 limit)
|
|
|
|
|
+ # 0x81 bytes are continuation octets, 0x01 terminates
|
|
|
|
|
+ malicious_payload = bytes([0x06, 26]) + bytes([0x81] * 25) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(malicious_payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ assert 0, 'Excessive continuation octets tolerated'
|
|
|
|
|
+
|
|
|
|
|
+ def testMaxAllowedContinuationOctets(self):
|
|
|
|
|
+ """Test that OID arcs at the maximum continuation octets limit work."""
|
|
|
|
|
+ # Create a payload with exactly 20 continuation octets (at limit)
|
|
|
|
|
+ # This should succeed
|
|
|
|
|
+ payload = bytes([0x06, 21]) + bytes([0x81] * 20) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ assert 0, 'Valid OID with 20 continuation octets rejected'
|
|
|
|
|
+
|
|
|
|
|
+ def testOneOverContinuationLimit(self):
|
|
|
|
|
+ """Test boundary: 21 continuation octets (one over limit) is rejected."""
|
|
|
|
|
+ payload = bytes([0x06, 22]) + bytes([0x81] * 21) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ assert 0, '21 continuation octets tolerated (should be rejected)'
|
|
|
|
|
+
|
|
|
|
|
+ def testExcessiveContinuationInSecondArc(self):
|
|
|
|
|
+ """Test that limit applies to subsequent arcs, not just the first."""
|
|
|
|
|
+ # First arc: valid simple byte (0x55 = 85, decodes to arc 2.5)
|
|
|
|
|
+ # Second arc: excessive continuation octets
|
|
|
|
|
+ payload = bytes([0x06, 27]) + bytes([0x55]) + bytes([0x81] * 25) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ assert 0, 'Excessive continuation in second arc tolerated'
|
|
|
|
|
+
|
|
|
|
|
+ def testMultipleArcsAtLimit(self):
|
|
|
|
|
+ """Test multiple arcs each at the continuation limit work correctly."""
|
|
|
|
|
+ # Two arcs, each with 20 continuation octets (both at limit)
|
|
|
|
|
+ arc1 = bytes([0x81] * 20) + bytes([0x01]) # 21 bytes
|
|
|
|
|
+ arc2 = bytes([0x81] * 20) + bytes([0x01]) # 21 bytes
|
|
|
|
|
+ payload = bytes([0x06, 42]) + arc1 + arc2
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ assert 0, 'Multiple valid arcs at limit rejected'
|
|
|
|
|
+
|
|
|
|
|
+ def testExcessiveContinuationWithMaxBytes(self):
|
|
|
|
|
+ """Test with 0xFF continuation bytes (maximum value, not just 0x81)."""
|
|
|
|
|
+ # 0xFF bytes are also continuation octets (high bit set)
|
|
|
|
|
+ malicious_payload = bytes([0x06, 26]) + bytes([0xFF] * 25) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(malicious_payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ assert 0, 'Excessive 0xFF continuation octets tolerated'
|
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
class RelativeOIDDecoderTestCase(BaseTestCase):
|
|
|
|
|
def testOne(self):
|
|
|
|
|
@@ -518,6 +584,70 @@ def testLarge(self):
|
|
|
|
|
bytes((0x0D, 0x13, 0x88, 0x37, 0x83, 0xC6, 0xDF, 0xD4, 0xCC, 0xB3, 0xFF, 0xFF, 0xFE, 0xF0, 0xB8, 0xD6, 0xB8, 0xCB, 0xE2, 0xB6, 0x47))
|
|
|
|
|
) == ((1079, 18446744073709551535184467440737095), b'')
|
|
|
|
|
|
|
|
|
|
+ def testExcessiveContinuationOctets(self):
|
|
|
|
|
+ """Test that RELATIVE-OID arcs with excessive continuation octets are rejected."""
|
|
|
|
|
+ # Create a payload with 25 continuation octets (exceeds 20 limit)
|
|
|
|
|
+ malicious_payload = bytes([0x0D, 26]) + bytes([0x81] * 25) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(malicious_payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ assert 0, 'Excessive continuation octets tolerated'
|
|
|
|
|
+
|
|
|
|
|
+ def testMaxAllowedContinuationOctets(self):
|
|
|
|
|
+ """Test that RELATIVE-OID arcs at the maximum continuation octets limit work."""
|
|
|
|
|
+ # Create a payload with exactly 20 continuation octets (at limit)
|
|
|
|
|
+ payload = bytes([0x0D, 21]) + bytes([0x81] * 20) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ assert 0, 'Valid RELATIVE-OID with 20 continuation octets rejected'
|
|
|
|
|
+
|
|
|
|
|
+ def testOneOverContinuationLimit(self):
|
|
|
|
|
+ """Test boundary: 21 continuation octets (one over limit) is rejected."""
|
|
|
|
|
+ payload = bytes([0x0D, 22]) + bytes([0x81] * 21) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ assert 0, '21 continuation octets tolerated (should be rejected)'
|
|
|
|
|
+
|
|
|
|
|
+ def testExcessiveContinuationInSecondArc(self):
|
|
|
|
|
+ """Test that limit applies to subsequent arcs, not just the first."""
|
|
|
|
|
+ # First arc: valid simple byte
|
|
|
|
|
+ # Second arc: excessive continuation octets
|
|
|
|
|
+ payload = bytes([0x0D, 27]) + bytes([0x55]) + bytes([0x81] * 25) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ assert 0, 'Excessive continuation in second arc tolerated'
|
|
|
|
|
+
|
|
|
|
|
+ def testMultipleArcsAtLimit(self):
|
|
|
|
|
+ """Test multiple arcs each at the continuation limit work correctly."""
|
|
|
|
|
+ # Two arcs, each with 20 continuation octets (both at limit)
|
|
|
|
|
+ arc1 = bytes([0x81] * 20) + bytes([0x01]) # 21 bytes
|
|
|
|
|
+ arc2 = bytes([0x81] * 20) + bytes([0x01]) # 21 bytes
|
|
|
|
|
+ payload = bytes([0x0D, 42]) + arc1 + arc2
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ assert 0, 'Multiple valid arcs at limit rejected'
|
|
|
|
|
+
|
|
|
|
|
+ def testExcessiveContinuationWithMaxBytes(self):
|
|
|
|
|
+ """Test with 0xFF continuation bytes (maximum value, not just 0x81)."""
|
|
|
|
|
+ # 0xFF bytes are also continuation octets (high bit set)
|
|
|
|
|
+ malicious_payload = bytes([0x0D, 26]) + bytes([0xFF] * 25) + bytes([0x01])
|
|
|
|
|
+ try:
|
|
|
|
|
+ decoder.decode(malicious_payload)
|
|
|
|
|
+ except error.PyAsn1Error:
|
|
|
|
|
+ pass
|
|
|
|
|
+ else:
|
|
|
|
|
+ assert 0, 'Excessive 0xFF continuation octets tolerated'
|
|
|
|
|
+
|
|
|
|
|
|
|
|
|
|
class RealDecoderTestCase(BaseTestCase):
|
|
|
|
|
def testChar(self):
|