diff --git a/.coveragerc b/.coveragerc index f2231ea31c2..d28878d8861 100644 --- a/.coveragerc +++ b/.coveragerc @@ -547,6 +547,7 @@ omit = homeassistant/components/ifttt/alarm_control_panel.py homeassistant/components/iglo/light.py homeassistant/components/ihc/* + homeassistant/components/imap_email_content/sensor.py homeassistant/components/incomfort/* homeassistant/components/insteon/binary_sensor.py homeassistant/components/insteon/climate.py diff --git a/homeassistant/components/imap/config_flow.py b/homeassistant/components/imap/config_flow.py index 70594d5fd7c..4c4a2e2a35c 100644 --- a/homeassistant/components/imap/config_flow.py +++ b/homeassistant/components/imap/config_flow.py @@ -10,7 +10,13 @@ from aioimaplib import AioImapException import voluptuous as vol from homeassistant import config_entries -from homeassistant.const import CONF_PASSWORD, CONF_PORT, CONF_USERNAME, CONF_VERIFY_SSL +from homeassistant.const import ( + CONF_NAME, + CONF_PASSWORD, + CONF_PORT, + CONF_USERNAME, + CONF_VERIFY_SSL, +) from homeassistant.core import HomeAssistant, callback from homeassistant.data_entry_flow import AbortFlow, FlowResult from homeassistant.helpers import config_validation as cv @@ -126,6 +132,28 @@ class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): VERSION = 1 _reauth_entry: config_entries.ConfigEntry | None + async def async_step_import(self, user_input: dict[str, Any]) -> FlowResult: + """Handle the import from imap_email_content integration.""" + data = CONFIG_SCHEMA( + { + CONF_SERVER: user_input[CONF_SERVER], + CONF_PORT: user_input[CONF_PORT], + CONF_USERNAME: user_input[CONF_USERNAME], + CONF_PASSWORD: user_input[CONF_PASSWORD], + CONF_FOLDER: user_input[CONF_FOLDER], + } + ) + self._async_abort_entries_match( + { + key: data[key] + for key in (CONF_USERNAME, CONF_SERVER, CONF_FOLDER, CONF_SEARCH) + } + ) + title = user_input[CONF_NAME] + if await validate_input(self.hass, data): + raise AbortFlow("cannot_connect") + return self.async_create_entry(title=title, data=data) + async def async_step_user( self, user_input: dict[str, Any] | None = None ) -> FlowResult: diff --git a/homeassistant/components/imap_email_content/__init__.py b/homeassistant/components/imap_email_content/__init__.py new file mode 100644 index 00000000000..f2041b947df --- /dev/null +++ b/homeassistant/components/imap_email_content/__init__.py @@ -0,0 +1,17 @@ +"""The imap_email_content component.""" + +from homeassistant.const import Platform +from homeassistant.core import HomeAssistant +from homeassistant.helpers import config_validation as cv +from homeassistant.helpers.typing import ConfigType + +from .const import DOMAIN + +PLATFORMS = [Platform.SENSOR] + +CONFIG_SCHEMA = cv.deprecated(DOMAIN, raise_if_present=False) + + +async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: + """Set up imap_email_content.""" + return True diff --git a/homeassistant/components/imap_email_content/const.py b/homeassistant/components/imap_email_content/const.py new file mode 100644 index 00000000000..5f1c653030e --- /dev/null +++ b/homeassistant/components/imap_email_content/const.py @@ -0,0 +1,13 @@ +"""Constants for the imap email content integration.""" + +DOMAIN = "imap_email_content" + +CONF_SERVER = "server" +CONF_SENDERS = "senders" +CONF_FOLDER = "folder" + +ATTR_FROM = "from" +ATTR_BODY = "body" +ATTR_SUBJECT = "subject" + +DEFAULT_PORT = 993 diff --git a/homeassistant/components/imap_email_content/manifest.json b/homeassistant/components/imap_email_content/manifest.json new file mode 100644 index 00000000000..b7d0589b83f --- /dev/null +++ b/homeassistant/components/imap_email_content/manifest.json @@ -0,0 +1,8 @@ +{ + "domain": "imap_email_content", + "name": "IMAP Email Content", + "codeowners": [], + "dependencies": ["imap"], + "documentation": "https://www.home-assistant.io/integrations/imap_email_content", + "iot_class": "cloud_push" +} diff --git a/homeassistant/components/imap_email_content/repairs.py b/homeassistant/components/imap_email_content/repairs.py new file mode 100644 index 00000000000..f19b0499040 --- /dev/null +++ b/homeassistant/components/imap_email_content/repairs.py @@ -0,0 +1,173 @@ +"""Repair flow for imap email content integration.""" + +from typing import Any + +import voluptuous as vol +import yaml + +from homeassistant import data_entry_flow +from homeassistant.components.imap import DOMAIN as IMAP_DOMAIN +from homeassistant.components.repairs import RepairsFlow +from homeassistant.config_entries import SOURCE_IMPORT +from homeassistant.const import ( + CONF_NAME, + CONF_PASSWORD, + CONF_PORT, + CONF_USERNAME, + CONF_VALUE_TEMPLATE, +) +from homeassistant.core import HomeAssistant, callback +from homeassistant.data_entry_flow import FlowResultType +from homeassistant.helpers import issue_registry as ir +from homeassistant.helpers.typing import ConfigType + +from .const import CONF_FOLDER, CONF_SENDERS, CONF_SERVER, DOMAIN + + +async def async_process_issue(hass: HomeAssistant, config: ConfigType) -> None: + """Register an issue and suggest new config.""" + + name: str = config.get(CONF_NAME) or config[CONF_USERNAME] + + issue_id = ( + f"{name}_{config[CONF_USERNAME]}_{config[CONF_SERVER]}_{config[CONF_FOLDER]}" + ) + + if CONF_VALUE_TEMPLATE in config: + template: str = config[CONF_VALUE_TEMPLATE].template + template = template.replace("subject", 'trigger.event.data["subject"]') + template = template.replace("from", 'trigger.event.data["sender"]') + template = template.replace("date", 'trigger.event.data["date"]') + template = template.replace("body", 'trigger.event.data["text"]') + else: + template = '{{ trigger.event.data["subject"] }}' + + template_sensor_config: ConfigType = { + "template": [ + { + "trigger": [ + { + "id": "custom_event", + "platform": "event", + "event_type": "imap_content", + "event_data": {"sender": config[CONF_SENDERS][0]}, + } + ], + "sensor": [ + { + "state": template, + "name": name, + } + ], + } + ] + } + + data = { + CONF_SERVER: config[CONF_SERVER], + CONF_PORT: config[CONF_PORT], + CONF_USERNAME: config[CONF_USERNAME], + CONF_PASSWORD: config[CONF_PASSWORD], + CONF_FOLDER: config[CONF_FOLDER], + } + data[CONF_VALUE_TEMPLATE] = template + data[CONF_NAME] = name + placeholders = {"yaml_example": yaml.dump(template_sensor_config)} + placeholders.update(data) + + ir.async_create_issue( + hass, + DOMAIN, + issue_id, + breaks_in_ha_version="2023.10.0", + is_fixable=True, + severity=ir.IssueSeverity.WARNING, + translation_key="migration", + translation_placeholders=placeholders, + data=data, + ) + + +class DeprecationRepairFlow(RepairsFlow): + """Handler for an issue fixing flow.""" + + def __init__(self, issue_id: str, config: ConfigType) -> None: + """Create flow.""" + self._name: str = config[CONF_NAME] + self._config: dict[str, Any] = config + self._issue_id = issue_id + super().__init__() + + async def async_step_init( + self, user_input: dict[str, str] | None = None + ) -> data_entry_flow.FlowResult: + """Handle the first step of a fix flow.""" + return await self.async_step_start() + + @callback + def _async_get_placeholders(self) -> dict[str, str] | None: + issue_registry = ir.async_get(self.hass) + description_placeholders = None + if issue := issue_registry.async_get_issue(self.handler, self.issue_id): + description_placeholders = issue.translation_placeholders + + return description_placeholders + + async def async_step_start( + self, user_input: dict[str, str] | None = None + ) -> data_entry_flow.FlowResult: + """Wait for the user to start the config migration.""" + placeholders = self._async_get_placeholders() + if user_input is None: + return self.async_show_form( + step_id="start", + data_schema=vol.Schema({}), + description_placeholders=placeholders, + ) + + return await self.async_step_confirm() + + async def async_step_confirm( + self, user_input: dict[str, str] | None = None + ) -> data_entry_flow.FlowResult: + """Handle the confirm step of a fix flow.""" + placeholders = self._async_get_placeholders() + if user_input is not None: + user_input[CONF_NAME] = self._name + result = await self.hass.config_entries.flow.async_init( + IMAP_DOMAIN, context={"source": SOURCE_IMPORT}, data=self._config + ) + if result["type"] == FlowResultType.ABORT: + ir.async_delete_issue(self.hass, DOMAIN, self._issue_id) + ir.async_create_issue( + self.hass, + DOMAIN, + self._issue_id, + breaks_in_ha_version="2023.10.0", + is_fixable=False, + severity=ir.IssueSeverity.WARNING, + translation_key="deprecation", + translation_placeholders=placeholders, + data=self._config, + learn_more_url="https://www.home-assistant.io/integrations/imap/#using-events", + ) + return self.async_abort(reason=result["reason"]) + return self.async_create_entry( + title="", + data={}, + ) + + return self.async_show_form( + step_id="confirm", + data_schema=vol.Schema({}), + description_placeholders=placeholders, + ) + + +async def async_create_fix_flow( + hass: HomeAssistant, + issue_id: str, + data: dict[str, str | int | float | None], +) -> RepairsFlow: + """Create flow.""" + return DeprecationRepairFlow(issue_id, data) diff --git a/homeassistant/components/imap_email_content/sensor.py b/homeassistant/components/imap_email_content/sensor.py new file mode 100644 index 00000000000..1df207e2968 --- /dev/null +++ b/homeassistant/components/imap_email_content/sensor.py @@ -0,0 +1,302 @@ +"""Email sensor support.""" +from __future__ import annotations + +from collections import deque +import datetime +import email +import imaplib +import logging + +import voluptuous as vol + +from homeassistant.components.sensor import PLATFORM_SCHEMA, SensorEntity +from homeassistant.const import ( + ATTR_DATE, + CONF_NAME, + CONF_PASSWORD, + CONF_PORT, + CONF_USERNAME, + CONF_VALUE_TEMPLATE, + CONF_VERIFY_SSL, + CONTENT_TYPE_TEXT_PLAIN, +) +from homeassistant.core import HomeAssistant +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType +from homeassistant.util.ssl import client_context + +from .const import ( + ATTR_BODY, + ATTR_FROM, + ATTR_SUBJECT, + CONF_FOLDER, + CONF_SENDERS, + CONF_SERVER, + DEFAULT_PORT, +) +from .repairs import async_process_issue + +_LOGGER = logging.getLogger(__name__) + +PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend( + { + vol.Optional(CONF_NAME): cv.string, + vol.Required(CONF_USERNAME): cv.string, + vol.Required(CONF_PASSWORD): cv.string, + vol.Required(CONF_SERVER): cv.string, + vol.Required(CONF_SENDERS): [cv.string], + vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port, + vol.Optional(CONF_VALUE_TEMPLATE): cv.template, + vol.Optional(CONF_FOLDER, default="INBOX"): cv.string, + vol.Optional(CONF_VERIFY_SSL, default=True): cv.boolean, + } +) + + +def setup_platform( + hass: HomeAssistant, + config: ConfigType, + add_entities: AddEntitiesCallback, + discovery_info: DiscoveryInfoType | None = None, +) -> None: + """Set up the Email sensor platform.""" + reader = EmailReader( + config[CONF_USERNAME], + config[CONF_PASSWORD], + config[CONF_SERVER], + config[CONF_PORT], + config[CONF_FOLDER], + config[CONF_VERIFY_SSL], + ) + + if (value_template := config.get(CONF_VALUE_TEMPLATE)) is not None: + value_template.hass = hass + sensor = EmailContentSensor( + hass, + reader, + config.get(CONF_NAME) or config[CONF_USERNAME], + config[CONF_SENDERS], + value_template, + ) + + hass.add_job(async_process_issue, hass, config) + + if sensor.connected: + add_entities([sensor], True) + + +class EmailReader: + """A class to read emails from an IMAP server.""" + + def __init__(self, user, password, server, port, folder, verify_ssl): + """Initialize the Email Reader.""" + self._user = user + self._password = password + self._server = server + self._port = port + self._folder = folder + self._verify_ssl = verify_ssl + self._last_id = None + self._last_message = None + self._unread_ids = deque([]) + self.connection = None + + @property + def last_id(self) -> int | None: + """Return last email uid that was processed.""" + return self._last_id + + @property + def last_unread_id(self) -> int | None: + """Return last email uid received.""" + # We assume the last id in the list is the last unread id + # We cannot know if that is the newest one, because it could arrive later + # https://stackoverflow.com/questions/12409862/python-imap-the-order-of-uids + if self._unread_ids: + return int(self._unread_ids[-1]) + return self._last_id + + def connect(self): + """Login and setup the connection.""" + ssl_context = client_context() if self._verify_ssl else None + try: + self.connection = imaplib.IMAP4_SSL( + self._server, self._port, ssl_context=ssl_context + ) + self.connection.login(self._user, self._password) + return True + except imaplib.IMAP4.error: + _LOGGER.error("Failed to login to %s", self._server) + return False + + def _fetch_message(self, message_uid): + """Get an email message from a message id.""" + _, message_data = self.connection.uid("fetch", message_uid, "(RFC822)") + + if message_data is None: + return None + if message_data[0] is None: + return None + raw_email = message_data[0][1] + email_message = email.message_from_bytes(raw_email) + return email_message + + def read_next(self): + """Read the next email from the email server.""" + try: + self.connection.select(self._folder, readonly=True) + + if self._last_id is None: + # search for today and yesterday + time_from = datetime.datetime.now() - datetime.timedelta(days=1) + search = f"SINCE {time_from:%d-%b-%Y}" + else: + search = f"UID {self._last_id}:*" + + _, data = self.connection.uid("search", None, search) + self._unread_ids = deque(data[0].split()) + while self._unread_ids: + message_uid = self._unread_ids.popleft() + if self._last_id is None or int(message_uid) > self._last_id: + self._last_id = int(message_uid) + self._last_message = self._fetch_message(message_uid) + return self._last_message + + except imaplib.IMAP4.error: + _LOGGER.info("Connection to %s lost, attempting to reconnect", self._server) + try: + self.connect() + _LOGGER.info( + "Reconnect to %s succeeded, trying last message", self._server + ) + if self._last_id is not None: + return self._fetch_message(str(self._last_id)) + except imaplib.IMAP4.error: + _LOGGER.error("Failed to reconnect") + + return None + + +class EmailContentSensor(SensorEntity): + """Representation of an EMail sensor.""" + + def __init__(self, hass, email_reader, name, allowed_senders, value_template): + """Initialize the sensor.""" + self.hass = hass + self._email_reader = email_reader + self._name = name + self._allowed_senders = [sender.upper() for sender in allowed_senders] + self._value_template = value_template + self._last_id = None + self._message = None + self._state_attributes = None + self.connected = self._email_reader.connect() + + @property + def name(self): + """Return the name of the sensor.""" + return self._name + + @property + def native_value(self): + """Return the current email state.""" + return self._message + + @property + def extra_state_attributes(self): + """Return other state attributes for the message.""" + return self._state_attributes + + def render_template(self, email_message): + """Render the message template.""" + variables = { + ATTR_FROM: EmailContentSensor.get_msg_sender(email_message), + ATTR_SUBJECT: EmailContentSensor.get_msg_subject(email_message), + ATTR_DATE: email_message["Date"], + ATTR_BODY: EmailContentSensor.get_msg_text(email_message), + } + return self._value_template.render(variables, parse_result=False) + + def sender_allowed(self, email_message): + """Check if the sender is in the allowed senders list.""" + return EmailContentSensor.get_msg_sender(email_message).upper() in ( + sender for sender in self._allowed_senders + ) + + @staticmethod + def get_msg_sender(email_message): + """Get the parsed message sender from the email.""" + return str(email.utils.parseaddr(email_message["From"])[1]) + + @staticmethod + def get_msg_subject(email_message): + """Decode the message subject.""" + decoded_header = email.header.decode_header(email_message["Subject"]) + header = email.header.make_header(decoded_header) + return str(header) + + @staticmethod + def get_msg_text(email_message): + """Get the message text from the email. + + Will look for text/plain or use text/html if not found. + """ + message_text = None + message_html = None + message_untyped_text = None + + for part in email_message.walk(): + if part.get_content_type() == CONTENT_TYPE_TEXT_PLAIN: + if message_text is None: + message_text = part.get_payload() + elif part.get_content_type() == "text/html": + if message_html is None: + message_html = part.get_payload() + elif ( + part.get_content_type().startswith("text") + and message_untyped_text is None + ): + message_untyped_text = part.get_payload() + + if message_text is not None: + return message_text + + if message_html is not None: + return message_html + + if message_untyped_text is not None: + return message_untyped_text + + return email_message.get_payload() + + def update(self) -> None: + """Read emails and publish state change.""" + email_message = self._email_reader.read_next() + while ( + self._last_id is None or self._last_id != self._email_reader.last_unread_id + ): + if email_message is None: + self._message = None + self._state_attributes = {} + return + + self._last_id = self._email_reader.last_id + + if self.sender_allowed(email_message): + message = EmailContentSensor.get_msg_subject(email_message) + + if self._value_template is not None: + message = self.render_template(email_message) + + self._message = message + self._state_attributes = { + ATTR_FROM: EmailContentSensor.get_msg_sender(email_message), + ATTR_SUBJECT: EmailContentSensor.get_msg_subject(email_message), + ATTR_DATE: email_message["Date"], + ATTR_BODY: EmailContentSensor.get_msg_text(email_message), + } + + if self._last_id == self._email_reader.last_unread_id: + break + email_message = self._email_reader.read_next() diff --git a/homeassistant/components/imap_email_content/strings.json b/homeassistant/components/imap_email_content/strings.json new file mode 100644 index 00000000000..b7b987b1212 --- /dev/null +++ b/homeassistant/components/imap_email_content/strings.json @@ -0,0 +1,27 @@ +{ + "issues": { + "deprecation": { + "title": "The IMAP email content integration is deprecated", + "description": "The IMAP email content integration is deprecated. Your IMAP server configuration was already migrated to the [imap integration](https://my.home-assistant.io/redirect/config_flow_start?domain=imap). To set up a sensor for the IMAP email content, set up a template sensor with the config:\n\n```yaml\n{yaml_example}```\n\nPlease remove the deprecated `imap_email_plaform` sensor configuration from your `configuration.yaml`.\n\nNote that the event filter only filters on the first of the configured allowed senders, customize the filter if needed.\n\nYou can skip this part if you have already set up a template sensor." + }, + "migration": { + "title": "The IMAP email content integration needs attention", + "fix_flow": { + "step": { + "start": { + "title": "Migrate your IMAP email configuration", + "description": "The IMAP email content integration is deprecated. Your IMAP server configuration can be migrated automatically to the [imap integration](https://my.home-assistant.io/redirect/config_flow_start?domain=imap), this will enable using a custom `imap` event trigger. To set up a sensor that has an IMAP content state, a template sensor can be used. Remove the `imap_email_plaform` sensor configuration from your `configuration.yaml` after migration.\n\nSubmit to start migration of your IMAP server configuration to the `imap` integration." + }, + "confirm": { + "title": "Your IMAP server settings will be migrated", + "description": "In this step an `imap` config entry will be set up with the following configuration:\n\n```text\nServer\t{server}\nPort\t{port}\nUsername\t{username}\nPassword\t*****\nFolder\t{folder}\n```\n\nSee also: (https://www.home-assistant.io/integrations/imap/)\n\nFitering configuration on allowed `sender` is part of the template sensor config that can copied and placed in your `configuration.yaml.\n\nNote that the event filter only filters on the first of the configured allowed senders, customize the filter if needed.\n\n```yaml\n{yaml_example}```\nDo not forget to cleanup the your `configuration.yaml` after migration.\n\nSubmit to migrate your IMAP server configuration to an `imap` configuration entry." + } + }, + "abort": { + "already_configured": "The IMAP server config was already migrated to the imap integration. Remove the `imap_email_plaform` sensor configuration from your `configuration.yaml`.", + "cannot_connect": "Migration failed. Failed to connect to the IMAP server. Perform a manual migration." + } + } + } + } +} diff --git a/homeassistant/generated/integrations.json b/homeassistant/generated/integrations.json index 39c7a82ce55..379dd112672 100644 --- a/homeassistant/generated/integrations.json +++ b/homeassistant/generated/integrations.json @@ -2581,6 +2581,12 @@ "config_flow": true, "iot_class": "cloud_push" }, + "imap_email_content": { + "name": "IMAP Email Content", + "integration_type": "hub", + "config_flow": false, + "iot_class": "cloud_push" + }, "incomfort": { "name": "Intergas InComfort/Intouch Lan2RF gateway", "integration_type": "hub", diff --git a/tests/components/imap/test_config_flow.py b/tests/components/imap/test_config_flow.py index d36cffbce06..efb505cda77 100644 --- a/tests/components/imap/test_config_flow.py +++ b/tests/components/imap/test_config_flow.py @@ -469,6 +469,73 @@ async def test_advanced_options_form( assert assert_result == data_entry_flow.FlowResultType.FORM +async def test_import_flow_success(hass: HomeAssistant) -> None: + """Test a successful import of yaml.""" + with patch( + "homeassistant.components.imap.config_flow.connect_to_server" + ) as mock_client, patch( + "homeassistant.components.imap.async_setup_entry", + return_value=True, + ) as mock_setup_entry: + mock_client.return_value.search.return_value = ( + "OK", + [b""], + ) + result2 = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_IMPORT}, + data={ + "name": "IMAP", + "username": "email@email.com", + "password": "password", + "server": "imap.server.com", + "port": 993, + "folder": "INBOX", + }, + ) + await hass.async_block_till_done() + + assert result2["type"] == FlowResultType.CREATE_ENTRY + assert result2["title"] == "IMAP" + assert result2["data"] == { + "username": "email@email.com", + "password": "password", + "server": "imap.server.com", + "port": 993, + "charset": "utf-8", + "folder": "INBOX", + "search": "UnSeen UnDeleted", + } + assert len(mock_setup_entry.mock_calls) == 1 + + +async def test_import_flow_connection_error(hass: HomeAssistant) -> None: + """Test a successful import of yaml.""" + with patch( + "homeassistant.components.imap.config_flow.connect_to_server", + side_effect=AioImapException("Unexpected error"), + ), patch( + "homeassistant.components.imap.async_setup_entry", + return_value=True, + ): + result = await hass.config_entries.flow.async_init( + DOMAIN, + context={"source": config_entries.SOURCE_IMPORT}, + data={ + "name": "IMAP", + "username": "email@email.com", + "password": "password", + "server": "imap.server.com", + "port": 993, + "folder": "INBOX", + }, + ) + await hass.async_block_till_done() + + assert result["type"] == FlowResultType.ABORT + assert result["reason"] == "cannot_connect" + + @pytest.mark.parametrize("cipher_list", ["python_default", "modern", "intermediate"]) @pytest.mark.parametrize("verify_ssl", [False, True]) async def test_config_flow_with_cipherlist_and_ssl_verify( diff --git a/tests/components/imap_email_content/__init__.py b/tests/components/imap_email_content/__init__.py new file mode 100644 index 00000000000..2c7e5692366 --- /dev/null +++ b/tests/components/imap_email_content/__init__.py @@ -0,0 +1 @@ +"""Tests for the imap_email_content component.""" diff --git a/tests/components/imap_email_content/test_repairs.py b/tests/components/imap_email_content/test_repairs.py new file mode 100644 index 00000000000..6323dcde377 --- /dev/null +++ b/tests/components/imap_email_content/test_repairs.py @@ -0,0 +1,296 @@ +"""Test repairs for imap_email_content.""" + +from collections.abc import Generator +from http import HTTPStatus +from unittest.mock import MagicMock, patch + +import pytest + +from homeassistant.components.repairs.websocket_api import ( + RepairsFlowIndexView, + RepairsFlowResourceView, +) +from homeassistant.core import HomeAssistant +from homeassistant.setup import async_setup_component + +from tests.common import MockConfigEntry +from tests.typing import ClientSessionGenerator, WebSocketGenerator + + +@pytest.fixture +def mock_client() -> Generator[MagicMock, None, None]: + """Mock the imap client.""" + with patch( + "homeassistant.components.imap_email_content.sensor.EmailReader.read_next", + return_value=None, + ), patch("imaplib.IMAP4_SSL") as mock_imap_client: + yield mock_imap_client + + +CONFIG = { + "platform": "imap_email_content", + "name": "Notifications", + "server": "imap.example.com", + "port": 993, + "username": "john.doe@example.com", + "password": "**SECRET**", + "folder": "INBOX.Notifications", + "value_template": "{{ body }}", + "senders": ["company@example.com"], +} +DESCRIPTION_PLACEHOLDERS = { + "yaml_example": "" + "template:\n" + "- sensor:\n" + " - name: Notifications\n" + " state: '{{ trigger.event.data[\"text\"] }}'\n" + " trigger:\n - event_data:\n" + " sender: company@example.com\n" + " event_type: imap_content\n" + " id: custom_event\n" + " platform: event\n", + "server": "imap.example.com", + "port": 993, + "username": "john.doe@example.com", + "password": "**SECRET**", + "folder": "INBOX.Notifications", + "value_template": '{{ trigger.event.data["text"] }}', + "name": "Notifications", +} + +CONFIG_DEFAULT = { + "platform": "imap_email_content", + "name": "Notifications", + "server": "imap.example.com", + "port": 993, + "username": "john.doe@example.com", + "password": "**SECRET**", + "folder": "INBOX.Notifications", + "senders": ["company@example.com"], +} +DESCRIPTION_PLACEHOLDERS_DEFAULT = { + "yaml_example": "" + "template:\n" + "- sensor:\n" + " - name: Notifications\n" + " state: '{{ trigger.event.data[\"subject\"] }}'\n" + " trigger:\n - event_data:\n" + " sender: company@example.com\n" + " event_type: imap_content\n" + " id: custom_event\n" + " platform: event\n", + "server": "imap.example.com", + "port": 993, + "username": "john.doe@example.com", + "password": "**SECRET**", + "folder": "INBOX.Notifications", + "value_template": '{{ trigger.event.data["subject"] }}', + "name": "Notifications", +} + + +@pytest.mark.parametrize( + ("config", "description_placeholders"), + [ + (CONFIG, DESCRIPTION_PLACEHOLDERS), + (CONFIG_DEFAULT, DESCRIPTION_PLACEHOLDERS_DEFAULT), + ], + ids=["with_value_template", "default_subject"], +) +async def test_deprecation_repair_flow( + hass: HomeAssistant, + mock_client: MagicMock, + hass_client: ClientSessionGenerator, + hass_ws_client: WebSocketGenerator, + config: str | None, + description_placeholders: str, +) -> None: + """Test the deprecation repair flow.""" + # setup config + await async_setup_component(hass, "sensor", {"sensor": config}) + await hass.async_block_till_done() + + state = hass.states.get("sensor.notifications") + assert state is not None + + ws_client = await hass_ws_client(hass) + client = await hass_client() + + await ws_client.send_json({"id": 1, "type": "repairs/list_issues"}) + + msg = await ws_client.receive_json() + + assert msg["success"] + assert len(msg["result"]["issues"]) > 0 + issue = None + for i in msg["result"]["issues"]: + if i["domain"] == "imap_email_content": + issue = i + assert issue is not None + assert ( + issue["issue_id"] + == "Notifications_john.doe@example.com_imap.example.com_INBOX.Notifications" + ) + assert issue["is_fixable"] + url = RepairsFlowIndexView.url + resp = await client.post( + url, json={"handler": "imap_email_content", "issue_id": issue["issue_id"]} + ) + assert resp.status == HTTPStatus.OK + data = await resp.json() + + flow_id = data["flow_id"] + assert data["description_placeholders"] == description_placeholders + assert data["step_id"] == "start" + + # Apply fix + url = RepairsFlowResourceView.url.format(flow_id=flow_id) + resp = await client.post(url) + assert resp.status == HTTPStatus.OK + data = await resp.json() + + flow_id = data["flow_id"] + assert data["description_placeholders"] == description_placeholders + assert data["step_id"] == "confirm" + + url = RepairsFlowResourceView.url.format(flow_id=flow_id) + + with patch( + "homeassistant.components.imap.config_flow.connect_to_server" + ) as mock_client, patch( + "homeassistant.components.imap.async_setup_entry", + return_value=True, + ): + mock_client.return_value.search.return_value = ( + "OK", + [b""], + ) + resp = await client.post(url) + + assert resp.status == HTTPStatus.OK + data = await resp.json() + + assert data["type"] == "create_entry" + + # Assert the issue is resolved + await ws_client.send_json({"id": 2, "type": "repairs/list_issues"}) + msg = await ws_client.receive_json() + assert msg["success"] + assert len(msg["result"]["issues"]) == 0 + + +@pytest.mark.parametrize( + ("config", "description_placeholders"), + [ + (CONFIG, DESCRIPTION_PLACEHOLDERS), + (CONFIG_DEFAULT, DESCRIPTION_PLACEHOLDERS_DEFAULT), + ], + ids=["with_value_template", "default_subject"], +) +async def test_repair_flow_where_entry_already_exists( + hass: HomeAssistant, + mock_client: MagicMock, + hass_client: ClientSessionGenerator, + hass_ws_client: WebSocketGenerator, + config: str | None, + description_placeholders: str, +) -> None: + """Test the deprecation repair flow and an entry already exists.""" + + await async_setup_component(hass, "sensor", {"sensor": config}) + await hass.async_block_till_done() + state = hass.states.get("sensor.notifications") + assert state is not None + + existing_imap_entry_config = { + "username": "john.doe@example.com", + "password": "password", + "server": "imap.example.com", + "port": 993, + "charset": "utf-8", + "folder": "INBOX.Notifications", + "search": "UnSeen UnDeleted", + } + + with patch("homeassistant.components.imap.async_setup_entry", return_value=True): + imap_entry = MockConfigEntry(domain="imap", data=existing_imap_entry_config) + imap_entry.add_to_hass(hass) + await hass.config_entries.async_setup(imap_entry.entry_id) + ws_client = await hass_ws_client(hass) + client = await hass_client() + + await ws_client.send_json({"id": 1, "type": "repairs/list_issues"}) + + msg = await ws_client.receive_json() + + assert msg["success"] + assert len(msg["result"]["issues"]) > 0 + issue = None + for i in msg["result"]["issues"]: + if i["domain"] == "imap_email_content": + issue = i + assert issue is not None + assert ( + issue["issue_id"] + == "Notifications_john.doe@example.com_imap.example.com_INBOX.Notifications" + ) + assert issue["is_fixable"] + assert issue["translation_key"] == "migration" + + url = RepairsFlowIndexView.url + resp = await client.post( + url, json={"handler": "imap_email_content", "issue_id": issue["issue_id"]} + ) + assert resp.status == HTTPStatus.OK + data = await resp.json() + + flow_id = data["flow_id"] + assert data["description_placeholders"] == description_placeholders + assert data["step_id"] == "start" + + url = RepairsFlowResourceView.url.format(flow_id=flow_id) + resp = await client.post(url) + assert resp.status == HTTPStatus.OK + data = await resp.json() + + flow_id = data["flow_id"] + assert data["description_placeholders"] == description_placeholders + assert data["step_id"] == "confirm" + + url = RepairsFlowResourceView.url.format(flow_id=flow_id) + + with patch( + "homeassistant.components.imap.config_flow.connect_to_server" + ) as mock_client, patch( + "homeassistant.components.imap.async_setup_entry", + return_value=True, + ): + mock_client.return_value.search.return_value = ( + "OK", + [b""], + ) + resp = await client.post(url) + + assert resp.status == HTTPStatus.OK + data = await resp.json() + + assert data["type"] == "abort" + assert data["reason"] == "already_configured" + + # We should now have a non_fixable issue left since there is still + # a config in configuration.yaml + await ws_client.send_json({"id": 2, "type": "repairs/list_issues"}) + msg = await ws_client.receive_json() + assert msg["success"] + assert len(msg["result"]["issues"]) > 0 + issue = None + for i in msg["result"]["issues"]: + if i["domain"] == "imap_email_content": + issue = i + assert issue is not None + assert ( + issue["issue_id"] + == "Notifications_john.doe@example.com_imap.example.com_INBOX.Notifications" + ) + assert not issue["is_fixable"] + assert issue["translation_key"] == "deprecation" diff --git a/tests/components/imap_email_content/test_sensor.py b/tests/components/imap_email_content/test_sensor.py new file mode 100644 index 00000000000..3e8a6c1e282 --- /dev/null +++ b/tests/components/imap_email_content/test_sensor.py @@ -0,0 +1,253 @@ +"""The tests for the IMAP email content sensor platform.""" +from collections import deque +import datetime +import email +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +from homeassistant.components.imap_email_content import sensor as imap_email_content +from homeassistant.core import HomeAssistant +from homeassistant.helpers.event import async_track_state_change +from homeassistant.helpers.template import Template +from homeassistant.setup import async_setup_component + + +class FakeEMailReader: + """A test class for sending test emails.""" + + def __init__(self, messages) -> None: + """Set up the fake email reader.""" + self._messages = messages + self.last_id = 0 + self.last_unread_id = len(messages) + + def add_test_message(self, message): + """Add a new message.""" + self.last_unread_id += 1 + self._messages.append(message) + + def connect(self): + """Stay always Connected.""" + return True + + def read_next(self): + """Get the next email.""" + if len(self._messages) == 0: + return None + self.last_id += 1 + return self._messages.popleft() + + +async def test_integration_setup_(hass: HomeAssistant) -> None: + """Test the integration component setup is successful.""" + assert await async_setup_component(hass, "imap_email_content", {}) + + +async def test_allowed_sender(hass: HomeAssistant) -> None: + """Test emails from allowed sender.""" + test_message = email.message.Message() + test_message["From"] = "sender@test.com" + test_message["Subject"] = "Test" + test_message["Date"] = datetime.datetime(2016, 1, 1, 12, 44, 57) + test_message.set_payload("Test Message") + + sensor = imap_email_content.EmailContentSensor( + hass, + FakeEMailReader(deque([test_message])), + "test_emails_sensor", + ["sender@test.com"], + None, + ) + + sensor.entity_id = "sensor.emailtest" + sensor.async_schedule_update_ha_state(True) + await hass.async_block_till_done() + assert sensor.state == "Test" + assert sensor.extra_state_attributes["body"] == "Test Message" + assert sensor.extra_state_attributes["from"] == "sender@test.com" + assert sensor.extra_state_attributes["subject"] == "Test" + assert ( + datetime.datetime(2016, 1, 1, 12, 44, 57) + == sensor.extra_state_attributes["date"] + ) + + +async def test_multi_part_with_text(hass: HomeAssistant) -> None: + """Test multi part emails.""" + msg = MIMEMultipart("alternative") + msg["Subject"] = "Link" + msg["From"] = "sender@test.com" + + text = "Test Message" + html = "Test Message" + + textPart = MIMEText(text, "plain") + htmlPart = MIMEText(html, "html") + + msg.attach(textPart) + msg.attach(htmlPart) + + sensor = imap_email_content.EmailContentSensor( + hass, + FakeEMailReader(deque([msg])), + "test_emails_sensor", + ["sender@test.com"], + None, + ) + + sensor.entity_id = "sensor.emailtest" + sensor.async_schedule_update_ha_state(True) + await hass.async_block_till_done() + assert sensor.state == "Link" + assert sensor.extra_state_attributes["body"] == "Test Message" + + +async def test_multi_part_only_html(hass: HomeAssistant) -> None: + """Test multi part emails with only HTML.""" + msg = MIMEMultipart("alternative") + msg["Subject"] = "Link" + msg["From"] = "sender@test.com" + + html = "Test Message" + + htmlPart = MIMEText(html, "html") + + msg.attach(htmlPart) + + sensor = imap_email_content.EmailContentSensor( + hass, + FakeEMailReader(deque([msg])), + "test_emails_sensor", + ["sender@test.com"], + None, + ) + + sensor.entity_id = "sensor.emailtest" + sensor.async_schedule_update_ha_state(True) + await hass.async_block_till_done() + assert sensor.state == "Link" + assert ( + sensor.extra_state_attributes["body"] + == "Test Message" + ) + + +async def test_multi_part_only_other_text(hass: HomeAssistant) -> None: + """Test multi part emails with only other text.""" + msg = MIMEMultipart("alternative") + msg["Subject"] = "Link" + msg["From"] = "sender@test.com" + + other = "Test Message" + + htmlPart = MIMEText(other, "other") + + msg.attach(htmlPart) + + sensor = imap_email_content.EmailContentSensor( + hass, + FakeEMailReader(deque([msg])), + "test_emails_sensor", + ["sender@test.com"], + None, + ) + + sensor.entity_id = "sensor.emailtest" + sensor.async_schedule_update_ha_state(True) + await hass.async_block_till_done() + assert sensor.state == "Link" + assert sensor.extra_state_attributes["body"] == "Test Message" + + +async def test_multiple_emails(hass: HomeAssistant) -> None: + """Test multiple emails, discarding stale states.""" + states = [] + + test_message1 = email.message.Message() + test_message1["From"] = "sender@test.com" + test_message1["Subject"] = "Test" + test_message1["Date"] = datetime.datetime(2016, 1, 1, 12, 44, 57) + test_message1.set_payload("Test Message") + + test_message2 = email.message.Message() + test_message2["From"] = "sender@test.com" + test_message2["Subject"] = "Test 2" + test_message2["Date"] = datetime.datetime(2016, 1, 1, 12, 44, 58) + test_message2.set_payload("Test Message 2") + + test_message3 = email.message.Message() + test_message3["From"] = "sender@test.com" + test_message3["Subject"] = "Test 3" + test_message3["Date"] = datetime.datetime(2016, 1, 1, 12, 50, 1) + test_message3.set_payload("Test Message 2") + + def state_changed_listener(entity_id, from_s, to_s): + states.append(to_s) + + async_track_state_change(hass, ["sensor.emailtest"], state_changed_listener) + + sensor = imap_email_content.EmailContentSensor( + hass, + FakeEMailReader(deque([test_message1, test_message2])), + "test_emails_sensor", + ["sender@test.com"], + None, + ) + + sensor.entity_id = "sensor.emailtest" + + sensor.async_schedule_update_ha_state(True) + await hass.async_block_till_done() + # Fake a new received message + sensor._email_reader.add_test_message(test_message3) + sensor.async_schedule_update_ha_state(True) + await hass.async_block_till_done() + + assert states[0].state == "Test 2" + assert states[1].state == "Test 3" + + assert sensor.extra_state_attributes["body"] == "Test Message 2" + + +async def test_sender_not_allowed(hass: HomeAssistant) -> None: + """Test not whitelisted emails.""" + test_message = email.message.Message() + test_message["From"] = "sender@test.com" + test_message["Subject"] = "Test" + test_message["Date"] = datetime.datetime(2016, 1, 1, 12, 44, 57) + test_message.set_payload("Test Message") + + sensor = imap_email_content.EmailContentSensor( + hass, + FakeEMailReader(deque([test_message])), + "test_emails_sensor", + ["other@test.com"], + None, + ) + + sensor.entity_id = "sensor.emailtest" + sensor.async_schedule_update_ha_state(True) + await hass.async_block_till_done() + assert sensor.state is None + + +async def test_template(hass: HomeAssistant) -> None: + """Test value template.""" + test_message = email.message.Message() + test_message["From"] = "sender@test.com" + test_message["Subject"] = "Test" + test_message["Date"] = datetime.datetime(2016, 1, 1, 12, 44, 57) + test_message.set_payload("Test Message") + + sensor = imap_email_content.EmailContentSensor( + hass, + FakeEMailReader(deque([test_message])), + "test_emails_sensor", + ["sender@test.com"], + Template("{{ subject }} from {{ from }} with message {{ body }}", hass), + ) + + sensor.entity_id = "sensor.emailtest" + sensor.async_schedule_update_ha_state(True) + await hass.async_block_till_done() + assert sensor.state == "Test from sender@test.com with message Test Message"