mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 13:17:32 +00:00
Email (#3421)
* Added email component * added email sensor component * added doc string to test class * fixed lint error * fixed lint error * rename of email component * added another block as test fails on CI * added retry to multi email test * added delay to retry * added to .coveragerc * removed sleep from tests and fixed up stale comments
This commit is contained in:
parent
54248863b3
commit
de5bd26050
@ -228,6 +228,7 @@ omit =
|
|||||||
homeassistant/components/sensor/gtfs.py
|
homeassistant/components/sensor/gtfs.py
|
||||||
homeassistant/components/sensor/hp_ilo.py
|
homeassistant/components/sensor/hp_ilo.py
|
||||||
homeassistant/components/sensor/imap.py
|
homeassistant/components/sensor/imap.py
|
||||||
|
homeassistant/components/sensor/imap_email_content.py
|
||||||
homeassistant/components/sensor/lastfm.py
|
homeassistant/components/sensor/lastfm.py
|
||||||
homeassistant/components/sensor/linux_battery.py
|
homeassistant/components/sensor/linux_battery.py
|
||||||
homeassistant/components/sensor/loopenergy.py
|
homeassistant/components/sensor/loopenergy.py
|
||||||
|
250
homeassistant/components/sensor/imap_email_content.py
Normal file
250
homeassistant/components/sensor/imap_email_content.py
Normal file
@ -0,0 +1,250 @@
|
|||||||
|
"""
|
||||||
|
EMail sensor support.
|
||||||
|
|
||||||
|
For more details about this platform, please refer to the documentation at
|
||||||
|
https://home-assistant.io/components/sensor.email/
|
||||||
|
"""
|
||||||
|
import logging
|
||||||
|
import datetime
|
||||||
|
import email
|
||||||
|
|
||||||
|
from collections import deque
|
||||||
|
from homeassistant.helpers.entity import Entity
|
||||||
|
from homeassistant.components.sensor import PLATFORM_SCHEMA
|
||||||
|
from homeassistant.const import (
|
||||||
|
CONF_NAME, CONF_PORT, CONF_USERNAME, CONF_PASSWORD)
|
||||||
|
import homeassistant.helpers.config_validation as cv
|
||||||
|
from homeassistant.helpers import template
|
||||||
|
import voluptuous as vol
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
CONF_SERVER = "server"
|
||||||
|
CONF_SENDERS = "senders"
|
||||||
|
CONF_VALUE_TEMPLATE = "value_template"
|
||||||
|
|
||||||
|
ATTR_FROM = "from"
|
||||||
|
ATTR_BODY = "body"
|
||||||
|
ATTR_DATE = "date"
|
||||||
|
ATTR_SUBJECT = "subject"
|
||||||
|
|
||||||
|
DEFAULT_PORT = 993
|
||||||
|
|
||||||
|
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
|
||||||
|
vol.Optional(CONF_NAME): cv.string,
|
||||||
|
vol.Required(CONF_USERNAME): cv.string,
|
||||||
|
vol.Required(CONF_PASSWORD): cv.string,
|
||||||
|
vol.Required(CONF_SERVER): cv.string,
|
||||||
|
vol.Optional(CONF_PORT, default=DEFAULT_PORT): cv.port,
|
||||||
|
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def setup_platform(hass, config, add_devices, discovery_info=None):
|
||||||
|
"""Setup the EMail platform."""
|
||||||
|
reader = EmailReader(
|
||||||
|
config.get(CONF_USERNAME),
|
||||||
|
config.get(CONF_PASSWORD),
|
||||||
|
config.get(CONF_SERVER),
|
||||||
|
config.get(CONF_PORT))
|
||||||
|
|
||||||
|
sensor = EmailContentSensor(
|
||||||
|
hass,
|
||||||
|
reader,
|
||||||
|
config.get(CONF_NAME, None) or config.get(CONF_USERNAME),
|
||||||
|
config.get(CONF_SENDERS),
|
||||||
|
config.get(CONF_VALUE_TEMPLATE))
|
||||||
|
|
||||||
|
if sensor.connected:
|
||||||
|
add_devices([sensor])
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
class EmailReader:
|
||||||
|
"""A class to read emails from an IMAP server."""
|
||||||
|
|
||||||
|
def __init__(self, user, password, server, port):
|
||||||
|
"""Initialize the Email Reader."""
|
||||||
|
self._user = user
|
||||||
|
self._password = password
|
||||||
|
self._server = server
|
||||||
|
self._port = port
|
||||||
|
self._last_id = None
|
||||||
|
self._unread_ids = deque([])
|
||||||
|
self.connection = None
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""Login and setup the connection."""
|
||||||
|
import imaplib
|
||||||
|
try:
|
||||||
|
self.connection = imaplib.IMAP4_SSL(self._server, self._port)
|
||||||
|
self.connection.login(self._user, self._password)
|
||||||
|
return True
|
||||||
|
except imaplib.IMAP4.error:
|
||||||
|
_LOGGER.error("Failed to login to %s.", self._server)
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _fetch_message(self, message_uid):
|
||||||
|
"""Get an email message from a message id."""
|
||||||
|
_, message_data = self.connection.uid(
|
||||||
|
'fetch',
|
||||||
|
message_uid,
|
||||||
|
'(RFC822)')
|
||||||
|
|
||||||
|
raw_email = message_data[0][1]
|
||||||
|
email_message = email.message_from_bytes(raw_email)
|
||||||
|
return email_message
|
||||||
|
|
||||||
|
def read_next(self):
|
||||||
|
"""Read the next email from the email server."""
|
||||||
|
import imaplib
|
||||||
|
try:
|
||||||
|
self.connection.select()
|
||||||
|
|
||||||
|
if len(self._unread_ids) == 0:
|
||||||
|
search = "SINCE {0:%d-%b-%y}".format(datetime.date.today())
|
||||||
|
if self._last_id is not None:
|
||||||
|
search = "UID {}:*".format(self._last_id)
|
||||||
|
|
||||||
|
_, data = self.connection.uid("search", None, search)
|
||||||
|
self._unread_ids = deque(data[0].split())
|
||||||
|
|
||||||
|
while len(self._unread_ids) > 0:
|
||||||
|
message_uid = self._unread_ids.popleft()
|
||||||
|
if self._last_id is None or int(message_uid) > self._last_id:
|
||||||
|
self._last_id = int(message_uid)
|
||||||
|
return self._fetch_message(message_uid)
|
||||||
|
|
||||||
|
except imaplib.IMAP4.error:
|
||||||
|
_LOGGER.info(
|
||||||
|
"Connection to %s lost, attempting to reconnect",
|
||||||
|
self._server)
|
||||||
|
try:
|
||||||
|
self.connect()
|
||||||
|
except imaplib.IMAP4.error:
|
||||||
|
_LOGGER.error("Failed to reconnect.")
|
||||||
|
|
||||||
|
|
||||||
|
class EmailContentSensor(Entity):
|
||||||
|
"""Representation of an EMail sensor."""
|
||||||
|
|
||||||
|
# pylint: disable=too-many-arguments
|
||||||
|
# pylint: disable=too-many-instance-attributes
|
||||||
|
def __init__(self,
|
||||||
|
hass,
|
||||||
|
email_reader,
|
||||||
|
name,
|
||||||
|
allowed_senders,
|
||||||
|
value_template):
|
||||||
|
"""Initialize the sensor."""
|
||||||
|
self.hass = hass
|
||||||
|
self._email_reader = email_reader
|
||||||
|
self._name = name
|
||||||
|
self._allowed_senders = \
|
||||||
|
[sender.upper() for sender in allowed_senders]
|
||||||
|
self._value_template = value_template
|
||||||
|
self._last_id = None
|
||||||
|
self._message = None
|
||||||
|
self._state_attributes = None
|
||||||
|
self.connected = self._email_reader.connect()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
"""Return the name of the sensor."""
|
||||||
|
return self._name
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state(self):
|
||||||
|
"""Return the current email state."""
|
||||||
|
return self._message
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state_attributes(self):
|
||||||
|
"""Return other state attributes for the message."""
|
||||||
|
return self._state_attributes
|
||||||
|
|
||||||
|
def render_template(self, email_message):
|
||||||
|
"""Render the message template."""
|
||||||
|
variables = {
|
||||||
|
ATTR_FROM: EmailContentSensor.get_msg_sender(email_message),
|
||||||
|
ATTR_SUBJECT: EmailContentSensor.get_msg_subject(email_message),
|
||||||
|
ATTR_DATE: email_message['Date'],
|
||||||
|
ATTR_BODY: EmailContentSensor.get_msg_text(email_message)
|
||||||
|
}
|
||||||
|
return template.render(self.hass, self._value_template, variables)
|
||||||
|
|
||||||
|
def sender_allowed(self, email_message):
|
||||||
|
"""Check if the sender is in the allowed senders list."""
|
||||||
|
return EmailContentSensor.get_msg_sender(email_message).upper() in (
|
||||||
|
sender for sender in self._allowed_senders)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_msg_sender(email_message):
|
||||||
|
"""Get the parsed message sender from the email."""
|
||||||
|
return str(email.utils.parseaddr(email_message['From'])[1])
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_msg_subject(email_message):
|
||||||
|
"""Decode the message subject."""
|
||||||
|
decoded_header = email.header.decode_header(email_message['Subject'])
|
||||||
|
header = email.header.make_header(decoded_header)
|
||||||
|
return str(header)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_msg_text(email_message):
|
||||||
|
"""
|
||||||
|
Get the message text from the email.
|
||||||
|
|
||||||
|
Will look for text/plain or use text/html if not found.
|
||||||
|
"""
|
||||||
|
message_text = None
|
||||||
|
message_html = None
|
||||||
|
message_untyped_text = None
|
||||||
|
|
||||||
|
for part in email_message.walk():
|
||||||
|
if part.get_content_type() == 'text/plain':
|
||||||
|
if message_text is None:
|
||||||
|
message_text = part.get_payload()
|
||||||
|
elif part.get_content_type() == 'text/html':
|
||||||
|
if message_html is None:
|
||||||
|
message_html = part.get_payload()
|
||||||
|
elif part.get_content_type().startswith('text'):
|
||||||
|
if message_untyped_text is None:
|
||||||
|
message_untyped_text = part.get_payload()
|
||||||
|
|
||||||
|
if message_text is not None:
|
||||||
|
return message_text
|
||||||
|
|
||||||
|
if message_html is not None:
|
||||||
|
return message_html
|
||||||
|
|
||||||
|
if message_untyped_text is not None:
|
||||||
|
return message_untyped_text
|
||||||
|
|
||||||
|
return email_message.get_payload()
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
"""Read emails and publish state change."""
|
||||||
|
while True:
|
||||||
|
email_message = self._email_reader.read_next()
|
||||||
|
|
||||||
|
if email_message is None:
|
||||||
|
break
|
||||||
|
|
||||||
|
if self.sender_allowed(email_message):
|
||||||
|
message_body = EmailContentSensor.get_msg_text(email_message)
|
||||||
|
|
||||||
|
if self._value_template is not None:
|
||||||
|
message_body = self.render_template(email_message)
|
||||||
|
|
||||||
|
self._message = message_body
|
||||||
|
self._state_attributes = {
|
||||||
|
ATTR_FROM:
|
||||||
|
EmailContentSensor.get_msg_sender(email_message),
|
||||||
|
ATTR_SUBJECT:
|
||||||
|
EmailContentSensor.get_msg_subject(email_message),
|
||||||
|
ATTR_DATE:
|
||||||
|
email_message['Date']
|
||||||
|
}
|
||||||
|
self.update_ha_state()
|
227
tests/components/sensor/test_email_content_sensor.py
Normal file
227
tests/components/sensor/test_email_content_sensor.py
Normal file
@ -0,0 +1,227 @@
|
|||||||
|
"""The tests for the Command line sensor platform."""
|
||||||
|
import unittest
|
||||||
|
import email
|
||||||
|
import datetime
|
||||||
|
|
||||||
|
from email.mime.multipart import MIMEMultipart
|
||||||
|
from email.mime.text import MIMEText
|
||||||
|
from threading import Event
|
||||||
|
|
||||||
|
from homeassistant.helpers.event import track_state_change
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
from homeassistant.components.sensor import imap_email_content
|
||||||
|
from tests.common import get_test_home_assistant
|
||||||
|
|
||||||
|
|
||||||
|
class FakeEMailReader:
|
||||||
|
"""A test class for sending test emails"""
|
||||||
|
|
||||||
|
def __init__(self, messages):
|
||||||
|
"""Setup the fake email reader"""
|
||||||
|
self._messages = messages
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
"""Always connected."""
|
||||||
|
return True
|
||||||
|
|
||||||
|
def read_next(self):
|
||||||
|
"""Get the next email."""
|
||||||
|
if len(self._messages) == 0:
|
||||||
|
return None
|
||||||
|
return self._messages.popleft()
|
||||||
|
|
||||||
|
|
||||||
|
class EmailContentSensor(unittest.TestCase):
|
||||||
|
"""Test the Command line sensor."""
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
"""Setup things to be run when tests are started."""
|
||||||
|
self.hass = get_test_home_assistant()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
"""Stop everything that was started."""
|
||||||
|
self.hass.stop()
|
||||||
|
|
||||||
|
def test_allowed_sender(self):
|
||||||
|
"""Test emails from allowed sender."""
|
||||||
|
test_message = email.message.Message()
|
||||||
|
test_message['From'] = "sender@test.com"
|
||||||
|
test_message['Subject'] = "Test"
|
||||||
|
test_message['Date'] = datetime.datetime(2016, 1, 1, 12, 44, 57)
|
||||||
|
test_message.set_payload("Test Message")
|
||||||
|
|
||||||
|
sensor = imap_email_content.EmailContentSensor(
|
||||||
|
self.hass,
|
||||||
|
FakeEMailReader(deque([test_message])),
|
||||||
|
"test_emails_sensor",
|
||||||
|
["sender@test.com"],
|
||||||
|
None)
|
||||||
|
|
||||||
|
sensor.entity_id = "sensor.emailtest"
|
||||||
|
sensor.update()
|
||||||
|
self.assertEqual("Test Message", sensor.state)
|
||||||
|
self.assertEqual("sender@test.com", sensor.state_attributes["from"])
|
||||||
|
self.assertEqual("Test", sensor.state_attributes["subject"])
|
||||||
|
self.assertEqual(datetime.datetime(2016, 1, 1, 12, 44, 57),
|
||||||
|
sensor.state_attributes["date"])
|
||||||
|
|
||||||
|
def test_multi_part_with_text(self):
|
||||||
|
"""Test multi part emails."""
|
||||||
|
msg = MIMEMultipart('alternative')
|
||||||
|
msg['Subject'] = "Link"
|
||||||
|
msg['From'] = "sender@test.com"
|
||||||
|
|
||||||
|
text = "Test Message"
|
||||||
|
html = "<html><head></head><body>Test Message</body></html>"
|
||||||
|
|
||||||
|
textPart = MIMEText(text, 'plain')
|
||||||
|
htmlPart = MIMEText(html, 'html')
|
||||||
|
|
||||||
|
msg.attach(textPart)
|
||||||
|
msg.attach(htmlPart)
|
||||||
|
|
||||||
|
sensor = imap_email_content.EmailContentSensor(
|
||||||
|
self.hass,
|
||||||
|
FakeEMailReader(deque([msg])),
|
||||||
|
"test_emails_sensor",
|
||||||
|
["sender@test.com"],
|
||||||
|
None)
|
||||||
|
|
||||||
|
sensor.entity_id = "sensor.emailtest"
|
||||||
|
sensor.update()
|
||||||
|
self.assertEqual("Test Message", sensor.state)
|
||||||
|
|
||||||
|
def test_multi_part_only_html(self):
|
||||||
|
"""Test multi part emails with only html."""
|
||||||
|
msg = MIMEMultipart('alternative')
|
||||||
|
msg['Subject'] = "Link"
|
||||||
|
msg['From'] = "sender@test.com"
|
||||||
|
|
||||||
|
html = "<html><head></head><body>Test Message</body></html>"
|
||||||
|
|
||||||
|
htmlPart = MIMEText(html, 'html')
|
||||||
|
|
||||||
|
msg.attach(htmlPart)
|
||||||
|
|
||||||
|
sensor = imap_email_content.EmailContentSensor(
|
||||||
|
self.hass,
|
||||||
|
FakeEMailReader(deque([msg])),
|
||||||
|
"test_emails_sensor",
|
||||||
|
["sender@test.com"],
|
||||||
|
None)
|
||||||
|
|
||||||
|
sensor.entity_id = "sensor.emailtest"
|
||||||
|
sensor.update()
|
||||||
|
self.assertEqual(
|
||||||
|
"<html><head></head><body>Test Message</body></html>",
|
||||||
|
sensor.state)
|
||||||
|
|
||||||
|
def test_multi_part_only_other_text(self):
|
||||||
|
"""Test multi part emails with only other text."""
|
||||||
|
msg = MIMEMultipart('alternative')
|
||||||
|
msg['Subject'] = "Link"
|
||||||
|
msg['From'] = "sender@test.com"
|
||||||
|
|
||||||
|
other = "Test Message"
|
||||||
|
|
||||||
|
htmlPart = MIMEText(other, 'other')
|
||||||
|
|
||||||
|
msg.attach(htmlPart)
|
||||||
|
|
||||||
|
sensor = imap_email_content.EmailContentSensor(
|
||||||
|
self.hass,
|
||||||
|
FakeEMailReader(deque([msg])),
|
||||||
|
"test_emails_sensor",
|
||||||
|
["sender@test.com"],
|
||||||
|
None)
|
||||||
|
|
||||||
|
sensor.entity_id = "sensor.emailtest"
|
||||||
|
sensor.update()
|
||||||
|
self.assertEqual("Test Message", sensor.state)
|
||||||
|
|
||||||
|
def test_multiple_emails(self):
|
||||||
|
"""Test multiple emails."""
|
||||||
|
states = []
|
||||||
|
|
||||||
|
test_message1 = email.message.Message()
|
||||||
|
test_message1['From'] = "sender@test.com"
|
||||||
|
test_message1['Subject'] = "Test"
|
||||||
|
test_message1['Date'] = datetime.datetime(2016, 1, 1, 12, 44, 57)
|
||||||
|
test_message1.set_payload("Test Message")
|
||||||
|
|
||||||
|
test_message2 = email.message.Message()
|
||||||
|
test_message2['From'] = "sender@test.com"
|
||||||
|
test_message2['Subject'] = "Test 2"
|
||||||
|
test_message2['Date'] = datetime.datetime(2016, 1, 1, 12, 44, 57)
|
||||||
|
test_message2.set_payload("Test Message 2")
|
||||||
|
|
||||||
|
states_received = Event()
|
||||||
|
|
||||||
|
def state_changed_listener(entity_id, from_s, to_s):
|
||||||
|
states.append(to_s)
|
||||||
|
if len(states) == 2:
|
||||||
|
states_received.set()
|
||||||
|
|
||||||
|
track_state_change(
|
||||||
|
self.hass,
|
||||||
|
["sensor.emailtest"],
|
||||||
|
state_changed_listener)
|
||||||
|
|
||||||
|
sensor = imap_email_content.EmailContentSensor(
|
||||||
|
self.hass,
|
||||||
|
FakeEMailReader(deque([test_message1, test_message2])),
|
||||||
|
"test_emails_sensor",
|
||||||
|
["sender@test.com"],
|
||||||
|
None)
|
||||||
|
|
||||||
|
sensor.entity_id = "sensor.emailtest"
|
||||||
|
sensor.update()
|
||||||
|
|
||||||
|
self.hass.pool.block_till_done()
|
||||||
|
states_received.wait(5)
|
||||||
|
|
||||||
|
self.assertEqual("Test Message", states[0].state)
|
||||||
|
self.assertEqual("Test Message 2", states[1].state)
|
||||||
|
|
||||||
|
self.assertEqual("Test Message 2", sensor.state)
|
||||||
|
|
||||||
|
def test_sender_not_allowed(self):
|
||||||
|
"""Test not whitelisted emails."""
|
||||||
|
test_message = email.message.Message()
|
||||||
|
test_message['From'] = "sender@test.com"
|
||||||
|
test_message['Subject'] = "Test"
|
||||||
|
test_message['Date'] = datetime.datetime(2016, 1, 1, 12, 44, 57)
|
||||||
|
test_message.set_payload("Test Message")
|
||||||
|
|
||||||
|
sensor = imap_email_content.EmailContentSensor(
|
||||||
|
self.hass,
|
||||||
|
FakeEMailReader(deque([test_message])),
|
||||||
|
"test_emails_sensor",
|
||||||
|
["other@test.com"],
|
||||||
|
None)
|
||||||
|
|
||||||
|
sensor.entity_id = "sensor.emailtest"
|
||||||
|
sensor.update()
|
||||||
|
self.assertEqual(None, sensor.state)
|
||||||
|
|
||||||
|
def test_template(self):
|
||||||
|
"""Test value template."""
|
||||||
|
test_message = email.message.Message()
|
||||||
|
test_message['From'] = "sender@test.com"
|
||||||
|
test_message['Subject'] = "Test"
|
||||||
|
test_message['Date'] = datetime.datetime(2016, 1, 1, 12, 44, 57)
|
||||||
|
test_message.set_payload("Test Message")
|
||||||
|
|
||||||
|
sensor = imap_email_content.EmailContentSensor(
|
||||||
|
self.hass,
|
||||||
|
FakeEMailReader(deque([test_message])),
|
||||||
|
"test_emails_sensor",
|
||||||
|
["sender@test.com"],
|
||||||
|
"{{ subject }} from {{ from }} with message {{ body }}")
|
||||||
|
|
||||||
|
sensor.entity_id = "sensor.emailtest"
|
||||||
|
sensor.update()
|
||||||
|
self.assertEqual(
|
||||||
|
"Test from sender@test.com with message Test Message",
|
||||||
|
sensor.state)
|
Loading…
x
Reference in New Issue
Block a user