mirror of
https://github.com/home-assistant/core.git
synced 2025-07-17 10:17:09 +00:00
Fix imap message part decoding (#109523)
This commit is contained in:
parent
f37db94f23
commit
7cc0b8a2fe
@ -9,7 +9,7 @@ from email.header import decode_header, make_header
|
|||||||
from email.message import Message
|
from email.message import Message
|
||||||
from email.utils import parseaddr, parsedate_to_datetime
|
from email.utils import parseaddr, parsedate_to_datetime
|
||||||
import logging
|
import logging
|
||||||
from typing import Any
|
from typing import TYPE_CHECKING, Any
|
||||||
|
|
||||||
from aioimaplib import AUTH, IMAP4_SSL, NONAUTH, SELECTED, AioImapException
|
from aioimaplib import AUTH, IMAP4_SSL, NONAUTH, SELECTED, AioImapException
|
||||||
|
|
||||||
@ -97,9 +97,8 @@ async def connect_to_server(data: Mapping[str, Any]) -> IMAP4_SSL:
|
|||||||
class ImapMessage:
|
class ImapMessage:
|
||||||
"""Class to parse an RFC822 email message."""
|
"""Class to parse an RFC822 email message."""
|
||||||
|
|
||||||
def __init__(self, raw_message: bytes, charset: str = "utf-8") -> None:
|
def __init__(self, raw_message: bytes) -> None:
|
||||||
"""Initialize IMAP message."""
|
"""Initialize IMAP message."""
|
||||||
self._charset = charset
|
|
||||||
self.email_message = email.message_from_bytes(raw_message)
|
self.email_message = email.message_from_bytes(raw_message)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@ -153,7 +152,7 @@ class ImapMessage:
|
|||||||
def text(self) -> str:
|
def text(self) -> str:
|
||||||
"""Get the message text from the email.
|
"""Get the message text from the email.
|
||||||
|
|
||||||
Will look for text/plain or use text/html if not found.
|
Will look for text/plain or use/ text/html if not found.
|
||||||
"""
|
"""
|
||||||
message_text: str | None = None
|
message_text: str | None = None
|
||||||
message_html: str | None = None
|
message_html: str | None = None
|
||||||
@ -166,8 +165,13 @@ class ImapMessage:
|
|||||||
Falls back to the raw content part if decoding fails.
|
Falls back to the raw content part if decoding fails.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
return str(part.get_payload(decode=True).decode(self._charset))
|
decoded_payload: Any = part.get_payload(decode=True)
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
assert isinstance(decoded_payload, bytes)
|
||||||
|
content_charset = part.get_content_charset() or "utf-8"
|
||||||
|
return decoded_payload.decode(content_charset)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
|
# return undecoded payload
|
||||||
return str(part.get_payload())
|
return str(part.get_payload())
|
||||||
|
|
||||||
part: Message
|
part: Message
|
||||||
@ -237,9 +241,7 @@ class ImapDataUpdateCoordinator(DataUpdateCoordinator[int | None]):
|
|||||||
"""Send a event for the last message if the last message was changed."""
|
"""Send a event for the last message if the last message was changed."""
|
||||||
response = await self.imap_client.fetch(last_message_uid, "BODY.PEEK[]")
|
response = await self.imap_client.fetch(last_message_uid, "BODY.PEEK[]")
|
||||||
if response.result == "OK":
|
if response.result == "OK":
|
||||||
message = ImapMessage(
|
message = ImapMessage(response.lines[1])
|
||||||
response.lines[1], charset=self.config_entry.data[CONF_CHARSET]
|
|
||||||
)
|
|
||||||
# Set `initial` to `False` if the last message is triggered again
|
# Set `initial` to `False` if the last message is triggered again
|
||||||
initial: bool = True
|
initial: bool = True
|
||||||
if (message_id := message.message_id) == self._last_message_id:
|
if (message_id := message.message_id) == self._last_message_id:
|
||||||
|
@ -8,6 +8,7 @@ from aioimaplib import AUTH, NONAUTH, SELECTED, AioImapException, Response
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from homeassistant.components.imap import DOMAIN
|
from homeassistant.components.imap import DOMAIN
|
||||||
|
from homeassistant.components.imap.const import CONF_CHARSET
|
||||||
from homeassistant.components.imap.errors import InvalidAuth, InvalidFolder
|
from homeassistant.components.imap.errors import InvalidAuth, InvalidFolder
|
||||||
from homeassistant.components.sensor.const import SensorStateClass
|
from homeassistant.components.sensor.const import SensorStateClass
|
||||||
from homeassistant.const import STATE_UNAVAILABLE
|
from homeassistant.const import STATE_UNAVAILABLE
|
||||||
@ -131,13 +132,16 @@ async def test_entry_startup_fails(
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
|
@pytest.mark.parametrize("imap_has_capability", [True, False], ids=["push", "poll"])
|
||||||
|
@pytest.mark.parametrize("charset", ["utf-8", "us-ascii"], ids=["utf-8", "us-ascii"])
|
||||||
async def test_receiving_message_successfully(
|
async def test_receiving_message_successfully(
|
||||||
hass: HomeAssistant, mock_imap_protocol: MagicMock, valid_date: bool
|
hass: HomeAssistant, mock_imap_protocol: MagicMock, valid_date: bool, charset: str
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Test receiving a message successfully."""
|
"""Test receiving a message successfully."""
|
||||||
event_called = async_capture_events(hass, "imap_content")
|
event_called = async_capture_events(hass, "imap_content")
|
||||||
|
|
||||||
config_entry = MockConfigEntry(domain=DOMAIN, data=MOCK_CONFIG)
|
config = MOCK_CONFIG.copy()
|
||||||
|
config[CONF_CHARSET] = charset
|
||||||
|
config_entry = MockConfigEntry(domain=DOMAIN, data=config)
|
||||||
config_entry.add_to_hass(hass)
|
config_entry.add_to_hass(hass)
|
||||||
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
assert await hass.config_entries.async_setup(config_entry.entry_id)
|
||||||
await hass.async_block_till_done()
|
await hass.async_block_till_done()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user