Add config flow to NMBS (#121548)

Co-authored-by: Joostlek <joostlek@outlook.com>
This commit is contained in:
Simon Lamon 2025-01-11 10:31:47 +01:00 committed by GitHub
parent 22b84450e8
commit b9259b6f77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 835 additions and 32 deletions

1
CODEOWNERS generated
View File

@ -1025,6 +1025,7 @@ build.json @home-assistant/supervisor
/tests/components/nina/ @DeerMaximum
/homeassistant/components/nissan_leaf/ @filcole
/homeassistant/components/nmbs/ @thibmaek
/tests/components/nmbs/ @thibmaek
/homeassistant/components/noaa_tides/ @jdelaney72
/homeassistant/components/nobo_hub/ @echoromeo @oyvindwe
/tests/components/nobo_hub/ @echoromeo @oyvindwe

View File

@ -1 +1,45 @@
"""The nmbs component."""
"""The NMBS component."""
import logging
from pyrail import iRail
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.SENSOR]
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the NMBS component."""
api_client = iRail()
hass.data.setdefault(DOMAIN, {})
station_response = await hass.async_add_executor_job(api_client.get_stations)
if station_response == -1:
return False
hass.data[DOMAIN] = station_response["station"]
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up NMBS from a config entry."""
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@ -0,0 +1,180 @@
"""Config flow for NMBS integration."""
from typing import Any
from pyrail import iRail
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import Platform
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.selector import (
BooleanSelector,
SelectOptionDict,
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
)
from .const import (
CONF_EXCLUDE_VIAS,
CONF_SHOW_ON_MAP,
CONF_STATION_FROM,
CONF_STATION_LIVE,
CONF_STATION_TO,
DOMAIN,
)
class NMBSConfigFlow(ConfigFlow, domain=DOMAIN):
"""NMBS config flow."""
def __init__(self) -> None:
"""Initialize."""
self.api_client = iRail()
self.stations: list[dict[str, Any]] = []
async def _fetch_stations(self) -> list[dict[str, Any]]:
"""Fetch the stations."""
stations_response = await self.hass.async_add_executor_job(
self.api_client.get_stations
)
if stations_response == -1:
raise CannotConnect("The API is currently unavailable.")
return stations_response["station"]
async def _fetch_stations_choices(self) -> list[SelectOptionDict]:
"""Fetch the stations options."""
if len(self.stations) == 0:
self.stations = await self._fetch_stations()
return [
SelectOptionDict(value=station["id"], label=station["standardname"])
for station in self.stations
]
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the step to setup a connection between 2 stations."""
try:
choices = await self._fetch_stations_choices()
except CannotConnect:
return self.async_abort(reason="api_unavailable")
errors: dict = {}
if user_input is not None:
if user_input[CONF_STATION_FROM] == user_input[CONF_STATION_TO]:
errors["base"] = "same_station"
else:
[station_from] = [
station
for station in self.stations
if station["id"] == user_input[CONF_STATION_FROM]
]
[station_to] = [
station
for station in self.stations
if station["id"] == user_input[CONF_STATION_TO]
]
await self.async_set_unique_id(
f"{user_input[CONF_STATION_FROM]}_{user_input[CONF_STATION_TO]}"
)
self._abort_if_unique_id_configured()
config_entry_name = f"Train from {station_from["standardname"]} to {station_to["standardname"]}"
return self.async_create_entry(
title=config_entry_name,
data=user_input,
)
schema = vol.Schema(
{
vol.Required(CONF_STATION_FROM): SelectSelector(
SelectSelectorConfig(
options=choices,
mode=SelectSelectorMode.DROPDOWN,
)
),
vol.Required(CONF_STATION_TO): SelectSelector(
SelectSelectorConfig(
options=choices,
mode=SelectSelectorMode.DROPDOWN,
)
),
vol.Optional(CONF_EXCLUDE_VIAS): BooleanSelector(),
vol.Optional(CONF_SHOW_ON_MAP): BooleanSelector(),
},
)
return self.async_show_form(
step_id="user",
data_schema=schema,
errors=errors,
)
async def async_step_import(self, user_input: dict[str, Any]) -> ConfigFlowResult:
"""Import configuration from yaml."""
try:
self.stations = await self._fetch_stations()
except CannotConnect:
return self.async_abort(reason="api_unavailable")
station_from = None
station_to = None
station_live = None
for station in self.stations:
if user_input[CONF_STATION_FROM] in (
station["standardname"],
station["name"],
):
station_from = station
if user_input[CONF_STATION_TO] in (
station["standardname"],
station["name"],
):
station_to = station
if CONF_STATION_LIVE in user_input and user_input[CONF_STATION_LIVE] in (
station["standardname"],
station["name"],
):
station_live = station
if station_from is None or station_to is None:
return self.async_abort(reason="invalid_station")
if station_from == station_to:
return self.async_abort(reason="same_station")
# config flow uses id and not the standard name
user_input[CONF_STATION_FROM] = station_from["id"]
user_input[CONF_STATION_TO] = station_to["id"]
if station_live:
user_input[CONF_STATION_LIVE] = station_live["id"]
entity_registry = er.async_get(self.hass)
prefix = "live"
if entity_id := entity_registry.async_get_entity_id(
Platform.SENSOR,
DOMAIN,
f"{prefix}_{station_live["standardname"]}_{station_from["standardname"]}_{station_to["standardname"]}",
):
new_unique_id = f"{DOMAIN}_{prefix}_{station_live["id"]}_{station_from["id"]}_{station_to["id"]}"
entity_registry.async_update_entity(
entity_id, new_unique_id=new_unique_id
)
if entity_id := entity_registry.async_get_entity_id(
Platform.SENSOR,
DOMAIN,
f"{prefix}_{station_live["name"]}_{station_from["name"]}_{station_to["name"]}",
):
new_unique_id = f"{DOMAIN}_{prefix}_{station_live["id"]}_{station_from["id"]}_{station_to["id"]}"
entity_registry.async_update_entity(
entity_id, new_unique_id=new_unique_id
)
return await self.async_step_user(user_input)
class CannotConnect(Exception):
"""Error to indicate we cannot connect to NMBS."""

View File

@ -0,0 +1,36 @@
"""The NMBS integration."""
from typing import Final
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
DOMAIN: Final = "nmbs"
PLATFORMS: Final = [Platform.SENSOR]
CONF_STATION_FROM = "station_from"
CONF_STATION_TO = "station_to"
CONF_STATION_LIVE = "station_live"
CONF_EXCLUDE_VIAS = "exclude_vias"
CONF_SHOW_ON_MAP = "show_on_map"
def find_station_by_name(hass: HomeAssistant, station_name: str):
"""Find given station_name in the station list."""
return next(
(
s
for s in hass.data[DOMAIN]
if station_name in (s["standardname"], s["name"])
),
None,
)
def find_station(hass: HomeAssistant, station_name: str):
"""Find given station_id in the station list."""
return next(
(s for s in hass.data[DOMAIN] if station_name in s["id"]),
None,
)

View File

@ -2,6 +2,7 @@
"domain": "nmbs",
"name": "NMBS",
"codeowners": ["@thibmaek"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/nmbs",
"iot_class": "cloud_polling",
"loggers": ["pyrail"],

View File

@ -11,19 +11,33 @@ from homeassistant.components.sensor import (
PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
SensorEntity,
)
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import (
ATTR_LATITUDE,
ATTR_LONGITUDE,
CONF_NAME,
CONF_PLATFORM,
CONF_SHOW_ON_MAP,
UnitOfTime,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
import homeassistant.util.dt as dt_util
from .const import ( # noqa: F401
CONF_EXCLUDE_VIAS,
CONF_STATION_FROM,
CONF_STATION_LIVE,
CONF_STATION_TO,
DOMAIN,
PLATFORMS,
find_station,
find_station_by_name,
)
_LOGGER = logging.getLogger(__name__)
API_FAILURE = -1
@ -33,11 +47,6 @@ DEFAULT_NAME = "NMBS"
DEFAULT_ICON = "mdi:train"
DEFAULT_ICON_ALERT = "mdi:alert-octagon"
CONF_STATION_FROM = "station_from"
CONF_STATION_TO = "station_to"
CONF_STATION_LIVE = "station_live"
CONF_EXCLUDE_VIAS = "exclude_vias"
PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_STATION_FROM): cv.string,
@ -73,33 +82,97 @@ def get_ride_duration(departure_time, arrival_time, delay=0):
return duration_time + get_delay_in_minutes(delay)
def setup_platform(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the NMBS sensor with iRail API."""
api_client = iRail()
if config[CONF_PLATFORM] == DOMAIN:
if CONF_SHOW_ON_MAP not in config:
config[CONF_SHOW_ON_MAP] = False
if CONF_EXCLUDE_VIAS not in config:
config[CONF_EXCLUDE_VIAS] = False
name = config[CONF_NAME]
show_on_map = config[CONF_SHOW_ON_MAP]
station_from = config[CONF_STATION_FROM]
station_to = config[CONF_STATION_TO]
station_live = config.get(CONF_STATION_LIVE)
excl_vias = config[CONF_EXCLUDE_VIAS]
station_types = [CONF_STATION_FROM, CONF_STATION_TO, CONF_STATION_LIVE]
sensors: list[SensorEntity] = [
NMBSSensor(api_client, name, show_on_map, station_from, station_to, excl_vias)
]
for station_type in station_types:
station = (
find_station_by_name(hass, config[station_type])
if station_type in config
else None
)
if station is None and station_type in config:
async_create_issue(
hass,
DOMAIN,
"deprecated_yaml_import_issue_station_not_found",
breaks_in_ha_version="2025.7.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_yaml_import_issue_station_not_found",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "NMBS",
"station_name": config[station_type],
"url": "/config/integrations/dashboard/add?domain=nmbs",
},
)
return
if station_live is not None:
sensors.append(
NMBSLiveBoard(api_client, station_live, station_from, station_to)
hass.async_create_task(
hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=config,
)
)
add_entities(sensors, True)
async_create_issue(
hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_yaml_{DOMAIN}",
breaks_in_ha_version="2025.7.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_yaml",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "NMBS",
},
)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up NMBS sensor entities based on a config entry."""
api_client = iRail()
name = config_entry.data.get(CONF_NAME, None)
show_on_map = config_entry.data.get(CONF_SHOW_ON_MAP, False)
excl_vias = config_entry.data.get(CONF_EXCLUDE_VIAS, False)
station_from = find_station(hass, config_entry.data[CONF_STATION_FROM])
station_to = find_station(hass, config_entry.data[CONF_STATION_TO])
# setup the connection from station to station
# setup a disabled liveboard for both from and to station
async_add_entities(
[
NMBSSensor(
api_client, name, show_on_map, station_from, station_to, excl_vias
),
NMBSLiveBoard(api_client, station_from, station_from, station_to),
NMBSLiveBoard(api_client, station_to, station_from, station_to),
]
)
class NMBSLiveBoard(SensorEntity):
@ -116,16 +189,18 @@ class NMBSLiveBoard(SensorEntity):
self._attrs = {}
self._state = None
self.entity_registry_enabled_default = False
@property
def name(self):
"""Return the sensor default name."""
return f"NMBS Live ({self._station})"
return f"Trains in {self._station["standardname"]}"
@property
def unique_id(self):
"""Return a unique ID."""
unique_id = f"{self._station}_{self._station_from}_{self._station_to}"
"""Return the unique ID."""
unique_id = f"{self._station}_{self._station_from}_{self._station_to}"
return f"nmbs_live_{unique_id}"
@property
@ -155,7 +230,7 @@ class NMBSLiveBoard(SensorEntity):
"departure_minutes": departure,
"extra_train": int(self._attrs["isExtra"]) > 0,
"vehicle_id": self._attrs["vehicle"],
"monitored_station": self._station,
"monitored_station": self._station["standardname"],
}
if delay > 0:
@ -166,7 +241,7 @@ class NMBSLiveBoard(SensorEntity):
def update(self) -> None:
"""Set the state equal to the next departure."""
liveboard = self._api_client.get_liveboard(self._station)
liveboard = self._api_client.get_liveboard(self._station["id"])
if liveboard == API_FAILURE:
_LOGGER.warning("API failed in NMBSLiveBoard")
@ -209,8 +284,17 @@ class NMBSSensor(SensorEntity):
self._state = None
@property
def name(self):
def unique_id(self) -> str:
"""Return the unique ID."""
unique_id = f"{self._station_from["id"]}_{self._station_to["id"]}"
return f"nmbs_connection_{unique_id}"
@property
def name(self) -> str:
"""Return the name of the sensor."""
if self._name is None:
return f"Train from {self._station_from["standardname"]} to {self._station_to["standardname"]}"
return self._name
@property
@ -234,7 +318,7 @@ class NMBSSensor(SensorEntity):
canceled = int(self._attrs["departure"]["canceled"])
attrs = {
"destination": self._station_to,
"destination": self._attrs["departure"]["station"],
"direction": self._attrs["departure"]["direction"]["name"],
"platform_arriving": self._attrs["arrival"]["platform"],
"platform_departing": self._attrs["departure"]["platform"],
@ -296,7 +380,7 @@ class NMBSSensor(SensorEntity):
def update(self) -> None:
"""Set the state to the duration of a connection."""
connections = self._api_client.get_connections(
self._station_from, self._station_to
self._station_from["id"], self._station_to["id"]
)
if connections == API_FAILURE:

View File

@ -0,0 +1,35 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_location%]",
"api_unavailable": "The API is currently unavailable.",
"same_station": "[%key:component::nmbs::config::error::same_station%]",
"invalid_station": "Invalid station."
},
"error": {
"same_station": "Departure and arrival station can not be the same."
},
"step": {
"user": {
"data": {
"station_from": "Departure station",
"station_to": "Arrival station",
"exclude_vias": "Direct connections only",
"show_on_map": "Display on map"
},
"data_description": {
"station_from": "Station where the train departs",
"station_to": "Station where the train arrives",
"exclude_vias": "Exclude connections with transfers",
"show_on_map": "Show the station on the map"
}
}
}
},
"issues": {
"deprecated_yaml_import_issue_station_not_found": {
"title": "The {integration_title} YAML configuration import failed",
"description": "Configuring {integration_title} using YAML is being removed but there was an problem importing your YAML configuration.\n\nThe used station \"{station_name}\" could not be found. Fix it or remove the {integration_title} YAML configuration from your configuration.yaml file and continue to [set up the integration]({url}) manually."
}
}
}

View File

@ -416,6 +416,7 @@ FLOWS = {
"niko_home_control",
"nina",
"nmap_tracker",
"nmbs",
"nobo_hub",
"nordpool",
"notion",

View File

@ -4224,7 +4224,7 @@
"nmbs": {
"name": "NMBS",
"integration_type": "hub",
"config_flow": false,
"config_flow": true,
"iot_class": "cloud_polling"
},
"no_ip": {

View File

@ -1814,6 +1814,9 @@ pyps4-2ndscreen==1.3.1
# homeassistant.components.qwikswitch
pyqwikswitch==0.93
# homeassistant.components.nmbs
pyrail==0.0.3
# homeassistant.components.rainbird
pyrainbird==6.0.1

View File

@ -0,0 +1,20 @@
"""Tests for the NMBS integration."""
import json
from typing import Any
from tests.common import load_fixture
def mock_api_unavailable() -> dict[str, Any]:
"""Mock for unavailable api."""
return -1
def mock_station_response() -> dict[str, Any]:
"""Mock for valid station response."""
dummy_stations_response: dict[str, Any] = json.loads(
load_fixture("stations.json", "nmbs")
)
return dummy_stations_response

View File

@ -0,0 +1,58 @@
"""NMBS tests configuration."""
from collections.abc import Generator
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.nmbs.const import (
CONF_STATION_FROM,
CONF_STATION_TO,
DOMAIN,
)
from tests.common import MockConfigEntry, load_json_object_fixture
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock]:
"""Override async_setup_entry."""
with patch(
"homeassistant.components.nmbs.async_setup_entry",
return_value=True,
) as mock_setup_entry:
yield mock_setup_entry
@pytest.fixture
def mock_nmbs_client() -> Generator[AsyncMock]:
"""Mock a NMBS client."""
with (
patch(
"homeassistant.components.nmbs.iRail",
autospec=True,
) as mock_client,
patch(
"homeassistant.components.nmbs.config_flow.iRail",
new=mock_client,
),
):
client = mock_client.return_value
client.get_stations.return_value = load_json_object_fixture(
"stations.json", DOMAIN
)
yield client
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Mock a config entry."""
return MockConfigEntry(
domain=DOMAIN,
title="Train from Brussel-Noord/Bruxelles-Nord to Brussel-Zuid/Bruxelles-Midi",
data={
CONF_STATION_FROM: "BE.NMBS.008812005",
CONF_STATION_TO: "BE.NMBS.008814001",
},
unique_id="BE.NMBS.008812005_BE.NMBS.008814001",
)

View File

@ -0,0 +1,30 @@
{
"version": "1.3",
"timestamp": "1720252400",
"station": [
{
"@id": "http://irail.be/stations/NMBS/008812005",
"id": "BE.NMBS.008812005",
"name": "Brussels-North",
"locationX": "4.360846",
"locationY": "50.859663",
"standardname": "Brussel-Noord/Bruxelles-Nord"
},
{
"@id": "http://irail.be/stations/NMBS/008813003",
"id": "BE.NMBS.008813003",
"name": "Brussels-Central",
"locationX": "4.356801",
"locationY": "50.845658",
"standardname": "Brussel-Centraal/Bruxelles-Central"
},
{
"@id": "http://irail.be/stations/NMBS/008814001",
"id": "BE.NMBS.008814001",
"name": "Brussels-South/Brussels-Midi",
"locationX": "4.336531",
"locationY": "50.835707",
"standardname": "Brussel-Zuid/Bruxelles-Midi"
}
]
}

View File

@ -0,0 +1,310 @@
"""Test the NMBS config flow."""
from typing import Any
from unittest.mock import AsyncMock
import pytest
from homeassistant import config_entries
from homeassistant.components.nmbs.const import (
CONF_STATION_FROM,
CONF_STATION_LIVE,
CONF_STATION_TO,
DOMAIN,
)
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import entity_registry as er
from tests.common import MockConfigEntry
DUMMY_DATA_IMPORT: dict[str, Any] = {
"STAT_BRUSSELS_NORTH": "Brussel-Noord/Bruxelles-Nord",
"STAT_BRUSSELS_CENTRAL": "Brussel-Centraal/Bruxelles-Central",
"STAT_BRUSSELS_SOUTH": "Brussel-Zuid/Bruxelles-Midi",
}
DUMMY_DATA_ALTERNATIVE_IMPORT: dict[str, Any] = {
"STAT_BRUSSELS_NORTH": "Brussels-North",
"STAT_BRUSSELS_CENTRAL": "Brussels-Central",
"STAT_BRUSSELS_SOUTH": "Brussels-South/Brussels-Midi",
}
DUMMY_DATA: dict[str, Any] = {
"STAT_BRUSSELS_NORTH": "BE.NMBS.008812005",
"STAT_BRUSSELS_CENTRAL": "BE.NMBS.008813003",
"STAT_BRUSSELS_SOUTH": "BE.NMBS.008814001",
}
async def test_full_flow(
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_setup_entry: AsyncMock
) -> None:
"""Test the full flow."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_SOUTH"],
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert (
result["title"]
== "Train from Brussel-Noord/Bruxelles-Nord to Brussel-Zuid/Bruxelles-Midi"
)
assert result["data"] == {
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_SOUTH"],
}
assert (
result["result"].unique_id
== f"{DUMMY_DATA["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA["STAT_BRUSSELS_SOUTH"]}"
)
async def test_same_station(
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_setup_entry: AsyncMock
) -> None:
"""Test selecting the same station."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
},
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": "same_station"}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_SOUTH"],
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
async def test_abort_if_exists(
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_config_entry: MockConfigEntry
) -> None:
"""Test aborting the flow if the entry already exists."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
data={
CONF_STATION_FROM: DUMMY_DATA["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA["STAT_BRUSSELS_SOUTH"],
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_unavailable_api(
hass: HomeAssistant, mock_nmbs_client: AsyncMock
) -> None:
"""Test starting a flow by user and api is unavailable."""
mock_nmbs_client.get_stations.return_value = -1
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "api_unavailable"
async def test_import(
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_setup_entry: AsyncMock
) -> None:
"""Test starting a flow by user which filled in data for connection."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_LIVE: DUMMY_DATA_IMPORT["STAT_BRUSSELS_CENTRAL"],
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert (
result["title"]
== "Train from Brussel-Noord/Bruxelles-Nord to Brussel-Zuid/Bruxelles-Midi"
)
assert result["data"] == {
CONF_STATION_FROM: "BE.NMBS.008812005",
CONF_STATION_LIVE: "BE.NMBS.008813003",
CONF_STATION_TO: "BE.NMBS.008814001",
}
assert result["result"].unique_id == "BE.NMBS.008812005_BE.NMBS.008814001"
async def test_step_import_abort_if_already_setup(
hass: HomeAssistant, mock_nmbs_client: AsyncMock, mock_config_entry: MockConfigEntry
) -> None:
"""Test starting a flow by user which filled in data for connection for already existing connection."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_unavailable_api_import(
hass: HomeAssistant, mock_nmbs_client: AsyncMock
) -> None:
"""Test starting a flow by import and api is unavailable."""
mock_nmbs_client.get_stations.return_value = -1
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_LIVE: DUMMY_DATA_IMPORT["STAT_BRUSSELS_CENTRAL"],
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "api_unavailable"
@pytest.mark.parametrize(
("config", "reason"),
[
(
{
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: "Utrecht Centraal",
},
"invalid_station",
),
(
{
CONF_STATION_FROM: "Utrecht Centraal",
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
},
"invalid_station",
),
(
{
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
},
"same_station",
),
],
)
async def test_invalid_station_name(
hass: HomeAssistant,
mock_nmbs_client: AsyncMock,
config: dict[str, Any],
reason: str,
) -> None:
"""Test importing invalid YAML."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=config,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == reason
async def test_sensor_id_migration_standardname(
hass: HomeAssistant,
mock_nmbs_client: AsyncMock,
mock_config_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test migrating unique id."""
entity_registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
f"live_{DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"]}",
config_entry=mock_config_entry,
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_STATION_LIVE: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_FROM: DUMMY_DATA_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA_IMPORT["STAT_BRUSSELS_SOUTH"],
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
await hass.async_block_till_done()
entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
assert len(entities) == 1
assert (
entities[0].unique_id
== f"nmbs_live_{DUMMY_DATA["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA["STAT_BRUSSELS_SOUTH"]}"
)
async def test_sensor_id_migration_localized_name(
hass: HomeAssistant,
mock_nmbs_client: AsyncMock,
mock_config_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test migrating unique id."""
entity_registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
f"live_{DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_SOUTH"]}",
config_entry=mock_config_entry,
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={
CONF_STATION_LIVE: DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_FROM: DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_NORTH"],
CONF_STATION_TO: DUMMY_DATA_ALTERNATIVE_IMPORT["STAT_BRUSSELS_SOUTH"],
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
await hass.async_block_till_done()
entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
assert len(entities) == 1
assert (
entities[0].unique_id
== f"nmbs_live_{DUMMY_DATA["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA["STAT_BRUSSELS_NORTH"]}_{DUMMY_DATA["STAT_BRUSSELS_SOUTH"]}"
)