From 7cc0b8a2fe8953b7824af38d9e639df8448b847e Mon Sep 17 00:00:00 2001 From: Jan Bouwhuis Date: Sun, 4 Feb 2024 21:25:14 +0100 Subject: [PATCH] Fix imap message part decoding (#109523) --- homeassistant/components/imap/coordinator.py | 18 ++++++++++-------- tests/components/imap/test_init.py | 8 ++++++-- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/homeassistant/components/imap/coordinator.py b/homeassistant/components/imap/coordinator.py index 5591980b2f1..49938eaaa0a 100644 --- a/homeassistant/components/imap/coordinator.py +++ b/homeassistant/components/imap/coordinator.py @@ -9,7 +9,7 @@ from email.header import decode_header, make_header from email.message import Message from email.utils import parseaddr, parsedate_to_datetime import logging -from typing import Any +from typing import TYPE_CHECKING, Any from aioimaplib import AUTH, IMAP4_SSL, NONAUTH, SELECTED, AioImapException @@ -97,9 +97,8 @@ async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL: class ImapMessage: """Class to parse an RFC822 email message.""" - def __init__(self, raw_message: bytes, charset: str = "utf-8") -> None: + def __init__(self, raw_message: bytes) -> None: """Initialize IMAP message.""" - self._charset = charset self.email_message = email.message_from_bytes(raw_message) @property @@ -153,7 +152,7 @@ class ImapMessage: def text(self) -> str: """Get the message text from the email. - Will look for text/plain or use text/html if not found. + Will look for text/plain or use/ text/html if not found. """ message_text: str | None = None message_html: str | None = None @@ -166,8 +165,13 @@ class ImapMessage: Falls back to the raw content part if decoding fails. """ try: - return str(part.get_payload(decode=True).decode(self._charset)) + decoded_payload: Any = part.get_payload(decode=True) + if TYPE_CHECKING: + assert isinstance(decoded_payload, bytes) + content_charset = part.get_content_charset() or "utf-8" + return decoded_payload.decode(content_charset) except ValueError: + # return undecoded payload return str(part.get_payload()) part: Message @@ -237,9 +241,7 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]): """Send a event for the last message if the last message was changed.""" response = await self.imap_client.fetch(last_message_uid, "BODY.PEEK[]") if response.result == "OK": - message = ImapMessage( - response.lines[1], charset=self.config_entry.data[CONF_CHARSET] - ) + message = ImapMessage(response.lines[1]) # Set `initial` to `False` if the last message is triggered again initial: bool = True if (message_id := message.message_id) == self._last_message_id: diff --git a/tests/components/imap/test_init.py b/tests/components/imap/test_init.py index a00f9d9c25d..8a8ac88c8aa 100644 --- a/tests/components/imap/test_init.py +++ b/tests/components/imap/test_init.py @@ -8,6 +8,7 @@ from aioimaplib import AUTH, NONAUTH, SELECTED, AioImapException, Response import pytest from homeassistant.components.imap import DOMAIN +from homeassistant.components.imap.const import CONF_CHARSET from homeassistant.components.imap.errors import InvalidAuth, InvalidFolder from homeassistant.components.sensor.const import SensorStateClass from homeassistant.const import STATE_UNAVAILABLE @@ -131,13 +132,16 @@ async def test_entry_startup_fails( ], ) @pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"]) +@pytest.mark.parametrize("charset", ["utf-8", "us-ascii"], ids=["utf-8", "us-ascii"]) async def test_receiving_message_successfully( - hass: HomeAssistant, mock_imap_protocol: MagicMock, valid_date: bool + hass: HomeAssistant, mock_imap_protocol: MagicMock, valid_date: bool, charset: str ) -> None: """Test receiving a message successfully.""" event_called = async_capture_events(hass, "imap_content") - config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG) + config = MOCK_CONFIG.copy() + config[CONF_CHARSET] = charset + config_entry = MockConfigEntry(domain=DOMAIN, data=config) config_entry.add_to_hass(hass) assert await hass.config_entries.async_setup(config_entry.entry_id) await hass.async_block_till_done()