From 8f8bba4ad7f74f6764e898ad9438d558aaf855a0 Mon Sep 17 00:00:00 2001 From: Willems Davy Date: Sat, 8 Oct 2016 20:38:58 +0200 Subject: [PATCH] Haveibeenpwned sensor platform (#3618) * Initial version of "haveibeenpwned" sensor component * 2 flake8 fixes * remove debugging error message * Increase scan_interval as well as throttle to make sure that during initial startup of hass the request happens with 5 seconds delays and after startup with 15 minutes delays. Scan_interval is increased also to not call update as often * update .coveragerc * remove (ssl) verify=False * - use dict to keep the request values with email as key - use track_point_in_time system to make sure data updates initially at 5 seconds between each call until all sensor's email have a result in the dict. * fix a pylint error that happend on the py35 tests --- .coveragerc | 1 + .../components/sensor/haveibeenpwned.py | 181 ++++++++++++++++++ 2 files changed, 182 insertions(+) create mode 100644 homeassistant/components/sensor/haveibeenpwned.py diff --git a/.coveragerc b/.coveragerc index 80bc6905c21..045e8f77588 100644 --- a/.coveragerc +++ b/.coveragerc @@ -235,6 +235,7 @@ omit = homeassistant/components/sensor/google_travel_time.py homeassistant/components/sensor/gpsd.py homeassistant/components/sensor/gtfs.py + homeassistant/components/sensor/haveibeenpwned.py homeassistant/components/sensor/hp_ilo.py homeassistant/components/sensor/imap.py homeassistant/components/sensor/imap_email_content.py diff --git a/homeassistant/components/sensor/haveibeenpwned.py b/homeassistant/components/sensor/haveibeenpwned.py new file mode 100644 index 00000000000..f317ef14565 --- /dev/null +++ b/homeassistant/components/sensor/haveibeenpwned.py @@ -0,0 +1,181 @@ +""" +Support for haveibeenpwned (email breaches) sensor. + +For more details about this platform, please refer to the documentation at +https://home-assistant.io/components/sensor.haveibeenpwned/ +""" +from datetime import timedelta +import logging + +import voluptuous as vol +import requests + +from homeassistant.components.sensor import PLATFORM_SCHEMA +from homeassistant.const import (STATE_UNKNOWN, CONF_EMAIL) +from homeassistant.helpers.entity import Entity +import homeassistant.helpers.config_validation as cv +from homeassistant.util import Throttle +import homeassistant.util.dt as dt_util +from homeassistant.helpers.event import track_point_in_time + +_LOGGER = logging.getLogger(__name__) + +DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S" +USER_AGENT = "Home Assistant HaveIBeenPwned Sensor Component" + +MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15) +MIN_TIME_BETWEEN_FORCED_UPDATES = timedelta(seconds=5) + +PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ + vol.Required(CONF_EMAIL): vol.All(cv.ensure_list, [cv.string]), +}) + + +# pylint: disable=unused-argument +def setup_platform(hass, config, add_devices, discovery_info=None): + """Setup the RESTful sensor.""" + emails = config.get(CONF_EMAIL) + data = HaveIBeenPwnedData(emails) + + devices = [] + for email in emails: + devices.append(HaveIBeenPwnedSensor(data, hass, email)) + + add_devices(devices) + + # To make sure we get initial data for the sensors + # ignoring the normal throttle of 15 minutes but using + # an update throttle of 5 seconds + for sensor in devices: + sensor.update_nothrottle() + + +class HaveIBeenPwnedSensor(Entity): + """Implementation of HaveIBeenPwnedSensor.""" + + def __init__(self, data, hass, email): + """Initialize the HaveIBeenPwnedSensor sensor.""" + self._state = STATE_UNKNOWN + self._data = data + self._hass = hass + self._email = email + self._unit_of_measurement = "Breaches" + + @property + def name(self): + """Return the name of the sensor.""" + return "Breaches {}".format(self._email) + + @property + def unit_of_measurement(self): + """Return the unit the value is expressed in.""" + return self._unit_of_measurement + + @property + def state(self): + """Return the state of the device.""" + return self._state + + @property + def state_attributes(self): + """Return the atrributes of the sensor.""" + val = {} + if self._email not in self._data.data: + return val + + for idx, value in enumerate(self._data.data[self._email]): + tmpname = "breach {}".format(idx+1) + tmpvalue = "{} {}".format( + value["Title"], + dt_util.as_local(dt_util.parse_datetime( + value["AddedDate"])).strftime(DATE_STR_FORMAT)) + val[tmpname] = tmpvalue + + return val + + def update_nothrottle(self, dummy=None): + """Update sensor without throttle.""" + self._data.update_no_throttle() + + # Schedule a forced update 5 seconds in the future if the + # update above returned no data for this sensors email. + # this is mainly to make sure that we don't + # get http error "too many requests" and to have initial + # data after hass startup once we have the data it will + # update as normal using update + if self._email not in self._data.data: + track_point_in_time(self._hass, + self.update_nothrottle, + dt_util.now() + + MIN_TIME_BETWEEN_FORCED_UPDATES) + return + + if self._email in self._data.data: + self._state = len(self._data.data[self._email]) + self.update_ha_state() + + def update(self): + """Update data and see if it contains data for our email.""" + self._data.update() + + if self._email in self._data.data: + self._state = len(self._data.data[self._email]) + + +class HaveIBeenPwnedData(object): + """Class for handling the data retrieval.""" + + def __init__(self, emails): + """Initialize the data object.""" + self._email_count = len(emails) + self._current_index = 0 + self.data = {} + self._email = emails[0] + self._emails = emails + + def set_next_email(self): + """Set the next email to be looked up.""" + self._current_index = (self._current_index + 1) % self._email_count + self._email = self._emails[self._current_index] + + def update_no_throttle(self): + """Get the data for a specific email.""" + self.update(no_throttle=True) + + @Throttle(MIN_TIME_BETWEEN_UPDATES, MIN_TIME_BETWEEN_FORCED_UPDATES) + def update(self, **kwargs): + """Get the latest data for current email from REST service.""" + try: + url = "https://haveibeenpwned.com/api/v2/breachedaccount/{}". \ + format(self._email) + + _LOGGER.info("Checking for breaches for email %s", self._email) + + req = requests.get(url, headers={"User-agent": USER_AGENT}, + allow_redirects=True, timeout=5) + + except requests.exceptions.RequestException: + _LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'", + self._email) + return + + if req.status_code == 200: + self.data[self._email] = sorted(req.json(), + key=lambda k: k["AddedDate"], + reverse=True) + + # only goto next email if we had data so that + # the forced updates try this current email again + self.set_next_email() + + elif req.status_code == 404: + self.data[self._email] = [] + + # only goto next email if we had data so that + # the forced updates try this current email again + self.set_next_email() + + else: + _LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'" + "(HTTP Status_code = %d)", self._email, + req.status_code)