Drop use of mock_mqtt_component (#37013)

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
Erik Montnemery 2020-06-23 19:17:22 +02:00 committed by GitHub
parent cf6480cda0
commit 4e77969f5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 2168 additions and 2375 deletions

View File

@ -23,7 +23,7 @@ from homeassistant.auth import (
providers as auth_providers, providers as auth_providers,
) )
from homeassistant.auth.permissions import system_policies from homeassistant.auth.permissions import system_policies
from homeassistant.components import mqtt, recorder from homeassistant.components import recorder
from homeassistant.components.device_automation import ( # noqa: F401 from homeassistant.components.device_automation import ( # noqa: F401
_async_get_device_automation_capabilities as async_get_device_automation_capabilities, _async_get_device_automation_capabilities as async_get_device_automation_capabilities,
_async_get_device_automations as async_get_device_automations, _async_get_device_automations as async_get_device_automations,
@ -53,13 +53,13 @@ from homeassistant.helpers import (
storage, storage,
) )
from homeassistant.helpers.json import JSONEncoder from homeassistant.helpers.json import JSONEncoder
from homeassistant.setup import async_setup_component, setup_component from homeassistant.setup import setup_component
from homeassistant.util.async_ import run_callback_threadsafe from homeassistant.util.async_ import run_callback_threadsafe
import homeassistant.util.dt as date_util import homeassistant.util.dt as date_util
from homeassistant.util.unit_system import METRIC_SYSTEM from homeassistant.util.unit_system import METRIC_SYSTEM
import homeassistant.util.yaml.loader as yaml_loader import homeassistant.util.yaml.loader as yaml_loader
from tests.async_mock import AsyncMock, MagicMock, Mock, patch from tests.async_mock import AsyncMock, Mock, patch
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
INSTANCES = [] INSTANCES = []
@ -324,39 +324,6 @@ def mock_state_change_event(hass, new_state, old_state=None):
hass.bus.fire(EVENT_STATE_CHANGED, event_data, context=new_state.context) hass.bus.fire(EVENT_STATE_CHANGED, event_data, context=new_state.context)
async def async_mock_mqtt_component(hass, config=None):
"""Mock the MQTT component."""
if config is None:
config = {mqtt.CONF_BROKER: "mock-broker"}
@ha.callback
def _async_fire_mqtt_message(topic, payload, qos, retain):
async_fire_mqtt_message(hass, topic, payload, qos, retain)
with patch("paho.mqtt.client.Client") as mock_client:
mock_client = mock_client.return_value
mock_client.connect.return_value = 0
mock_client.subscribe.return_value = (0, 0)
mock_client.unsubscribe.return_value = (0, 0)
mock_client.publish.side_effect = _async_fire_mqtt_message
result = await async_setup_component(hass, mqtt.DOMAIN, {mqtt.DOMAIN: config})
assert result
await hass.async_block_till_done()
mqtt_component_mock = MagicMock(
spec_set=hass.data["mqtt"], wraps=hass.data["mqtt"]
)
hass.data["mqtt"].connected = mqtt_component_mock.connected
mqtt_component_mock._mqttc = mock_client
hass.data["mqtt"] = mqtt_component_mock
return hass.data["mqtt"]
mock_mqtt_component = threadsafe_coroutine_factory(async_mock_mqtt_component)
@ha.callback @ha.callback
def mock_component(hass, component): def mock_component(hass, component):
"""Mock a component is setup.""" """Mock a component is setup."""

View File

@ -15,7 +15,6 @@ from homeassistant.const import (
SERVICE_ALARM_DISARM, SERVICE_ALARM_DISARM,
SERVICE_ALARM_TRIGGER, SERVICE_ALARM_TRIGGER,
) )
from homeassistant.loader import bind_hass
async def async_alarm_disarm(hass, code=None, entity_id=ENTITY_MATCH_ALL): async def async_alarm_disarm(hass, code=None, entity_id=ENTITY_MATCH_ALL):
@ -29,18 +28,6 @@ async def async_alarm_disarm(hass, code=None, entity_id=ENTITY_MATCH_ALL):
await hass.services.async_call(DOMAIN, SERVICE_ALARM_DISARM, data, blocking=True) await hass.services.async_call(DOMAIN, SERVICE_ALARM_DISARM, data, blocking=True)
@bind_hass
def alarm_disarm(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for disarm."""
data = {}
if code:
data[ATTR_CODE] = code
if entity_id:
data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_ALARM_DISARM, data)
async def async_alarm_arm_home(hass, code=None, entity_id=ENTITY_MATCH_ALL): async def async_alarm_arm_home(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for disarm.""" """Send the alarm the command for disarm."""
data = {} data = {}
@ -52,18 +39,6 @@ async def async_alarm_arm_home(hass, code=None, entity_id=ENTITY_MATCH_ALL):
await hass.services.async_call(DOMAIN, SERVICE_ALARM_ARM_HOME, data, blocking=True) await hass.services.async_call(DOMAIN, SERVICE_ALARM_ARM_HOME, data, blocking=True)
@bind_hass
def alarm_arm_home(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for arm home."""
data = {}
if code:
data[ATTR_CODE] = code
if entity_id:
data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_ALARM_ARM_HOME, data)
async def async_alarm_arm_away(hass, code=None, entity_id=ENTITY_MATCH_ALL): async def async_alarm_arm_away(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for disarm.""" """Send the alarm the command for disarm."""
data = {} data = {}
@ -75,18 +50,6 @@ async def async_alarm_arm_away(hass, code=None, entity_id=ENTITY_MATCH_ALL):
await hass.services.async_call(DOMAIN, SERVICE_ALARM_ARM_AWAY, data, blocking=True) await hass.services.async_call(DOMAIN, SERVICE_ALARM_ARM_AWAY, data, blocking=True)
@bind_hass
def alarm_arm_away(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for arm away."""
data = {}
if code:
data[ATTR_CODE] = code
if entity_id:
data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_ALARM_ARM_AWAY, data)
async def async_alarm_arm_night(hass, code=None, entity_id=ENTITY_MATCH_ALL): async def async_alarm_arm_night(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for disarm.""" """Send the alarm the command for disarm."""
data = {} data = {}
@ -98,18 +61,6 @@ async def async_alarm_arm_night(hass, code=None, entity_id=ENTITY_MATCH_ALL):
await hass.services.async_call(DOMAIN, SERVICE_ALARM_ARM_NIGHT, data, blocking=True) await hass.services.async_call(DOMAIN, SERVICE_ALARM_ARM_NIGHT, data, blocking=True)
@bind_hass
def alarm_arm_night(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for arm night."""
data = {}
if code:
data[ATTR_CODE] = code
if entity_id:
data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_ALARM_ARM_NIGHT, data)
async def async_alarm_trigger(hass, code=None, entity_id=ENTITY_MATCH_ALL): async def async_alarm_trigger(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for disarm.""" """Send the alarm the command for disarm."""
data = {} data = {}
@ -121,18 +72,6 @@ async def async_alarm_trigger(hass, code=None, entity_id=ENTITY_MATCH_ALL):
await hass.services.async_call(DOMAIN, SERVICE_ALARM_TRIGGER, data, blocking=True) await hass.services.async_call(DOMAIN, SERVICE_ALARM_TRIGGER, data, blocking=True)
@bind_hass
def alarm_trigger(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for trigger."""
data = {}
if code:
data[ATTR_CODE] = code
if entity_id:
data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_ALARM_TRIGGER, data)
async def async_alarm_arm_custom_bypass(hass, code=None, entity_id=ENTITY_MATCH_ALL): async def async_alarm_arm_custom_bypass(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for disarm.""" """Send the alarm the command for disarm."""
data = {} data = {}
@ -144,15 +83,3 @@ async def async_alarm_arm_custom_bypass(hass, code=None, entity_id=ENTITY_MATCH_
await hass.services.async_call( await hass.services.async_call(
DOMAIN, SERVICE_ALARM_ARM_CUSTOM_BYPASS, data, blocking=True DOMAIN, SERVICE_ALARM_ARM_CUSTOM_BYPASS, data, blocking=True
) )
@bind_hass
def alarm_arm_custom_bypass(hass, code=None, entity_id=ENTITY_MATCH_ALL):
"""Send the alarm the command for arm custom bypass."""
data = {}
if code:
data[ATTR_CODE] = code
if entity_id:
data[ATTR_ENTITY_ID] = entity_id
hass.services.call(DOMAIN, SERVICE_ALARM_ARM_CUSTOM_BYPASS, data)

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ from homeassistant.const import (
STATE_ALARM_TRIGGERED, STATE_ALARM_TRIGGERED,
STATE_UNKNOWN, STATE_UNKNOWN,
) )
from homeassistant.setup import async_setup_component
from .test_common import ( from .test_common import (
help_test_availability_when_connection_lost, help_test_availability_when_connection_lost,
@ -41,11 +42,7 @@ from .test_common import (
help_test_update_with_json_attrs_not_dict, help_test_update_with_json_attrs_not_dict,
) )
from tests.common import ( from tests.common import assert_setup_component, async_fire_mqtt_message
assert_setup_component,
async_fire_mqtt_message,
async_setup_component,
)
from tests.components.alarm_control_panel import common from tests.components.alarm_control_panel import common
CODE_NUMBER = "1234" CODE_NUMBER = "1234"

View File

@ -23,6 +23,7 @@ from homeassistant.components.climate.const import (
SUPPORT_TARGET_TEMPERATURE_RANGE, SUPPORT_TARGET_TEMPERATURE_RANGE,
) )
from homeassistant.const import STATE_OFF from homeassistant.const import STATE_OFF
from homeassistant.setup import async_setup_component
from .test_common import ( from .test_common import (
help_test_availability_when_connection_lost, help_test_availability_when_connection_lost,
@ -48,7 +49,7 @@ from .test_common import (
) )
from tests.async_mock import call from tests.async_mock import call
from tests.common import async_fire_mqtt_message, async_setup_component from tests.common import async_fire_mqtt_message
from tests.components.climate import common from tests.components.climate import common
ENTITY_CLIMATE = "climate.test" ENTITY_CLIMATE = "climate.test"

View File

@ -10,9 +10,10 @@ from homeassistant.components.mqtt.const import MQTT_DISCONNECTED
from homeassistant.components.mqtt.discovery import async_start from homeassistant.components.mqtt.discovery import async_start
from homeassistant.const import ATTR_ASSUMED_STATE, STATE_UNAVAILABLE from homeassistant.const import ATTR_ASSUMED_STATE, STATE_UNAVAILABLE
from homeassistant.helpers.dispatcher import async_dispatcher_send from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.setup import async_setup_component
from tests.async_mock import ANY from tests.async_mock import ANY
from tests.common import async_fire_mqtt_message, async_setup_component, mock_registry from tests.common import async_fire_mqtt_message, mock_registry
DEFAULT_CONFIG_DEVICE_INFO_ID = { DEFAULT_CONFIG_DEVICE_INFO_ID = {
"identifiers": ["helloworld"], "identifiers": ["helloworld"],

View File

@ -7,15 +7,13 @@ import homeassistant.components.mqtt_eventstream as eventstream
from homeassistant.const import EVENT_STATE_CHANGED from homeassistant.const import EVENT_STATE_CHANGED
from homeassistant.core import State, callback from homeassistant.core import State, callback
from homeassistant.helpers.json import JSONEncoder from homeassistant.helpers.json import JSONEncoder
from homeassistant.setup import setup_component from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from tests.async_mock import ANY, patch from tests.async_mock import ANY, patch
from tests.common import ( from tests.common import (
fire_mqtt_message, async_fire_mqtt_message,
fire_time_changed, async_fire_time_changed,
get_test_home_assistant,
mock_mqtt_component,
mock_state_change_event, mock_state_change_event,
) )
@ -25,177 +23,170 @@ def mock_storage(hass_storage):
"""Autouse hass_storage for the TestCase tests.""" """Autouse hass_storage for the TestCase tests."""
class TestMqttEventStream: async def add_eventstream(hass, sub_topic=None, pub_topic=None, ignore_event=None):
"""Test the MQTT eventstream module.""" """Add a mqtt_eventstream component."""
config = {}
if sub_topic:
config["subscribe_topic"] = sub_topic
if pub_topic:
config["publish_topic"] = pub_topic
if ignore_event:
config["ignore_event"] = ignore_event
return await async_setup_component(
hass, eventstream.DOMAIN, {eventstream.DOMAIN: config}
)
def setup_method(self):
"""Set up things to be run when tests are started."""
self.hass = get_test_home_assistant()
self.mock_mqtt = mock_mqtt_component(self.hass)
def teardown_method(self): async def test_setup_succeeds(hass, mqtt_mock):
"""Stop everything that was started.""" """Test the success of the setup."""
self.hass.stop() assert await add_eventstream(hass)
def add_eventstream(self, sub_topic=None, pub_topic=None, ignore_event=None):
"""Add a mqtt_eventstream component."""
config = {}
if sub_topic:
config["subscribe_topic"] = sub_topic
if pub_topic:
config["publish_topic"] = pub_topic
if ignore_event:
config["ignore_event"] = ignore_event
return setup_component(
self.hass, eventstream.DOMAIN, {eventstream.DOMAIN: config}
)
def test_setup_succeeds(self): async def test_setup_with_pub(hass, mqtt_mock):
"""Test the success of the setup.""" """Test the setup with subscription."""
assert self.add_eventstream() # Should start off with no listeners for all events
assert hass.bus.async_listeners().get("*") is None
def test_setup_with_pub(self): assert await add_eventstream(hass, pub_topic="bar")
"""Test the setup with subscription.""" await hass.async_block_till_done()
# Should start off with no listeners for all events
assert self.hass.bus.listeners.get("*") is None
assert self.add_eventstream(pub_topic="bar") # Verify that the event handler has been added as a listener
self.hass.block_till_done() assert hass.bus.async_listeners().get("*") == 1
# Verify that the event handler has been added as a listener
assert self.hass.bus.listeners.get("*") == 1
@patch("homeassistant.components.mqtt.async_subscribe") async def test_subscribe(hass, mqtt_mock):
def test_subscribe(self, mock_sub): """Test the subscription."""
"""Test the subscription.""" sub_topic = "foo"
sub_topic = "foo" assert await add_eventstream(hass, sub_topic=sub_topic)
assert self.add_eventstream(sub_topic=sub_topic) await hass.async_block_till_done()
self.hass.block_till_done()
# Verify that the this entity was subscribed to the topic # Verify that the this entity was subscribed to the topic
mock_sub.assert_called_with(self.hass, sub_topic, ANY) mqtt_mock.async_subscribe.assert_called_with(sub_topic, ANY, 0, ANY)
@patch("homeassistant.components.mqtt.async_publish")
@patch("homeassistant.core.dt_util.utcnow")
def test_state_changed_event_sends_message(self, mock_utcnow, mock_pub):
"""Test the sending of a new message if event changed."""
now = dt_util.as_utc(dt_util.now())
e_id = "fake.entity"
pub_topic = "bar"
mock_utcnow.return_value = now
async def test_state_changed_event_sends_message(hass, mqtt_mock):
"""Test the sending of a new message if event changed."""
now = dt_util.as_utc(dt_util.now())
e_id = "fake.entity"
pub_topic = "bar"
with patch(
("homeassistant.core.dt_util.utcnow"), return_value=now,
):
# Add the eventstream component for publishing events # Add the eventstream component for publishing events
assert self.add_eventstream(pub_topic=pub_topic) assert await add_eventstream(hass, pub_topic=pub_topic)
self.hass.block_till_done() await hass.async_block_till_done()
# Reset the mock because it will have already gotten calls for the # Reset the mock because it will have already gotten calls for the
# mqtt_eventstream state change on initialization, etc. # mqtt_eventstream state change on initialization, etc.
mock_pub.reset_mock() mqtt_mock.async_publish.reset_mock()
# Set a state of an entity # Set a state of an entity
mock_state_change_event(self.hass, State(e_id, "on")) mock_state_change_event(hass, State(e_id, "on"))
self.hass.block_till_done() await hass.async_block_till_done()
await hass.async_block_till_done()
# The order of the JSON is indeterminate, # The order of the JSON is indeterminate,
# so first just check that publish was called # so first just check that publish was called
mock_pub.assert_called_with(self.hass, pub_topic, ANY) mqtt_mock.async_publish.assert_called_with(pub_topic, ANY, 0, False)
assert mock_pub.called assert mqtt_mock.async_publish.called
# Get the actual call to publish and make sure it was the one # Get the actual call to publish and make sure it was the one
# we were looking for # we were looking for
msg = mock_pub.call_args[0][2] msg = mqtt_mock.async_publish.call_args[0][1]
event = {} event = {}
event["event_type"] = EVENT_STATE_CHANGED event["event_type"] = EVENT_STATE_CHANGED
new_state = { new_state = {
"last_updated": now.isoformat(), "last_updated": now.isoformat(),
"state": "on", "state": "on",
"entity_id": e_id, "entity_id": e_id,
"attributes": {}, "attributes": {},
"last_changed": now.isoformat(), "last_changed": now.isoformat(),
} }
event["event_data"] = {"new_state": new_state, "entity_id": e_id} event["event_data"] = {"new_state": new_state, "entity_id": e_id}
# Verify that the message received was that expected # Verify that the message received was that expected
result = json.loads(msg) result = json.loads(msg)
result["event_data"]["new_state"].pop("context") result["event_data"]["new_state"].pop("context")
assert result == event assert result == event
@patch("homeassistant.components.mqtt.async_publish")
def test_time_event_does_not_send_message(self, mock_pub):
"""Test the sending of a new message if time event."""
assert self.add_eventstream(pub_topic="bar")
self.hass.block_till_done()
# Reset the mock because it will have already gotten calls for the async def test_time_event_does_not_send_message(hass, mqtt_mock):
# mqtt_eventstream state change on initialization, etc. """Test the sending of a new message if time event."""
mock_pub.reset_mock() assert await add_eventstream(hass, pub_topic="bar")
await hass.async_block_till_done()
fire_time_changed(self.hass, dt_util.utcnow()) # Reset the mock because it will have already gotten calls for the
assert not mock_pub.called # mqtt_eventstream state change on initialization, etc.
mqtt_mock.async_publish.reset_mock()
def test_receiving_remote_event_fires_hass_event(self): async_fire_time_changed(hass, dt_util.utcnow())
"""Test the receiving of the remotely fired event.""" assert not mqtt_mock.async_publish.called
sub_topic = "foo"
assert self.add_eventstream(sub_topic=sub_topic)
self.hass.block_till_done()
calls = []
@callback async def test_receiving_remote_event_fires_hass_event(hass, mqtt_mock):
def listener(_): """Test the receiving of the remotely fired event."""
calls.append(1) sub_topic = "foo"
assert await add_eventstream(hass, sub_topic=sub_topic)
await hass.async_block_till_done()
self.hass.bus.listen_once("test_event", listener) calls = []
self.hass.block_till_done()
payload = json.dumps( @callback
{"event_type": "test_event", "event_data": {}}, cls=JSONEncoder def listener(_):
) calls.append(1)
fire_mqtt_message(self.hass, sub_topic, payload)
self.hass.block_till_done()
assert 1 == len(calls) hass.bus.async_listen_once("test_event", listener)
await hass.async_block_till_done()
@patch("homeassistant.components.mqtt.async_publish") payload = json.dumps(
def test_ignored_event_doesnt_send_over_stream(self, mock_pub): {"event_type": "test_event", "event_data": {}}, cls=JSONEncoder
"""Test the ignoring of sending events if defined.""" )
assert self.add_eventstream(pub_topic="bar", ignore_event=["state_changed"]) async_fire_mqtt_message(hass, sub_topic, payload)
self.hass.block_till_done() await hass.async_block_till_done()
e_id = "entity.test_id" assert 1 == len(calls)
event = {}
event["event_type"] = EVENT_STATE_CHANGED
new_state = {"state": "on", "entity_id": e_id, "attributes": {}}
event["event_data"] = {"new_state": new_state, "entity_id": e_id}
# Reset the mock because it will have already gotten calls for the
# mqtt_eventstream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity async def test_ignored_event_doesnt_send_over_stream(hass, mqtt_mock):
mock_state_change_event(self.hass, State(e_id, "on")) """Test the ignoring of sending events if defined."""
self.hass.block_till_done() assert await add_eventstream(hass, pub_topic="bar", ignore_event=["state_changed"])
await hass.async_block_till_done()
assert not mock_pub.called e_id = "entity.test_id"
event = {}
event["event_type"] = EVENT_STATE_CHANGED
new_state = {"state": "on", "entity_id": e_id, "attributes": {}}
event["event_data"] = {"new_state": new_state, "entity_id": e_id}
@patch("homeassistant.components.mqtt.async_publish") # Reset the mock because it will have already gotten calls for the
def test_wrong_ignored_event_sends_over_stream(self, mock_pub): # mqtt_eventstream state change on initialization, etc.
"""Test the ignoring of sending events if defined.""" mqtt_mock.async_publish.reset_mock()
assert self.add_eventstream(pub_topic="bar", ignore_event=["statee_changed"])
self.hass.block_till_done()
e_id = "entity.test_id" # Set a state of an entity
event = {} mock_state_change_event(hass, State(e_id, "on"))
event["event_type"] = EVENT_STATE_CHANGED await hass.async_block_till_done()
new_state = {"state": "on", "entity_id": e_id, "attributes": {}}
event["event_data"] = {"new_state": new_state, "entity_id": e_id}
# Reset the mock because it will have already gotten calls for the assert not mqtt_mock.async_publish.called
# mqtt_eventstream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity
mock_state_change_event(self.hass, State(e_id, "on"))
self.hass.block_till_done()
assert mock_pub.called async def test_wrong_ignored_event_sends_over_stream(hass, mqtt_mock):
"""Test the ignoring of sending events if defined."""
assert await add_eventstream(hass, pub_topic="bar", ignore_event=["statee_changed"])
await hass.async_block_till_done()
e_id = "entity.test_id"
event = {}
event["event_type"] = EVENT_STATE_CHANGED
new_state = {"state": "on", "entity_id": e_id, "attributes": {}}
event["event_data"] = {"new_state": new_state, "entity_id": e_id}
# Reset the mock because it will have already gotten calls for the
# mqtt_eventstream state change on initialization, etc.
mqtt_mock.async_publish.reset_mock()
# Set a state of an entity
mock_state_change_event(hass, State(e_id, "on"))
await hass.async_block_till_done()
await hass.async_block_till_done()
assert mqtt_mock.async_publish.called

View File

@ -3,14 +3,10 @@ import pytest
import homeassistant.components.mqtt_statestream as statestream import homeassistant.components.mqtt_statestream as statestream
from homeassistant.core import State from homeassistant.core import State
from homeassistant.setup import setup_component from homeassistant.setup import async_setup_component
from tests.async_mock import ANY, call, patch from tests.async_mock import ANY, call
from tests.common import ( from tests.common import mock_state_change_event
get_test_home_assistant,
mock_mqtt_component,
mock_state_change_event,
)
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@ -18,360 +14,343 @@ def mock_storage(hass_storage):
"""Autouse hass_storage for the TestCase tests.""" """Autouse hass_storage for the TestCase tests."""
class TestMqttStateStream: async def add_statestream(
"""Test the MQTT statestream module.""" hass,
base_topic=None,
def setup_method(self): publish_attributes=None,
"""Set up things to be run when tests are started.""" publish_timestamps=None,
self.hass = get_test_home_assistant() publish_include=None,
self.mock_mqtt = mock_mqtt_component(self.hass) publish_exclude=None,
):
def teardown_method(self): """Add a mqtt_statestream component."""
"""Stop everything that was started.""" config = {}
self.hass.stop() if base_topic:
config["base_topic"] = base_topic
def add_statestream( if publish_attributes:
self, config["publish_attributes"] = publish_attributes
base_topic=None, if publish_timestamps:
publish_attributes=None, config["publish_timestamps"] = publish_timestamps
publish_timestamps=None, if publish_include:
publish_include=None, config["include"] = publish_include
publish_exclude=None, if publish_exclude:
): config["exclude"] = publish_exclude
"""Add a mqtt_statestream component.""" return await async_setup_component(
config = {} hass, statestream.DOMAIN, {statestream.DOMAIN: config}
if base_topic: )
config["base_topic"] = base_topic
if publish_attributes:
config["publish_attributes"] = publish_attributes async def test_fails_with_no_base(hass, mqtt_mock):
if publish_timestamps: """Setup should fail if no base_topic is set."""
config["publish_timestamps"] = publish_timestamps assert await add_statestream(hass) is False
if publish_include:
config["include"] = publish_include
if publish_exclude: async def test_setup_succeeds_without_attributes(hass, mqtt_mock):
config["exclude"] = publish_exclude """Test the success of the setup with a valid base_topic."""
return setup_component( assert await add_statestream(hass, base_topic="pub")
self.hass, statestream.DOMAIN, {statestream.DOMAIN: config}
)
async def test_setup_succeeds_with_attributes(hass, mqtt_mock):
def test_fails_with_no_base(self): """Test setup with a valid base_topic and publish_attributes."""
"""Setup should fail if no base_topic is set.""" assert await add_statestream(hass, base_topic="pub", publish_attributes=True)
assert self.add_statestream() is False
def test_setup_succeeds_without_attributes(self): async def test_state_changed_event_sends_message(hass, mqtt_mock):
"""Test the success of the setup with a valid base_topic.""" """Test the sending of a new message if event changed."""
assert self.add_statestream(base_topic="pub") e_id = "fake.entity"
base_topic = "pub"
def test_setup_succeeds_with_attributes(self):
"""Test setup with a valid base_topic and publish_attributes.""" # Add the statestream component for publishing state updates
assert self.add_statestream(base_topic="pub", publish_attributes=True) assert await add_statestream(hass, base_topic=base_topic)
await hass.async_block_till_done()
@patch("homeassistant.components.mqtt.async_publish")
@patch("homeassistant.core.dt_util.utcnow") # Reset the mock because it will have already gotten calls for the
def test_state_changed_event_sends_message(self, mock_utcnow, mock_pub): # mqtt_statestream state change on initialization, etc.
"""Test the sending of a new message if event changed.""" mqtt_mock.async_publish.reset_mock()
e_id = "fake.entity"
base_topic = "pub" # Set a state of an entity
mock_state_change_event(hass, State(e_id, "on"))
# Add the statestream component for publishing state updates await hass.async_block_till_done()
assert self.add_statestream(base_topic=base_topic) await hass.async_block_till_done()
self.hass.block_till_done()
# Make sure 'on' was published to pub/fake/entity/state
# Reset the mock because it will have already gotten calls for the mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True)
# mqtt_statestream state change on initialization, etc. assert mqtt_mock.async_publish.called
mock_pub.reset_mock()
# Set a state of an entity async def test_state_changed_event_sends_message_and_timestamp(hass, mqtt_mock):
mock_state_change_event(self.hass, State(e_id, "on")) """Test the sending of a message and timestamps if event changed."""
self.hass.block_till_done() e_id = "another.entity"
base_topic = "pub"
# Make sure 'on' was published to pub/fake/entity/state
mock_pub.assert_called_with(self.hass, "pub/fake/entity/state", "on", 1, True) # Add the statestream component for publishing state updates
assert mock_pub.called assert await add_statestream(
hass, base_topic=base_topic, publish_attributes=None, publish_timestamps=True
@patch("homeassistant.components.mqtt.async_publish") )
@patch("homeassistant.core.dt_util.utcnow") await hass.async_block_till_done()
def test_state_changed_event_sends_message_and_timestamp(
self, mock_utcnow, mock_pub # Reset the mock because it will have already gotten calls for the
): # mqtt_statestream state change on initialization, etc.
"""Test the sending of a message and timestamps if event changed.""" mqtt_mock.async_publish.reset_mock()
e_id = "another.entity"
base_topic = "pub" # Set a state of an entity
mock_state_change_event(hass, State(e_id, "on"))
# Add the statestream component for publishing state updates await hass.async_block_till_done()
assert self.add_statestream( await hass.async_block_till_done()
base_topic=base_topic, publish_attributes=None, publish_timestamps=True
) # Make sure 'on' was published to pub/fake/entity/state
self.hass.block_till_done() calls = [
call.async_publish("pub/another/entity/state", "on", 1, True),
# Reset the mock because it will have already gotten calls for the call.async_publish("pub/another/entity/last_changed", ANY, 1, True),
# mqtt_statestream state change on initialization, etc. call.async_publish("pub/another/entity/last_updated", ANY, 1, True),
mock_pub.reset_mock() ]
# Set a state of an entity mqtt_mock.async_publish.assert_has_calls(calls, any_order=True)
mock_state_change_event(self.hass, State(e_id, "on")) assert mqtt_mock.async_publish.called
self.hass.block_till_done()
# Make sure 'on' was published to pub/fake/entity/state async def test_state_changed_attr_sends_message(hass, mqtt_mock):
calls = [ """Test the sending of a new message if attribute changed."""
call.async_publish(self.hass, "pub/another/entity/state", "on", 1, True), e_id = "fake.entity"
call.async_publish( base_topic = "pub"
self.hass, "pub/another/entity/last_changed", ANY, 1, True
), # Add the statestream component for publishing state updates
call.async_publish( assert await add_statestream(hass, base_topic=base_topic, publish_attributes=True)
self.hass, "pub/another/entity/last_updated", ANY, 1, True await hass.async_block_till_done()
),
] # Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc.
mock_pub.assert_has_calls(calls, any_order=True) mqtt_mock.async_publish.reset_mock()
assert mock_pub.called
test_attributes = {"testing": "YES", "list": ["a", "b", "c"], "bool": False}
@patch("homeassistant.components.mqtt.async_publish")
@patch("homeassistant.core.dt_util.utcnow") # Set a state of an entity
def test_state_changed_attr_sends_message(self, mock_utcnow, mock_pub): mock_state_change_event(hass, State(e_id, "off", attributes=test_attributes))
"""Test the sending of a new message if attribute changed.""" await hass.async_block_till_done()
e_id = "fake.entity" await hass.async_block_till_done()
base_topic = "pub"
# Make sure 'on' was published to pub/fake/entity/state
# Add the statestream component for publishing state updates calls = [
assert self.add_statestream(base_topic=base_topic, publish_attributes=True) call.async_publish("pub/fake/entity/state", "off", 1, True),
self.hass.block_till_done() call.async_publish("pub/fake/entity/testing", '"YES"', 1, True),
call.async_publish("pub/fake/entity/list", '["a", "b", "c"]', 1, True),
# Reset the mock because it will have already gotten calls for the call.async_publish("pub/fake/entity/bool", "false", 1, True),
# mqtt_statestream state change on initialization, etc. ]
mock_pub.reset_mock()
mqtt_mock.async_publish.assert_has_calls(calls, any_order=True)
test_attributes = {"testing": "YES", "list": ["a", "b", "c"], "bool": False} assert mqtt_mock.async_publish.called
# Set a state of an entity
mock_state_change_event( async def test_state_changed_event_include_domain(hass, mqtt_mock):
self.hass, State(e_id, "off", attributes=test_attributes) """Test that filtering on included domain works as expected."""
) base_topic = "pub"
self.hass.block_till_done()
incl = {"domains": ["fake"]}
# Make sure 'on' was published to pub/fake/entity/state excl = {}
calls = [
call.async_publish(self.hass, "pub/fake/entity/state", "off", 1, True), # Add the statestream component for publishing state updates
call.async_publish(self.hass, "pub/fake/entity/testing", '"YES"', 1, True), # Set the filter to allow fake.* items
call.async_publish( assert await add_statestream(
self.hass, "pub/fake/entity/list", '["a", "b", "c"]', 1, True hass, base_topic=base_topic, publish_include=incl, publish_exclude=excl
), )
call.async_publish(self.hass, "pub/fake/entity/bool", "false", 1, True), await hass.async_block_till_done()
]
# Reset the mock because it will have already gotten calls for the
mock_pub.assert_has_calls(calls, any_order=True) # mqtt_statestream state change on initialization, etc.
assert mock_pub.called mqtt_mock.async_publish.reset_mock()
@patch("homeassistant.components.mqtt.async_publish") # Set a state of an entity
@patch("homeassistant.core.dt_util.utcnow") mock_state_change_event(hass, State("fake.entity", "on"))
def test_state_changed_event_include_domain(self, mock_utcnow, mock_pub): await hass.async_block_till_done()
"""Test that filtering on included domain works as expected.""" await hass.async_block_till_done()
base_topic = "pub"
# Make sure 'on' was published to pub/fake/entity/state
incl = {"domains": ["fake"]} mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True)
excl = {} assert mqtt_mock.async_publish.called
# Add the statestream component for publishing state updates mqtt_mock.async_publish.reset_mock()
# Set the filter to allow fake.* items # Set a state of an entity that shouldn't be included
assert self.add_statestream( mock_state_change_event(hass, State("fake2.entity", "on"))
base_topic=base_topic, publish_include=incl, publish_exclude=excl await hass.async_block_till_done()
) await hass.async_block_till_done()
self.hass.block_till_done()
assert not mqtt_mock.async_publish.called
# Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc.
mock_pub.reset_mock() async def test_state_changed_event_include_entity(hass, mqtt_mock):
"""Test that filtering on included entity works as expected."""
# Set a state of an entity base_topic = "pub"
mock_state_change_event(self.hass, State("fake.entity", "on"))
self.hass.block_till_done() incl = {"entities": ["fake.entity"]}
excl = {}
# Make sure 'on' was published to pub/fake/entity/state
mock_pub.assert_called_with(self.hass, "pub/fake/entity/state", "on", 1, True) # Add the statestream component for publishing state updates
assert mock_pub.called # Set the filter to allow fake.* items
assert await add_statestream(
mock_pub.reset_mock() hass, base_topic=base_topic, publish_include=incl, publish_exclude=excl
# Set a state of an entity that shouldn't be included )
mock_state_change_event(self.hass, State("fake2.entity", "on")) await hass.async_block_till_done()
self.hass.block_till_done()
# Reset the mock because it will have already gotten calls for the
assert not mock_pub.called # mqtt_statestream state change on initialization, etc.
mqtt_mock.async_publish.reset_mock()
@patch("homeassistant.components.mqtt.async_publish")
@patch("homeassistant.core.dt_util.utcnow") # Set a state of an entity
def test_state_changed_event_include_entity(self, mock_utcnow, mock_pub): mock_state_change_event(hass, State("fake.entity", "on"))
"""Test that filtering on included entity works as expected.""" await hass.async_block_till_done()
base_topic = "pub" await hass.async_block_till_done()
incl = {"entities": ["fake.entity"]} # Make sure 'on' was published to pub/fake/entity/state
excl = {} mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True)
assert mqtt_mock.async_publish.called
# Add the statestream component for publishing state updates
# Set the filter to allow fake.* items mqtt_mock.async_publish.reset_mock()
assert self.add_statestream( # Set a state of an entity that shouldn't be included
base_topic=base_topic, publish_include=incl, publish_exclude=excl mock_state_change_event(hass, State("fake.entity2", "on"))
) await hass.async_block_till_done()
self.hass.block_till_done() await hass.async_block_till_done()
# Reset the mock because it will have already gotten calls for the assert not mqtt_mock.async_publish.called
# mqtt_statestream state change on initialization, etc.
mock_pub.reset_mock()
async def test_state_changed_event_exclude_domain(hass, mqtt_mock):
# Set a state of an entity """Test that filtering on excluded domain works as expected."""
mock_state_change_event(self.hass, State("fake.entity", "on")) base_topic = "pub"
self.hass.block_till_done()
incl = {}
# Make sure 'on' was published to pub/fake/entity/state excl = {"domains": ["fake2"]}
mock_pub.assert_called_with(self.hass, "pub/fake/entity/state", "on", 1, True)
assert mock_pub.called # Add the statestream component for publishing state updates
# Set the filter to allow fake.* items
mock_pub.reset_mock() assert await add_statestream(
# Set a state of an entity that shouldn't be included hass, base_topic=base_topic, publish_include=incl, publish_exclude=excl
mock_state_change_event(self.hass, State("fake.entity2", "on")) )
self.hass.block_till_done() await hass.async_block_till_done()
assert not mock_pub.called # Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc.
@patch("homeassistant.components.mqtt.async_publish") mqtt_mock.async_publish.reset_mock()
@patch("homeassistant.core.dt_util.utcnow")
def test_state_changed_event_exclude_domain(self, mock_utcnow, mock_pub): # Set a state of an entity
"""Test that filtering on excluded domain works as expected.""" mock_state_change_event(hass, State("fake.entity", "on"))
base_topic = "pub" await hass.async_block_till_done()
await hass.async_block_till_done()
incl = {}
excl = {"domains": ["fake2"]} # Make sure 'on' was published to pub/fake/entity/state
mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True)
# Add the statestream component for publishing state updates assert mqtt_mock.async_publish.called
# Set the filter to allow fake.* items
assert self.add_statestream( mqtt_mock.async_publish.reset_mock()
base_topic=base_topic, publish_include=incl, publish_exclude=excl # Set a state of an entity that shouldn't be included
) mock_state_change_event(hass, State("fake2.entity", "on"))
self.hass.block_till_done() await hass.async_block_till_done()
await hass.async_block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc. assert not mqtt_mock.async_publish.called
mock_pub.reset_mock()
# Set a state of an entity async def test_state_changed_event_exclude_entity(hass, mqtt_mock):
mock_state_change_event(self.hass, State("fake.entity", "on")) """Test that filtering on excluded entity works as expected."""
self.hass.block_till_done() base_topic = "pub"
# Make sure 'on' was published to pub/fake/entity/state incl = {}
mock_pub.assert_called_with(self.hass, "pub/fake/entity/state", "on", 1, True) excl = {"entities": ["fake.entity2"]}
assert mock_pub.called
# Add the statestream component for publishing state updates
mock_pub.reset_mock() # Set the filter to allow fake.* items
# Set a state of an entity that shouldn't be included assert await add_statestream(
mock_state_change_event(self.hass, State("fake2.entity", "on")) hass, base_topic=base_topic, publish_include=incl, publish_exclude=excl
self.hass.block_till_done() )
await hass.async_block_till_done()
assert not mock_pub.called
# Reset the mock because it will have already gotten calls for the
@patch("homeassistant.components.mqtt.async_publish") # mqtt_statestream state change on initialization, etc.
@patch("homeassistant.core.dt_util.utcnow") mqtt_mock.async_publish.reset_mock()
def test_state_changed_event_exclude_entity(self, mock_utcnow, mock_pub):
"""Test that filtering on excluded entity works as expected.""" # Set a state of an entity
base_topic = "pub" mock_state_change_event(hass, State("fake.entity", "on"))
await hass.async_block_till_done()
incl = {} await hass.async_block_till_done()
excl = {"entities": ["fake.entity2"]}
# Make sure 'on' was published to pub/fake/entity/state
# Add the statestream component for publishing state updates mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True)
# Set the filter to allow fake.* items assert mqtt_mock.async_publish.called
assert self.add_statestream(
base_topic=base_topic, publish_include=incl, publish_exclude=excl mqtt_mock.async_publish.reset_mock()
) # Set a state of an entity that shouldn't be included
self.hass.block_till_done() mock_state_change_event(hass, State("fake.entity2", "on"))
await hass.async_block_till_done()
# Reset the mock because it will have already gotten calls for the await hass.async_block_till_done()
# mqtt_statestream state change on initialization, etc.
mock_pub.reset_mock() assert not mqtt_mock.async_publish.called
# Set a state of an entity
mock_state_change_event(self.hass, State("fake.entity", "on")) async def test_state_changed_event_exclude_domain_include_entity(hass, mqtt_mock):
self.hass.block_till_done() """Test filtering with excluded domain and included entity."""
base_topic = "pub"
# Make sure 'on' was published to pub/fake/entity/state
mock_pub.assert_called_with(self.hass, "pub/fake/entity/state", "on", 1, True) incl = {"entities": ["fake.entity"]}
assert mock_pub.called excl = {"domains": ["fake"]}
mock_pub.reset_mock() # Add the statestream component for publishing state updates
# Set a state of an entity that shouldn't be included # Set the filter to allow fake.* items
mock_state_change_event(self.hass, State("fake.entity2", "on")) assert await add_statestream(
self.hass.block_till_done() hass, base_topic=base_topic, publish_include=incl, publish_exclude=excl
)
assert not mock_pub.called await hass.async_block_till_done()
@patch("homeassistant.components.mqtt.async_publish") # Reset the mock because it will have already gotten calls for the
@patch("homeassistant.core.dt_util.utcnow") # mqtt_statestream state change on initialization, etc.
def test_state_changed_event_exclude_domain_include_entity( mqtt_mock.async_publish.reset_mock()
self, mock_utcnow, mock_pub
): # Set a state of an entity
"""Test filtering with excluded domain and included entity.""" mock_state_change_event(hass, State("fake.entity", "on"))
base_topic = "pub" await hass.async_block_till_done()
await hass.async_block_till_done()
incl = {"entities": ["fake.entity"]}
excl = {"domains": ["fake"]} # Make sure 'on' was published to pub/fake/entity/state
mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True)
# Add the statestream component for publishing state updates assert mqtt_mock.async_publish.called
# Set the filter to allow fake.* items
assert self.add_statestream( mqtt_mock.async_publish.reset_mock()
base_topic=base_topic, publish_include=incl, publish_exclude=excl # Set a state of an entity that shouldn't be included
) mock_state_change_event(hass, State("fake.entity2", "on"))
self.hass.block_till_done() await hass.async_block_till_done()
await hass.async_block_till_done()
# Reset the mock because it will have already gotten calls for the
# mqtt_statestream state change on initialization, etc. assert not mqtt_mock.async_publish.called
mock_pub.reset_mock()
# Set a state of an entity async def test_state_changed_event_include_domain_exclude_entity(hass, mqtt_mock):
mock_state_change_event(self.hass, State("fake.entity", "on")) """Test filtering with included domain and excluded entity."""
self.hass.block_till_done() base_topic = "pub"
# Make sure 'on' was published to pub/fake/entity/state incl = {"domains": ["fake"]}
mock_pub.assert_called_with(self.hass, "pub/fake/entity/state", "on", 1, True) excl = {"entities": ["fake.entity2"]}
assert mock_pub.called
# Add the statestream component for publishing state updates
mock_pub.reset_mock() # Set the filter to allow fake.* items
# Set a state of an entity that shouldn't be included assert await add_statestream(
mock_state_change_event(self.hass, State("fake.entity2", "on")) hass, base_topic=base_topic, publish_include=incl, publish_exclude=excl
self.hass.block_till_done() )
await hass.async_block_till_done()
assert not mock_pub.called
# Reset the mock because it will have already gotten calls for the
@patch("homeassistant.components.mqtt.async_publish") # mqtt_statestream state change on initialization, etc.
@patch("homeassistant.core.dt_util.utcnow") mqtt_mock.async_publish.reset_mock()
def test_state_changed_event_include_domain_exclude_entity(
self, mock_utcnow, mock_pub # Set a state of an entity
): mock_state_change_event(hass, State("fake.entity", "on"))
"""Test filtering with included domain and excluded entity.""" await hass.async_block_till_done()
base_topic = "pub" await hass.async_block_till_done()
incl = {"domains": ["fake"]} # Make sure 'on' was published to pub/fake/entity/state
excl = {"entities": ["fake.entity2"]} mqtt_mock.async_publish.assert_called_with("pub/fake/entity/state", "on", 1, True)
assert mqtt_mock.async_publish.called
# Add the statestream component for publishing state updates
# Set the filter to allow fake.* items mqtt_mock.async_publish.reset_mock()
assert self.add_statestream( # Set a state of an entity that shouldn't be included
base_topic=base_topic, publish_include=incl, publish_exclude=excl mock_state_change_event(hass, State("fake.entity2", "on"))
) await hass.async_block_till_done()
self.hass.block_till_done() await hass.async_block_till_done()
# Reset the mock because it will have already gotten calls for the assert not mqtt_mock.async_publish.called
# mqtt_statestream state change on initialization, etc.
mock_pub.reset_mock()
# Set a state of an entity
mock_state_change_event(self.hass, State("fake.entity", "on"))
self.hass.block_till_done()
# Make sure 'on' was published to pub/fake/entity/state
mock_pub.assert_called_with(self.hass, "pub/fake/entity/state", "on", 1, True)
assert mock_pub.called
mock_pub.reset_mock()
# Set a state of an entity that shouldn't be included
mock_state_change_event(self.hass, State("fake.entity2", "on"))
self.hass.block_till_done()
assert not mock_pub.called

View File

@ -5,9 +5,10 @@ import pytest
import homeassistant.components.nextbus.sensor as nextbus import homeassistant.components.nextbus.sensor as nextbus
import homeassistant.components.sensor as sensor import homeassistant.components.sensor as sensor
from homeassistant.setup import async_setup_component
from tests.async_mock import patch from tests.async_mock import patch
from tests.common import assert_setup_component, async_setup_component from tests.common import assert_setup_component
VALID_AGENCY = "sf-muni" VALID_AGENCY = "sf-muni"
VALID_ROUTE = "F" VALID_ROUTE = "F"

View File

@ -2,12 +2,13 @@
from homeassistant.components import pi_hole from homeassistant.components import pi_hole
from homeassistant.components.pi_hole.const import MIN_TIME_BETWEEN_UPDATES from homeassistant.components.pi_hole.const import MIN_TIME_BETWEEN_UPDATES
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util from homeassistant.util import dt as dt_util
from . import _create_mocked_hole, _patch_config_flow_hole from . import _create_mocked_hole, _patch_config_flow_hole
from tests.async_mock import patch from tests.async_mock import patch
from tests.common import async_fire_time_changed, async_setup_component from tests.common import async_fire_time_changed
def _patch_init_hole(mocked_hole): def _patch_init_hole(mocked_hole):

View File

@ -7,11 +7,12 @@ from pytz import timezone
from homeassistant.components.pvpc_hourly_pricing import ATTR_TARIFF, DOMAIN from homeassistant.components.pvpc_hourly_pricing import ATTR_TARIFF, DOMAIN
from homeassistant.const import CONF_NAME from homeassistant.const import CONF_NAME
from homeassistant.core import ATTR_NOW, EVENT_TIME_CHANGED from homeassistant.core import ATTR_NOW, EVENT_TIME_CHANGED
from homeassistant.setup import async_setup_component
from .conftest import check_valid_state from .conftest import check_valid_state
from tests.async_mock import patch from tests.async_mock import patch
from tests.common import async_setup_component, date_util from tests.common import date_util
from tests.test_util.aiohttp import AiohttpClientMocker from tests.test_util.aiohttp import AiohttpClientMocker

View File

@ -25,7 +25,8 @@ from .common import (
new_profile_config, new_profile_config,
) )
from tests.common import MagicMock, MockConfigEntry, patch from tests.async_mock import MagicMock, patch
from tests.common import MockConfigEntry
def config_schema_validate(withings_config) -> dict: def config_schema_validate(withings_config) -> dict:

View File

@ -6,10 +6,11 @@ import pytest
import homeassistant.components.sensor as sensor import homeassistant.components.sensor as sensor
from homeassistant.const import CONF_NAME from homeassistant.const import CONF_NAME
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from tests.async_mock import patch from tests.async_mock import patch
from tests.common import assert_setup_component, async_setup_component, load_fixture from tests.common import assert_setup_component, load_fixture
REPLY = json.loads(load_fixture("yandex_transport_reply.json")) REPLY = json.loads(load_fixture("yandex_transport_reply.json"))