mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 08:47:57 +00:00
Use storage helper in feedreader (#98754)
This commit is contained in:
parent
a42d975c49
commit
91df9434d0
@ -1,22 +1,23 @@
|
||||
"""Support for RSS/Atom feeds."""
|
||||
from __future__ import annotations
|
||||
|
||||
from calendar import timegm
|
||||
from datetime import datetime, timedelta
|
||||
from logging import getLogger
|
||||
from os.path import exists
|
||||
import os
|
||||
import pickle
|
||||
from threading import Lock
|
||||
from time import struct_time
|
||||
from typing import cast
|
||||
from time import gmtime, struct_time
|
||||
|
||||
import feedparser
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import CONF_SCAN_INTERVAL, EVENT_HOMEASSISTANT_START
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.core import Event, HomeAssistant, callback
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.event import track_time_interval
|
||||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.helpers.storage import Store
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.util.dt import utc_from_timestamp
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
||||
@ -25,10 +26,12 @@ CONF_MAX_ENTRIES = "max_entries"
|
||||
|
||||
DEFAULT_MAX_ENTRIES = 20
|
||||
DEFAULT_SCAN_INTERVAL = timedelta(hours=1)
|
||||
DELAY_SAVE = 30
|
||||
|
||||
DOMAIN = "feedreader"
|
||||
|
||||
EVENT_FEEDREADER = "feedreader"
|
||||
STORAGE_VERSION = 1
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema(
|
||||
{
|
||||
@ -46,17 +49,25 @@ CONFIG_SCHEMA = vol.Schema(
|
||||
)
|
||||
|
||||
|
||||
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the Feedreader component."""
|
||||
urls: list[str] = config[DOMAIN][CONF_URLS]
|
||||
if not urls:
|
||||
return False
|
||||
|
||||
scan_interval: timedelta = config[DOMAIN][CONF_SCAN_INTERVAL]
|
||||
max_entries: int = config[DOMAIN][CONF_MAX_ENTRIES]
|
||||
data_file = hass.config.path(f"{DOMAIN}.pickle")
|
||||
storage = StoredData(data_file)
|
||||
old_data_file = hass.config.path(f"{DOMAIN}.pickle")
|
||||
storage = StoredData(hass, old_data_file)
|
||||
await storage.async_setup()
|
||||
feeds = [
|
||||
FeedManager(url, scan_interval, max_entries, hass, storage) for url in urls
|
||||
FeedManager(hass, url, scan_interval, max_entries, storage) for url in urls
|
||||
]
|
||||
return len(feeds) > 0
|
||||
|
||||
for feed in feeds:
|
||||
feed.async_setup()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class FeedManager:
|
||||
@ -64,50 +75,47 @@ class FeedManager:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
url: str,
|
||||
scan_interval: timedelta,
|
||||
max_entries: int,
|
||||
hass: HomeAssistant,
|
||||
storage: StoredData,
|
||||
) -> None:
|
||||
"""Initialize the FeedManager object, poll as per scan interval."""
|
||||
self._hass = hass
|
||||
self._url = url
|
||||
self._scan_interval = scan_interval
|
||||
self._max_entries = max_entries
|
||||
self._feed: feedparser.FeedParserDict | None = None
|
||||
self._hass = hass
|
||||
self._firstrun = True
|
||||
self._storage = storage
|
||||
self._last_entry_timestamp: struct_time | None = None
|
||||
self._last_update_successful = False
|
||||
self._has_published_parsed = False
|
||||
self._has_updated_parsed = False
|
||||
self._event_type = EVENT_FEEDREADER
|
||||
self._feed_id = url
|
||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, lambda _: self._update())
|
||||
self._init_regular_updates(hass)
|
||||
|
||||
@callback
|
||||
def async_setup(self) -> None:
|
||||
"""Set up the feed manager."""
|
||||
self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, self._async_update)
|
||||
async_track_time_interval(
|
||||
self._hass, self._async_update, self._scan_interval, cancel_on_shutdown=True
|
||||
)
|
||||
|
||||
def _log_no_entries(self) -> None:
|
||||
"""Send no entries log at debug level."""
|
||||
_LOGGER.debug("No new entries to be published in feed %s", self._url)
|
||||
|
||||
def _init_regular_updates(self, hass: HomeAssistant) -> None:
|
||||
"""Schedule regular updates at the top of the clock."""
|
||||
track_time_interval(
|
||||
hass,
|
||||
lambda now: self._update(),
|
||||
self._scan_interval,
|
||||
cancel_on_shutdown=True,
|
||||
)
|
||||
|
||||
@property
|
||||
def last_update_successful(self) -> bool:
|
||||
"""Return True if the last feed update was successful."""
|
||||
return self._last_update_successful
|
||||
|
||||
def _update(self) -> None:
|
||||
async def _async_update(self, _: datetime | Event) -> None:
|
||||
"""Update the feed and publish new entries to the event bus."""
|
||||
_LOGGER.info("Fetching new data from feed %s", self._url)
|
||||
last_entry_timestamp = await self._hass.async_add_executor_job(self._update)
|
||||
if last_entry_timestamp:
|
||||
self._storage.async_put_timestamp(self._feed_id, last_entry_timestamp)
|
||||
|
||||
def _update(self) -> struct_time | None:
|
||||
"""Update the feed and publish new entries to the event bus."""
|
||||
_LOGGER.debug("Fetching new data from feed %s", self._url)
|
||||
self._feed: feedparser.FeedParserDict = feedparser.parse( # type: ignore[no-redef]
|
||||
self._url,
|
||||
etag=None if not self._feed else self._feed.get("etag"),
|
||||
@ -115,38 +123,41 @@ class FeedManager:
|
||||
)
|
||||
if not self._feed:
|
||||
_LOGGER.error("Error fetching feed data from %s", self._url)
|
||||
self._last_update_successful = False
|
||||
else:
|
||||
# The 'bozo' flag really only indicates that there was an issue
|
||||
# during the initial parsing of the XML, but it doesn't indicate
|
||||
# whether this is an unrecoverable error. In this case the
|
||||
# feedparser lib is trying a less strict parsing approach.
|
||||
# If an error is detected here, log warning message but continue
|
||||
# processing the feed entries if present.
|
||||
if self._feed.bozo != 0:
|
||||
_LOGGER.warning(
|
||||
"Possible issue parsing feed %s: %s",
|
||||
self._url,
|
||||
self._feed.bozo_exception,
|
||||
)
|
||||
# Using etag and modified, if there's no new data available,
|
||||
# the entries list will be empty
|
||||
if self._feed.entries:
|
||||
_LOGGER.debug(
|
||||
"%s entri(es) available in feed %s",
|
||||
len(self._feed.entries),
|
||||
self._url,
|
||||
)
|
||||
self._filter_entries()
|
||||
self._publish_new_entries()
|
||||
if self._has_published_parsed or self._has_updated_parsed:
|
||||
self._storage.put_timestamp(
|
||||
self._feed_id, cast(struct_time, self._last_entry_timestamp)
|
||||
)
|
||||
else:
|
||||
self._log_no_entries()
|
||||
self._last_update_successful = True
|
||||
_LOGGER.info("Fetch from feed %s completed", self._url)
|
||||
return None
|
||||
# The 'bozo' flag really only indicates that there was an issue
|
||||
# during the initial parsing of the XML, but it doesn't indicate
|
||||
# whether this is an unrecoverable error. In this case the
|
||||
# feedparser lib is trying a less strict parsing approach.
|
||||
# If an error is detected here, log warning message but continue
|
||||
# processing the feed entries if present.
|
||||
if self._feed.bozo != 0:
|
||||
_LOGGER.warning(
|
||||
"Possible issue parsing feed %s: %s",
|
||||
self._url,
|
||||
self._feed.bozo_exception,
|
||||
)
|
||||
# Using etag and modified, if there's no new data available,
|
||||
# the entries list will be empty
|
||||
_LOGGER.debug(
|
||||
"%s entri(es) available in feed %s",
|
||||
len(self._feed.entries),
|
||||
self._url,
|
||||
)
|
||||
if not self._feed.entries:
|
||||
self._log_no_entries()
|
||||
return None
|
||||
|
||||
self._filter_entries()
|
||||
self._publish_new_entries()
|
||||
|
||||
_LOGGER.debug("Fetch from feed %s completed", self._url)
|
||||
|
||||
if (
|
||||
self._has_published_parsed or self._has_updated_parsed
|
||||
) and self._last_entry_timestamp:
|
||||
return self._last_entry_timestamp
|
||||
|
||||
return None
|
||||
|
||||
def _filter_entries(self) -> None:
|
||||
"""Filter the entries provided and return the ones to keep."""
|
||||
@ -219,47 +230,62 @@ class FeedManager:
|
||||
|
||||
|
||||
class StoredData:
|
||||
"""Abstraction over pickle data storage."""
|
||||
"""Represent a data storage."""
|
||||
|
||||
def __init__(self, data_file: str) -> None:
|
||||
"""Initialize pickle data storage."""
|
||||
self._data_file = data_file
|
||||
self._lock = Lock()
|
||||
self._cache_outdated = True
|
||||
def __init__(self, hass: HomeAssistant, legacy_data_file: str) -> None:
|
||||
"""Initialize data storage."""
|
||||
self._legacy_data_file = legacy_data_file
|
||||
self._data: dict[str, struct_time] = {}
|
||||
self._fetch_data()
|
||||
self._hass = hass
|
||||
self._store: Store[dict[str, str]] = Store(hass, STORAGE_VERSION, DOMAIN)
|
||||
|
||||
def _fetch_data(self) -> None:
|
||||
"""Fetch data stored into pickle file."""
|
||||
if self._cache_outdated and exists(self._data_file):
|
||||
try:
|
||||
_LOGGER.debug("Fetching data from file %s", self._data_file)
|
||||
with self._lock, open(self._data_file, "rb") as myfile:
|
||||
self._data = pickle.load(myfile) or {}
|
||||
self._cache_outdated = False
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.error(
|
||||
"Error loading data from pickled file %s", self._data_file
|
||||
)
|
||||
async def async_setup(self) -> None:
|
||||
"""Set up storage."""
|
||||
if not os.path.exists(self._store.path):
|
||||
# Remove the legacy store loading after deprecation period.
|
||||
data = await self._hass.async_add_executor_job(self._legacy_fetch_data)
|
||||
else:
|
||||
if (store_data := await self._store.async_load()) is None:
|
||||
return
|
||||
# Make sure that dst is set to 0, by using gmtime() on the timestamp.
|
||||
data = {
|
||||
feed_id: gmtime(datetime.fromisoformat(timestamp_string).timestamp())
|
||||
for feed_id, timestamp_string in store_data.items()
|
||||
}
|
||||
|
||||
self._data = data
|
||||
|
||||
def _legacy_fetch_data(self) -> dict[str, struct_time]:
|
||||
"""Fetch data stored in pickle file."""
|
||||
_LOGGER.debug("Fetching data from legacy file %s", self._legacy_data_file)
|
||||
try:
|
||||
with open(self._legacy_data_file, "rb") as myfile:
|
||||
return pickle.load(myfile) or {}
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
except (OSError, pickle.PickleError) as err:
|
||||
_LOGGER.error(
|
||||
"Error loading data from pickled file %s: %s",
|
||||
self._legacy_data_file,
|
||||
err,
|
||||
)
|
||||
|
||||
return {}
|
||||
|
||||
def get_timestamp(self, feed_id: str) -> struct_time | None:
|
||||
"""Return stored timestamp for given feed id (usually the url)."""
|
||||
self._fetch_data()
|
||||
"""Return stored timestamp for given feed id."""
|
||||
return self._data.get(feed_id)
|
||||
|
||||
def put_timestamp(self, feed_id: str, timestamp: struct_time) -> None:
|
||||
"""Update timestamp for given feed id (usually the url)."""
|
||||
self._fetch_data()
|
||||
with self._lock, open(self._data_file, "wb") as myfile:
|
||||
self._data.update({feed_id: timestamp})
|
||||
_LOGGER.debug(
|
||||
"Overwriting feed %s timestamp in storage file %s: %s",
|
||||
feed_id,
|
||||
self._data_file,
|
||||
timestamp,
|
||||
)
|
||||
try:
|
||||
pickle.dump(self._data, myfile)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
_LOGGER.error("Error saving pickled data to %s", self._data_file)
|
||||
self._cache_outdated = True
|
||||
@callback
|
||||
def async_put_timestamp(self, feed_id: str, timestamp: struct_time) -> None:
|
||||
"""Update timestamp for given feed id."""
|
||||
self._data[feed_id] = timestamp
|
||||
self._store.async_delay_save(self._async_save_data, DELAY_SAVE)
|
||||
|
||||
@callback
|
||||
def _async_save_data(self) -> dict[str, str]:
|
||||
"""Save feed data to storage."""
|
||||
return {
|
||||
feed_id: utc_from_timestamp(timegm(struct_utc)).isoformat()
|
||||
for feed_id, struct_utc in self._data.items()
|
||||
}
|
||||
|
@ -1,7 +1,11 @@
|
||||
"""The tests for the feedreader component."""
|
||||
from datetime import timedelta
|
||||
from collections.abc import Generator
|
||||
from datetime import datetime, timedelta
|
||||
import pickle
|
||||
from time import gmtime
|
||||
from typing import Any
|
||||
from unittest import mock
|
||||
from unittest.mock import mock_open, patch
|
||||
from unittest.mock import MagicMock, mock_open, patch
|
||||
|
||||
import pytest
|
||||
|
||||
@ -13,7 +17,7 @@ from homeassistant.components.feedreader import (
|
||||
EVENT_FEEDREADER,
|
||||
)
|
||||
from homeassistant.const import CONF_SCAN_INTERVAL, EVENT_HOMEASSISTANT_START
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.core import Event, HomeAssistant
|
||||
from homeassistant.setup import async_setup_component
|
||||
import homeassistant.util.dt as dt_util
|
||||
|
||||
@ -27,7 +31,7 @@ VALID_CONFIG_4 = {feedreader.DOMAIN: {CONF_URLS: [URL], CONF_MAX_ENTRIES: 5}}
|
||||
VALID_CONFIG_5 = {feedreader.DOMAIN: {CONF_URLS: [URL], CONF_MAX_ENTRIES: 1}}
|
||||
|
||||
|
||||
def load_fixture_bytes(src):
|
||||
def load_fixture_bytes(src: str) -> bytes:
|
||||
"""Return byte stream of fixture."""
|
||||
feed_data = load_fixture(src)
|
||||
raw = bytes(feed_data, "utf-8")
|
||||
@ -35,72 +39,198 @@ def load_fixture_bytes(src):
|
||||
|
||||
|
||||
@pytest.fixture(name="feed_one_event")
|
||||
def fixture_feed_one_event(hass):
|
||||
def fixture_feed_one_event(hass: HomeAssistant) -> bytes:
|
||||
"""Load test feed data for one event."""
|
||||
return load_fixture_bytes("feedreader.xml")
|
||||
|
||||
|
||||
@pytest.fixture(name="feed_two_event")
|
||||
def fixture_feed_two_events(hass):
|
||||
def fixture_feed_two_events(hass: HomeAssistant) -> bytes:
|
||||
"""Load test feed data for two event."""
|
||||
return load_fixture_bytes("feedreader1.xml")
|
||||
|
||||
|
||||
@pytest.fixture(name="feed_21_events")
|
||||
def fixture_feed_21_events(hass):
|
||||
def fixture_feed_21_events(hass: HomeAssistant) -> bytes:
|
||||
"""Load test feed data for twenty one events."""
|
||||
return load_fixture_bytes("feedreader2.xml")
|
||||
|
||||
|
||||
@pytest.fixture(name="feed_three_events")
|
||||
def fixture_feed_three_events(hass):
|
||||
def fixture_feed_three_events(hass: HomeAssistant) -> bytes:
|
||||
"""Load test feed data for three events."""
|
||||
return load_fixture_bytes("feedreader3.xml")
|
||||
|
||||
|
||||
@pytest.fixture(name="feed_atom_event")
|
||||
def fixture_feed_atom_event(hass):
|
||||
def fixture_feed_atom_event(hass: HomeAssistant) -> bytes:
|
||||
"""Load test feed data for atom event."""
|
||||
return load_fixture_bytes("feedreader5.xml")
|
||||
|
||||
|
||||
@pytest.fixture(name="events")
|
||||
async def fixture_events(hass):
|
||||
async def fixture_events(hass: HomeAssistant) -> list[Event]:
|
||||
"""Fixture that catches alexa events."""
|
||||
return async_capture_events(hass, EVENT_FEEDREADER)
|
||||
|
||||
|
||||
@pytest.fixture(name="feed_storage", autouse=True)
|
||||
def fixture_feed_storage():
|
||||
@pytest.fixture(name="storage")
|
||||
def fixture_storage(request: pytest.FixtureRequest) -> Generator[None, None, None]:
|
||||
"""Set up the test storage environment."""
|
||||
if request.param == "legacy_storage":
|
||||
with patch("os.path.exists", return_value=False):
|
||||
yield
|
||||
elif request.param == "json_storage":
|
||||
with patch("os.path.exists", return_value=True):
|
||||
yield
|
||||
else:
|
||||
raise RuntimeError("Invalid storage fixture")
|
||||
|
||||
|
||||
@pytest.fixture(name="legacy_storage_open")
|
||||
def fixture_legacy_storage_open() -> Generator[MagicMock, None, None]:
|
||||
"""Mock builtins.open for feedreader storage."""
|
||||
with patch("homeassistant.components.feedreader.open", mock_open(), create=True):
|
||||
yield
|
||||
|
||||
|
||||
async def test_setup_one_feed(hass: HomeAssistant) -> None:
|
||||
"""Test the general setup of this component."""
|
||||
with patch(
|
||||
"homeassistant.components.feedreader.track_time_interval"
|
||||
"homeassistant.components.feedreader.open",
|
||||
mock_open(),
|
||||
create=True,
|
||||
) as open_mock:
|
||||
yield open_mock
|
||||
|
||||
|
||||
@pytest.fixture(name="legacy_storage_load", autouse=True)
|
||||
def fixture_legacy_storage_load(
|
||||
legacy_storage_open,
|
||||
) -> Generator[MagicMock, None, None]:
|
||||
"""Mock builtins.open for feedreader storage."""
|
||||
with patch(
|
||||
"homeassistant.components.feedreader.pickle.load", return_value={}
|
||||
) as pickle_load:
|
||||
yield pickle_load
|
||||
|
||||
|
||||
async def test_setup_no_feeds(hass: HomeAssistant) -> None:
|
||||
"""Test config with no urls."""
|
||||
assert not await async_setup_component(
|
||||
hass, feedreader.DOMAIN, {feedreader.DOMAIN: {CONF_URLS: []}}
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("open_error", "load_error"),
|
||||
[
|
||||
(FileNotFoundError("No file"), None),
|
||||
(OSError("Boom"), None),
|
||||
(None, pickle.PickleError("Bad data")),
|
||||
],
|
||||
)
|
||||
async def test_legacy_storage_error(
|
||||
hass: HomeAssistant,
|
||||
legacy_storage_open: MagicMock,
|
||||
legacy_storage_load: MagicMock,
|
||||
open_error: Exception | None,
|
||||
load_error: Exception | None,
|
||||
) -> None:
|
||||
"""Test legacy storage error."""
|
||||
legacy_storage_open.side_effect = open_error
|
||||
legacy_storage_load.side_effect = load_error
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.feedreader.async_track_time_interval"
|
||||
) as track_method:
|
||||
assert await async_setup_component(hass, feedreader.DOMAIN, VALID_CONFIG_1)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
track_method.assert_called_once_with(
|
||||
hass, mock.ANY, DEFAULT_SCAN_INTERVAL, cancel_on_shutdown=True
|
||||
)
|
||||
track_method.assert_called_once_with(
|
||||
hass, mock.ANY, DEFAULT_SCAN_INTERVAL, cancel_on_shutdown=True
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("storage", ["legacy_storage", "json_storage"], indirect=True)
|
||||
async def test_storage_data_loading(
|
||||
hass: HomeAssistant,
|
||||
events: list[Event],
|
||||
feed_one_event: bytes,
|
||||
legacy_storage_load: MagicMock,
|
||||
hass_storage: dict[str, Any],
|
||||
storage: None,
|
||||
) -> None:
|
||||
"""Test loading existing storage data."""
|
||||
storage_data: dict[str, str] = {URL: "2018-04-30T05:10:00+00:00"}
|
||||
hass_storage[feedreader.DOMAIN] = {
|
||||
"version": 1,
|
||||
"minor_version": 1,
|
||||
"key": feedreader.DOMAIN,
|
||||
"data": storage_data,
|
||||
}
|
||||
legacy_storage_data = {
|
||||
URL: gmtime(datetime.fromisoformat(storage_data[URL]).timestamp())
|
||||
}
|
||||
legacy_storage_load.return_value = legacy_storage_data
|
||||
|
||||
with patch(
|
||||
"feedparser.http.get",
|
||||
return_value=feed_one_event,
|
||||
):
|
||||
assert await async_setup_component(hass, feedreader.DOMAIN, VALID_CONFIG_2)
|
||||
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# no new events
|
||||
assert not events
|
||||
|
||||
|
||||
async def test_storage_data_writing(
|
||||
hass: HomeAssistant,
|
||||
events: list[Event],
|
||||
feed_one_event: bytes,
|
||||
hass_storage: dict[str, Any],
|
||||
) -> None:
|
||||
"""Test writing to storage."""
|
||||
storage_data: dict[str, str] = {URL: "2018-04-30T05:10:00+00:00"}
|
||||
|
||||
with patch(
|
||||
"feedparser.http.get",
|
||||
return_value=feed_one_event,
|
||||
), patch("homeassistant.components.feedreader.DELAY_SAVE", new=0):
|
||||
assert await async_setup_component(hass, feedreader.DOMAIN, VALID_CONFIG_2)
|
||||
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_START)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# one new event
|
||||
assert len(events) == 1
|
||||
|
||||
# storage data updated
|
||||
assert hass_storage[feedreader.DOMAIN]["data"] == storage_data
|
||||
|
||||
|
||||
@pytest.mark.parametrize("storage", ["legacy_storage", "json_storage"], indirect=True)
|
||||
async def test_setup_one_feed(hass: HomeAssistant, storage: None) -> None:
|
||||
"""Test the general setup of this component."""
|
||||
with patch(
|
||||
"homeassistant.components.feedreader.async_track_time_interval"
|
||||
) as track_method:
|
||||
assert await async_setup_component(hass, feedreader.DOMAIN, VALID_CONFIG_1)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
track_method.assert_called_once_with(
|
||||
hass, mock.ANY, DEFAULT_SCAN_INTERVAL, cancel_on_shutdown=True
|
||||
)
|
||||
|
||||
|
||||
async def test_setup_scan_interval(hass: HomeAssistant) -> None:
|
||||
"""Test the setup of this component with scan interval."""
|
||||
with patch(
|
||||
"homeassistant.components.feedreader.track_time_interval"
|
||||
"homeassistant.components.feedreader.async_track_time_interval"
|
||||
) as track_method:
|
||||
assert await async_setup_component(hass, feedreader.DOMAIN, VALID_CONFIG_2)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
track_method.assert_called_once_with(
|
||||
hass, mock.ANY, timedelta(seconds=60), cancel_on_shutdown=True
|
||||
)
|
||||
track_method.assert_called_once_with(
|
||||
hass, mock.ANY, timedelta(seconds=60), cancel_on_shutdown=True
|
||||
)
|
||||
|
||||
|
||||
async def test_setup_max_entries(hass: HomeAssistant) -> None:
|
||||
|
Loading…
x
Reference in New Issue
Block a user