Fix imap does not decode text body correctly (#104217)

This commit is contained in:
Jan Bouwhuis 2023-11-19 20:15:02 +01:00 committed by GitHub
parent 1ca95965b6
commit 9a38e23f28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 153 additions and 23 deletions

View File

@ -6,6 +6,7 @@ from collections.abc import Mapping
from datetime import datetime, timedelta
import email
from email.header import decode_header, make_header
from email.message import Message
from email.utils import parseaddr, parsedate_to_datetime
import logging
from typing import Any
@ -96,8 +97,9 @@ async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
class ImapMessage:
"""Class to parse an RFC822 email message."""
def __init__(self, raw_message: bytes) -> None:
def __init__(self, raw_message: bytes, charset: str = "utf-8") -> None:
"""Initialize IMAP message."""
self._charset = charset
self.email_message = email.message_from_bytes(raw_message)
@property
@ -157,18 +159,30 @@ class ImapMessage:
message_html: str | None = None
message_untyped_text: str | None = None
def _decode_payload(part: Message) -> str:
"""Try to decode text payloads.
Common text encodings are quoted-printable or base64.
Falls back to the raw content part if decoding fails.
"""
try:
return str(part.get_payload(decode=True).decode(self._charset))
except Exception: # pylint: disable=broad-except
return str(part.get_payload())
part: Message
for part in self.email_message.walk():
if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN:
if message_text is None:
message_text = part.get_payload()
message_text = _decode_payload(part)
elif part.get_content_type() == "text/html":
if message_html is None:
message_html = part.get_payload()
message_html = _decode_payload(part)
elif (
part.get_content_type().startswith("text")
and message_untyped_text is None
):
message_untyped_text = part.get_payload()
message_untyped_text = str(part.get_payload())
if message_text is not None:
return message_text
@ -223,7 +237,9 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
"""Send a event for the last message if the last message was changed."""
response = await self.imap_client.fetch(last_message_uid, "BODY.PEEK[]")
if response.result == "OK":
message = ImapMessage(response.lines[1])
message = ImapMessage(
response.lines[1], charset=self.config_entry.data[CONF_CHARSET]
)
# Set `initial` to `False` if the last message is triggered again
initial: bool = True
if (message_id := message.message_id) == self._last_message_id:

View File

@ -18,16 +18,25 @@ TEST_MESSAGE_HEADERS1 = (
b"for <notify@example.com>; Fri, 24 Mar 2023 13:52:01 +0100 (CET)\r\n"
)
TEST_MESSAGE_HEADERS2 = (
b"MIME-Version: 1.0\r\n"
b"To: notify@example.com\r\n"
b"From: John Doe <john.doe@example.com>\r\n"
b"Subject: Test subject\r\n"
b"Message-ID: <N753P9hLvLw3lYGan11ji9WggPjxtLSpKvFOYgdnE@example.com>"
b"Message-ID: <N753P9hLvLw3lYGan11ji9WggPjxtLSpKvFOYgdnE@example.com>\r\n"
b"MIME-Version: 1.0\r\n"
)
TEST_MULTIPART_HEADER = (
b'Content-Type: multipart/related;\r\n\tboundary="Mark=_100584970350292485166"'
)
TEST_MESSAGE_HEADERS3 = b""
TEST_MESSAGE = TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS2
TEST_MESSAGE_MULTIPART = (
TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS2 + TEST_MULTIPART_HEADER
)
TEST_MESSAGE_NO_SUBJECT_TO_FROM = (
TEST_MESSAGE_HEADERS1 + DATE_HEADER1 + TEST_MESSAGE_HEADERS3
)
@ -44,21 +53,27 @@ TEST_INVALID_DATE3 = (
TEST_CONTENT_TEXT_BARE = b"\r\nTest body\r\n\r\n"
TEST_CONTENT_BINARY = (
b"Content-Type: application/binary\r\n"
b"Content-Transfer-Encoding: base64\r\n"
b"\r\n"
b"VGVzdCBib2R5\r\n"
)
TEST_CONTENT_BINARY = b"Content-Type: application/binary\r\n\r\nTest body\r\n"
TEST_CONTENT_TEXT_PLAIN = (
b"Content-Type: text/plain; charset=UTF-8; format=flowed\r\n"
b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n\r\n"
b'Content-Type: text/plain; charset="utf-8"\r\n'
b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n"
)
TEST_CONTENT_TEXT_BASE64 = (
b'Content-Type: text/plain; charset="utf-8"\r\n'
b"Content-Transfer-Encoding: base64\r\n\r\nVGVzdCBib2R5\r\n"
)
TEST_CONTENT_TEXT_BASE64_INVALID = (
b'Content-Type: text/plain; charset="utf-8"\r\n'
b"Content-Transfer-Encoding: base64\r\n\r\nVGVzdCBib2R5invalid\r\n"
)
TEST_BADLY_ENCODED_CONTENT = "VGVzdCBib2R5invalid\r\n"
TEST_CONTENT_TEXT_OTHER = (
b"Content-Type: text/other; charset=UTF-8\r\n"
b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n\r\n"
b"Content-Transfer-Encoding: 7bit\r\n\r\nTest body\r\n"
)
TEST_CONTENT_HTML = (
@ -76,14 +91,40 @@ TEST_CONTENT_HTML = (
b"</html>\r\n"
b"\r\n"
)
TEST_CONTENT_HTML_BASE64 = (
b"Content-Type: text/html; charset=UTF-8\r\n"
b"Content-Transfer-Encoding: base64\r\n\r\n"
b"PGh0bWw+CiAgICA8aGVhZD48bWV0YSBodHRwLWVxdW"
b"l2PSJjb250ZW50LXR5cGUiIGNvbnRlbnQ9InRleHQvaHRtbDsgY2hhcnNldD1VVEYtOCI+PC9oZWFkPgog"
b"CAgPGJvZHk+CiAgICAgIDxwPlRlc3QgYm9keTxicj48L3A+CiAgICA8L2JvZHk+CjwvaHRtbD4=\r\n"
)
TEST_CONTENT_MULTIPART = (
b"\r\nThis is a multi-part message in MIME format.\r\n"
+ b"--------------McwBciN2C0o3rWeF1tmFo2oI\r\n"
+ b"\r\n--Mark=_100584970350292485166\r\n"
+ TEST_CONTENT_TEXT_PLAIN
+ b"--------------McwBciN2C0o3rWeF1tmFo2oI\r\n"
+ b"\r\n--Mark=_100584970350292485166\r\n"
+ TEST_CONTENT_HTML
+ b"--------------McwBciN2C0o3rWeF1tmFo2oI--\r\n"
+ b"\r\n--Mark=_100584970350292485166--\r\n"
)
TEST_CONTENT_MULTIPART_BASE64 = (
b"\r\nThis is a multi-part message in MIME format.\r\n"
+ b"\r\n--Mark=_100584970350292485166\r\n"
+ TEST_CONTENT_TEXT_BASE64
+ b"\r\n--Mark=_100584970350292485166\r\n"
+ TEST_CONTENT_HTML_BASE64
+ b"\r\n--Mark=_100584970350292485166--\r\n"
)
TEST_CONTENT_MULTIPART_BASE64_INVALID = (
b"\r\nThis is a multi-part message in MIME format.\r\n"
+ b"\r\n--Mark=_100584970350292485166\r\n"
+ TEST_CONTENT_TEXT_BASE64_INVALID
+ b"\r\n--Mark=_100584970350292485166\r\n"
+ TEST_CONTENT_HTML_BASE64
+ b"\r\n--Mark=_100584970350292485166--\r\n"
)
EMPTY_SEARCH_RESPONSE = ("OK", [b"", b"Search completed (0.0001 + 0.000 secs)."])
@ -202,14 +243,40 @@ TEST_FETCH_RESPONSE_MULTIPART = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE + TEST_CONTENT_MULTIPART)).encode("utf-8")
+ str(len(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART)).encode("utf-8")
+ b"}",
bytearray(TEST_MESSAGE + TEST_CONTENT_MULTIPART),
bytearray(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_MULTIPART_BASE64 = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(len(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART_BASE64)).encode(
"utf-8"
)
+ b"}",
bytearray(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART_BASE64),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_MULTIPART_BASE64_INVALID = (
"OK",
[
b"1 FETCH (BODY[] {"
+ str(
len(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART_BASE64_INVALID)
).encode("utf-8")
+ b"}",
bytearray(TEST_MESSAGE_MULTIPART + TEST_CONTENT_MULTIPART_BASE64_INVALID),
b")",
b"Fetch completed (0.0001 + 0.000 secs).",
],
)
TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM = (
"OK",

View File

@ -17,12 +17,15 @@ from homeassistant.util.dt import utcnow
from .const import (
BAD_RESPONSE,
EMPTY_SEARCH_RESPONSE,
TEST_BADLY_ENCODED_CONTENT,
TEST_FETCH_RESPONSE_BINARY,
TEST_FETCH_RESPONSE_HTML,
TEST_FETCH_RESPONSE_INVALID_DATE1,
TEST_FETCH_RESPONSE_INVALID_DATE2,
TEST_FETCH_RESPONSE_INVALID_DATE3,
TEST_FETCH_RESPONSE_MULTIPART,
TEST_FETCH_RESPONSE_MULTIPART_BASE64,
TEST_FETCH_RESPONSE_MULTIPART_BASE64_INVALID,
TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM,
TEST_FETCH_RESPONSE_TEXT_BARE,
TEST_FETCH_RESPONSE_TEXT_OTHER,
@ -110,6 +113,7 @@ async def test_entry_startup_fails(
(TEST_FETCH_RESPONSE_TEXT_OTHER, True),
(TEST_FETCH_RESPONSE_HTML, True),
(TEST_FETCH_RESPONSE_MULTIPART, True),
(TEST_FETCH_RESPONSE_MULTIPART_BASE64, True),
(TEST_FETCH_RESPONSE_BINARY, True),
],
ids=[
@ -122,6 +126,7 @@ async def test_entry_startup_fails(
"other",
"html",
"multipart",
"multipart_base64",
"binary",
],
)
@ -154,7 +159,7 @@ async def test_receiving_message_successfully(
assert data["folder"] == "INBOX"
assert data["sender"] == "john.doe@example.com"
assert data["subject"] == "Test subject"
assert data["text"]
assert "Test body" in data["text"]
assert (
valid_date
and isinstance(data["date"], datetime)
@ -163,6 +168,48 @@ async def test_receiving_message_successfully(
)
@pytest.mark.parametrize("imap_search", [TEST_SEARCH_RESPONSE])
@pytest.mark.parametrize(
("imap_fetch"),
[
TEST_FETCH_RESPONSE_MULTIPART_BASE64_INVALID,
],
ids=[
"multipart_base64_invalid",
],
)
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
async def test_receiving_message_with_invalid_encoding(
hass: HomeAssistant, mock_imap_protocol: MagicMock
) -> None:
"""Test receiving a message successfully."""
event_called = async_capture_events(hass, "imap_content")
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
# Make sure we have had one update (when polling)
async_fire_time_changed(hass, utcnow() + timedelta(seconds=5))
await hass.async_block_till_done()
state = hass.states.get("sensor.imap_email_email_com")
# we should have received one message
assert state is not None
assert state.state == "1"
assert state.attributes["state_class"] == SensorStateClass.MEASUREMENT
# we should have received one event
assert len(event_called) == 1
data: dict[str, Any] = event_called[0].data
assert data["server"] == "imap.server.com"
assert data["username"] == "email@email.com"
assert data["search"] == "UnSeen UnDeleted"
assert data["folder"] == "INBOX"
assert data["sender"] == "john.doe@example.com"
assert data["subject"] == "Test subject"
assert data["text"] == TEST_BADLY_ENCODED_CONTENT
@pytest.mark.parametrize("imap_search", [TEST_SEARCH_RESPONSE])
@pytest.mark.parametrize("imap_fetch", [TEST_FETCH_RESPONSE_NO_SUBJECT_TO_FROM])
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
@ -196,7 +243,7 @@ async def test_receiving_message_no_subject_to_from(
assert data["date"] == datetime(
2023, 3, 24, 13, 52, tzinfo=timezone(timedelta(seconds=3600))
)
assert data["text"] == "Test body\r\n\r\n"
assert data["text"] == "Test body\r\n"
assert data["headers"]["Return-Path"] == ("<john.doe@example.com>",)
assert data["headers"]["Delivered-To"] == ("notify@example.com",)