Add GDACS feed integration (#31235)

* initial version of gdacs integration

* updated translations

* generated files

* added abbreviation

* bumped library version

* small feed entry attribute fixes

* add unit tests

* need to use original mdi name

* bumped library version

* improved entity name for earthquakes

* round vulnerability number

* typo

* support for categories

* testing support for categories

* tie longitude and latitude together

* validating categories

* simplifying setup

* passing domain as parameter

* simplified test setup

* moved test code

* simplified test code

* removed superfluous code

* changed approach to unique identifier

* changed code structure

* simplified unit system handling

* made schema a constant

* comment added

* simplifying code

* added message if location already configured

* removed unnecessary code

* simplified test code

* avoid mocking __init__

* pylint

* simplified code

* fetch categories from integration library

* setting PARALLEL_UPDATES

* setting PARALLEL_UPDATES to zero/unlimited

* added quality scale
This commit is contained in:
Malte Franken 2020-02-06 21:32:30 +11:00 committed by GitHub
parent 05b3c1f17d
commit 8d429d7676
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1240 additions and 0 deletions

View File

@ -117,6 +117,7 @@ homeassistant/components/freebox/* @snoof85
homeassistant/components/fronius/* @nielstron
homeassistant/components/frontend/* @home-assistant/frontend
homeassistant/components/garmin_connect/* @cyberjunky
homeassistant/components/gdacs/* @exxamalte
homeassistant/components/gearbest/* @HerrHofrat
homeassistant/components/geniushub/* @zxdavb
homeassistant/components/geo_rss_events/* @exxamalte

View File

@ -0,0 +1,16 @@
{
"config": {
"abort": {
"already_configured": "Location is already configured."
},
"step": {
"user": {
"data": {
"radius": "Radius"
},
"title": "Fill in your filter details."
}
},
"title": "Global Disaster Alert and Coordination System (GDACS)"
}
}

View File

@ -0,0 +1,212 @@
"""The Global Disaster Alert and Coordination System (GDACS) integration."""
import asyncio
from datetime import timedelta
import logging
from aio_georss_gdacs import GdacsFeedManager
import voluptuous as vol
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import (
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_RADIUS,
CONF_SCAN_INTERVAL,
CONF_UNIT_SYSTEM_IMPERIAL,
LENGTH_MILES,
)
from homeassistant.core import callback
from homeassistant.helpers import aiohttp_client, config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.util.unit_system import METRIC_SYSTEM
from .const import (
CONF_CATEGORIES,
DEFAULT_RADIUS,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
FEED,
PLATFORMS,
SIGNAL_DELETE_ENTITY,
SIGNAL_NEW_GEOLOCATION,
SIGNAL_STATUS,
SIGNAL_UPDATE_ENTITY,
VALID_CATEGORIES,
)
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
{
vol.Inclusive(CONF_LATITUDE, "coordinates"): cv.latitude,
vol.Inclusive(CONF_LONGITUDE, "coordinates"): cv.longitude,
vol.Optional(CONF_RADIUS, default=DEFAULT_RADIUS): vol.Coerce(float),
vol.Optional(
CONF_SCAN_INTERVAL, default=DEFAULT_SCAN_INTERVAL
): cv.time_period,
vol.Optional(CONF_CATEGORIES, default=[]): vol.All(
cv.ensure_list, [vol.In(VALID_CATEGORIES)]
),
}
)
},
extra=vol.ALLOW_EXTRA,
)
async def async_setup(hass, config):
"""Set up the GDACS component."""
if DOMAIN not in config:
return True
conf = config[DOMAIN]
latitude = conf.get(CONF_LATITUDE, hass.config.latitude)
longitude = conf.get(CONF_LONGITUDE, hass.config.longitude)
scan_interval = conf[CONF_SCAN_INTERVAL]
categories = conf[CONF_CATEGORIES]
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_LATITUDE: latitude,
CONF_LONGITUDE: longitude,
CONF_RADIUS: conf[CONF_RADIUS],
CONF_SCAN_INTERVAL: scan_interval,
CONF_CATEGORIES: categories,
},
)
)
return True
async def async_setup_entry(hass, config_entry):
"""Set up the GDACS component as config entry."""
hass.data.setdefault(DOMAIN, {})
feeds = hass.data[DOMAIN].setdefault(FEED, {})
radius = config_entry.data[CONF_RADIUS]
if hass.config.units.name == CONF_UNIT_SYSTEM_IMPERIAL:
radius = METRIC_SYSTEM.length(radius, LENGTH_MILES)
# Create feed entity manager for all platforms.
manager = GdacsFeedEntityManager(hass, config_entry, radius)
feeds[config_entry.entry_id] = manager
_LOGGER.debug("Feed entity manager added for %s", config_entry.entry_id)
await manager.async_init()
return True
async def async_unload_entry(hass, config_entry):
"""Unload an GDACS component config entry."""
manager = hass.data[DOMAIN][FEED].pop(config_entry.entry_id)
await manager.async_stop()
await asyncio.wait(
[
hass.config_entries.async_forward_entry_unload(config_entry, domain)
for domain in PLATFORMS
]
)
return True
class GdacsFeedEntityManager:
"""Feed Entity Manager for GDACS feed."""
def __init__(self, hass, config_entry, radius_in_km):
"""Initialize the Feed Entity Manager."""
self._hass = hass
self._config_entry = config_entry
coordinates = (
config_entry.data[CONF_LATITUDE],
config_entry.data[CONF_LONGITUDE],
)
categories = config_entry.data[CONF_CATEGORIES]
websession = aiohttp_client.async_get_clientsession(hass)
self._feed_manager = GdacsFeedManager(
websession,
self._generate_entity,
self._update_entity,
self._remove_entity,
coordinates,
filter_radius=radius_in_km,
filter_categories=categories,
status_async_callback=self._status_update,
)
self._config_entry_id = config_entry.entry_id
self._scan_interval = timedelta(seconds=config_entry.data[CONF_SCAN_INTERVAL])
self._track_time_remove_callback = None
self._status_info = None
self.listeners = []
async def async_init(self):
"""Schedule initial and regular updates based on configured time interval."""
for domain in PLATFORMS:
self._hass.async_create_task(
self._hass.config_entries.async_forward_entry_setup(
self._config_entry, domain
)
)
async def update(event_time):
"""Update."""
await self.async_update()
# Trigger updates at regular intervals.
self._track_time_remove_callback = async_track_time_interval(
self._hass, update, self._scan_interval
)
_LOGGER.debug("Feed entity manager initialized")
async def async_update(self):
"""Refresh data."""
await self._feed_manager.update()
_LOGGER.debug("Feed entity manager updated")
async def async_stop(self):
"""Stop this feed entity manager from refreshing."""
for unsub_dispatcher in self.listeners:
unsub_dispatcher()
self.listeners = []
if self._track_time_remove_callback:
self._track_time_remove_callback()
_LOGGER.debug("Feed entity manager stopped")
@callback
def async_event_new_entity(self):
"""Return manager specific event to signal new entity."""
return SIGNAL_NEW_GEOLOCATION.format(self._config_entry_id)
def get_entry(self, external_id):
"""Get feed entry by external id."""
return self._feed_manager.feed_entries.get(external_id)
def status_info(self):
"""Return latest status update info received."""
return self._status_info
async def _generate_entity(self, external_id):
"""Generate new entity."""
async_dispatcher_send(
self._hass, self.async_event_new_entity(), self, external_id
)
async def _update_entity(self, external_id):
"""Update entity."""
async_dispatcher_send(self._hass, SIGNAL_UPDATE_ENTITY.format(external_id))
async def _remove_entity(self, external_id):
"""Remove entity."""
async_dispatcher_send(self._hass, SIGNAL_DELETE_ENTITY.format(external_id))
async def _status_update(self, status_info):
"""Propagate status update."""
_LOGGER.debug("Status update received: %s", status_info)
self._status_info = status_info
async_dispatcher_send(self._hass, SIGNAL_STATUS.format(self._config_entry_id))

View File

@ -0,0 +1,66 @@
"""Config flow to configure the GDACS integration."""
import logging
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_RADIUS,
CONF_SCAN_INTERVAL,
)
from homeassistant.helpers import config_validation as cv
from .const import ( # pylint: disable=unused-import
CONF_CATEGORIES,
DEFAULT_RADIUS,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
)
DATA_SCHEMA = vol.Schema(
{vol.Optional(CONF_RADIUS, default=DEFAULT_RADIUS): cv.positive_int}
)
_LOGGER = logging.getLogger(__name__)
class GdacsFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a GDACS config flow."""
CONNECTION_CLASS = config_entries.CONN_CLASS_CLOUD_POLL
async def _show_form(self, errors=None):
"""Show the form to the user."""
return self.async_show_form(
step_id="user", data_schema=DATA_SCHEMA, errors=errors or {}
)
async def async_step_import(self, import_config):
"""Import a config entry from configuration.yaml."""
return await self.async_step_user(import_config)
async def async_step_user(self, user_input=None):
"""Handle the start of the config flow."""
_LOGGER.debug("User input: %s", user_input)
if not user_input:
return await self._show_form()
latitude = user_input.get(CONF_LATITUDE, self.hass.config.latitude)
user_input[CONF_LATITUDE] = latitude
longitude = user_input.get(CONF_LONGITUDE, self.hass.config.longitude)
user_input[CONF_LONGITUDE] = longitude
identifier = f"{user_input[CONF_LATITUDE]}, {user_input[CONF_LONGITUDE]}"
await self.async_set_unique_id(identifier)
self._abort_if_unique_id_configured()
scan_interval = user_input.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
user_input[CONF_SCAN_INTERVAL] = scan_interval.seconds
categories = user_input.get(CONF_CATEGORIES, [])
user_input[CONF_CATEGORIES] = categories
return self.async_create_entry(title=identifier, data=user_input)

View File

@ -0,0 +1,25 @@
"""Define constants for the GDACS integration."""
from datetime import timedelta
from aio_georss_gdacs.consts import EVENT_TYPE_MAP
DOMAIN = "gdacs"
PLATFORMS = ("sensor", "geo_location")
FEED = "feed"
CONF_CATEGORIES = "categories"
DEFAULT_ICON = "mdi:alert"
DEFAULT_RADIUS = 500.0
DEFAULT_SCAN_INTERVAL = timedelta(minutes=5)
SIGNAL_DELETE_ENTITY = "gdacs_delete_{}"
SIGNAL_UPDATE_ENTITY = "gdacs_update_{}"
SIGNAL_STATUS = "gdacs_status_{}"
SIGNAL_NEW_GEOLOCATION = "gdacs_new_geolocation_{}"
# Fetch valid categories from integration library.
VALID_CATEGORIES = list(EVENT_TYPE_MAP.values())

View File

@ -0,0 +1,234 @@
"""Geolocation support for GDACS Feed."""
import logging
from typing import Optional
from homeassistant.components.geo_location import GeolocationEvent
from homeassistant.const import (
ATTR_ATTRIBUTION,
CONF_UNIT_SYSTEM_IMPERIAL,
LENGTH_KILOMETERS,
LENGTH_MILES,
)
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.util.unit_system import IMPERIAL_SYSTEM
from .const import (
DEFAULT_ICON,
DOMAIN,
FEED,
SIGNAL_DELETE_ENTITY,
SIGNAL_UPDATE_ENTITY,
)
_LOGGER = logging.getLogger(__name__)
ATTR_ALERT_LEVEL = "alert_level"
ATTR_COUNTRY = "country"
ATTR_DESCRIPTION = "description"
ATTR_DURATION_IN_WEEK = "duration_in_week"
ATTR_EVENT_TYPE = "event_type"
ATTR_EXTERNAL_ID = "external_id"
ATTR_FROM_DATE = "from_date"
ATTR_POPULATION = "population"
ATTR_SEVERITY = "severity"
ATTR_TO_DATE = "to_date"
ATTR_VULNERABILITY = "vulnerability"
ICONS = {
"DR": "mdi:water-off",
"EQ": "mdi:pulse",
"FL": "mdi:home-flood",
"TC": "mdi:weather-hurricane",
"TS": "mdi:waves",
"VO": "mdi:image-filter-hdr",
}
# An update of this entity is not making a web request, but uses internal data only.
PARALLEL_UPDATES = 0
SOURCE = "gdacs"
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the GDACS Feed platform."""
manager = hass.data[DOMAIN][FEED][entry.entry_id]
@callback
def async_add_geolocation(feed_manager, external_id):
"""Add gelocation entity from feed."""
new_entity = GdacsEvent(feed_manager, external_id)
_LOGGER.debug("Adding geolocation %s", new_entity)
async_add_entities([new_entity], True)
manager.listeners.append(
async_dispatcher_connect(
hass, manager.async_event_new_entity(), async_add_geolocation
)
)
# Do not wait for update here so that the setup can be completed and because an
# update will fetch data from the feed via HTTP and then process that data.
hass.async_create_task(manager.async_update())
_LOGGER.debug("Geolocation setup done")
class GdacsEvent(GeolocationEvent):
"""This represents an external event with GDACS feed data."""
def __init__(self, feed_manager, external_id):
"""Initialize entity with data from feed entry."""
self._feed_manager = feed_manager
self._external_id = external_id
self._title = None
self._distance = None
self._latitude = None
self._longitude = None
self._attribution = None
self._alert_level = None
self._country = None
self._description = None
self._duration_in_week = None
self._event_type_short = None
self._event_type = None
self._from_date = None
self._to_date = None
self._population = None
self._severity = None
self._vulnerability = None
self._version = None
self._remove_signal_delete = None
self._remove_signal_update = None
async def async_added_to_hass(self):
"""Call when entity is added to hass."""
self._remove_signal_delete = async_dispatcher_connect(
self.hass,
SIGNAL_DELETE_ENTITY.format(self._external_id),
self._delete_callback,
)
self._remove_signal_update = async_dispatcher_connect(
self.hass,
SIGNAL_UPDATE_ENTITY.format(self._external_id),
self._update_callback,
)
async def async_will_remove_from_hass(self) -> None:
"""Call when entity will be removed from hass."""
self._remove_signal_delete()
self._remove_signal_update()
@callback
def _delete_callback(self):
"""Remove this entity."""
self.hass.async_create_task(self.async_remove())
@callback
def _update_callback(self):
"""Call update method."""
self.async_schedule_update_ha_state(True)
@property
def should_poll(self):
"""No polling needed for GDACS feed location events."""
return False
async def async_update(self):
"""Update this entity from the data held in the feed manager."""
_LOGGER.debug("Updating %s", self._external_id)
feed_entry = self._feed_manager.get_entry(self._external_id)
if feed_entry:
self._update_from_feed(feed_entry)
def _update_from_feed(self, feed_entry):
"""Update the internal state from the provided feed entry."""
event_name = feed_entry.event_name
if not event_name:
# Earthquakes usually don't have an event name.
event_name = f"{feed_entry.country} ({feed_entry.event_id})"
self._title = f"{feed_entry.event_type}: {event_name}"
# Convert distance if not metric system.
if self.hass.config.units.name == CONF_UNIT_SYSTEM_IMPERIAL:
self._distance = IMPERIAL_SYSTEM.length(
feed_entry.distance_to_home, LENGTH_KILOMETERS
)
else:
self._distance = feed_entry.distance_to_home
self._latitude = feed_entry.coordinates[0]
self._longitude = feed_entry.coordinates[1]
self._attribution = feed_entry.attribution
self._alert_level = feed_entry.alert_level
self._country = feed_entry.country
self._description = feed_entry.title
self._duration_in_week = feed_entry.duration_in_week
self._event_type_short = feed_entry.event_type_short
self._event_type = feed_entry.event_type
self._from_date = feed_entry.from_date
self._to_date = feed_entry.to_date
self._population = feed_entry.population
self._severity = feed_entry.severity
self._vulnerability = feed_entry.vulnerability
# Round vulnerability value if presented as float.
if isinstance(self._vulnerability, float):
self._vulnerability = round(self._vulnerability, 1)
self._version = feed_entry.version
@property
def icon(self):
"""Return the icon to use in the frontend, if any."""
if self._event_type_short and self._event_type_short in ICONS:
return ICONS[self._event_type_short]
return DEFAULT_ICON
@property
def source(self) -> str:
"""Return source value of this external event."""
return SOURCE
@property
def name(self) -> Optional[str]:
"""Return the name of the entity."""
return self._title
@property
def distance(self) -> Optional[float]:
"""Return distance value of this external event."""
return self._distance
@property
def latitude(self) -> Optional[float]:
"""Return latitude value of this external event."""
return self._latitude
@property
def longitude(self) -> Optional[float]:
"""Return longitude value of this external event."""
return self._longitude
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
if self.hass.config.units.name == CONF_UNIT_SYSTEM_IMPERIAL:
return LENGTH_MILES
return LENGTH_KILOMETERS
@property
def device_state_attributes(self):
"""Return the device state attributes."""
attributes = {}
for key, value in (
(ATTR_EXTERNAL_ID, self._external_id),
(ATTR_DESCRIPTION, self._description),
(ATTR_ATTRIBUTION, self._attribution),
(ATTR_EVENT_TYPE, self._event_type),
(ATTR_ALERT_LEVEL, self._alert_level),
(ATTR_COUNTRY, self._country),
(ATTR_DURATION_IN_WEEK, self._duration_in_week),
(ATTR_FROM_DATE, self._from_date),
(ATTR_TO_DATE, self._to_date),
(ATTR_POPULATION, self._population),
(ATTR_SEVERITY, self._severity),
(ATTR_VULNERABILITY, self._vulnerability),
):
if value or isinstance(value, bool):
attributes[key] = value
return attributes

View File

@ -0,0 +1,14 @@
{
"domain": "gdacs",
"name": "Global Disaster Alert and Coordination System (GDACS)",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/gdacs",
"requirements": [
"aio_georss_gdacs==0.3"
],
"dependencies": [],
"codeowners": [
"@exxamalte"
],
"quality_scale": "platinum"
}

View File

@ -0,0 +1,140 @@
"""Feed Entity Manager Sensor support for GDACS Feed."""
import logging
from typing import Optional
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity
from homeassistant.util import dt
from .const import DEFAULT_ICON, DOMAIN, FEED, SIGNAL_STATUS
_LOGGER = logging.getLogger(__name__)
ATTR_STATUS = "status"
ATTR_LAST_UPDATE = "last_update"
ATTR_LAST_UPDATE_SUCCESSFUL = "last_update_successful"
ATTR_LAST_TIMESTAMP = "last_timestamp"
ATTR_CREATED = "created"
ATTR_UPDATED = "updated"
ATTR_REMOVED = "removed"
DEFAULT_UNIT_OF_MEASUREMENT = "alerts"
# An update of this entity is not making a web request, but uses internal data only.
PARALLEL_UPDATES = 0
async def async_setup_entry(hass, entry, async_add_entities):
"""Set up the GDACS Feed platform."""
manager = hass.data[DOMAIN][FEED][entry.entry_id]
sensor = GdacsSensor(entry.entry_id, entry.title, manager)
async_add_entities([sensor])
_LOGGER.debug("Sensor setup done")
class GdacsSensor(Entity):
"""This is a status sensor for the GDACS integration."""
def __init__(self, config_entry_id, config_title, manager):
"""Initialize entity."""
self._config_entry_id = config_entry_id
self._config_title = config_title
self._manager = manager
self._status = None
self._last_update = None
self._last_update_successful = None
self._last_timestamp = None
self._total = None
self._created = None
self._updated = None
self._removed = None
self._remove_signal_status = None
async def async_added_to_hass(self):
"""Call when entity is added to hass."""
self._remove_signal_status = async_dispatcher_connect(
self.hass,
SIGNAL_STATUS.format(self._config_entry_id),
self._update_status_callback,
)
_LOGGER.debug("Waiting for updates %s", self._config_entry_id)
# First update is manual because of how the feed entity manager is updated.
await self.async_update()
async def async_will_remove_from_hass(self) -> None:
"""Call when entity will be removed from hass."""
if self._remove_signal_status:
self._remove_signal_status()
@callback
def _update_status_callback(self):
"""Call status update method."""
_LOGGER.debug("Received status update for %s", self._config_entry_id)
self.async_schedule_update_ha_state(True)
@property
def should_poll(self):
"""No polling needed for GDACS status sensor."""
return False
async def async_update(self):
"""Update this entity from the data held in the feed manager."""
_LOGGER.debug("Updating %s", self._config_entry_id)
if self._manager:
status_info = self._manager.status_info()
if status_info:
self._update_from_status_info(status_info)
def _update_from_status_info(self, status_info):
"""Update the internal state from the provided information."""
self._status = status_info.status
self._last_update = (
dt.as_utc(status_info.last_update) if status_info.last_update else None
)
if status_info.last_update_successful:
self._last_update_successful = dt.as_utc(status_info.last_update_successful)
else:
self._last_update_successful = None
self._last_timestamp = status_info.last_timestamp
self._total = status_info.total
self._created = status_info.created
self._updated = status_info.updated
self._removed = status_info.removed
@property
def state(self):
"""Return the state of the sensor."""
return self._total
@property
def name(self) -> Optional[str]:
"""Return the name of the entity."""
return f"GDACS ({self._config_title})"
@property
def icon(self):
"""Return the icon to use in the frontend, if any."""
return DEFAULT_ICON
@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return DEFAULT_UNIT_OF_MEASUREMENT
@property
def device_state_attributes(self):
"""Return the device state attributes."""
attributes = {}
for key, value in (
(ATTR_STATUS, self._status),
(ATTR_LAST_UPDATE, self._last_update),
(ATTR_LAST_UPDATE_SUCCESSFUL, self._last_update_successful),
(ATTR_LAST_TIMESTAMP, self._last_timestamp),
(ATTR_CREATED, self._created),
(ATTR_UPDATED, self._updated),
(ATTR_REMOVED, self._removed),
):
if value or isinstance(value, bool):
attributes[key] = value
return attributes

View File

@ -0,0 +1,16 @@
{
"config": {
"title": "Global Disaster Alert and Coordination System (GDACS)",
"step": {
"user": {
"title": "Fill in your filter details.",
"data": {
"radius": "Radius"
}
}
},
"abort": {
"already_configured": "Location is already configured."
}
}
}

View File

@ -25,6 +25,7 @@ FLOWS = [
"emulated_roku",
"esphome",
"garmin_connect",
"gdacs",
"geofency",
"geonetnz_quakes",
"geonetnz_volcano",

View File

@ -131,6 +131,9 @@ aio_geojson_geonetnz_volcano==0.5
# homeassistant.components.nsw_rural_fire_service_feed
aio_geojson_nsw_rfs_incidents==0.1
# homeassistant.components.gdacs
aio_georss_gdacs==0.3
# homeassistant.components.ambient_station
aioambient==1.0.2

View File

@ -43,6 +43,9 @@ aio_geojson_geonetnz_volcano==0.5
# homeassistant.components.nsw_rural_fire_service_feed
aio_geojson_nsw_rfs_incidents==0.1
# homeassistant.components.gdacs
aio_georss_gdacs==0.3
# homeassistant.components.ambient_station
aioambient==1.0.2

View File

@ -0,0 +1,41 @@
"""Tests for the GDACS component."""
from unittest.mock import MagicMock
def _generate_mock_feed_entry(
external_id,
title,
distance_to_home,
coordinates,
attribution=None,
alert_level=None,
country=None,
duration_in_week=None,
event_name=None,
event_type_short=None,
event_type=None,
from_date=None,
to_date=None,
population=None,
severity=None,
vulnerability=None,
):
"""Construct a mock feed entry for testing purposes."""
feed_entry = MagicMock()
feed_entry.external_id = external_id
feed_entry.title = title
feed_entry.distance_to_home = distance_to_home
feed_entry.coordinates = coordinates
feed_entry.attribution = attribution
feed_entry.alert_level = alert_level
feed_entry.country = country
feed_entry.duration_in_week = duration_in_week
feed_entry.event_name = event_name
feed_entry.event_type_short = event_type_short
feed_entry.event_type = event_type
feed_entry.from_date = from_date
feed_entry.to_date = to_date
feed_entry.population = population
feed_entry.severity = severity
feed_entry.vulnerability = vulnerability
return feed_entry

View File

@ -0,0 +1,31 @@
"""Configuration for GDACS tests."""
import pytest
from homeassistant.components.gdacs import CONF_CATEGORIES, DOMAIN
from homeassistant.const import (
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_RADIUS,
CONF_SCAN_INTERVAL,
CONF_UNIT_SYSTEM,
)
from tests.common import MockConfigEntry
@pytest.fixture
def config_entry():
"""Create a mock GDACS config entry."""
return MockConfigEntry(
domain=DOMAIN,
data={
CONF_LATITUDE: -41.2,
CONF_LONGITUDE: 174.7,
CONF_RADIUS: 25,
CONF_UNIT_SYSTEM: "metric",
CONF_SCAN_INTERVAL: 300.0,
CONF_CATEGORIES: [],
},
title="-41.2, 174.7",
unique_id="-41.2, 174.7",
)

View File

@ -0,0 +1,76 @@
"""Define tests for the GDACS config flow."""
from datetime import timedelta
from homeassistant import data_entry_flow
from homeassistant.components.gdacs import CONF_CATEGORIES, DOMAIN
from homeassistant.const import (
CONF_LATITUDE,
CONF_LONGITUDE,
CONF_RADIUS,
CONF_SCAN_INTERVAL,
)
async def test_duplicate_error(hass, config_entry):
"""Test that errors are shown when duplicates are added."""
conf = {CONF_LATITUDE: -41.2, CONF_LONGITUDE: 174.7, CONF_RADIUS: 25}
config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "user"}, data=conf
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
async def test_show_form(hass):
"""Test that the form is served with no input."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "user"}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "user"
async def test_step_import(hass):
"""Test that the import step works."""
conf = {
CONF_LATITUDE: -41.2,
CONF_LONGITUDE: 174.7,
CONF_RADIUS: 25,
CONF_SCAN_INTERVAL: timedelta(minutes=4),
CONF_CATEGORIES: ["Drought", "Earthquake"],
}
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "import"}, data=conf
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "-41.2, 174.7"
assert result["data"] == {
CONF_LATITUDE: -41.2,
CONF_LONGITUDE: 174.7,
CONF_RADIUS: 25,
CONF_SCAN_INTERVAL: 240.0,
CONF_CATEGORIES: ["Drought", "Earthquake"],
}
async def test_step_user(hass):
"""Test that the user step works."""
hass.config.latitude = -41.2
hass.config.longitude = 174.7
conf = {CONF_RADIUS: 25}
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": "user"}, data=conf
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == "-41.2, 174.7"
assert result["data"] == {
CONF_LATITUDE: -41.2,
CONF_LONGITUDE: 174.7,
CONF_RADIUS: 25,
CONF_SCAN_INTERVAL: 300.0,
CONF_CATEGORIES: [],
}

View File

@ -0,0 +1,242 @@
"""The tests for the GDACS Feed integration."""
import datetime
from asynctest import patch
from homeassistant.components import gdacs
from homeassistant.components.gdacs import DEFAULT_SCAN_INTERVAL, DOMAIN, FEED
from homeassistant.components.gdacs.geo_location import (
ATTR_ALERT_LEVEL,
ATTR_COUNTRY,
ATTR_DESCRIPTION,
ATTR_DURATION_IN_WEEK,
ATTR_EVENT_TYPE,
ATTR_EXTERNAL_ID,
ATTR_FROM_DATE,
ATTR_POPULATION,
ATTR_SEVERITY,
ATTR_TO_DATE,
ATTR_VULNERABILITY,
)
from homeassistant.components.geo_location import ATTR_SOURCE
from homeassistant.const import (
ATTR_ATTRIBUTION,
ATTR_FRIENDLY_NAME,
ATTR_ICON,
ATTR_LATITUDE,
ATTR_LONGITUDE,
ATTR_UNIT_OF_MEASUREMENT,
CONF_RADIUS,
EVENT_HOMEASSISTANT_START,
)
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from homeassistant.util.unit_system import IMPERIAL_SYSTEM
from tests.common import async_fire_time_changed
from tests.components.gdacs import _generate_mock_feed_entry
CONFIG = {gdacs.DOMAIN: {CONF_RADIUS: 200}}
async def test_setup(hass):
"""Test the general setup of the integration."""
# Set up some mock feed entries for this test.
mock_entry_1 = _generate_mock_feed_entry(
"1234",
"Description 1",
15.5,
(38.0, -3.0),
event_name="Name 1",
event_type_short="DR",
event_type="Drought",
alert_level="Alert Level 1",
country="Country 1",
attribution="Attribution 1",
from_date=datetime.datetime(2020, 1, 10, 8, 0, tzinfo=datetime.timezone.utc),
to_date=datetime.datetime(2020, 1, 20, 8, 0, tzinfo=datetime.timezone.utc),
duration_in_week=1,
population="Population 1",
severity="Severity 1",
vulnerability="Vulnerability 1",
)
mock_entry_2 = _generate_mock_feed_entry(
"2345",
"Description 2",
20.5,
(38.1, -3.1),
event_name="Name 2",
event_type_short="TC",
event_type="Tropical Cyclone",
)
mock_entry_3 = _generate_mock_feed_entry(
"3456",
"Description 3",
25.5,
(38.2, -3.2),
event_name="Name 3",
event_type_short="TC",
event_type="Tropical Cyclone",
country="Country 2",
)
mock_entry_4 = _generate_mock_feed_entry(
"4567", "Description 4", 12.5, (38.3, -3.3)
)
# Patching 'utcnow' to gain more control over the timed update.
utcnow = dt_util.utcnow()
with patch("homeassistant.util.dt.utcnow", return_value=utcnow), patch(
"aio_georss_client.feed.GeoRssFeed.update"
) as mock_feed_update:
mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_2, mock_entry_3]
assert await async_setup_component(hass, gdacs.DOMAIN, CONFIG)
# Artificially trigger update and collect events.
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
all_states = hass.states.async_all()
# 3 geolocation and 1 sensor entities
assert len(all_states) == 4
state = hass.states.get("geo_location.drought_name_1")
assert state is not None
assert state.name == "Drought: Name 1"
assert state.attributes == {
ATTR_EXTERNAL_ID: "1234",
ATTR_LATITUDE: 38.0,
ATTR_LONGITUDE: -3.0,
ATTR_FRIENDLY_NAME: "Drought: Name 1",
ATTR_DESCRIPTION: "Description 1",
ATTR_COUNTRY: "Country 1",
ATTR_ATTRIBUTION: "Attribution 1",
ATTR_FROM_DATE: datetime.datetime(
2020, 1, 10, 8, 0, tzinfo=datetime.timezone.utc
),
ATTR_TO_DATE: datetime.datetime(
2020, 1, 20, 8, 0, tzinfo=datetime.timezone.utc
),
ATTR_DURATION_IN_WEEK: 1,
ATTR_ALERT_LEVEL: "Alert Level 1",
ATTR_POPULATION: "Population 1",
ATTR_EVENT_TYPE: "Drought",
ATTR_SEVERITY: "Severity 1",
ATTR_VULNERABILITY: "Vulnerability 1",
ATTR_UNIT_OF_MEASUREMENT: "km",
ATTR_SOURCE: "gdacs",
ATTR_ICON: "mdi:water-off",
}
assert float(state.state) == 15.5
state = hass.states.get("geo_location.tropical_cyclone_name_2")
assert state is not None
assert state.name == "Tropical Cyclone: Name 2"
assert state.attributes == {
ATTR_EXTERNAL_ID: "2345",
ATTR_LATITUDE: 38.1,
ATTR_LONGITUDE: -3.1,
ATTR_FRIENDLY_NAME: "Tropical Cyclone: Name 2",
ATTR_DESCRIPTION: "Description 2",
ATTR_EVENT_TYPE: "Tropical Cyclone",
ATTR_UNIT_OF_MEASUREMENT: "km",
ATTR_SOURCE: "gdacs",
ATTR_ICON: "mdi:weather-hurricane",
}
assert float(state.state) == 20.5
state = hass.states.get("geo_location.tropical_cyclone_name_3")
assert state is not None
assert state.name == "Tropical Cyclone: Name 3"
assert state.attributes == {
ATTR_EXTERNAL_ID: "3456",
ATTR_LATITUDE: 38.2,
ATTR_LONGITUDE: -3.2,
ATTR_FRIENDLY_NAME: "Tropical Cyclone: Name 3",
ATTR_DESCRIPTION: "Description 3",
ATTR_EVENT_TYPE: "Tropical Cyclone",
ATTR_COUNTRY: "Country 2",
ATTR_UNIT_OF_MEASUREMENT: "km",
ATTR_SOURCE: "gdacs",
ATTR_ICON: "mdi:weather-hurricane",
}
assert float(state.state) == 25.5
# Simulate an update - two existing, one new entry, one outdated entry
mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_4, mock_entry_3]
async_fire_time_changed(hass, utcnow + DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 4
# Simulate an update - empty data, but successful update,
# so no changes to entities.
mock_feed_update.return_value = "OK_NO_DATA", None
async_fire_time_changed(hass, utcnow + 2 * DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 4
# Simulate an update - empty data, removes all entities
mock_feed_update.return_value = "ERROR", None
async_fire_time_changed(hass, utcnow + 3 * DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 1
async def test_setup_imperial(hass):
"""Test the setup of the integration using imperial unit system."""
hass.config.units = IMPERIAL_SYSTEM
# Set up some mock feed entries for this test.
mock_entry_1 = _generate_mock_feed_entry(
"1234",
"Description 1",
15.5,
(38.0, -3.0),
event_name="Name 1",
event_type_short="DR",
event_type="Drought",
)
# Patching 'utcnow' to gain more control over the timed update.
utcnow = dt_util.utcnow()
with patch("homeassistant.util.dt.utcnow", return_value=utcnow), patch(
"aio_georss_client.feed.GeoRssFeed.update"
) as mock_feed_update, patch(
"aio_georss_client.feed.GeoRssFeed.last_timestamp", create=True
):
mock_feed_update.return_value = "OK", [mock_entry_1]
assert await async_setup_component(hass, gdacs.DOMAIN, CONFIG)
# Artificially trigger update and collect events.
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 2
# Test conversion of 200 miles to kilometers.
feeds = hass.data[DOMAIN][FEED]
assert feeds is not None
assert len(feeds) == 1
manager = list(feeds.values())[0]
# Ensure that the filter value in km is correctly set.
assert manager._feed_manager._feed._filter_radius == 321.8688
state = hass.states.get("geo_location.drought_name_1")
assert state is not None
assert state.name == "Drought: Name 1"
assert state.attributes == {
ATTR_EXTERNAL_ID: "1234",
ATTR_LATITUDE: 38.0,
ATTR_LONGITUDE: -3.0,
ATTR_FRIENDLY_NAME: "Drought: Name 1",
ATTR_DESCRIPTION: "Description 1",
ATTR_EVENT_TYPE: "Drought",
ATTR_UNIT_OF_MEASUREMENT: "mi",
ATTR_SOURCE: "gdacs",
ATTR_ICON: "mdi:water-off",
}
# 15.5km (as defined in mock entry) has been converted to 9.6mi.
assert float(state.state) == 9.6

View File

@ -0,0 +1,19 @@
"""Define tests for the GDACS general setup."""
from asynctest import patch
from homeassistant.components.gdacs import DOMAIN, FEED
async def test_component_unload_config_entry(hass, config_entry):
"""Test that loading and unloading of a config entry works."""
config_entry.add_to_hass(hass)
with patch("aio_georss_gdacs.GdacsFeedManager.update") as mock_feed_manager_update:
# Load config entry.
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert mock_feed_manager_update.call_count == 1
assert hass.data[DOMAIN][FEED][config_entry.entry_id] is not None
# Unload config entry.
assert await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.data[DOMAIN][FEED].get(config_entry.entry_id) is None

View File

@ -0,0 +1,100 @@
"""The tests for the GDACS Feed integration."""
from asynctest import patch
from homeassistant.components import gdacs
from homeassistant.components.gdacs import DEFAULT_SCAN_INTERVAL
from homeassistant.components.gdacs.sensor import (
ATTR_CREATED,
ATTR_LAST_UPDATE,
ATTR_LAST_UPDATE_SUCCESSFUL,
ATTR_REMOVED,
ATTR_STATUS,
ATTR_UPDATED,
)
from homeassistant.const import (
ATTR_ICON,
ATTR_UNIT_OF_MEASUREMENT,
CONF_RADIUS,
EVENT_HOMEASSISTANT_START,
)
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from tests.common import async_fire_time_changed
from tests.components.gdacs import _generate_mock_feed_entry
CONFIG = {gdacs.DOMAIN: {CONF_RADIUS: 200}}
async def test_setup(hass):
"""Test the general setup of the integration."""
# Set up some mock feed entries for this test.
mock_entry_1 = _generate_mock_feed_entry(
"1234", "Title 1", 15.5, (38.0, -3.0), attribution="Attribution 1",
)
mock_entry_2 = _generate_mock_feed_entry("2345", "Title 2", 20.5, (38.1, -3.1),)
mock_entry_3 = _generate_mock_feed_entry("3456", "Title 3", 25.5, (38.2, -3.2),)
mock_entry_4 = _generate_mock_feed_entry("4567", "Title 4", 12.5, (38.3, -3.3))
# Patching 'utcnow' to gain more control over the timed update.
utcnow = dt_util.utcnow()
with patch("homeassistant.util.dt.utcnow", return_value=utcnow), patch(
"aio_georss_client.feed.GeoRssFeed.update"
) as mock_feed_update:
mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_2, mock_entry_3]
assert await async_setup_component(hass, gdacs.DOMAIN, CONFIG)
# Artificially trigger update and collect events.
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
await hass.async_block_till_done()
all_states = hass.states.async_all()
# 3 geolocation and 1 sensor entities
assert len(all_states) == 4
state = hass.states.get("sensor.gdacs_32_87336_117_22743")
assert state is not None
assert int(state.state) == 3
assert state.name == "GDACS (32.87336, -117.22743)"
attributes = state.attributes
assert attributes[ATTR_STATUS] == "OK"
assert attributes[ATTR_CREATED] == 3
assert attributes[ATTR_LAST_UPDATE].tzinfo == dt_util.UTC
assert attributes[ATTR_LAST_UPDATE_SUCCESSFUL].tzinfo == dt_util.UTC
assert attributes[ATTR_LAST_UPDATE] == attributes[ATTR_LAST_UPDATE_SUCCESSFUL]
assert attributes[ATTR_UNIT_OF_MEASUREMENT] == "alerts"
assert attributes[ATTR_ICON] == "mdi:alert"
# Simulate an update - two existing, one new entry, one outdated entry
mock_feed_update.return_value = "OK", [mock_entry_1, mock_entry_4, mock_entry_3]
async_fire_time_changed(hass, utcnow + DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 4
state = hass.states.get("sensor.gdacs_32_87336_117_22743")
attributes = state.attributes
assert attributes[ATTR_CREATED] == 1
assert attributes[ATTR_UPDATED] == 2
assert attributes[ATTR_REMOVED] == 1
# Simulate an update - empty data, but successful update,
# so no changes to entities.
mock_feed_update.return_value = "OK_NO_DATA", None
async_fire_time_changed(hass, utcnow + 2 * DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 4
# Simulate an update - empty data, removes all entities
mock_feed_update.return_value = "ERROR", None
async_fire_time_changed(hass, utcnow + 3 * DEFAULT_SCAN_INTERVAL)
await hass.async_block_till_done()
all_states = hass.states.async_all()
assert len(all_states) == 1
state = hass.states.get("sensor.gdacs_32_87336_117_22743")
attributes = state.attributes
assert attributes[ATTR_REMOVED] == 3