From 696a9611ab982c45ee2190ed79ca8e1d8e09398f Mon Sep 17 00:00:00 2001 From: Hsiaoming Yang Date: Wed, 25 Feb 2026 08:57:51 +0800 Subject: [PATCH] fix(jwe): set max value for p2c --- src/joserfc/_rfc7518/jwe_algs.py | 19 ++++++++++++-- tests/jwe/test_compact.py | 45 ++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) Index: joserfc-1.1.0/src/joserfc/rfc7518/jwe_algs.py =================================================================== --- joserfc-1.1.0.orig/src/joserfc/rfc7518/jwe_algs.py +++ joserfc-1.1.0/src/joserfc/rfc7518/jwe_algs.py @@ -1,5 +1,7 @@ from __future__ import annotations import secrets +import warnings + from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes from cryptography.hazmat.backends import default_backend @@ -31,6 +33,7 @@ from ..registry import HeaderParameter from ..errors import ( InvalidKeyLengthError, DecodeError, + SecurityWarning, ) @@ -225,16 +228,28 @@ class ECDHESAlgModel(JWEKeyAgreement): return derive_key_for_concat_kdf(shared_key, headers, enc.cek_size, self.key_size) +def validate_p2c(value: int) -> None: + if not isinstance(value, int): + raise ValueError("must be an int") + + # A minimum iteration count of 1000 is RECOMMENDED. + if value < 1000: + warnings.warn("A minimum iteration count of 1000 is RECOMMENDED", SecurityWarning) + + max_value = 300000 + if value > max_value: + raise ValueError(f"must be less than {max_value}") + + class PBES2HSAlgModel(JWEKeyEncryption): # https://www.rfc-editor.org/rfc/rfc7518#section-4.8 key_size: int more_header_registry = { "p2s": HeaderParameter("PBES2 Salt Input", "str", True), - "p2c": HeaderParameter("PBES2 Count", "int", True), + "p2c": HeaderParameter("PBES2 Count", validate_p2c, True), } key_types = ["oct"] - # A minimum iteration count of 1000 is RECOMMENDED. DEFAULT_P2C = 2048 def __init__(self, hash_size: int, key_wrapping: JWEKeyWrapping): Index: joserfc-1.1.0/tests/jwe/test_compact.py =================================================================== --- joserfc-1.1.0.orig/tests/jwe/test_compact.py +++ joserfc-1.1.0/tests/jwe/test_compact.py @@ -3,6 +3,7 @@ from joserfc.jwe import JWERegistry, enc from joserfc.jwk import RSAKey, ECKey, OctKey, OKPKey, KeySet from joserfc.rfc7518.jwe_encs import JWE_ENC_MODELS from joserfc.errors import ( + SecurityWarning, InvalidKeyLengthError, MissingAlgorithmError, MissingEncryptionError, @@ -174,6 +175,50 @@ class TestJWECompact(TestCase): key, registry=registry, ) + + def test_PBES2HS_with_small_p2c(self): + key = OctKey.generate_key(128) + protected = { + "alg": "PBES2-HS256+A128KW", + "enc": "A128CBC-HS256", + "p2s": "QoGrcBpns_cLWCQPEVuA-g", + "p2c": 500, + } + registry = JWERegistry(algorithms=["PBES2-HS256+A128KW", "A128CBC-HS256"]) + self.assertWarns( + SecurityWarning, + encrypt_compact, + protected, + b"i", + key, + registry=registry, + ) + + def test_PBES2HS_with_large_p2c(self): + key = OctKey.import_key({"k": "pyL42ncDFSYnenl-GiZjRw", "kty": "oct"}) + protected = { + "alg": "PBES2-HS256+A128KW", + "enc": "A128CBC-HS256", + "p2s": "QoGrcBpns_cLWCQPEVuA-g", + "p2c": 500000, + } + registry = JWERegistry(algorithms=["PBES2-HS256+A128KW", "A128CBC-HS256"]) + self.assertRaises( + InvalidHeaderValueError, + encrypt_compact, + protected, + b"i", + key, + registry=registry, + ) + encrypted = "eyJhbGciOiJQQkVTMi1IUzI1NitBMTI4S1ciLCJlbmMiOiJBMTI4Q0JDLUhTMjU2IiwicDJzIjoiUW9HcmNCcG5zX2NMV0NRUEVWdUEtZyIsInAyYyI6NTAwMDAwfQ.qdtshVQlPM-fW57DRVUnmwMyvBVzUZCm58zn7j5W7IP9S2-cBVTh_w.mMUagTUTRi7fLQ3VUi6g4w.Hi0-8_MusxEwRtW6dkjXzw.Ktm1FmBA9rPe0Vv8w0kZ2g" + self.assertRaises( + InvalidHeaderValueError, + decrypt_compact, + encrypted, + key, + registry=registry, + ) def test_with_zip_header(self): private_key: RSAKey = load_key("rsa-openssl-private.pem") Index: joserfc-1.1.0/src/joserfc/errors.py =================================================================== --- joserfc-1.1.0.orig/src/joserfc/errors.py +++ joserfc-1.1.0/src/joserfc/errors.py @@ -1,6 +1,12 @@ from __future__ import annotations +class SecurityWarning(UserWarning): + """Base class for warnings of security issues.""" + + pass + + class JoseError(Exception): """Base Exception for all errors in joserfc."""