Update nest config flow to create pub/sub topics (#136609)

Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
This commit is contained in:
Allen Porter 2025-01-28 02:53:57 -08:00 committed by GitHub
parent cd9abacdb2
commit 3ac062453f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 333 additions and 132 deletions

View File

@ -15,6 +15,7 @@ import logging
from typing import TYPE_CHECKING, Any
from google_nest_sdm.admin_client import (
DEFAULT_TOPIC_IAM_POLICY,
AdminClient,
EligibleSubscriptions,
EligibleTopics,
@ -25,6 +26,11 @@ import voluptuous as vol
from homeassistant.config_entries import SOURCE_REAUTH, ConfigFlowResult
from homeassistant.helpers import config_entry_oauth2_flow
from homeassistant.helpers.selector import (
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
)
from homeassistant.util import get_random_string
from . import api
@ -41,8 +47,9 @@ from .const import (
)
DATA_FLOW_IMPL = "nest_flow_implementation"
TOPIC_FORMAT = "projects/{cloud_project_id}/topics/home-assistant-{rnd}"
SUBSCRIPTION_FORMAT = "projects/{cloud_project_id}/subscriptions/home-assistant-{rnd}"
SUBSCRIPTION_RAND_LENGTH = 10
RAND_LENGTH = 10
MORE_INFO_URL = "https://www.home-assistant.io/integrations/nest/#configuration"
@ -59,6 +66,7 @@ DEVICE_ACCESS_CONSOLE_URL = "https://console.nest.google.com/device-access/"
DEVICE_ACCESS_CONSOLE_EDIT_URL = (
"https://console.nest.google.com/device-access/project/{project_id}/information"
)
CREATE_NEW_TOPIC_KEY = "create_new_topic"
CREATE_NEW_SUBSCRIPTION_KEY = "create_new_subscription"
_LOGGER = logging.getLogger(__name__)
@ -66,10 +74,16 @@ _LOGGER = logging.getLogger(__name__)
def _generate_subscription_id(cloud_project_id: str) -> str:
"""Create a new subscription id."""
rnd = get_random_string(SUBSCRIPTION_RAND_LENGTH)
rnd = get_random_string(RAND_LENGTH)
return SUBSCRIPTION_FORMAT.format(cloud_project_id=cloud_project_id, rnd=rnd)
def _generate_topic_id(cloud_project_id: str) -> str:
"""Create a new topic id."""
rnd = get_random_string(RAND_LENGTH)
return TOPIC_FORMAT.format(cloud_project_id=cloud_project_id, rnd=rnd)
def generate_config_title(structures: Iterable[Structure]) -> str | None:
"""Pick a user friendly config title based on the Google Home name(s)."""
names: list[str] = [
@ -130,7 +144,7 @@ class NestFlowHandler(
if self.source == SOURCE_REAUTH:
_LOGGER.debug("Skipping Pub/Sub configuration")
return await self._async_finish()
return await self.async_step_pubsub()
return await self.async_step_pubsub_topic()
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
@ -192,7 +206,9 @@ class NestFlowHandler(
) -> ConfigFlowResult:
"""Handle cloud project in user input."""
if user_input is not None:
self._data.update(user_input)
self._data[CONF_CLOUD_PROJECT_ID] = user_input[
CONF_CLOUD_PROJECT_ID
].strip()
return await self.async_step_device_project()
return self.async_show_form(
step_id="cloud_project",
@ -213,7 +229,7 @@ class NestFlowHandler(
"""Collect device access project from user input."""
errors = {}
if user_input is not None:
project_id = user_input[CONF_PROJECT_ID]
project_id = user_input[CONF_PROJECT_ID].strip()
if project_id == self._data[CONF_CLOUD_PROJECT_ID]:
_LOGGER.error(
"Device Access Project ID and Cloud Project ID must not be the"
@ -240,72 +256,83 @@ class NestFlowHandler(
errors=errors,
)
async def async_step_pubsub(
async def async_step_pubsub_topic(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Configure and the pre-requisites to configure Pub/Sub topics and subscriptions."""
data = {
**self._data,
**(user_input if user_input is not None else {}),
}
cloud_project_id = data.get(CONF_CLOUD_PROJECT_ID, "").strip()
device_access_project_id = data[CONF_PROJECT_ID]
errors: dict[str, str] = {}
if cloud_project_id:
"""Configure and create Pub/Sub topic."""
cloud_project_id = self._data[CONF_CLOUD_PROJECT_ID]
if self._admin_client is None:
access_token = self._data["token"]["access_token"]
self._admin_client = api.new_pubsub_admin_client(
self.hass, access_token=access_token, cloud_project_id=cloud_project_id
self.hass,
access_token=access_token,
cloud_project_id=cloud_project_id,
)
try:
eligible_topics = await self._admin_client.list_eligible_topics(
device_access_project_id=device_access_project_id
)
except ApiException as err:
_LOGGER.error("Error listing eligible Pub/Sub topics: %s", err)
errors["base"] = "pubsub_api_error"
else:
if not eligible_topics.topic_names:
errors["base"] = "no_pubsub_topics"
errors = {}
if user_input is not None:
topic_name = user_input[CONF_TOPIC_NAME]
if topic_name == CREATE_NEW_TOPIC_KEY:
topic_name = _generate_topic_id(cloud_project_id)
_LOGGER.debug("Creating topic %s", topic_name)
try:
await self._admin_client.create_topic(topic_name)
await self._admin_client.set_topic_iam_policy(
topic_name, DEFAULT_TOPIC_IAM_POLICY
)
except ApiException as err:
_LOGGER.error("Error creating Pub/Sub topic: %s", err)
errors["base"] = "pubsub_api_error"
if not errors:
self._data[CONF_CLOUD_PROJECT_ID] = cloud_project_id
self._eligible_topics = eligible_topics
return await self.async_step_pubsub_topic()
self._data[CONF_TOPIC_NAME] = topic_name
return await self.async_step_pubsub_topic_confirm()
device_access_project_id = self._data[CONF_PROJECT_ID]
try:
eligible_topics = await self._admin_client.list_eligible_topics(
device_access_project_id=device_access_project_id
)
except ApiException as err:
_LOGGER.error("Error listing eligible Pub/Sub topics: %s", err)
return self.async_abort(reason="pubsub_api_error")
topics = [
*eligible_topics.topic_names, # Untranslated topic paths
CREATE_NEW_TOPIC_KEY,
]
return self.async_show_form(
step_id="pubsub",
step_id="pubsub_topic",
data_schema=vol.Schema(
{
vol.Required(CONF_CLOUD_PROJECT_ID, default=cloud_project_id): str,
vol.Required(
CONF_TOPIC_NAME, default=next(iter(topics))
): SelectSelector(
SelectSelectorConfig(
translation_key="topic_name",
mode=SelectSelectorMode.LIST,
options=topics,
)
)
}
),
description_placeholders={
"url": CLOUD_CONSOLE_URL,
"device_access_console_url": DEVICE_ACCESS_CONSOLE_URL,
"more_info_url": MORE_INFO_URL,
},
errors=errors,
)
async def async_step_pubsub_topic(
self, user_input: dict[str, Any] | None = None
async def async_step_pubsub_topic_confirm(
self, user_input: dict | None = None
) -> ConfigFlowResult:
"""Configure and create Pub/Sub topic."""
if TYPE_CHECKING:
assert self._eligible_topics
"""Have the user confirm the Pub/Sub topic is set correctly in Device Access Console."""
if user_input is not None:
self._data.update(user_input)
return await self.async_step_pubsub_subscription()
topics = list(self._eligible_topics.topic_names)
return self.async_show_form(
step_id="pubsub_topic",
data_schema=vol.Schema(
{
vol.Optional(CONF_TOPIC_NAME, default=topics[0]): vol.In(topics),
}
),
step_id="pubsub_topic_confirm",
description_placeholders={
"device_access_console_url": DEVICE_ACCESS_CONSOLE_URL,
"device_access_console_url": DEVICE_ACCESS_CONSOLE_EDIT_URL.format(
project_id=self._data[CONF_PROJECT_ID]
),
"topic_name": self._data[CONF_TOPIC_NAME],
"more_info_url": MORE_INFO_URL,
},
)
@ -362,7 +389,7 @@ class NestFlowHandler(
)
return await self._async_finish()
subscriptions = {}
subscriptions = []
try:
eligible_subscriptions = (
await self._admin_client.list_eligible_subscriptions(
@ -375,10 +402,8 @@ class NestFlowHandler(
)
errors["base"] = "pubsub_api_error"
else:
subscriptions.update(
{name: name for name in eligible_subscriptions.subscription_names}
)
subscriptions[CREATE_NEW_SUBSCRIPTION_KEY] = "Create New"
subscriptions.extend(eligible_subscriptions.subscription_names)
subscriptions.append(CREATE_NEW_SUBSCRIPTION_KEY)
return self.async_show_form(
step_id="pubsub_subscription",
data_schema=vol.Schema(
@ -386,7 +411,13 @@ class NestFlowHandler(
vol.Optional(
CONF_SUBSCRIPTION_NAME,
default=next(iter(subscriptions)),
): vol.In(subscriptions),
): SelectSelector(
SelectSelectorConfig(
translation_key="subscription_name",
mode=SelectSelectorMode.LIST,
options=subscriptions,
)
)
}
),
description_placeholders={

View File

@ -17,7 +17,7 @@
},
"device_project": {
"title": "Nest: Create a Device Access Project",
"description": "Create a Nest Device Access project which **requires paying Google a US $5 fee** to set up.\n1. Go to the [Device Access Console]({device_access_console_url}), and through the payment flow.\n1. Select on **Create project**\n1. Give your Device Access project a name and select **Next**.\n1. Enter your OAuth Client ID\n1. Enable events by clicking **Enable** and **Create project**.\n\nEnter your Device Access Project ID below ([more info]({more_info_url})).",
"description": "Create a Nest Device Access project which **requires paying Google a US $5 fee** to set up.\n1. Go to the [Device Access Console]({device_access_console_url}), and through the payment flow.\n1. Select on **Create project**\n1. Give your Device Access project a name and select **Next**.\n1. Enter your OAuth Client ID\n1. Skip enabling events for now and select **Create project**.\n\nEnter your Device Access Project ID below ([more info]({more_info_url})).",
"data": {
"project_id": "Device Access Project ID"
}
@ -25,20 +25,18 @@
"pick_implementation": {
"title": "[%key:common::config_flow::title::oauth2_pick_implementation%]"
},
"pubsub": {
"title": "Configure Google Cloud Pub/Sub",
"description": "Home Assistant uses Cloud Pub/Sub receive realtime Nest device updates. Nest servers publish updates to a Pub/Sub topic and Home Assistant receives the updates through a Pub/Sub subscription.\n\n1. Visit the [Device Access Console]({device_access_console_url}) and ensure a Pub/Sub topic is configured.\n2. Visit the [Cloud Console]({url}) to find your Google Cloud Project ID and confirm it is correct below.\n3. The next step will attempt to auto-discover Pub/Sub topics and subscriptions.\n\nSee the integration documentation for [more info]({more_info_url}).",
"data": {
"cloud_project_id": "[%key:component::nest::config::step::cloud_project::data::cloud_project_id%]"
}
},
"pubsub_topic": {
"title": "Configure Cloud Pub/Sub topic",
"description": "Nest devices publish updates on a Cloud Pub/Sub topic. Select the Pub/Sub topic below that is the same as the [Device Access Console]({device_access_console_url}). See the integration documentation for [more info]({more_info_url}).",
"description": "Nest devices publish updates on a Cloud Pub/Sub topic. You can select an existing topic if one exists, or choose to create a new topic and the next step will create it for you with the necessary permissions. See the integration documentation for [more info]({more_info_url}).",
"data": {
"topic_name": "Pub/Sub topic Name"
}
},
"pubsub_topic_confirm": {
"title": "Enable events",
"description": "The Nest Device Access Console needs to be configured to publish device events to your Pub/Sub topic.\n\n1. Visit the [Device Access Console]({device_access_console_url}).\n2. Open the project.\n3. Enable *Events* and set the Pub/Sub topic name to `{topic_name}`\n4. Click *Add & Validate* to verify the topic is configured correctly.\n\nSee the integration documentation for [more info]({more_info_url}).",
"submit": "Confirm"
},
"pubsub_subscription": {
"title": "Configure Cloud Pub/Sub subscription",
"description": "Home Assistant receives realtime Nest device updates with a Cloud Pub/Sub subscription for topic `{topic}`.\n\nSelect an existing subscription below if one already exists, or the next step will create a new one for you. See the integration documentation for [more info]({more_info_url}).",
@ -70,7 +68,8 @@
"oauth_error": "[%key:common::config_flow::abort::oauth2_error%]",
"oauth_timeout": "[%key:common::config_flow::abort::oauth2_timeout%]",
"oauth_unauthorized": "[%key:common::config_flow::abort::oauth2_unauthorized%]",
"oauth_failed": "[%key:common::config_flow::abort::oauth2_failed%]"
"oauth_failed": "[%key:common::config_flow::abort::oauth2_failed%]",
"pubsub_api_error": "[%key:component::nest::config::error::pubsub_api_error%]"
},
"create_entry": {
"default": "[%key:common::config_flow::create_entry::authenticated%]"
@ -109,5 +108,17 @@
}
}
}
},
"selector": {
"topic_name": {
"options": {
"create_new_topic": "Create new topic"
}
},
"subscription_name": {
"options": {
"create_new_subscription": "Create new subscription"
}
}
}
}

View File

@ -74,20 +74,25 @@ class FakeAuth:
self.json = None
self.headers = None
self.captured_requests = []
self._project_id = project_id
self._aioclient_mock = aioclient_mock
self.register_mock_requests()
def register_mock_requests(self) -> None:
"""Register the mocks."""
# API makes a call to request structures to initiate pubsub feed, but the
# integration does not use this.
aioclient_mock.get(
f"{API_URL}/enterprises/{project_id}/structures",
self._aioclient_mock.get(
f"{API_URL}/enterprises/{self._project_id}/structures",
side_effect=self.request_structures,
)
aioclient_mock.get(
f"{API_URL}/enterprises/{project_id}/devices",
self._aioclient_mock.get(
f"{API_URL}/enterprises/{self._project_id}/devices",
side_effect=self.request_devices,
)
aioclient_mock.post(DEVICE_URL_MATCH, side_effect=self.request)
aioclient_mock.get(TEST_IMAGE_URL, side_effect=self.request)
aioclient_mock.get(TEST_CLIP_URL, side_effect=self.request)
self._aioclient_mock.post(DEVICE_URL_MATCH, side_effect=self.request)
self._aioclient_mock.get(TEST_IMAGE_URL, side_effect=self.request)
self._aioclient_mock.get(TEST_CLIP_URL, side_effect=self.request)
async def request_structures(
self, method: str, url: str, data: dict[str, Any]

View File

@ -34,7 +34,7 @@ from tests.typing import ClientSessionGenerator
WEB_REDIRECT_URL = "https://example.com/auth/external/callback"
APP_REDIRECT_URL = "urn:ietf:wg:oauth:2.0:oob"
RAND_SUBSCRIBER_SUFFIX = "ABCDEF"
RAND_SUFFIX = "ABCDEF"
FAKE_DHCP_DATA = DhcpServiceInfo(
ip="127.0.0.2", macaddress="001122334455", hostname="fake_hostname"
@ -52,7 +52,7 @@ def mock_rand_topic_name_fixture() -> None:
"""Set the topic name random string to a constant."""
with patch(
"homeassistant.components.nest.config_flow.get_random_string",
return_value=RAND_SUBSCRIBER_SUFFIX,
return_value=RAND_SUFFIX,
):
yield
@ -173,6 +173,7 @@ class OAuthFixture:
selected_topic: str,
selected_subscription: str = "create_new_subscription",
user_input: dict | None = None,
existing_errors: dict | None = None,
) -> ConfigEntry:
"""Fixture to walk through the Pub/Sub topic and subscription steps.
@ -193,6 +194,12 @@ class OAuthFixture:
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic_confirm"
assert not result.get("errors")
# ACK the topic selection. User is instructed to do some manual
result = await self.async_configure(result, {})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert not result.get("errors")
@ -267,6 +274,12 @@ def mock_cloud_project_id() -> str:
return CLOUD_PROJECT_ID
@pytest.fixture(name="create_topic_status")
def mock_create_topic_status() -> str:
"""Fixture to configure the return code when creating the topic."""
return HTTPStatus.OK
@pytest.fixture(name="create_subscription_status")
def mock_create_subscription_status() -> str:
"""Fixture to configure the return code when creating the subscription."""
@ -285,6 +298,64 @@ def mock_list_subscriptions_status() -> str:
return HTTPStatus.OK
def setup_mock_list_subscriptions_responses(
aioclient_mock: AiohttpClientMocker,
cloud_project_id: str,
subscriptions: list[tuple[str, str]],
list_subscriptions_status: HTTPStatus = HTTPStatus.OK,
) -> None:
"""Configure the mock responses for listing Pub/Sub subscriptions."""
aioclient_mock.get(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/subscriptions",
json={
"subscriptions": [
{
"name": subscription_name,
"topic": topic,
"pushConfig": {},
"ackDeadlineSeconds": 10,
"messageRetentionDuration": "604800s",
"expirationPolicy": {"ttl": "2678400s"},
"state": "ACTIVE",
}
for (subscription_name, topic) in subscriptions or ()
]
},
status=list_subscriptions_status,
)
def setup_mock_create_topic_responses(
aioclient_mock: AiohttpClientMocker,
cloud_project_id: str,
create_topic_status: HTTPStatus = HTTPStatus.OK,
) -> None:
"""Configure the mock responses for creating a Pub/Sub topic."""
aioclient_mock.put(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/topics/home-assistant-{RAND_SUFFIX}",
json={},
status=create_topic_status,
)
aioclient_mock.post(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/topics/home-assistant-{RAND_SUFFIX}:setIamPolicy",
json={},
status=create_topic_status,
)
def setup_mock_create_subscription_responses(
aioclient_mock: AiohttpClientMocker,
cloud_project_id: str,
create_subscription_status: HTTPStatus = HTTPStatus.OK,
) -> None:
"""Configure the mock responses for creating a Pub/Sub subscription."""
aioclient_mock.put(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/subscriptions/home-assistant-{RAND_SUFFIX}",
json={},
status=create_subscription_status,
)
@pytest.fixture(autouse=True)
def mock_pubsub_api_responses(
aioclient_mock: AiohttpClientMocker,
@ -293,6 +364,7 @@ def mock_pubsub_api_responses(
subscriptions: list[tuple[str, str]],
device_access_project_id: str,
cloud_project_id: str,
create_topic_status: HTTPStatus,
create_subscription_status: HTTPStatus,
list_topics_status: HTTPStatus,
list_subscriptions_status: HTTPStatus,
@ -320,28 +392,14 @@ def mock_pubsub_api_responses(
)
# We check for a topic created by the SDM Device Access Console (but note we don't have permission to read it)
# or the user has created one themselves in the Google Cloud Project.
aioclient_mock.get(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/subscriptions",
json={
"subscriptions": [
{
"name": subscription_name,
"topic": topic,
"pushConfig": {},
"ackDeadlineSeconds": 10,
"messageRetentionDuration": "604800s",
"expirationPolicy": {"ttl": "2678400s"},
"state": "ACTIVE",
}
for (subscription_name, topic) in subscriptions or ()
]
},
status=list_subscriptions_status,
setup_mock_list_subscriptions_responses(
aioclient_mock, cloud_project_id, subscriptions, list_subscriptions_status
)
aioclient_mock.put(
f"https://pubsub.googleapis.com/v1/projects/{cloud_project_id}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}",
json={},
status=create_subscription_status,
setup_mock_create_topic_responses(
aioclient_mock, cloud_project_id, create_topic_status
)
setup_mock_create_subscription_responses(
aioclient_mock, cloud_project_id, create_subscription_status
)
@ -371,7 +429,7 @@ async def test_app_credentials(
"auth_implementation": "imported-cred",
"cloud_project_id": CLOUD_PROJECT_ID,
"project_id": PROJECT_ID,
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}",
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUFFIX}",
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
"token": {
"refresh_token": "mock-refresh-token",
@ -520,6 +578,11 @@ async def test_config_flow_pubsub_configuration_error(
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic_confirm"
assert not result.get("errors")
result = await oauth.async_configure(result, {})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("data_schema")({}) == {
"subscription_name": "create_new_subscription",
@ -565,6 +628,11 @@ async def test_config_flow_pubsub_subscriber_error(
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic_confirm"
assert not result.get("errors")
result = await oauth.async_configure(result, {})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("data_schema")({}) == {
"subscription_name": "create_new_subscription",
@ -691,37 +759,6 @@ async def test_reauth_multiple_config_entries(
assert entry.data.get("extra_data")
@pytest.mark.parametrize(("sdm_managed_topic"), [(True)])
async def test_pubsub_subscription_strip_whitespace(
hass: HomeAssistant,
oauth: OAuthFixture,
) -> None:
"""Check that project id has whitespace stripped on entry."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(
result, cloud_project_id=" " + CLOUD_PROJECT_ID + " "
)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, {"code": "1234"})
entry = await oauth.async_complete_pubsub_flow(
result, selected_topic="projects/sdm-prod/topics/enterprise-some-project-id"
)
assert entry.title == "Import from configuration.yaml"
assert "token" in entry.data
entry.data["token"].pop("expires_at")
assert entry.unique_id == PROJECT_ID
assert entry.data["token"] == {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
"expires_in": 60,
}
assert "subscription_name" in entry.data
assert entry.data["cloud_project_id"] == CLOUD_PROJECT_ID
@pytest.mark.parametrize(
("sdm_managed_topic", "create_subscription_status"),
[(True, HTTPStatus.UNAUTHORIZED)],
@ -751,6 +788,11 @@ async def test_pubsub_subscription_auth_failure(
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic_confirm"
assert not result.get("errors")
result = await oauth.async_configure(result, {})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("data_schema")({}) == {
"subscription_name": "create_new_subscription",
@ -833,7 +875,7 @@ async def test_config_entry_title_from_home(
assert entry.data.get("cloud_project_id") == CLOUD_PROJECT_ID
assert (
entry.data.get("subscription_name")
== f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}"
== f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUFFIX}"
)
assert (
entry.data.get("topic_name")
@ -905,7 +947,7 @@ async def test_title_failure_fallback(
assert entry.data.get("cloud_project_id") == CLOUD_PROJECT_ID
assert (
entry.data.get("subscription_name")
== f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}"
== f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUFFIX}"
)
assert (
entry.data.get("topic_name")
@ -997,7 +1039,7 @@ async def test_dhcp_discovery_with_creds(
"auth_implementation": "imported-cred",
"cloud_project_id": CLOUD_PROJECT_ID,
"project_id": PROJECT_ID,
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUBSCRIBER_SUFFIX}",
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUFFIX}",
"topic_name": f"projects/sdm-prod/topics/enterprise-{PROJECT_ID}",
"token": {
"refresh_token": "mock-refresh-token",
@ -1092,7 +1134,7 @@ async def test_no_eligible_topics(
hass: HomeAssistant,
oauth: OAuthFixture,
) -> None:
"""Test the case where there are no eligible pub/sub topics."""
"""Test the case where there are no eligible pub/sub topics and the topic is created."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
@ -1101,8 +1143,36 @@ async def test_no_eligible_topics(
result = await oauth.async_configure(result, None)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub"
assert result.get("errors") == {"base": "no_pubsub_topics"}
assert result.get("step_id") == "pubsub_topic"
assert not result.get("errors")
# Option shown to create a new topic
assert result.get("data_schema")({}) == {
"topic_name": "create_new_topic",
}
entry = await oauth.async_complete_pubsub_flow(
result,
selected_topic="create_new_topic",
selected_subscription="create_new_subscription",
)
data = dict(entry.data)
assert "token" in data
data["token"].pop("expires_in")
data["token"].pop("expires_at")
assert data == {
"sdm": {},
"auth_implementation": "imported-cred",
"cloud_project_id": CLOUD_PROJECT_ID,
"project_id": PROJECT_ID,
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUFFIX}",
"topic_name": f"projects/{CLOUD_PROJECT_ID}/topics/home-assistant-{RAND_SUFFIX}",
"token": {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
},
}
@pytest.mark.parametrize(
@ -1122,11 +1192,90 @@ async def test_list_topics_failure(
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, None)
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "pubsub_api_error"
@pytest.mark.parametrize(
("create_topic_status"),
[(HTTPStatus.INTERNAL_SERVER_ERROR)],
)
async def test_create_topic_failed(
hass: HomeAssistant,
oauth: OAuthFixture,
aioclient_mock: AiohttpClientMocker,
cloud_project_id: str,
subscriptions: list[tuple[str, str]],
auth: FakeAuth,
) -> None:
"""Test the case where there are no eligible pub/sub topics and the topic is created."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
await oauth.async_app_creds_flow(result)
oauth.async_mock_refresh()
result = await oauth.async_configure(result, None)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub"
assert result.get("step_id") == "pubsub_topic"
assert not result.get("errors")
# Option shown to create a new topic
assert result.get("data_schema")({}) == {
"topic_name": "create_new_topic",
}
result = await oauth.async_configure(result, {"topic_name": "create_new_topic"})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic"
assert result.get("errors") == {"base": "pubsub_api_error"}
# Re-register mock requests needed for the rest of the test. The topic
# request will now succeed.
aioclient_mock.clear_requests()
setup_mock_create_topic_responses(aioclient_mock, cloud_project_id)
# Fix up other mock responses cleared above
auth.register_mock_requests()
setup_mock_list_subscriptions_responses(
aioclient_mock,
cloud_project_id,
subscriptions,
)
setup_mock_create_subscription_responses(aioclient_mock, cloud_project_id)
result = await oauth.async_configure(result, {"topic_name": "create_new_topic"})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic_confirm"
assert not result.get("errors")
result = await oauth.async_configure(result, {})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert not result.get("errors")
# Create a subscription for the topic and end the flow
entry = await oauth.async_finish_setup(
result,
{"subscription_name": "create_new_subscription"},
)
data = dict(entry.data)
assert "token" in data
data["token"].pop("expires_in")
data["token"].pop("expires_at")
assert data == {
"sdm": {},
"auth_implementation": "imported-cred",
"cloud_project_id": CLOUD_PROJECT_ID,
"project_id": PROJECT_ID,
"subscription_name": f"projects/{CLOUD_PROJECT_ID}/subscriptions/home-assistant-{RAND_SUFFIX}",
"topic_name": f"projects/{CLOUD_PROJECT_ID}/topics/home-assistant-{RAND_SUFFIX}",
"token": {
"refresh_token": "mock-refresh-token",
"access_token": "mock-access-token",
"type": "Bearer",
},
}
@pytest.mark.parametrize(
("sdm_managed_topic", "list_subscriptions_status"),
@ -1158,5 +1307,10 @@ async def test_list_subscriptions_failure(
},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_topic_confirm"
assert not result.get("errors")
result = await oauth.async_configure(result, {})
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "pubsub_subscription"
assert result.get("errors") == {"base": "pubsub_api_error"}