From 67c4de90f342e1dd64f6da5ebec42526ad6ae340 Mon Sep 17 00:00:00 2001 From: Michael <35783820+mib1185@users.noreply.github.com> Date: Sat, 15 Apr 2023 21:32:30 +0200 Subject: [PATCH] Add option to select list of accepted ssl ciphers in httpx client (#91389) --- homeassistant/helpers/httpx_client.py | 11 ++- homeassistant/util/ssl.py | 118 ++++++++++++++++---------- tests/util/test_ssl.py | 53 ++++++++++++ 3 files changed, 137 insertions(+), 45 deletions(-) create mode 100644 tests/util/test_ssl.py diff --git a/homeassistant/helpers/httpx_client.py b/homeassistant/helpers/httpx_client.py index 44ad81c73e9..beb084d8c1c 100644 --- a/homeassistant/helpers/httpx_client.py +++ b/homeassistant/helpers/httpx_client.py @@ -11,7 +11,11 @@ from typing_extensions import Self from homeassistant.const import APPLICATION_NAME, EVENT_HOMEASSISTANT_CLOSE, __version__ from homeassistant.core import Event, HomeAssistant, callback from homeassistant.loader import bind_hass -from homeassistant.util.ssl import get_default_context, get_default_no_verify_context +from homeassistant.util.ssl import ( + SSLCipherList, + client_context, + create_no_verify_ssl_context, +) from .frame import warn_use @@ -56,6 +60,7 @@ def create_async_httpx_client( hass: HomeAssistant, verify_ssl: bool = True, auto_cleanup: bool = True, + ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT, **kwargs: Any, ) -> httpx.AsyncClient: """Create a new httpx.AsyncClient with kwargs, i.e. for cookies. @@ -66,7 +71,9 @@ def create_async_httpx_client( This method must be run in the event loop. """ ssl_context = ( - get_default_context() if verify_ssl else get_default_no_verify_context() + client_context(ssl_cipher_list) + if verify_ssl + else create_no_verify_ssl_context(ssl_cipher_list) ) client = HassHttpXAsyncClient( verify=ssl_context, diff --git a/homeassistant/util/ssl.py b/homeassistant/util/ssl.py index 5b8830e6571..aa1b933e0ae 100644 --- a/homeassistant/util/ssl.py +++ b/homeassistant/util/ssl.py @@ -1,12 +1,70 @@ """Helper to create SSL contexts.""" import contextlib +from functools import cache from os import environ import ssl import certifi +from homeassistant.backports.enum import StrEnum -def create_no_verify_ssl_context() -> ssl.SSLContext: + +class SSLCipherList(StrEnum): + """SSL cipher lists.""" + + PYTHON_DEFAULT = "python_default" + INTERMEDIATE = "intermediate" + MODERN = "modern" + + +SSL_CIPHER_LISTS = { + SSLCipherList.INTERMEDIATE: ( + "ECDHE-ECDSA-CHACHA20-POLY1305:" + "ECDHE-RSA-CHACHA20-POLY1305:" + "ECDHE-ECDSA-AES128-GCM-SHA256:" + "ECDHE-RSA-AES128-GCM-SHA256:" + "ECDHE-ECDSA-AES256-GCM-SHA384:" + "ECDHE-RSA-AES256-GCM-SHA384:" + "DHE-RSA-AES128-GCM-SHA256:" + "DHE-RSA-AES256-GCM-SHA384:" + "ECDHE-ECDSA-AES128-SHA256:" + "ECDHE-RSA-AES128-SHA256:" + "ECDHE-ECDSA-AES128-SHA:" + "ECDHE-RSA-AES256-SHA384:" + "ECDHE-RSA-AES128-SHA:" + "ECDHE-ECDSA-AES256-SHA384:" + "ECDHE-ECDSA-AES256-SHA:" + "ECDHE-RSA-AES256-SHA:" + "DHE-RSA-AES128-SHA256:" + "DHE-RSA-AES128-SHA:" + "DHE-RSA-AES256-SHA256:" + "DHE-RSA-AES256-SHA:" + "ECDHE-ECDSA-DES-CBC3-SHA:" + "ECDHE-RSA-DES-CBC3-SHA:" + "EDH-RSA-DES-CBC3-SHA:" + "AES128-GCM-SHA256:" + "AES256-GCM-SHA384:" + "AES128-SHA256:" + "AES256-SHA256:" + "AES128-SHA:" + "AES256-SHA:" + "DES-CBC3-SHA:" + "!DSS" + ), + SSLCipherList.MODERN: ( + "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:" + "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:" + "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:" + "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:" + "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256" + ), +} + + +@cache +def create_no_verify_ssl_context( + ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT, +) -> ssl.SSLContext: """Return an SSL context that does not verify the server certificate. This is a copy of aiohttp's create_default_context() function, with the @@ -23,10 +81,16 @@ def create_no_verify_ssl_context() -> ssl.SSLContext: # This only works for OpenSSL >= 1.0.0 sslcontext.options |= ssl.OP_NO_COMPRESSION sslcontext.set_default_verify_paths() + if ssl_cipher_list != SSLCipherList.PYTHON_DEFAULT: + sslcontext.set_ciphers(SSL_CIPHER_LISTS[ssl_cipher_list]) + return sslcontext -def client_context() -> ssl.SSLContext: +@cache +def client_context( + ssl_cipher_list: SSLCipherList = SSLCipherList.PYTHON_DEFAULT, +) -> ssl.SSLContext: """Return an SSL context for making requests.""" # Reuse environment variable definition from requests, since it's already a @@ -34,7 +98,13 @@ def client_context() -> ssl.SSLContext: # certs from certifi package. cafile = environ.get("REQUESTS_CA_BUNDLE", certifi.where()) - return ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile) + sslcontext = ssl.create_default_context( + purpose=ssl.Purpose.SERVER_AUTH, cafile=cafile + ) + if ssl_cipher_list != SSLCipherList.PYTHON_DEFAULT: + sslcontext.set_ciphers(SSL_CIPHER_LISTS[ssl_cipher_list]) + + return sslcontext # Create this only once and reuse it @@ -71,13 +141,7 @@ def server_context_modern() -> ssl.SSLContext: if hasattr(ssl, "OP_NO_COMPRESSION"): context.options |= ssl.OP_NO_COMPRESSION - context.set_ciphers( - "ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:" - "ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:" - "ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:" - "ECDHE-ECDSA-AES256-SHA384:ECDHE-RSA-AES256-SHA384:" - "ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256" - ) + context.set_ciphers(SSL_CIPHER_LISTS[SSLCipherList.MODERN]) return context @@ -97,38 +161,6 @@ def server_context_intermediate() -> ssl.SSLContext: if hasattr(ssl, "OP_NO_COMPRESSION"): context.options |= ssl.OP_NO_COMPRESSION - context.set_ciphers( - "ECDHE-ECDSA-CHACHA20-POLY1305:" - "ECDHE-RSA-CHACHA20-POLY1305:" - "ECDHE-ECDSA-AES128-GCM-SHA256:" - "ECDHE-RSA-AES128-GCM-SHA256:" - "ECDHE-ECDSA-AES256-GCM-SHA384:" - "ECDHE-RSA-AES256-GCM-SHA384:" - "DHE-RSA-AES128-GCM-SHA256:" - "DHE-RSA-AES256-GCM-SHA384:" - "ECDHE-ECDSA-AES128-SHA256:" - "ECDHE-RSA-AES128-SHA256:" - "ECDHE-ECDSA-AES128-SHA:" - "ECDHE-RSA-AES256-SHA384:" - "ECDHE-RSA-AES128-SHA:" - "ECDHE-ECDSA-AES256-SHA384:" - "ECDHE-ECDSA-AES256-SHA:" - "ECDHE-RSA-AES256-SHA:" - "DHE-RSA-AES128-SHA256:" - "DHE-RSA-AES128-SHA:" - "DHE-RSA-AES256-SHA256:" - "DHE-RSA-AES256-SHA:" - "ECDHE-ECDSA-DES-CBC3-SHA:" - "ECDHE-RSA-DES-CBC3-SHA:" - "EDH-RSA-DES-CBC3-SHA:" - "AES128-GCM-SHA256:" - "AES256-GCM-SHA384:" - "AES128-SHA256:" - "AES256-SHA256:" - "AES128-SHA:" - "AES256-SHA:" - "DES-CBC3-SHA:" - "!DSS" - ) + context.set_ciphers(SSL_CIPHER_LISTS[SSLCipherList.INTERMEDIATE]) return context diff --git a/tests/util/test_ssl.py b/tests/util/test_ssl.py new file mode 100644 index 00000000000..4d43859cc44 --- /dev/null +++ b/tests/util/test_ssl.py @@ -0,0 +1,53 @@ +"""Test Home Assistant ssl utility functions.""" + +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from homeassistant.util.ssl import ( + SSL_CIPHER_LISTS, + SSLCipherList, + client_context, + create_no_verify_ssl_context, +) + + +@pytest.fixture +def mock_sslcontext(): + """Mock the ssl lib.""" + ssl_mock = MagicMock(set_ciphers=Mock(return_value=True)) + return ssl_mock + + +def test_client_context(mock_sslcontext) -> None: + """Test client context.""" + with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext): + client_context() + mock_sslcontext.set_ciphers.assert_not_called() + + client_context(SSLCipherList.MODERN) + mock_sslcontext.set_ciphers.assert_called_with( + SSL_CIPHER_LISTS[SSLCipherList.MODERN] + ) + + client_context(SSLCipherList.INTERMEDIATE) + mock_sslcontext.set_ciphers.assert_called_with( + SSL_CIPHER_LISTS[SSLCipherList.INTERMEDIATE] + ) + + +def test_no_verify_ssl_context(mock_sslcontext) -> None: + """Test no verify ssl context.""" + with patch("homeassistant.util.ssl.ssl.SSLContext", return_value=mock_sslcontext): + create_no_verify_ssl_context() + mock_sslcontext.set_ciphers.assert_not_called() + + create_no_verify_ssl_context(SSLCipherList.MODERN) + mock_sslcontext.set_ciphers.assert_called_with( + SSL_CIPHER_LISTS[SSLCipherList.MODERN] + ) + + create_no_verify_ssl_context(SSLCipherList.INTERMEDIATE) + mock_sslcontext.set_ciphers.assert_called_with( + SSL_CIPHER_LISTS[SSLCipherList.INTERMEDIATE] + )