mirror of
https://github.com/home-assistant/core.git
synced 2025-07-22 20:57:21 +00:00
Fix missing azure event hub instance name (#52049)
This commit is contained in:
parent
780d538bb0
commit
e21325b975
@ -23,6 +23,7 @@ import homeassistant.helpers.config_validation as cv
|
|||||||
from homeassistant.helpers.entityfilter import FILTER_SCHEMA
|
from homeassistant.helpers.entityfilter import FILTER_SCHEMA
|
||||||
from homeassistant.helpers.event import async_call_later
|
from homeassistant.helpers.event import async_call_later
|
||||||
from homeassistant.helpers.json import JSONEncoder
|
from homeassistant.helpers.json import JSONEncoder
|
||||||
|
from homeassistant.helpers.typing import ConfigType
|
||||||
|
|
||||||
from .const import (
|
from .const import (
|
||||||
ADDITIONAL_ARGS,
|
ADDITIONAL_ARGS,
|
||||||
@ -43,9 +44,9 @@ CONFIG_SCHEMA = vol.Schema(
|
|||||||
{
|
{
|
||||||
DOMAIN: vol.Schema(
|
DOMAIN: vol.Schema(
|
||||||
{
|
{
|
||||||
|
vol.Required(CONF_EVENT_HUB_INSTANCE_NAME): cv.string,
|
||||||
vol.Exclusive(CONF_EVENT_HUB_CON_STRING, "setup_methods"): cv.string,
|
vol.Exclusive(CONF_EVENT_HUB_CON_STRING, "setup_methods"): cv.string,
|
||||||
vol.Exclusive(CONF_EVENT_HUB_NAMESPACE, "setup_methods"): cv.string,
|
vol.Exclusive(CONF_EVENT_HUB_NAMESPACE, "setup_methods"): cv.string,
|
||||||
vol.Optional(CONF_EVENT_HUB_INSTANCE_NAME): cv.string,
|
|
||||||
vol.Optional(CONF_EVENT_HUB_SAS_POLICY): cv.string,
|
vol.Optional(CONF_EVENT_HUB_SAS_POLICY): cv.string,
|
||||||
vol.Optional(CONF_EVENT_HUB_SAS_KEY): cv.string,
|
vol.Optional(CONF_EVENT_HUB_SAS_KEY): cv.string,
|
||||||
vol.Optional(CONF_SEND_INTERVAL, default=5): cv.positive_int,
|
vol.Optional(CONF_SEND_INTERVAL, default=5): cv.positive_int,
|
||||||
@ -61,20 +62,23 @@ CONFIG_SCHEMA = vol.Schema(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def async_setup(hass, yaml_config):
|
async def async_setup(hass: HomeAssistant, yaml_config: ConfigType) -> bool:
|
||||||
"""Activate Azure EH component."""
|
"""Activate Azure EH component."""
|
||||||
config = yaml_config[DOMAIN]
|
config = yaml_config[DOMAIN]
|
||||||
if config.get(CONF_EVENT_HUB_CON_STRING):
|
if config.get(CONF_EVENT_HUB_CON_STRING):
|
||||||
client_args = {"conn_str": config[CONF_EVENT_HUB_CON_STRING]}
|
client_args = {
|
||||||
|
"conn_str": config[CONF_EVENT_HUB_CON_STRING],
|
||||||
|
"eventhub_name": config[CONF_EVENT_HUB_INSTANCE_NAME],
|
||||||
|
}
|
||||||
conn_str_client = True
|
conn_str_client = True
|
||||||
else:
|
else:
|
||||||
client_args = {
|
client_args = {
|
||||||
"fully_qualified_namespace": f"{config[CONF_EVENT_HUB_NAMESPACE]}.servicebus.windows.net",
|
"fully_qualified_namespace": f"{config[CONF_EVENT_HUB_NAMESPACE]}.servicebus.windows.net",
|
||||||
|
"eventhub_name": config[CONF_EVENT_HUB_INSTANCE_NAME],
|
||||||
"credential": EventHubSharedKeyCredential(
|
"credential": EventHubSharedKeyCredential(
|
||||||
policy=config[CONF_EVENT_HUB_SAS_POLICY],
|
policy=config[CONF_EVENT_HUB_SAS_POLICY],
|
||||||
key=config[CONF_EVENT_HUB_SAS_KEY],
|
key=config[CONF_EVENT_HUB_SAS_KEY],
|
||||||
),
|
),
|
||||||
"eventhub_name": config[CONF_EVENT_HUB_INSTANCE_NAME],
|
|
||||||
}
|
}
|
||||||
conn_str_client = False
|
conn_str_client = False
|
||||||
|
|
||||||
@ -115,7 +119,7 @@ class AzureEventHub:
|
|||||||
self._next_send_remover = None
|
self._next_send_remover = None
|
||||||
self.shutdown = False
|
self.shutdown = False
|
||||||
|
|
||||||
async def async_start(self):
|
async def async_start(self) -> None:
|
||||||
"""Start the recorder, suppress logging and register the callbacks and do the first send after five seconds, to capture the startup events."""
|
"""Start the recorder, suppress logging and register the callbacks and do the first send after five seconds, to capture the startup events."""
|
||||||
# suppress the INFO and below logging on the underlying packages, they are very verbose, even at INFO
|
# suppress the INFO and below logging on the underlying packages, they are very verbose, even at INFO
|
||||||
logging.getLogger("uamqp").setLevel(logging.WARNING)
|
logging.getLogger("uamqp").setLevel(logging.WARNING)
|
||||||
@ -128,7 +132,7 @@ class AzureEventHub:
|
|||||||
# schedule the first send after 10 seconds to capture startup events, after that each send will schedule the next after the interval.
|
# schedule the first send after 10 seconds to capture startup events, after that each send will schedule the next after the interval.
|
||||||
self._next_send_remover = async_call_later(self.hass, 10, self.async_send)
|
self._next_send_remover = async_call_later(self.hass, 10, self.async_send)
|
||||||
|
|
||||||
async def async_shutdown(self, _: Event):
|
async def async_shutdown(self, _: Event) -> None:
|
||||||
"""Shut down the AEH by queueing None and calling send."""
|
"""Shut down the AEH by queueing None and calling send."""
|
||||||
if self._next_send_remover:
|
if self._next_send_remover:
|
||||||
self._next_send_remover()
|
self._next_send_remover()
|
||||||
@ -137,14 +141,13 @@ class AzureEventHub:
|
|||||||
await self.queue.put((3, (time.monotonic(), None)))
|
await self.queue.put((3, (time.monotonic(), None)))
|
||||||
await self.async_send(None)
|
await self.async_send(None)
|
||||||
|
|
||||||
async def async_listen(self, event: Event):
|
async def async_listen(self, event: Event) -> None:
|
||||||
"""Listen for new messages on the bus and queue them for AEH."""
|
"""Listen for new messages on the bus and queue them for AEH."""
|
||||||
await self.queue.put((2, (time.monotonic(), event)))
|
await self.queue.put((2, (time.monotonic(), event)))
|
||||||
|
|
||||||
async def async_send(self, _):
|
async def async_send(self, _) -> None:
|
||||||
"""Write preprocessed events to eventhub, with retry."""
|
"""Write preprocessed events to eventhub, with retry."""
|
||||||
client = self._get_client()
|
async with self._get_client() as client:
|
||||||
async with client:
|
|
||||||
while not self.queue.empty():
|
while not self.queue.empty():
|
||||||
data_batch, dequeue_count = await self.fill_batch(client)
|
data_batch, dequeue_count = await self.fill_batch(client)
|
||||||
_LOGGER.debug(
|
_LOGGER.debug(
|
||||||
@ -160,14 +163,13 @@ class AzureEventHub:
|
|||||||
finally:
|
finally:
|
||||||
for _ in range(dequeue_count):
|
for _ in range(dequeue_count):
|
||||||
self.queue.task_done()
|
self.queue.task_done()
|
||||||
await client.close()
|
|
||||||
|
|
||||||
if not self.shutdown:
|
if not self.shutdown:
|
||||||
self._next_send_remover = async_call_later(
|
self._next_send_remover = async_call_later(
|
||||||
self.hass, self._send_interval, self.async_send
|
self.hass, self._send_interval, self.async_send
|
||||||
)
|
)
|
||||||
|
|
||||||
async def fill_batch(self, client):
|
async def fill_batch(self, client) -> None:
|
||||||
"""Return a batch of events formatted for writing.
|
"""Return a batch of events formatted for writing.
|
||||||
|
|
||||||
Uses get_nowait instead of await get, because the functions batches and doesn't wait for each single event, the send function is called.
|
Uses get_nowait instead of await get, because the functions batches and doesn't wait for each single event, the send function is called.
|
||||||
@ -205,7 +207,7 @@ class AzureEventHub:
|
|||||||
|
|
||||||
return event_batch, dequeue_count
|
return event_batch, dequeue_count
|
||||||
|
|
||||||
def _event_to_filtered_event_data(self, event: Event):
|
def _event_to_filtered_event_data(self, event: Event) -> None:
|
||||||
"""Filter event states and create EventData object."""
|
"""Filter event states and create EventData object."""
|
||||||
state = event.data.get("new_state")
|
state = event.data.get("new_state")
|
||||||
if (
|
if (
|
||||||
@ -216,7 +218,7 @@ class AzureEventHub:
|
|||||||
return None
|
return None
|
||||||
return EventData(json.dumps(obj=state, cls=JSONEncoder).encode("utf-8"))
|
return EventData(json.dumps(obj=state, cls=JSONEncoder).encode("utf-8"))
|
||||||
|
|
||||||
def _get_client(self):
|
def _get_client(self) -> EventHubProducerClient:
|
||||||
"""Get a Event Producer Client."""
|
"""Get a Event Producer Client."""
|
||||||
if self._conn_str_client:
|
if self._conn_str_client:
|
||||||
return EventHubProducerClient.from_connection_string(
|
return EventHubProducerClient.from_connection_string(
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
"domain": "azure_event_hub",
|
"domain": "azure_event_hub",
|
||||||
"name": "Azure Event Hub",
|
"name": "Azure Event Hub",
|
||||||
"documentation": "https://www.home-assistant.io/integrations/azure_event_hub",
|
"documentation": "https://www.home-assistant.io/integrations/azure_event_hub",
|
||||||
"requirements": ["azure-eventhub==5.1.0"],
|
"requirements": ["azure-eventhub==5.5.0"],
|
||||||
"codeowners": ["@eavanvalkenburg"],
|
"codeowners": ["@eavanvalkenburg"],
|
||||||
"iot_class": "cloud_push"
|
"iot_class": "cloud_push"
|
||||||
}
|
}
|
||||||
|
@ -331,7 +331,7 @@ av==8.0.3
|
|||||||
axis==44
|
axis==44
|
||||||
|
|
||||||
# homeassistant.components.azure_event_hub
|
# homeassistant.components.azure_event_hub
|
||||||
azure-eventhub==5.1.0
|
azure-eventhub==5.5.0
|
||||||
|
|
||||||
# homeassistant.components.azure_service_bus
|
# homeassistant.components.azure_service_bus
|
||||||
azure-servicebus==0.50.3
|
azure-servicebus==0.50.3
|
||||||
|
@ -208,7 +208,7 @@ av==8.0.3
|
|||||||
axis==44
|
axis==44
|
||||||
|
|
||||||
# homeassistant.components.azure_event_hub
|
# homeassistant.components.azure_event_hub
|
||||||
azure-eventhub==5.1.0
|
azure-eventhub==5.5.0
|
||||||
|
|
||||||
# homeassistant.components.homekit
|
# homeassistant.components.homekit
|
||||||
base36==0.1.1
|
base36==0.1.1
|
||||||
|
Loading…
x
Reference in New Issue
Block a user