Rewrite recorder unittest tests to pytest style test function (#41264)

This commit is contained in:
Ariana Hlavaty 2020-10-06 20:24:13 +01:00 committed by GitHub
parent d35e33610a
commit 1e9e40bf71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 346 additions and 405 deletions

View File

@ -0,0 +1,24 @@
"""Common test tools."""
import pytest
from homeassistant.components.recorder.const import DATA_INSTANCE
from tests.common import get_test_home_assistant, init_recorder_component
@pytest.fixture
def hass_recorder():
"""Home Assistant fixture with in-memory recorder."""
hass = get_test_home_assistant()
def setup_recorder(config=None):
"""Set up with params."""
init_recorder_component(hass, config)
hass.start()
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
return hass
yield setup_recorder
hass.stop()

View File

@ -1,9 +1,6 @@
"""The tests for the Recorder component."""
# pylint: disable=protected-access
from datetime import datetime, timedelta
import unittest
import pytest
from homeassistant.components.recorder import (
CONFIG_SCHEMA,
@ -24,99 +21,69 @@ from homeassistant.util import dt as dt_util
from .common import wait_recording_done
from tests.async_mock import patch
from tests.common import (
async_fire_time_changed,
get_test_home_assistant,
init_recorder_component,
)
from tests.common import async_fire_time_changed, get_test_home_assistant
class TestRecorder(unittest.TestCase):
"""Test the recorder module."""
def test_saving_state(hass, hass_recorder):
"""Test saving and restoring a state."""
hass = hass_recorder()
def setUp(self): # pylint: disable=invalid-name
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
init_recorder_component(self.hass)
self.hass.start()
self.addCleanup(self.tear_down_cleanup)
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
def tear_down_cleanup(self):
"""Stop everything that was started."""
self.hass.stop()
hass.states.set(entity_id, state, attributes)
def test_saving_state(self):
"""Test saving and restoring a state."""
entity_id = "test.recorder"
state = "restoring_from_db"
attributes = {"test_attr": 5, "test_attr_10": "nice"}
wait_recording_done(hass)
self.hass.states.set(entity_id, state, attributes)
with session_scope(hass=hass) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id > 0
state = db_states[0].to_native()
wait_recording_done(self.hass)
with session_scope(hass=self.hass) as session:
db_states = list(session.query(States))
assert len(db_states) == 1
assert db_states[0].event_id > 0
state = db_states[0].to_native()
assert state == _state_empty_context(self.hass, entity_id)
def test_saving_event(self):
"""Test saving and restoring an event."""
event_type = "EVENT_TEST"
event_data = {"test_attr": 5, "test_attr_10": "nice"}
events = []
@callback
def event_listener(event):
"""Record events from eventbus."""
if event.event_type == event_type:
events.append(event)
self.hass.bus.listen(MATCH_ALL, event_listener)
self.hass.bus.fire(event_type, event_data)
wait_recording_done(self.hass)
assert len(events) == 1
event = events[0]
self.hass.data[DATA_INSTANCE].block_till_done()
with session_scope(hass=self.hass) as session:
db_events = list(session.query(Events).filter_by(event_type=event_type))
assert len(db_events) == 1
db_event = db_events[0].to_native()
assert event.event_type == db_event.event_type
assert event.data == db_event.data
assert event.origin == db_event.origin
# Recorder uses SQLite and stores datetimes as integer unix timestamps
assert event.time_fired.replace(microsecond=0) == db_event.time_fired.replace(
microsecond=0
)
assert state == _state_empty_context(hass, entity_id)
@pytest.fixture
def hass_recorder():
"""Home Assistant fixture with in-memory recorder."""
hass = get_test_home_assistant()
def test_saving_event(hass, hass_recorder):
"""Test saving and restoring an event."""
hass = hass_recorder()
def setup_recorder(config=None):
"""Set up with params."""
init_recorder_component(hass, config)
hass.start()
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
return hass
event_type = "EVENT_TEST"
event_data = {"test_attr": 5, "test_attr_10": "nice"}
yield setup_recorder
hass.stop()
events = []
@callback
def event_listener(event):
"""Record events from eventbus."""
if event.event_type == event_type:
events.append(event)
hass.bus.listen(MATCH_ALL, event_listener)
hass.bus.fire(event_type, event_data)
wait_recording_done(hass)
assert len(events) == 1
event = events[0]
hass.data[DATA_INSTANCE].block_till_done()
with session_scope(hass=hass) as session:
db_events = list(session.query(Events).filter_by(event_type=event_type))
assert len(db_events) == 1
db_event = db_events[0].to_native()
assert event.event_type == db_event.event_type
assert event.data == db_event.data
assert event.origin == db_event.origin
# Recorder uses SQLite and stores datetimes as integer unix timestamps
assert event.time_fired.replace(microsecond=0) == db_event.time_fired.replace(
microsecond=0
)
def _add_entities(hass, entity_ids):

View File

@ -1,6 +1,5 @@
"""The tests for the Recorder component."""
from datetime import datetime
import unittest
import pytest
import pytz
@ -21,150 +20,112 @@ from homeassistant.exceptions import InvalidEntityFormatError
from homeassistant.util import dt
import homeassistant.util.dt as dt_util
ENGINE = None
SESSION = None
def test_from_event_to_db_event():
"""Test converting event to db event."""
event = ha.Event("test_event", {"some_data": 15})
assert event == Events.from_event(event).to_native()
def setUpModule(): # pylint: disable=invalid-name
"""Set up a database to use."""
global ENGINE
global SESSION
ENGINE = create_engine("sqlite://")
Base.metadata.create_all(ENGINE)
session_factory = sessionmaker(bind=ENGINE)
SESSION = scoped_session(session_factory)
def test_from_event_to_db_state():
"""Test converting event to db state."""
state = ha.State("sensor.temperature", "18")
event = ha.Event(
EVENT_STATE_CHANGED,
{"entity_id": "sensor.temperature", "old_state": None, "new_state": state},
context=state.context,
)
# We don't restore context unless we need it by joining the
# events table on the event_id for state_changed events
state.context = ha.Context(id=None)
assert state == States.from_event(event).to_native()
def tearDownModule(): # pylint: disable=invalid-name
"""Close database."""
global ENGINE
global SESSION
def test_from_event_to_delete_state():
"""Test converting deleting state event to db state."""
event = ha.Event(
EVENT_STATE_CHANGED,
{
"entity_id": "sensor.temperature",
"old_state": ha.State("sensor.temperature", "18"),
"new_state": None,
},
)
db_state = States.from_event(event)
ENGINE.dispose()
ENGINE = None
SESSION = None
assert db_state.entity_id == "sensor.temperature"
assert db_state.domain == "sensor"
assert db_state.state == ""
assert db_state.last_changed == event.time_fired
assert db_state.last_updated == event.time_fired
class TestEvents(unittest.TestCase):
"""Test Events model."""
def test_entity_ids():
"""Test if entity ids helper method works."""
engine = create_engine("sqlite://")
Base.metadata.create_all(engine)
session_factory = sessionmaker(bind=engine)
# pylint: disable=no-self-use
def test_from_event(self):
"""Test converting event to db event."""
event = ha.Event("test_event", {"some_data": 15})
assert event == Events.from_event(event).to_native()
session = scoped_session(session_factory)
session.query(Events).delete()
session.query(States).delete()
session.query(RecorderRuns).delete()
run = RecorderRuns(
start=datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC),
end=datetime(2016, 7, 9, 23, 0, 0, tzinfo=dt.UTC),
closed_incorrect=False,
created=datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC),
)
class TestStates(unittest.TestCase):
"""Test States model."""
session.add(run)
session.commit()
# pylint: disable=no-self-use
before_run = datetime(2016, 7, 9, 8, 0, 0, tzinfo=dt.UTC)
in_run = datetime(2016, 7, 9, 13, 0, 0, tzinfo=dt.UTC)
in_run2 = datetime(2016, 7, 9, 15, 0, 0, tzinfo=dt.UTC)
in_run3 = datetime(2016, 7, 9, 18, 0, 0, tzinfo=dt.UTC)
after_run = datetime(2016, 7, 9, 23, 30, 0, tzinfo=dt.UTC)
def test_from_event(self):
"""Test converting event to db state."""
state = ha.State("sensor.temperature", "18")
event = ha.Event(
EVENT_STATE_CHANGED,
{"entity_id": "sensor.temperature", "old_state": None, "new_state": state},
context=state.context,
assert run.to_native() == run
assert run.entity_ids() == []
session.add(
States(
entity_id="sensor.temperature",
state="20",
last_changed=before_run,
last_updated=before_run,
)
# We don't restore context unless we need it by joining the
# events table on the event_id for state_changed events
state.context = ha.Context(id=None)
assert state == States.from_event(event).to_native()
def test_from_event_to_delete_state(self):
"""Test converting deleting state event to db state."""
event = ha.Event(
EVENT_STATE_CHANGED,
{
"entity_id": "sensor.temperature",
"old_state": ha.State("sensor.temperature", "18"),
"new_state": None,
},
)
session.add(
States(
entity_id="sensor.sound",
state="10",
last_changed=after_run,
last_updated=after_run,
)
db_state = States.from_event(event)
)
assert db_state.entity_id == "sensor.temperature"
assert db_state.domain == "sensor"
assert db_state.state == ""
assert db_state.last_changed == event.time_fired
assert db_state.last_updated == event.time_fired
class TestRecorderRuns(unittest.TestCase):
"""Test recorder run model."""
def setUp(self): # pylint: disable=invalid-name
"""Set up recorder runs."""
self.session = session = SESSION()
session.query(Events).delete()
session.query(States).delete()
session.query(RecorderRuns).delete()
self.addCleanup(self.tear_down_cleanup)
def tear_down_cleanup(self):
"""Clean up."""
self.session.rollback()
def test_entity_ids(self):
"""Test if entity ids helper method works."""
run = RecorderRuns(
start=datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC),
end=datetime(2016, 7, 9, 23, 0, 0, tzinfo=dt.UTC),
closed_incorrect=False,
created=datetime(2016, 7, 9, 11, 0, 0, tzinfo=dt.UTC),
session.add(
States(
entity_id="sensor.humidity",
state="76",
last_changed=in_run,
last_updated=in_run,
)
self.session.add(run)
self.session.commit()
before_run = datetime(2016, 7, 9, 8, 0, 0, tzinfo=dt.UTC)
in_run = datetime(2016, 7, 9, 13, 0, 0, tzinfo=dt.UTC)
in_run2 = datetime(2016, 7, 9, 15, 0, 0, tzinfo=dt.UTC)
in_run3 = datetime(2016, 7, 9, 18, 0, 0, tzinfo=dt.UTC)
after_run = datetime(2016, 7, 9, 23, 30, 0, tzinfo=dt.UTC)
assert run.to_native() == run
assert run.entity_ids() == []
self.session.add(
States(
entity_id="sensor.temperature",
state="20",
last_changed=before_run,
last_updated=before_run,
)
)
self.session.add(
States(
entity_id="sensor.sound",
state="10",
last_changed=after_run,
last_updated=after_run,
)
)
session.add(
States(
entity_id="sensor.lux",
state="5",
last_changed=in_run3,
last_updated=in_run3,
)
)
self.session.add(
States(
entity_id="sensor.humidity",
state="76",
last_changed=in_run,
last_updated=in_run,
)
)
self.session.add(
States(
entity_id="sensor.lux",
state="5",
last_changed=in_run3,
last_updated=in_run3,
)
)
assert sorted(run.entity_ids()) == ["sensor.humidity", "sensor.lux"]
assert run.entity_ids(in_run2) == ["sensor.humidity"]
assert sorted(run.entity_ids()) == ["sensor.humidity", "sensor.lux"]
assert run.entity_ids(in_run2) == ["sensor.humidity"]
def test_states_from_native_invalid_entity_id():

View File

@ -1,7 +1,6 @@
"""Test data purging."""
from datetime import datetime, timedelta
import json
import unittest
from homeassistant.components import recorder
from homeassistant.components.recorder.const import DATA_INSTANCE
@ -13,226 +12,216 @@ from homeassistant.util import dt as dt_util
from .common import wait_recording_done
from tests.async_mock import patch
from tests.common import get_test_home_assistant, init_recorder_component
class TestRecorderPurge(unittest.TestCase):
"""Base class for common recorder tests."""
def test_purge_old_states(hass, hass_recorder):
"""Test deleting old states."""
hass = hass_recorder()
_add_test_states(hass)
def setUp(self): # pylint: disable=invalid-name
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
init_recorder_component(self.hass)
self.hass.start()
self.addCleanup(self.tear_down_cleanup)
# make sure we start with 6 states
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 6
def tear_down_cleanup(self):
"""Stop everything that was started."""
self.hass.stop()
# run purge_old_data()
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
assert not finished
assert states.count() == 4
def _add_test_states(self):
"""Add multiple states to the db for testing."""
now = datetime.now()
five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
attributes = {"test_attr": 5, "test_attr_10": "nice"}
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
assert not finished
assert states.count() == 2
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
assert finished
assert states.count() == 2
with recorder.session_scope(hass=self.hass) as session:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
state = "autopurgeme"
elif event_id < 4:
timestamp = five_days_ago
state = "purgeme"
else:
timestamp = now
state = "dontpurgeme"
session.add(
States(
entity_id="test.recorder2",
domain="sensor",
state=state,
attributes=json.dumps(attributes),
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
event_id=event_id + 1000,
)
)
def test_purge_old_events(hass, hass_recorder):
"""Test deleting old events."""
hass = hass_recorder()
_add_test_events(hass)
def _add_test_events(self):
"""Add a few events for testing."""
now = datetime.now()
five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
event_data = {"test_attr": 5, "test_attr_10": "nice"}
with session_scope(hass=hass) as session:
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == 6
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
# run purge_old_data()
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
assert not finished
assert events.count() == 4
with recorder.session_scope(hass=self.hass) as session:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = "EVENT_TEST_AUTOPURGE"
elif event_id < 4:
timestamp = five_days_ago
event_type = "EVENT_TEST_PURGE"
else:
timestamp = now
event_type = "EVENT_TEST"
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
assert not finished
assert events.count() == 2
session.add(
Events(
event_type=event_type,
event_data=json.dumps(event_data),
origin="LOCAL",
created=timestamp,
time_fired=timestamp,
)
)
# we should only have 2 events left
finished = purge_old_data(hass.data[DATA_INSTANCE], 4, repack=False)
assert finished
assert events.count() == 2
def _add_test_recorder_runs(self):
"""Add a few recorder_runs for testing."""
now = datetime.now()
five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
def test_purge_method(hass, hass_recorder):
"""Test purge method."""
hass = hass_recorder()
service_data = {"keep_days": 4}
_add_test_events(hass)
_add_test_states(hass)
_add_test_recorder_runs(hass)
with recorder.session_scope(hass=self.hass) as session:
for rec_id in range(6):
if rec_id < 2:
timestamp = eleven_days_ago
elif rec_id < 4:
timestamp = five_days_ago
else:
timestamp = now
# make sure we start with 6 states
with session_scope(hass=hass) as session:
states = session.query(States)
assert states.count() == 6
session.add(
RecorderRuns(
start=timestamp,
created=dt_util.utcnow(),
end=timestamp + timedelta(days=1),
)
)
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == 6
def test_purge_old_states(self):
"""Test deleting old states."""
self._add_test_states()
# make sure we start with 6 states
with session_scope(hass=self.hass) as session:
states = session.query(States)
assert states.count() == 6
recorder_runs = session.query(RecorderRuns)
assert recorder_runs.count() == 7
# run purge_old_data()
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
assert not finished
assert states.count() == 4
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
assert not finished
assert states.count() == 2
# run purge method - no service data, use defaults
hass.services.call("recorder", "purge")
hass.block_till_done()
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
assert finished
assert states.count() == 2
# Small wait for recorder thread
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
def test_purge_old_events(self):
"""Test deleting old events."""
self._add_test_events()
# only purged old events
assert states.count() == 4
assert events.count() == 4
with session_scope(hass=self.hass) as session:
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == 6
# run purge method - correct service data
hass.services.call("recorder", "purge", service_data=service_data)
hass.block_till_done()
# run purge_old_data()
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
assert not finished
assert events.count() == 4
# Small wait for recorder thread
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
assert not finished
assert events.count() == 2
# we should only have 2 states left after purging
assert states.count() == 2
# we should only have 2 events left
finished = purge_old_data(self.hass.data[DATA_INSTANCE], 4, repack=False)
assert finished
assert events.count() == 2
# now we should only have 2 events left
assert events.count() == 2
def test_purge_method(self):
"""Test purge method."""
service_data = {"keep_days": 4}
self._add_test_events()
self._add_test_states()
self._add_test_recorder_runs()
# now we should only have 3 recorder runs left
assert recorder_runs.count() == 3
# make sure we start with 6 states
with session_scope(hass=self.hass) as session:
states = session.query(States)
assert states.count() == 6
assert not ("EVENT_TEST_PURGE" in (event.event_type for event in events.all()))
events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%"))
assert events.count() == 6
recorder_runs = session.query(RecorderRuns)
assert recorder_runs.count() == 7
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
# run purge method - no service data, use defaults
self.hass.services.call("recorder", "purge")
self.hass.block_till_done()
# Small wait for recorder thread
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
# only purged old events
assert states.count() == 4
assert events.count() == 4
# run purge method - correct service data
self.hass.services.call("recorder", "purge", service_data=service_data)
self.hass.block_till_done()
# Small wait for recorder thread
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
# we should only have 2 states left after purging
assert states.count() == 2
# now we should only have 2 events left
assert events.count() == 2
# now we should only have 3 recorder runs left
assert recorder_runs.count() == 3
assert not (
"EVENT_TEST_PURGE" in (event.event_type for event in events.all())
# run purge method - correct service data, with repack
with patch("homeassistant.components.recorder.purge._LOGGER") as mock_logger:
service_data["repack"] = True
hass.services.call("recorder", "purge", service_data=service_data)
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
assert (
mock_logger.debug.mock_calls[5][1][0]
== "Vacuuming SQL DB to free space"
)
# run purge method - correct service data, with repack
with patch(
"homeassistant.components.recorder.purge._LOGGER"
) as mock_logger:
service_data["repack"] = True
self.hass.services.call("recorder", "purge", service_data=service_data)
self.hass.block_till_done()
self.hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(self.hass)
assert (
mock_logger.debug.mock_calls[5][1][0]
== "Vacuuming SQL DB to free space"
def _add_test_states(hass):
"""Add multiple states to the db for testing."""
now = datetime.now()
five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
attributes = {"test_attr": 5, "test_attr_10": "nice"}
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
with recorder.session_scope(hass=hass) as session:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
state = "autopurgeme"
elif event_id < 4:
timestamp = five_days_ago
state = "purgeme"
else:
timestamp = now
state = "dontpurgeme"
session.add(
States(
entity_id="test.recorder2",
domain="sensor",
state=state,
attributes=json.dumps(attributes),
last_changed=timestamp,
last_updated=timestamp,
created=timestamp,
event_id=event_id + 1000,
)
)
def _add_test_events(hass):
"""Add a few events for testing."""
now = datetime.now()
five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
event_data = {"test_attr": 5, "test_attr_10": "nice"}
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
with recorder.session_scope(hass=hass) as session:
for event_id in range(6):
if event_id < 2:
timestamp = eleven_days_ago
event_type = "EVENT_TEST_AUTOPURGE"
elif event_id < 4:
timestamp = five_days_ago
event_type = "EVENT_TEST_PURGE"
else:
timestamp = now
event_type = "EVENT_TEST"
session.add(
Events(
event_type=event_type,
event_data=json.dumps(event_data),
origin="LOCAL",
created=timestamp,
time_fired=timestamp,
)
)
def _add_test_recorder_runs(hass):
"""Add a few recorder_runs for testing."""
now = datetime.now()
five_days_ago = now - timedelta(days=5)
eleven_days_ago = now - timedelta(days=11)
hass.block_till_done()
hass.data[DATA_INSTANCE].block_till_done()
wait_recording_done(hass)
with recorder.session_scope(hass=hass) as session:
for rec_id in range(6):
if rec_id < 2:
timestamp = eleven_days_ago
elif rec_id < 4:
timestamp = five_days_ago
else:
timestamp = now
session.add(
RecorderRuns(
start=timestamp,
created=dt_util.utcnow(),
end=timestamp + timedelta(days=1),
)
)