From d3386907a4db22be84cac5c83cde3908042d15a0 Mon Sep 17 00:00:00 2001 From: Otto Winter Date: Fri, 2 Mar 2018 00:06:26 +0100 Subject: [PATCH] MQTT Python 3.5 Async Await Syntax (#12815) * MQTT Async Await * Remove unused decorator. --- homeassistant/components/mqtt/__init__.py | 92 ++++++++++------------- tests/components/mqtt/test_init.py | 1 + 2 files changed, 41 insertions(+), 52 deletions(-) diff --git a/homeassistant/components/mqtt/__init__.py b/homeassistant/components/mqtt/__init__.py index 63662d2072d..27e7c0358ad 100644 --- a/homeassistant/components/mqtt/__init__.py +++ b/homeassistant/components/mqtt/__init__.py @@ -228,17 +228,16 @@ def publish_template(hass: HomeAssistantType, topic, payload_template, hass.services.call(DOMAIN, SERVICE_PUBLISH, data) -@asyncio.coroutine @bind_hass -def async_subscribe(hass: HomeAssistantType, topic: str, - msg_callback: MessageCallbackType, - qos: int = DEFAULT_QOS, - encoding: str = 'utf-8'): +async def async_subscribe(hass: HomeAssistantType, topic: str, + msg_callback: MessageCallbackType, + qos: int = DEFAULT_QOS, + encoding: str = 'utf-8'): """Subscribe to an MQTT topic. Call the return value to unsubscribe. """ - async_remove = yield from hass.data[DATA_MQTT].async_subscribe( + async_remove = await hass.data[DATA_MQTT].async_subscribe( topic, msg_callback, qos, encoding) return async_remove @@ -259,16 +258,15 @@ def subscribe(hass: HomeAssistantType, topic: str, return remove -@asyncio.coroutine -def _async_setup_server(hass: HomeAssistantType, - config: ConfigType): +async def _async_setup_server(hass: HomeAssistantType, + config: ConfigType): """Try to start embedded MQTT broker. This method is a coroutine. """ conf = config.get(DOMAIN, {}) # type: ConfigType - server = yield from async_prepare_setup_platform( + server = await async_prepare_setup_platform( hass, config, DOMAIN, 'server') if server is None: @@ -276,37 +274,35 @@ def _async_setup_server(hass: HomeAssistantType, return None success, broker_config = \ - yield from server.async_start(hass, conf.get(CONF_EMBEDDED)) + await server.async_start(hass, conf.get(CONF_EMBEDDED)) if not success: return None return broker_config -@asyncio.coroutine -def _async_setup_discovery(hass: HomeAssistantType, - config: ConfigType): +async def _async_setup_discovery(hass: HomeAssistantType, + config: ConfigType) -> bool: """Try to start the discovery of MQTT devices. This method is a coroutine. """ conf = config.get(DOMAIN, {}) # type: ConfigType - discovery = yield from async_prepare_setup_platform( + discovery = await async_prepare_setup_platform( hass, config, DOMAIN, 'discovery') if discovery is None: _LOGGER.error("Unable to load MQTT discovery") return False - success = yield from discovery.async_start( + success = await discovery.async_start( hass, conf[CONF_DISCOVERY_PREFIX], config) # type: bool return success -@asyncio.coroutine -def async_setup(hass: HomeAssistantType, config: ConfigType): +async def async_setup(hass: HomeAssistantType, config: ConfigType) -> bool: """Start the MQTT protocol service.""" conf = config.get(DOMAIN) # type: Optional[ConfigType] @@ -321,7 +317,7 @@ def async_setup(hass: HomeAssistantType, config: ConfigType): if CONF_EMBEDDED not in conf and CONF_BROKER in conf: broker_config = None else: - broker_config = yield from _async_setup_server(hass, config) + broker_config = await _async_setup_server(hass, config) if CONF_BROKER in conf: broker = conf[CONF_BROKER] # type: str @@ -392,19 +388,17 @@ def async_setup(hass: HomeAssistantType, config: ConfigType): "Please check your settings and the broker itself") return False - @asyncio.coroutine - def async_stop_mqtt(event: Event): + async def async_stop_mqtt(event: Event): """Stop MQTT component.""" - yield from hass.data[DATA_MQTT].async_disconnect() + await hass.data[DATA_MQTT].async_disconnect() hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, async_stop_mqtt) - success = yield from hass.data[DATA_MQTT].async_connect() # type: bool + success = await hass.data[DATA_MQTT].async_connect() # type: bool if not success: return False - @asyncio.coroutine - def async_publish_service(call: ServiceCall): + async def async_publish_service(call: ServiceCall): """Handle MQTT publish service calls.""" msg_topic = call.data[ATTR_TOPIC] # type: str payload = call.data.get(ATTR_PAYLOAD) @@ -422,7 +416,7 @@ def async_setup(hass: HomeAssistantType, config: ConfigType): msg_topic, payload_template, exc) return - yield from hass.data[DATA_MQTT].async_publish( + await hass.data[DATA_MQTT].async_publish( msg_topic, payload, qos, retain) hass.services.async_register( @@ -430,7 +424,7 @@ def async_setup(hass: HomeAssistantType, config: ConfigType): schema=MQTT_PUBLISH_SCHEMA) if conf.get(CONF_DISCOVERY): - yield from _async_setup_discovery(hass, config) + await _async_setup_discovery(hass, config) return True @@ -505,25 +499,23 @@ class MQTT(object): if will_message is not None: self._mqttc.will_set(*attr.astuple(will_message)) - @asyncio.coroutine - def async_publish(self, topic: str, payload: PublishPayloadType, qos: int, - retain: bool): + async def async_publish(self, topic: str, payload: PublishPayloadType, + qos: int, retain: bool) -> None: """Publish a MQTT message. This method must be run in the event loop and returns a coroutine. """ - with (yield from self._paho_lock): - yield from self.hass.async_add_job( + async with self._paho_lock: + await self.hass.async_add_job( self._mqttc.publish, topic, payload, qos, retain) - @asyncio.coroutine - def async_connect(self): + async def async_connect(self) -> bool: """Connect to the host. Does process messages yet. This method is a coroutine. """ result = None # type: int - result = yield from self.hass.async_add_job( + result = await self.hass.async_add_job( self._mqttc.connect, self.broker, self.port, self.keepalive) if result != 0: @@ -547,9 +539,9 @@ class MQTT(object): return self.hass.async_add_job(stop) - @asyncio.coroutine - def async_subscribe(self, topic: str, msg_callback: MessageCallbackType, - qos: int, encoding: str): + async def async_subscribe(self, topic: str, + msg_callback: MessageCallbackType, + qos: int, encoding: str) -> Callable[[], None]: """Set up a subscription to a topic with the provided qos. This method is a coroutine. @@ -560,10 +552,10 @@ class MQTT(object): subscription = Subscription(topic, msg_callback, qos, encoding) self.subscriptions.append(subscription) - yield from self._async_perform_subscription(topic, qos) + await self._async_perform_subscription(topic, qos) @callback - def async_remove(): + def async_remove() -> None: """Remove subscription.""" if subscription not in self.subscriptions: raise HomeAssistantError("Can't remove subscription twice") @@ -576,27 +568,24 @@ class MQTT(object): return async_remove - @asyncio.coroutine - def _async_unsubscribe(self, topic: str): + async def _async_unsubscribe(self, topic: str) -> None: """Unsubscribe from a topic. This method is a coroutine. """ - with (yield from self._paho_lock): + async with self._paho_lock: result = None # type: int - result, _ = yield from self.hass.async_add_job( + result, _ = await self.hass.async_add_job( self._mqttc.unsubscribe, topic) _raise_on_error(result) - @asyncio.coroutine - def _async_perform_subscription(self, topic: str, - qos: int): + async def _async_perform_subscription(self, topic: str, qos: int) -> None: """Perform a paho-mqtt subscription.""" _LOGGER.debug("Subscribing to %s", topic) - with (yield from self._paho_lock): + async with self._paho_lock: result = None # type: int - result, _ = yield from self.hass.async_add_job( + result, _ = await self.hass.async_add_job( self._mqttc.subscribe, topic, qos) _raise_on_error(result) @@ -721,8 +710,7 @@ class MqttAvailability(Entity): self._payload_available = payload_available self._payload_not_available = payload_not_available - @asyncio.coroutine - def async_added_to_hass(self): + async def async_added_to_hass(self) -> None: """Subscribe mqtt events. This method must be run in the event loop and returns a coroutine. @@ -740,7 +728,7 @@ class MqttAvailability(Entity): self.async_schedule_update_ha_state() if self._availability_topic is not None: - yield from async_subscribe( + await async_subscribe( self.hass, self._availability_topic, availability_message_received, self._availability_qos) diff --git a/tests/components/mqtt/test_init.py b/tests/components/mqtt/test_init.py index 24308bc9a7e..1dd89a92f04 100644 --- a/tests/components/mqtt/test_init.py +++ b/tests/components/mqtt/test_init.py @@ -28,6 +28,7 @@ def async_mock_mqtt_client(hass, config=None): with mock.patch('paho.mqtt.client.Client') as mock_client: mock_client().connect.return_value = 0 mock_client().subscribe.return_value = (0, 0) + mock_client().unsubscribe.return_value = (0, 0) mock_client().publish.return_value = (0, 0) result = yield from async_setup_component(hass, mqtt.DOMAIN, { mqtt.DOMAIN: config