diff --git a/homeassistant/components/imap/coordinator.py b/homeassistant/components/imap/coordinator.py index 59c24b11e51..d77f7fb05bb 100644 --- a/homeassistant/components/imap/coordinator.py +++ b/homeassistant/components/imap/coordinator.py @@ -6,6 +6,7 @@ from collections.abc import Mapping from datetime import datetime, timedelta import email from email.header import decode_header, make_header +from email.message import Message from email.utils import parseaddr, parsedate_to_datetime import logging from typing import Any @@ -96,8 +97,9 @@ async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL: class ImapMessage: """Class to parse an RFC822 email message.""" - def __init__(self, raw_message: bytes) -> None: + def __init__(self, raw_message: bytes, charset: str = "utf-8") -> None: """Initialize IMAP message.""" + self._charset = charset self.email_message = email.message_from_bytes(raw_message) @property @@ -157,18 +159,30 @@ class ImapMessage: message_html: str | None = None message_untyped_text: str | None = None + def _decode_payload(part: Message) -> str: + """Try to decode text payloads. + + Common text encodings are quoted-printable or base64. + Falls back to the raw content part if decoding fails. + """ + try: + return str(part.get_payload(decode=True).decode(self._charset)) + except Exception: # pylint: disable=broad-except + return str(part.get_payload()) + + part: Message for part in self.email_message.walk(): if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN: if message_text is None: - message_text = part.get_payload() + message_text = _decode_payload(part) elif part.get_content_type() == "text/html": if message_html is None: - message_html = part.get_payload() + message_html = _decode_payload(part) elif ( part.get_content_type().startswith("text") and message_untyped_text is None ): - message_untyped_text = part.get_payload() + message_untyped_text = str(part.get_payload()) if message_text is not None: return message_text @@ -223,7 +237,9 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]): """Send a event for the last message if the last message was changed.""" response = await self.imap_client.fetch(last_message_uid, "BODY.PEEK[]") if response.result == "OK": - message = ImapMessage(response.lines[1]) + message = ImapMessage( + response.lines[1], charset=self.config_entry.data[CONF_CHARSET] + ) # Set `initial` to `False` if the last message is triggered again initial: bool = True if (message_id := message.message_id) == self._last_message_id: diff --git a/tests/components/imap/const.py b/tests/components/imap/const.py index ec864fd4665..713261936c7 100644 --- a/tests/components/imap/const.py +++ b/tests/components/imap/const.py @@ -18,16 +18,25 @@ TEST_MESSAGE_HEADERS1 = ( b"for ; Fri, 24 Mar 2023 13:52:01 +0100 (CET)\r\n" ) TEST_MESSAGE_HEADERS2 = ( - b"MIME-Version: 1.0\r\n" b"To: notify@example.com\r\n" b"From: John Doe \r\n" b"Subject: Test subject\r\n" - b"Message-ID: " + b"Message-ID: \r\n" + b"MIME-Version: 1.0\r\n" +) + +TEST_MULTIPART_HEADER = ( + b'Content-Type: multipart/related;\r\n\tboundary="Mark=_100584970350292485166"' ) TEST_MESSAGE_HEADERS3 = b"" TEST_MESSAGE = TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS2 + +TEST_MESSAGE_MULTIPART = ( + TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS2 + TEST_MULTIPART_HEADER +) + TEST_MESSAGE_NO_SUBJECT_TO_FROM = ( TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS3 ) @@ -44,21 +53,27 @@ TEST_INVALID_DATE3 = ( TEST_CONTENT_TEXT_BARE = b"\r\nTest body\r\n\r\n" -TEST_CONTENT_BINARY = ( - b"Content-Type: application/binary\r\n" - b"Content-Transfer-Encoding: base64\r\n" - b"\r\n" - b"VGVzdCBib2R5\r\n" -) +TEST_CONTENT_BINARY = b"Content-Type: application/binary\r\n\r\nTest body\r\n" TEST_CONTENT_TEXT_PLAIN = ( - b"Content-Type: text/plain; charset=UTF-8; format=flowed\r\n" - b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n\r\n" + b'Content-Type: text/plain; charset="utf-8"\r\n' + b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n" ) +TEST_CONTENT_TEXT_BASE64 = ( + b'Content-Type: text/plain; charset="utf-8"\r\n' + b"Content-Transfer-Encoding: base64\r\n\r\nVGVzdCBib2R5\r\n" +) + +TEST_CONTENT_TEXT_BASE64_INVALID = ( + b'Content-Type: text/plain; charset="utf-8"\r\n' + b"Content-Transfer-Encoding: base64\r\n\r\nVGVzdCBib2R5invalid\r\n" +) +TEST_BADLY_ENCODED_CONTENT = "VGVzdCBib2R5invalid\r\n" + TEST_CONTENT_TEXT_OTHER = ( b"Content-Type: text/other; charset=UTF-8\r\n" - b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n\r\n" + b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n" ) TEST_CONTENT_HTML = ( @@ -76,14 +91,40 @@ TEST_CONTENT_HTML = ( b"\r\n" b"\r\n" ) +TEST_CONTENT_HTML_BASE64 = ( + b"Content-Type: text/html; charset=UTF-8\r\n" + b"Content-Transfer-Encoding: base64\r\n\r\n" + b"PGh0bWw+CiAgICA8aGVhZD48bWV0YSBodHRwLWVxdW" + b"l2PSJjb250ZW50LXR5cGUiIGNvbnRlbnQ9InRleHQvaHRtbDsgY2hhcnNldD1VVEYtOCI+PC9oZWFkPgog" + b"CAgPGJvZHk+CiAgICAgIDxwPlRlc3QgYm9keTxicj48L3A+CiAgICA8L2JvZHk+CjwvaHRtbD4=\r\n" +) + TEST_CONTENT_MULTIPART = ( b"\r\nThis is a multi-part message in MIME format.\r\n" - + b"--------------McwBciN2C0o3rWeF1tmFo2oI\r\n" + + b"\r\n--Mark=_100584970350292485166\r\n" + TEST_CONTENT_TEXT_PLAIN - + b"--------------McwBciN2C0o3rWeF1tmFo2oI\r\n" + + b"\r\n--Mark=_100584970350292485166\r\n" + TEST_CONTENT_HTML - + b"--------------McwBciN2C0o3rWeF1tmFo2oI--\r\n" + + b"\r\n--Mark=_100584970350292485166--\r\n" +) + +TEST_CONTENT_MULTIPART_BASE64 = ( + b"\r\nThis is a multi-part message in MIME format.\r\n" + + b"\r\n--Mark=_100584970350292485166\r\n" + + TEST_CONTENT_TEXT_BASE64 + + b"\r\n--Mark=_100584970350292485166\r\n" + + TEST_CONTENT_HTML_BASE64 + + b"\r\n--Mark=_100584970350292485166--\r\n" +) + +TEST_CONTENT_MULTIPART_BASE64_INVALID = ( + b"\r\nThis is a multi-part message in MIME format.\r\n" + + b"\r\n--Mark=_100584970350292485166\r\n" + + TEST_CONTENT_TEXT_BASE64_INVALID + + b"\r\n--Mark=_100584970350292485166\r\n" + + TEST_CONTENT_HTML_BASE64 + + b"\r\n--Mark=_100584970350292485166--\r\n" ) EMPTY_SEARCH_RESPONSE = ("OK", [b"", b"Search completed (0.0001 + 0.000 secs)."]) @@ -202,14 +243,40 @@ TEST_FETCH_RESPONSE_MULTIPART = ( "OK", [ b"1 FETCH (BODY[] {" - + str(len(TEST_MESSAGE + TEST_CONTENT_MULTIPART)).encode("utf-8") + + str(len(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART)).encode("utf-8") + b"}", - bytearray(TEST_MESSAGE + TEST_CONTENT_MULTIPART), + bytearray(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART), + b")", + b"Fetch completed (0.0001 + 0.000 secs).", + ], +) +TEST_FETCH_RESPONSE_MULTIPART_BASE64 = ( + "OK", + [ + b"1 FETCH (BODY[] {" + + str(len(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART_BASE64)).encode( + "utf-8" + ) + + b"}", + bytearray(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART_BASE64), b")", b"Fetch completed (0.0001 + 0.000 secs).", ], ) +TEST_FETCH_RESPONSE_MULTIPART_BASE64_INVALID = ( + "OK", + [ + b"1 FETCH (BODY[] {" + + str( + len(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART_BASE64_INVALID) + ).encode("utf-8") + + b"}", + bytearray(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART_BASE64_INVALID), + b")", + b"Fetch completed (0.0001 + 0.000 secs).", + ], +) TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM = ( "OK", diff --git a/tests/components/imap/test_init.py b/tests/components/imap/test_init.py index ceda841202c..a00f9d9c25d 100644 --- a/tests/components/imap/test_init.py +++ b/tests/components/imap/test_init.py @@ -17,12 +17,15 @@ from homeassistant.util.dt import utcnow from .const import ( BAD_RESPONSE, EMPTY_SEARCH_RESPONSE, + TEST_BADLY_ENCODED_CONTENT, TEST_FETCH_RESPONSE_BINARY, TEST_FETCH_RESPONSE_HTML, TEST_FETCH_RESPONSE_INVALID_DATE1, TEST_FETCH_RESPONSE_INVALID_DATE2, TEST_FETCH_RESPONSE_INVALID_DATE3, TEST_FETCH_RESPONSE_MULTIPART, + TEST_FETCH_RESPONSE_MULTIPART_BASE64, + TEST_FETCH_RESPONSE_MULTIPART_BASE64_INVALID, TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM, TEST_FETCH_RESPONSE_TEXT_BARE, TEST_FETCH_RESPONSE_TEXT_OTHER, @@ -110,6 +113,7 @@ async def test_entry_startup_fails( (TEST_FETCH_RESPONSE_TEXT_OTHER, True), (TEST_FETCH_RESPONSE_HTML, True), (TEST_FETCH_RESPONSE_MULTIPART, True), + (TEST_FETCH_RESPONSE_MULTIPART_BASE64, True), (TEST_FETCH_RESPONSE_BINARY, True), ], ids=[ @@ -122,6 +126,7 @@ async def test_entry_startup_fails( "other", "html", "multipart", + "multipart_base64", "binary", ], ) @@ -154,7 +159,7 @@ async def test_receiving_message_successfully( assert data["folder"] == "INBOX" assert data["sender"] == "john.doe@example.com" assert data["subject"] == "Test subject" - assert data["text"] + assert "Test body" in data["text"] assert ( valid_date and isinstance(data["date"], datetime) @@ -163,6 +168,48 @@ async def test_receiving_message_successfully( ) +@pytest.mark.parametrize("imap_search", [TEST_SEARCH_RESPONSE]) +@pytest.mark.parametrize( + ("imap_fetch"), + [ + TEST_FETCH_RESPONSE_MULTIPART_BASE64_INVALID, + ], + ids=[ + "multipart_base64_invalid", + ], +) +@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"]) +async def test_receiving_message_with_invalid_encoding( + hass: HomeAssistant, mock_imap_protocol: MagicMock +) -> None: + """Test receiving a message successfully.""" + event_called = async_capture_events(hass, "imap_content") + + config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG) + config_entry.add_to_hass(hass) + assert await hass.config_entries.async_setup(config_entry.entry_id) + await hass.async_block_till_done() + # Make sure we have had one update (when polling) + async_fire_time_changed(hass, utcnow() + timedelta(seconds=5)) + await hass.async_block_till_done() + state = hass.states.get("sensor.imap_email_email_com") + # we should have received one message + assert state is not None + assert state.state == "1" + assert state.attributes["state_class"] == SensorStateClass.MEASUREMENT + + # we should have received one event + assert len(event_called) == 1 + data: dict[str, Any] = event_called[0].data + assert data["server"] == "imap.server.com" + assert data["username"] == "email@email.com" + assert data["search"] == "UnSeen UnDeleted" + assert data["folder"] == "INBOX" + assert data["sender"] == "john.doe@example.com" + assert data["subject"] == "Test subject" + assert data["text"] == TEST_BADLY_ENCODED_CONTENT + + @pytest.mark.parametrize("imap_search", [TEST_SEARCH_RESPONSE]) @pytest.mark.parametrize("imap_fetch", [TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM]) @pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"]) @@ -196,7 +243,7 @@ async def test_receiving_message_no_subject_to_from( assert data["date"] == datetime( 2023, 3, 24, 13, 52, tzinfo=timezone(timedelta(seconds=3600)) ) - assert data["text"] == "Test body\r\n\r\n" + assert data["text"] == "Test body\r\n" assert data["headers"]["Return-Path"] == ("",) assert data["headers"]["Delivered-To"] == ("notify@example.com",)