File: //snap/google-cloud-cli/396/platform/gsutil/third_party/urllib3/test/with_dummyserver/test_https.py
from __future__ import annotations
import contextlib
import datetime
import os.path
import shutil
import ssl
import tempfile
import warnings
from pathlib import Path
from test import (
LONG_TIMEOUT,
SHORT_TIMEOUT,
TARPIT_HOST,
requires_network,
resolvesLocalhostFQDN,
)
from test.conftest import ServerConfig
from unittest import mock
import pytest
import trustme
import urllib3.util as util
import urllib3.util.ssl_
from dummyserver.socketserver import (
DEFAULT_CA,
DEFAULT_CA_KEY,
DEFAULT_CERTS,
encrypt_key_pem,
)
from dummyserver.testcase import HTTPSHypercornDummyServerTestCase
from urllib3 import HTTPSConnectionPool
from urllib3.connection import RECENT_DATE, HTTPSConnection, VerifiedHTTPSConnection
from urllib3.exceptions import (
ConnectTimeoutError,
InsecureRequestWarning,
MaxRetryError,
ProtocolError,
SSLError,
SystemTimeWarning,
)
from urllib3.util.ssl_match_hostname import CertificateError
from urllib3.util.timeout import Timeout
from .. import has_alpn
TLSv1_CERTS = DEFAULT_CERTS.copy()
TLSv1_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1", None)
TLSv1_1_CERTS = DEFAULT_CERTS.copy()
TLSv1_1_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1_1", None)
TLSv1_2_CERTS = DEFAULT_CERTS.copy()
TLSv1_2_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLSv1_2", None)
TLSv1_3_CERTS = DEFAULT_CERTS.copy()
TLSv1_3_CERTS["ssl_version"] = getattr(ssl, "PROTOCOL_TLS", None)
CLIENT_INTERMEDIATE_PEM = "client_intermediate.pem"
CLIENT_NO_INTERMEDIATE_PEM = "client_no_intermediate.pem"
CLIENT_INTERMEDIATE_KEY = "client_intermediate.key"
PASSWORD_CLIENT_KEYFILE = "client_password.key"
CLIENT_CERT = CLIENT_INTERMEDIATE_PEM
class BaseTestHTTPS(HTTPSHypercornDummyServerTestCase):
tls_protocol_name: str | None = None
def tls_protocol_not_default(self) -> bool:
return self.tls_protocol_name in {"TLSv1", "TLSv1.1"}
def tls_version(self) -> ssl.TLSVersion:
if self.tls_protocol_name is None:
return pytest.skip("Skipping base test class")
try:
from ssl import TLSVersion
except ImportError:
return pytest.skip("ssl.TLSVersion isn't available")
return TLSVersion[self.tls_protocol_name.replace(".", "_")]
def ssl_version(self) -> int:
if self.tls_protocol_name is None:
return pytest.skip("Skipping base test class")
if self.tls_protocol_name == "TLSv1.3" and ssl.HAS_TLSv1_3:
return ssl.PROTOCOL_TLS_CLIENT
if self.tls_protocol_name == "TLSv1.2" and ssl.HAS_TLSv1_2:
return ssl.PROTOCOL_TLSv1_2
if self.tls_protocol_name == "TLSv1.1" and ssl.HAS_TLSv1_1:
return ssl.PROTOCOL_TLSv1_1
if self.tls_protocol_name == "TLSv1" and ssl.HAS_TLSv1:
return ssl.PROTOCOL_TLSv1
else:
return pytest.skip(f"{self.tls_protocol_name} isn't available")
@classmethod
def setup_class(cls) -> None:
super().setup_class()
cls.certs_dir = tempfile.mkdtemp()
# Start from existing root CA as we don't want to change the server certificate yet
with open(DEFAULT_CA, "rb") as crt, open(DEFAULT_CA_KEY, "rb") as key:
root_ca = trustme.CA.from_pem(crt.read(), key.read())
# Generate another CA to test verification failure
bad_ca = trustme.CA()
cls.bad_ca_path = os.path.join(cls.certs_dir, "ca_bad.pem")
bad_ca.cert_pem.write_to_path(cls.bad_ca_path)
# client cert chain
intermediate_ca = root_ca.create_child_ca()
cert = intermediate_ca.issue_cert("example.com")
encrypted_key = encrypt_key_pem(cert.private_key_pem, b"letmein")
cert.private_key_pem.write_to_path(
os.path.join(cls.certs_dir, CLIENT_INTERMEDIATE_KEY)
)
encrypted_key.write_to_path(
os.path.join(cls.certs_dir, PASSWORD_CLIENT_KEYFILE)
)
# Write the client cert and the intermediate CA
client_cert = os.path.join(cls.certs_dir, CLIENT_INTERMEDIATE_PEM)
cert.cert_chain_pems[0].write_to_path(client_cert)
cert.cert_chain_pems[1].write_to_path(client_cert, append=True)
# Write only the client cert
cert.cert_chain_pems[0].write_to_path(
os.path.join(cls.certs_dir, CLIENT_NO_INTERMEDIATE_PEM)
)
@classmethod
def teardown_class(cls) -> None:
super().teardown_class()
shutil.rmtree(cls.certs_dir)
def test_simple(self, http_version: str) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
r = https_pool.request("GET", "/")
assert r.status == 200, r.data
assert r.headers["server"] == f"hypercorn-{http_version}"
assert r.data == b"Dummy server!"
@resolvesLocalhostFQDN()
def test_dotted_fqdn(self) -> None:
with HTTPSConnectionPool(
self.host + ".",
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as pool:
r = pool.request("GET", "/")
assert r.status == 200, r.data
def test_client_intermediate(self) -> None:
"""Check that certificate chains work well with client certs
We generate an intermediate CA from the root CA, and issue a client certificate
from that intermediate CA. Since the server only knows about the root CA, we
need to send it the certificate *and* the intermediate CA, so that it can check
the whole chain.
"""
with HTTPSConnectionPool(
self.host,
self.port,
key_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_KEY),
cert_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_PEM),
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
r = https_pool.request("GET", "/certificate")
subject = r.json()
assert subject["organizationalUnitName"].startswith("Testing cert")
def test_client_no_intermediate(self) -> None:
"""Check that missing links in certificate chains indeed break
The only difference with test_client_intermediate is that we don't send the
intermediate CA to the server, only the client cert.
"""
with HTTPSConnectionPool(
self.host,
self.port,
cert_file=os.path.join(self.certs_dir, CLIENT_NO_INTERMEDIATE_PEM),
key_file=os.path.join(self.certs_dir, CLIENT_INTERMEDIATE_KEY),
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
with pytest.raises((SSLError, ProtocolError)):
https_pool.request("GET", "/certificate", retries=False)
def test_client_key_password(self) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
key_file=os.path.join(self.certs_dir, PASSWORD_CLIENT_KEYFILE),
cert_file=os.path.join(self.certs_dir, CLIENT_CERT),
key_password="letmein",
ssl_minimum_version=self.tls_version(),
) as https_pool:
r = https_pool.request("GET", "/certificate")
subject = r.json()
assert subject["organizationalUnitName"].startswith("Testing cert")
def test_client_encrypted_key_requires_password(self) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
key_file=os.path.join(self.certs_dir, PASSWORD_CLIENT_KEYFILE),
cert_file=os.path.join(self.certs_dir, CLIENT_CERT),
key_password=None,
ssl_minimum_version=self.tls_version(),
) as https_pool:
with pytest.raises(MaxRetryError, match="password is required") as e:
https_pool.request("GET", "/certificate")
assert type(e.value.reason) is SSLError
def test_verified(self) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
with contextlib.closing(https_pool._new_conn()) as conn:
assert conn.__class__ == VerifiedHTTPSConnection
with warnings.catch_warnings(record=True) as w:
r = https_pool.request("GET", "/")
assert r.status == 200
assert [str(wm) for wm in w] == []
def test_verified_with_context(self) -> None:
ctx = util.ssl_.create_urllib3_context(
cert_reqs=ssl.CERT_REQUIRED, ssl_minimum_version=self.tls_version()
)
ctx.load_verify_locations(cafile=DEFAULT_CA)
with HTTPSConnectionPool(self.host, self.port, ssl_context=ctx) as https_pool:
with contextlib.closing(https_pool._new_conn()) as conn:
assert conn.__class__ == VerifiedHTTPSConnection
with mock.patch("warnings.warn") as warn:
r = https_pool.request("GET", "/")
assert r.status == 200
assert not warn.called, warn.call_args_list
def test_context_combines_with_ca_certs(self) -> None:
ctx = util.ssl_.create_urllib3_context(
cert_reqs=ssl.CERT_REQUIRED, ssl_minimum_version=self.tls_version()
)
with HTTPSConnectionPool(
self.host, self.port, ca_certs=DEFAULT_CA, ssl_context=ctx
) as https_pool:
with contextlib.closing(https_pool._new_conn()) as conn:
assert conn.__class__ == VerifiedHTTPSConnection
with mock.patch("warnings.warn") as warn:
r = https_pool.request("GET", "/")
assert r.status == 200
assert not warn.called, warn.call_args_list
def test_ca_dir_verified(self, tmp_path: Path) -> None:
# OpenSSL looks up certificates by the hash for their name, see c_rehash
# TODO infer the bytes using `cryptography.x509.Name.public_bytes`.
# https://github.com/pyca/cryptography/pull/3236
shutil.copyfile(DEFAULT_CA, str(tmp_path / "81deb5f7.0"))
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs="CERT_REQUIRED",
ca_cert_dir=str(tmp_path),
ssl_minimum_version=self.tls_version(),
) as https_pool:
with contextlib.closing(https_pool._new_conn()) as conn:
assert conn.__class__ == VerifiedHTTPSConnection
with warnings.catch_warnings(record=True) as w:
r = https_pool.request("GET", "/")
assert r.status == 200
assert [str(wm) for wm in w] == []
def test_invalid_common_name(self) -> None:
with HTTPSConnectionPool(
"127.0.0.1",
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
with pytest.raises(MaxRetryError) as e:
https_pool.request("GET", "/", retries=0)
assert type(e.value.reason) is SSLError
assert "doesn't match" in str(
e.value.reason
) or "certificate verify failed" in str(e.value.reason)
def test_verified_with_bad_ca_certs(self) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=self.bad_ca_path,
ssl_minimum_version=self.tls_version(),
) as https_pool:
with pytest.raises(MaxRetryError) as e:
https_pool.request("GET", "/")
assert type(e.value.reason) is SSLError
assert (
"certificate verify failed" in str(e.value.reason)
# PyPy is more specific
or "self signed certificate in certificate chain" in str(e.value.reason)
), f"Expected 'certificate verify failed', instead got: {e.value.reason!r}"
def test_wrap_socket_failure_resource_leak(self) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=self.bad_ca_path,
ssl_minimum_version=self.tls_version(),
) as https_pool:
with contextlib.closing(https_pool._get_conn()) as conn:
with pytest.raises(ssl.SSLError):
conn.connect()
assert conn.sock is not None # type: ignore[attr-defined]
def test_verified_without_ca_certs(self) -> None:
# default is cert_reqs=None which is ssl.CERT_NONE
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs="CERT_REQUIRED",
ssl_minimum_version=self.tls_version(),
) as https_pool:
with pytest.raises(MaxRetryError) as e:
https_pool.request("GET", "/")
assert type(e.value.reason) is SSLError
# there is a different error message depending on whether or
# not pyopenssl is injected
assert (
"No root certificates specified" in str(e.value.reason)
# PyPy is more specific
or "self signed certificate in certificate chain" in str(e.value.reason)
# PyPy sometimes uses all-caps here
or "certificate verify failed" in str(e.value.reason).lower()
or "invalid certificate chain" in str(e.value.reason)
), (
"Expected 'No root certificates specified', "
"'certificate verify failed', or "
"'invalid certificate chain', "
"instead got: %r" % e.value.reason
)
def test_no_ssl(self) -> None:
with HTTPSConnectionPool(self.host, self.port) as pool:
pool.ConnectionCls = None # type: ignore[assignment]
with pytest.raises(ImportError):
pool._new_conn()
with pytest.raises(ImportError):
pool.request("GET", "/", retries=0)
def test_unverified_ssl(self) -> None:
"""Test that bare HTTPSConnection can connect, make requests"""
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs=ssl.CERT_NONE,
ssl_minimum_version=self.tls_version(),
) as pool:
with mock.patch("warnings.warn") as warn:
r = pool.request("GET", "/")
assert r.status == 200
assert warn.called
# Modern versions of Python, or systems using PyOpenSSL, only emit
# the unverified warning. Older systems may also emit other
# warnings, which we want to ignore here.
calls = warn.call_args_list
assert InsecureRequestWarning in [x[0][1] for x in calls]
def test_ssl_unverified_with_ca_certs(self) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs="CERT_NONE",
ca_certs=self.bad_ca_path,
ssl_minimum_version=self.tls_version(),
) as pool:
with mock.patch("warnings.warn") as warn:
r = pool.request("GET", "/")
assert r.status == 200
assert warn.called
# Modern versions of Python, or systems using PyOpenSSL, only emit
# the unverified warning. Older systems may also emit other
# warnings, which we want to ignore here.
calls = warn.call_args_list
category = calls[0][0][1]
assert category == InsecureRequestWarning
def test_assert_hostname_false(self) -> None:
with HTTPSConnectionPool(
"localhost",
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.assert_hostname = False
https_pool.request("GET", "/")
def test_assert_specific_hostname(self) -> None:
with HTTPSConnectionPool(
"localhost",
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.assert_hostname = "localhost"
https_pool.request("GET", "/")
def test_server_hostname(self) -> None:
with HTTPSConnectionPool(
"127.0.0.1",
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
server_hostname="localhost",
ssl_minimum_version=self.tls_version(),
) as https_pool:
conn = https_pool._new_conn()
conn.request("GET", "/")
# Assert the wrapping socket is using the passed-through SNI name.
# pyopenssl doesn't let you pull the server_hostname back off the
# socket, so only add this assertion if the attribute is there (i.e.
# the python ssl module).
if hasattr(conn.sock, "server_hostname"): # type: ignore[attr-defined]
assert conn.sock.server_hostname == "localhost" # type: ignore[attr-defined]
conn.getresponse().close()
conn.close()
def test_assert_fingerprint_md5(self) -> None:
with HTTPSConnectionPool(
"localhost",
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
assert_fingerprint=("55:39:BF:70:05:12:43:FA:1F:D1:BF:4E:E8:1B:07:1D"),
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.request("GET", "/")
def test_assert_fingerprint_sha1(self) -> None:
with HTTPSConnectionPool(
"localhost",
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
assert_fingerprint=(
"72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
),
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.request("GET", "/")
def test_assert_fingerprint_sha256(self) -> None:
with HTTPSConnectionPool(
"localhost",
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
assert_fingerprint=(
"E3:59:8E:69:FF:C5:9F:C7:88:87:44:58:22:7F:90:8D:D9:BC:12:C4:90:79:D5:"
"DC:A8:5D:4F:60:40:1E:A6:D2"
),
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.request("GET", "/")
def test_assert_invalid_fingerprint(self) -> None:
def _test_request(pool: HTTPSConnectionPool) -> SSLError:
with pytest.raises(MaxRetryError) as cm:
pool.request("GET", "/", retries=0)
assert type(cm.value.reason) is SSLError
return cm.value.reason
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.assert_fingerprint = (
"AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA:AA"
)
e = _test_request(https_pool)
expected = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
got = "728b554c9afc1e88a11cad1bb2e7cc3edbc8f98a"
assert (
str(e)
== f'Fingerprints did not match. Expected "{expected}", got "{got}"'
)
# Uneven length
https_pool.assert_fingerprint = "AA:A"
e = _test_request(https_pool)
assert "Fingerprint of invalid length:" in str(e)
# Invalid length
https_pool.assert_fingerprint = "AA"
e = _test_request(https_pool)
assert "Fingerprint of invalid length:" in str(e)
def test_verify_none_and_bad_fingerprint(self) -> None:
with HTTPSConnectionPool(
"127.0.0.1",
self.port,
cert_reqs="CERT_NONE",
assert_hostname=False,
assert_fingerprint=(
"AA:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
),
) as https_pool:
with pytest.raises(MaxRetryError) as cm:
https_pool.request("GET", "/", retries=0)
assert type(cm.value.reason) is SSLError
def test_verify_none_and_good_fingerprint(self) -> None:
with HTTPSConnectionPool(
"127.0.0.1",
self.port,
cert_reqs="CERT_NONE",
assert_hostname=False,
assert_fingerprint=(
"72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
),
) as https_pool:
https_pool.request("GET", "/")
def test_good_fingerprint_and_hostname_mismatch(self) -> None:
with HTTPSConnectionPool(
"127.0.0.1",
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
assert_fingerprint=(
"72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
),
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.request("GET", "/")
@requires_network()
def test_https_timeout(self) -> None:
timeout = Timeout(total=None, connect=SHORT_TIMEOUT)
with HTTPSConnectionPool(
TARPIT_HOST,
self.port,
timeout=timeout,
retries=False,
cert_reqs="CERT_REQUIRED",
ssl_minimum_version=self.tls_version(),
) as https_pool:
with pytest.raises(ConnectTimeoutError):
https_pool.request("GET", "/")
timeout = Timeout(read=0.01)
with HTTPSConnectionPool(
self.host,
self.port,
timeout=timeout,
retries=False,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
assert_fingerprint=(
"72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
),
ssl_minimum_version=self.tls_version(),
) as https_pool:
# TODO This was removed in https://github.com/urllib3/urllib3/pull/703/files
# We need to put something back or remove this block.
pass
timeout = Timeout(total=None)
with HTTPSConnectionPool(
self.host,
self.port,
timeout=timeout,
cert_reqs="CERT_NONE",
ssl_minimum_version=self.tls_version(),
) as https_pool:
with pytest.warns(InsecureRequestWarning):
https_pool.request("GET", "/")
def test_tunnel(self) -> None:
"""test the _tunnel behavior"""
timeout = Timeout(total=None)
with HTTPSConnectionPool(
self.host,
self.port,
timeout=timeout,
cert_reqs="CERT_NONE",
ssl_minimum_version=self.tls_version(),
) as https_pool:
with contextlib.closing(https_pool._new_conn()) as conn:
conn.set_tunnel(self.host, self.port)
with mock.patch.object(
conn, "_tunnel", create=True, return_value=None
) as conn_tunnel:
with pytest.warns(InsecureRequestWarning):
https_pool._make_request(conn, "GET", "/")
conn_tunnel.assert_called_once_with()
@requires_network()
def test_enhanced_timeout(self) -> None:
with HTTPSConnectionPool(
TARPIT_HOST,
self.port,
timeout=Timeout(connect=SHORT_TIMEOUT),
retries=False,
cert_reqs="CERT_REQUIRED",
) as https_pool:
with contextlib.closing(https_pool._new_conn()) as conn:
with pytest.raises(ConnectTimeoutError):
https_pool.request("GET", "/")
with pytest.raises(ConnectTimeoutError):
https_pool._make_request(conn, "GET", "/")
with HTTPSConnectionPool(
TARPIT_HOST,
self.port,
timeout=Timeout(connect=LONG_TIMEOUT),
retries=False,
cert_reqs="CERT_REQUIRED",
) as https_pool:
with pytest.raises(ConnectTimeoutError):
https_pool.request("GET", "/", timeout=Timeout(connect=SHORT_TIMEOUT))
with HTTPSConnectionPool(
TARPIT_HOST,
self.port,
timeout=Timeout(total=None),
retries=False,
cert_reqs="CERT_REQUIRED",
) as https_pool:
with contextlib.closing(https_pool._new_conn()) as conn:
with pytest.raises(ConnectTimeoutError):
https_pool.request(
"GET", "/", timeout=Timeout(total=None, connect=SHORT_TIMEOUT)
)
def test_enhanced_ssl_connection(self) -> None:
fingerprint = "72:8B:55:4C:9A:FC:1E:88:A1:1C:AD:1B:B2:E7:CC:3E:DB:C8:F9:8A"
with HTTPSConnectionPool(
self.host,
self.port,
cert_reqs="CERT_REQUIRED",
ca_certs=DEFAULT_CA,
assert_fingerprint=fingerprint,
ssl_minimum_version=self.tls_version(),
) as https_pool:
r = https_pool.request("GET", "/")
assert r.status == 200
def test_ssl_correct_system_time(self) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.cert_reqs = "CERT_REQUIRED"
https_pool.ca_certs = DEFAULT_CA
w = self._request_without_resource_warnings("GET", "/")
assert [] == w
def test_ssl_wrong_system_time(self) -> None:
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.cert_reqs = "CERT_REQUIRED"
https_pool.ca_certs = DEFAULT_CA
with mock.patch("urllib3.connection.datetime") as mock_date:
mock_date.date.today.return_value = datetime.date(1970, 1, 1)
w = self._request_without_resource_warnings("GET", "/")
assert len(w) == 1
warning = w[0]
assert SystemTimeWarning == warning.category
assert isinstance(warning.message, Warning)
assert str(RECENT_DATE) in warning.message.args[0]
def _request_without_resource_warnings(
self, method: str, url: str
) -> list[warnings.WarningMessage]:
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
https_pool.request(method, url)
w = [x for x in w if not isinstance(x.message, ResourceWarning)]
return w
def test_set_ssl_version_to_tls_version(self) -> None:
if self.tls_protocol_name is None:
pytest.skip("Skipping base test class")
with HTTPSConnectionPool(
self.host, self.port, ca_certs=DEFAULT_CA
) as https_pool:
https_pool.ssl_version = ssl_version = self.certs["ssl_version"]
if ssl_version is getattr(ssl, "PROTOCOL_TLS", object()):
cmgr: contextlib.AbstractContextManager[
object
] = contextlib.nullcontext()
else:
cmgr = pytest.warns(
DeprecationWarning,
match=r"'ssl_version' option is deprecated and will be removed "
r"in urllib3 v2\.1\.0\. Instead use 'ssl_minimum_version'",
)
with cmgr:
r = https_pool.request("GET", "/")
assert r.status == 200, r.data
def test_set_cert_default_cert_required(self) -> None:
conn = VerifiedHTTPSConnection(self.host, self.port)
with pytest.warns(DeprecationWarning) as w:
conn.set_cert()
assert conn.cert_reqs == ssl.CERT_REQUIRED
assert len(w) == 1 and str(w[0].message) == (
"HTTPSConnection.set_cert() is deprecated and will be removed in urllib3 v2.1.0. "
"Instead provide the parameters to the HTTPSConnection constructor."
)
@pytest.mark.parametrize("verify_mode", [ssl.CERT_NONE, ssl.CERT_REQUIRED])
def test_set_cert_inherits_cert_reqs_from_ssl_context(
self, verify_mode: int
) -> None:
ssl_context = urllib3.util.ssl_.create_urllib3_context(cert_reqs=verify_mode)
assert ssl_context.verify_mode == verify_mode
conn = HTTPSConnection(self.host, self.port, ssl_context=ssl_context)
with pytest.warns(DeprecationWarning) as w:
conn.set_cert()
assert conn.cert_reqs == verify_mode
assert (
conn.ssl_context is not None and conn.ssl_context.verify_mode == verify_mode
)
assert len(w) == 1 and str(w[0].message) == (
"HTTPSConnection.set_cert() is deprecated and will be removed in urllib3 v2.1.0. "
"Instead provide the parameters to the HTTPSConnection constructor."
)
def test_tls_protocol_name_of_socket(self) -> None:
if self.tls_protocol_name is None:
pytest.skip("Skipping base test class")
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
ssl_maximum_version=self.tls_version(),
) as https_pool:
with contextlib.closing(https_pool._get_conn()) as conn:
conn.connect()
if not hasattr(conn.sock, "version"): # type: ignore[attr-defined]
pytest.skip("SSLSocket.version() not available")
assert conn.sock.version() == self.tls_protocol_name # type: ignore[attr-defined]
def test_ssl_version_is_deprecated(self) -> None:
if self.tls_protocol_name is None:
pytest.skip("Skipping base test class")
if self.ssl_version() == ssl.PROTOCOL_TLS_CLIENT:
pytest.skip(
"Skipping because ssl_version=ssl.PROTOCOL_TLS_CLIENT is not deprecated"
)
with HTTPSConnectionPool(
self.host, self.port, ca_certs=DEFAULT_CA, ssl_version=self.ssl_version()
) as https_pool:
with contextlib.closing(https_pool._get_conn()) as conn:
with pytest.warns(DeprecationWarning) as w:
conn.connect()
assert len(w) >= 1
assert any(x.category == DeprecationWarning for x in w)
assert any(
str(x.message)
== (
"'ssl_version' option is deprecated and will be removed in "
"urllib3 v2.1.0. Instead use 'ssl_minimum_version'"
)
for x in w
)
@pytest.mark.parametrize(
"ssl_version", [None, ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_CLIENT]
)
def test_ssl_version_with_protocol_tls_or_client_not_deprecated(
self, ssl_version: int | None
) -> None:
if self.tls_protocol_name is None:
pytest.skip("Skipping base test class")
if self.tls_protocol_not_default():
pytest.skip(
f"Skipping because '{self.tls_protocol_name}' isn't set by default"
)
with HTTPSConnectionPool(
self.host, self.port, ca_certs=DEFAULT_CA, ssl_version=ssl_version
) as https_pool:
with contextlib.closing(https_pool._get_conn()) as conn:
with warnings.catch_warnings(record=True) as w:
conn.connect()
assert [str(wm) for wm in w if wm.category != ResourceWarning] == []
def test_no_tls_version_deprecation_with_ssl_context(self) -> None:
if self.tls_protocol_name is None:
pytest.skip("Skipping base test class")
ctx = util.ssl_.create_urllib3_context(ssl_minimum_version=self.tls_version())
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_context=ctx,
) as https_pool:
with contextlib.closing(https_pool._get_conn()) as conn:
with warnings.catch_warnings(record=True) as w:
conn.connect()
assert [str(wm) for wm in w if wm.category != ResourceWarning] == []
def test_tls_version_maximum_and_minimum(self) -> None:
if self.tls_protocol_name is None:
pytest.skip("Skipping base test class")
from ssl import TLSVersion
min_max_versions = [
(self.tls_version(), self.tls_version()),
(TLSVersion.MINIMUM_SUPPORTED, self.tls_version()),
(TLSVersion.MINIMUM_SUPPORTED, TLSVersion.MAXIMUM_SUPPORTED),
]
for minimum_version, maximum_version in min_max_versions:
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=minimum_version,
ssl_maximum_version=maximum_version,
) as https_pool:
conn = https_pool._get_conn()
try:
conn.connect()
if maximum_version == TLSVersion.MAXIMUM_SUPPORTED:
# A higher protocol than tls_protocol_name could be negotiated
assert conn.sock.version() >= self.tls_protocol_name # type: ignore[attr-defined]
else:
assert conn.sock.version() == self.tls_protocol_name # type: ignore[attr-defined]
finally:
conn.close()
def test_sslkeylogfile(
self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
if not hasattr(util.SSLContext, "keylog_filename"):
pytest.skip("requires OpenSSL 1.1.1+")
keylog_file = tmp_path / "keylogfile.txt"
monkeypatch.setenv("SSLKEYLOGFILE", str(keylog_file))
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as https_pool:
r = https_pool.request("GET", "/")
assert r.status == 200, r.data
assert keylog_file.is_file(), "keylogfile '%s' should exist" % str(
keylog_file
)
assert keylog_file.read_text().startswith(
"# TLS secrets log file"
), "keylogfile '%s' should start with '# TLS secrets log file'" % str(
keylog_file
)
@pytest.mark.parametrize("sslkeylogfile", [None, ""])
def test_sslkeylogfile_empty(
self, monkeypatch: pytest.MonkeyPatch, sslkeylogfile: str | None
) -> None:
# Assert that an HTTPS connection doesn't error out when given
# no SSLKEYLOGFILE or an empty value (ie 'SSLKEYLOGFILE=')
if sslkeylogfile is not None:
monkeypatch.setenv("SSLKEYLOGFILE", sslkeylogfile)
else:
monkeypatch.delenv("SSLKEYLOGFILE", raising=False)
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as pool:
r = pool.request("GET", "/")
assert r.status == 200, r.data
def test_alpn_default(self) -> None:
"""Default ALPN protocols are sent by default."""
if not has_alpn() or not has_alpn(ssl.SSLContext):
pytest.skip("ALPN-support not available")
with HTTPSConnectionPool(
self.host,
self.port,
ca_certs=DEFAULT_CA,
ssl_minimum_version=self.tls_version(),
) as pool:
r = pool.request("GET", "/alpn_protocol", retries=0)
assert r.status == 200
assert r.data.decode("utf-8") == util.ALPN_PROTOCOLS[0]
def test_default_ssl_context_ssl_min_max_versions(self) -> None:
ctx = urllib3.util.ssl_.create_urllib3_context()
assert ctx.minimum_version == ssl.TLSVersion.TLSv1_2
# urllib3 sets a default maximum version only when it is
# injected with PyOpenSSL SSL-support.
# Otherwise, the default maximum version is set by Python's
# `ssl.SSLContext`. The value respects OpenSSL configuration and
# can be different from `ssl.TLSVersion.MAXIMUM_SUPPORTED`.
# https://github.com/urllib3/urllib3/issues/2477#issuecomment-1151452150
if util.IS_PYOPENSSL:
expected_maximum_version = ssl.TLSVersion.MAXIMUM_SUPPORTED
else:
expected_maximum_version = ssl.SSLContext(
ssl.PROTOCOL_TLS_CLIENT
).maximum_version
assert ctx.maximum_version == expected_maximum_version
def test_ssl_context_ssl_version_uses_ssl_min_max_versions(self) -> None:
if self.ssl_version() == ssl.PROTOCOL_TLS_CLIENT:
pytest.skip(
"Skipping because ssl_version=ssl.PROTOCOL_TLS_CLIENT is not deprecated"
)
with pytest.warns(
DeprecationWarning,
match=r"'ssl_version' option is deprecated and will be removed in "
r"urllib3 v2\.1\.0\. Instead use 'ssl_minimum_version'",
):
ctx = urllib3.util.ssl_.create_urllib3_context(
ssl_version=self.ssl_version()
)
assert ctx.minimum_version == self.tls_version()
assert ctx.maximum_version == self.tls_version()
@pytest.mark.usefixtures("requires_tlsv1")
class TestHTTPS_TLSv1(BaseTestHTTPS):
tls_protocol_name = "TLSv1"
certs = TLSv1_CERTS
@pytest.mark.usefixtures("requires_tlsv1_1")
class TestHTTPS_TLSv1_1(BaseTestHTTPS):
tls_protocol_name = "TLSv1.1"
certs = TLSv1_1_CERTS
@pytest.mark.usefixtures("requires_tlsv1_2")
class TestHTTPS_TLSv1_2(BaseTestHTTPS):
tls_protocol_name = "TLSv1.2"
certs = TLSv1_2_CERTS
@pytest.mark.usefixtures("requires_tlsv1_3")
class TestHTTPS_TLSv1_3(BaseTestHTTPS):
tls_protocol_name = "TLSv1.3"
certs = TLSv1_3_CERTS
class TestHTTPS_Hostname:
def test_can_validate_san(self, san_server: ServerConfig) -> None:
"""Ensure that urllib3 can validate SANs with IP addresses in them."""
with HTTPSConnectionPool(
san_server.host,
san_server.port,
cert_reqs="CERT_REQUIRED",
ca_certs=san_server.ca_certs,
) as https_pool:
r = https_pool.request("GET", "/")
assert r.status == 200
def test_common_name_without_san_fails(self, no_san_server: ServerConfig) -> None:
with HTTPSConnectionPool(
no_san_server.host,
no_san_server.port,
cert_reqs="CERT_REQUIRED",
ca_certs=no_san_server.ca_certs,
) as https_pool:
with pytest.raises(
MaxRetryError,
) as e:
https_pool.request("GET", "/")
assert "mismatch, certificate is not valid" in str(
e.value
) or "no appropriate subjectAltName" in str(e.value)
def test_common_name_without_san_with_different_common_name(
self, no_san_server_with_different_commmon_name: ServerConfig
) -> None:
ctx = urllib3.util.ssl_.create_urllib3_context()
try:
ctx.hostname_checks_common_name = True
except AttributeError:
pytest.skip("Couldn't set 'SSLContext.hostname_checks_common_name'")
with HTTPSConnectionPool(
no_san_server_with_different_commmon_name.host,
no_san_server_with_different_commmon_name.port,
cert_reqs="CERT_REQUIRED",
ca_certs=no_san_server_with_different_commmon_name.ca_certs,
ssl_context=ctx,
) as https_pool:
with pytest.raises(MaxRetryError) as e:
https_pool.request("GET", "/")
assert "mismatch, certificate is not valid for 'localhost'" in str(
e.value
) or "hostname 'localhost' doesn't match 'example.com'" in str(e.value)
@pytest.mark.parametrize("use_assert_hostname", [True, False])
def test_hostname_checks_common_name_respected(
self, no_san_server: ServerConfig, use_assert_hostname: bool
) -> None:
ctx = urllib3.util.ssl_.create_urllib3_context()
if not hasattr(ctx, "hostname_checks_common_name"):
pytest.skip("Test requires 'SSLContext.hostname_checks_common_name'")
ctx.load_verify_locations(no_san_server.ca_certs)
try:
ctx.hostname_checks_common_name = True
except AttributeError:
pytest.skip("Couldn't set 'SSLContext.hostname_checks_common_name'")
err: MaxRetryError | None
try:
with HTTPSConnectionPool(
no_san_server.host,
no_san_server.port,
cert_reqs="CERT_REQUIRED",
ssl_context=ctx,
assert_hostname=no_san_server.host if use_assert_hostname else None,
) as https_pool:
https_pool.request("GET", "/")
except MaxRetryError as e:
err = e
else:
err = None
# commonName is only valid for DNS names, not IP addresses.
if no_san_server.host == "localhost":
assert err is None
# IP addresses should fail for commonName.
else:
assert err is not None
assert type(err.reason) is SSLError
assert isinstance(
err.reason.args[0], (ssl.SSLCertVerificationError, CertificateError)
)
def test_assert_hostname_invalid_san(
self, no_localhost_san_server: ServerConfig
) -> None:
"""Ensure SAN errors are not raised while assert_hostname is false"""
with HTTPSConnectionPool(
no_localhost_san_server.host,
no_localhost_san_server.port,
cert_reqs="CERT_REQUIRED",
ca_certs=no_localhost_san_server.ca_certs,
assert_hostname=False,
) as https_pool:
https_pool.request("GET", "/")
def test_assert_hostname_invalid_cn(
self, no_san_server_with_different_commmon_name: ServerConfig
) -> None:
"""Ensure CN errors are not raised while assert_hostname is false"""
with HTTPSConnectionPool(
no_san_server_with_different_commmon_name.host,
no_san_server_with_different_commmon_name.port,
cert_reqs="CERT_REQUIRED",
ca_certs=no_san_server_with_different_commmon_name.ca_certs,
assert_hostname=False,
) as https_pool:
https_pool.request("GET", "/")
class TestHTTPS_IPV4SAN:
def test_can_validate_ip_san(self, ipv4_san_server: ServerConfig) -> None:
"""Ensure that urllib3 can validate SANs with IP addresses in them."""
with HTTPSConnectionPool(
ipv4_san_server.host,
ipv4_san_server.port,
cert_reqs="CERT_REQUIRED",
ca_certs=ipv4_san_server.ca_certs,
) as https_pool:
r = https_pool.request("GET", "/")
assert r.status == 200
class TestHTTPS_IPV6SAN:
@pytest.mark.parametrize("host", ["::1", "[::1]"])
def test_can_validate_ipv6_san(
self, ipv6_san_server: ServerConfig, host: str, http_version: str
) -> None:
"""Ensure that urllib3 can validate SANs with IPv6 addresses in them."""
with HTTPSConnectionPool(
host,
ipv6_san_server.port,
cert_reqs="CERT_REQUIRED",
ca_certs=ipv6_san_server.ca_certs,
) as https_pool:
r = https_pool.request("GET", "/")
assert r.status == 200
assert r.headers["server"] == f"hypercorn-{http_version}"