Powerwall: Reuse authentication cookie (#136147)

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Christian 2025-01-24 13:52:24 -08:00 committed by GitHub
parent f5fc46a7be
commit 9993a68a55
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 326 additions and 31 deletions

View File

@ -14,6 +14,7 @@ from tesla_powerwall import (
Powerwall, Powerwall,
PowerwallUnreachableError, PowerwallUnreachableError,
) )
from yarl import URL
from homeassistant.components import persistent_notification from homeassistant.components import persistent_notification
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
@ -25,7 +26,14 @@ from homeassistant.helpers.aiohttp_client import async_create_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util.network import is_ip_address from homeassistant.util.network import is_ip_address
from .const import DOMAIN, POWERWALL_API_CHANGED, POWERWALL_COORDINATOR, UPDATE_INTERVAL from .const import (
AUTH_COOKIE_KEY,
CONFIG_ENTRY_COOKIE,
DOMAIN,
POWERWALL_API_CHANGED,
POWERWALL_COORDINATOR,
UPDATE_INTERVAL,
)
from .models import ( from .models import (
PowerwallBaseInfo, PowerwallBaseInfo,
PowerwallConfigEntry, PowerwallConfigEntry,
@ -52,6 +60,8 @@ class PowerwallDataManager:
self, self,
hass: HomeAssistant, hass: HomeAssistant,
power_wall: Powerwall, power_wall: Powerwall,
cookie_jar: CookieJar,
entry: PowerwallConfigEntry,
ip_address: str, ip_address: str,
password: str | None, password: str | None,
runtime_data: PowerwallRuntimeData, runtime_data: PowerwallRuntimeData,
@ -62,6 +72,8 @@ class PowerwallDataManager:
self.password = password self.password = password
self.runtime_data = runtime_data self.runtime_data = runtime_data
self.power_wall = power_wall self.power_wall = power_wall
self.cookie_jar = cookie_jar
self.entry = entry
@property @property
def api_changed(self) -> int: def api_changed(self) -> int:
@ -72,7 +84,9 @@ class PowerwallDataManager:
"""Recreate the login on auth failure.""" """Recreate the login on auth failure."""
if self.power_wall.is_authenticated(): if self.power_wall.is_authenticated():
await self.power_wall.logout() await self.power_wall.logout()
# Always use the password when recreating the login
await self.power_wall.login(self.password or "") await self.power_wall.login(self.password or "")
self.save_auth_cookie()
async def async_update_data(self) -> PowerwallData: async def async_update_data(self) -> PowerwallData:
"""Fetch data from API endpoint.""" """Fetch data from API endpoint."""
@ -116,27 +130,54 @@ class PowerwallDataManager:
return data return data
raise RuntimeError("unreachable") raise RuntimeError("unreachable")
@callback
def save_auth_cookie(self) -> None:
"""Save the auth cookie."""
for cookie in self.cookie_jar:
if cookie.key == AUTH_COOKIE_KEY:
self.hass.config_entries.async_update_entry(
self.entry,
data={**self.entry.data, CONFIG_ENTRY_COOKIE: cookie.value},
)
_LOGGER.debug("Saved auth cookie")
break
async def async_setup_entry(hass: HomeAssistant, entry: PowerwallConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: PowerwallConfigEntry) -> bool:
"""Set up Tesla Powerwall from a config entry.""" """Set up Tesla Powerwall from a config entry."""
ip_address: str = entry.data[CONF_IP_ADDRESS] ip_address: str = entry.data[CONF_IP_ADDRESS]
password: str | None = entry.data.get(CONF_PASSWORD) password: str | None = entry.data.get(CONF_PASSWORD)
cookie_jar: CookieJar = CookieJar(unsafe=True)
use_auth_cookie: bool = False
# Try to reuse the auth cookie
auth_cookie_value: str | None = entry.data.get(CONFIG_ENTRY_COOKIE)
if auth_cookie_value:
cookie_jar.update_cookies(
{AUTH_COOKIE_KEY: auth_cookie_value},
URL(f"http://{ip_address}"),
)
_LOGGER.debug("Using existing auth cookie")
use_auth_cookie = True
http_session = async_create_clientsession( http_session = async_create_clientsession(
hass, verify_ssl=False, cookie_jar=CookieJar(unsafe=True) hass, verify_ssl=False, cookie_jar=cookie_jar
) )
async with AsyncExitStack() as stack: async with AsyncExitStack() as stack:
power_wall = Powerwall(ip_address, http_session=http_session, verify_ssl=False) power_wall = Powerwall(ip_address, http_session=http_session, verify_ssl=False)
stack.push_async_callback(power_wall.close) stack.push_async_callback(power_wall.close)
for tries in range(2):
try: try:
base_info = await _login_and_fetch_base_info( base_info = await _login_and_fetch_base_info(
power_wall, ip_address, password power_wall, ip_address, password, use_auth_cookie
) )
# Cancel closing power_wall on success # Cancel closing power_wall on success
stack.pop_all() stack.pop_all()
break
except (TimeoutError, PowerwallUnreachableError) as err: except (TimeoutError, PowerwallUnreachableError) as err:
raise ConfigEntryNotReady from err raise ConfigEntryNotReady from err
except MissingAttributeError as err: except MissingAttributeError as err:
@ -147,6 +188,12 @@ async def async_setup_entry(hass: HomeAssistant, entry: PowerwallConfigEntry) ->
) )
return False return False
except AccessDeniedError as err: except AccessDeniedError as err:
if use_auth_cookie and tries == 0:
_LOGGER.debug(
"Authentication failed with cookie, retrying with password"
)
use_auth_cookie = False
continue
_LOGGER.debug("Authentication failed", exc_info=err) _LOGGER.debug("Authentication failed", exc_info=err)
raise ConfigEntryAuthFailed from err raise ConfigEntryAuthFailed from err
except ApiError as err: except ApiError as err:
@ -163,7 +210,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: PowerwallConfigEntry) ->
api_instance=power_wall, api_instance=power_wall,
) )
manager = PowerwallDataManager(hass, power_wall, ip_address, password, runtime_data) manager = PowerwallDataManager(
hass,
power_wall,
cookie_jar,
entry,
ip_address,
password,
runtime_data,
)
manager.save_auth_cookie()
coordinator = DataUpdateCoordinator( coordinator = DataUpdateCoordinator(
hass, hass,
@ -213,10 +269,11 @@ async def async_migrate_entity_unique_ids(
async def _login_and_fetch_base_info( async def _login_and_fetch_base_info(
power_wall: Powerwall, host: str, password: str | None power_wall: Powerwall, host: str, password: str | None, use_auth_cookie: bool
) -> PowerwallBaseInfo: ) -> PowerwallBaseInfo:
"""Login to the powerwall and fetch the base info.""" """Login to the powerwall and fetch the base info."""
if password is not None: # Login step is skipped if password is None or if we are using the auth cookie
if not (password is None or use_auth_cookie):
await power_wall.login(password) await power_wall.login(password)
return await _call_base_info(power_wall, host) return await _call_base_info(power_wall, host)

View File

@ -31,7 +31,7 @@ from homeassistant.helpers.service_info.dhcp import DhcpServiceInfo
from homeassistant.util.network import is_ip_address from homeassistant.util.network import is_ip_address
from . import async_last_update_was_successful from . import async_last_update_was_successful
from .const import DOMAIN from .const import CONFIG_ENTRY_COOKIE, DOMAIN
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -257,8 +257,10 @@ class PowerwallConfigFlow(ConfigFlow, domain=DOMAIN):
{CONF_IP_ADDRESS: reauth_entry.data[CONF_IP_ADDRESS], **user_input} {CONF_IP_ADDRESS: reauth_entry.data[CONF_IP_ADDRESS], **user_input}
) )
if not errors: if not errors:
# We have a new valid connection, old cookie is no longer valid
user_input[CONFIG_ENTRY_COOKIE] = None
return self.async_update_reload_and_abort( return self.async_update_reload_and_abort(
reauth_entry, data_updates=user_input reauth_entry, data_updates={**user_input, CONFIG_ENTRY_COOKIE: None}
) )
self.context["title_placeholders"] = { self.context["title_placeholders"] = {

View File

@ -18,3 +18,6 @@ ATTR_IS_ACTIVE = "is_active"
MODEL = "PowerWall 2" MODEL = "PowerWall 2"
MANUFACTURER = "Tesla" MANUFACTURER = "Tesla"
CONFIG_ENTRY_COOKIE = "cookie"
AUTH_COOKIE_KEY = "AuthCookie"

View File

@ -1,17 +1,23 @@
"""Tests for the PowerwallDataManager.""" """Tests for the PowerwallDataManager."""
import datetime import datetime
from unittest.mock import patch from http.cookies import Morsel
from unittest.mock import MagicMock, patch
from aiohttp import CookieJar
from tesla_powerwall import AccessDeniedError, LoginResponse from tesla_powerwall import AccessDeniedError, LoginResponse
from homeassistant.components.powerwall.const import DOMAIN from homeassistant.components.powerwall.const import (
AUTH_COOKIE_KEY,
CONFIG_ENTRY_COOKIE,
DOMAIN,
)
from homeassistant.config_entries import ConfigEntryState from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.util.dt import utcnow from homeassistant.util.dt import utcnow
from .mocks import _mock_powerwall_with_fixtures from .mocks import MOCK_GATEWAY_DIN, _mock_powerwall_with_fixtures
from tests.common import MockConfigEntry, async_fire_time_changed from tests.common import MockConfigEntry, async_fire_time_changed
@ -37,7 +43,11 @@ async def test_update_data_reauthenticate_on_access_denied(hass: HomeAssistant)
mock_powerwall.is_authenticated.return_value = True mock_powerwall.is_authenticated.return_value = True
config_entry = MockConfigEntry( config_entry = MockConfigEntry(
domain=DOMAIN, data={CONF_IP_ADDRESS: "1.2.3.4", CONF_PASSWORD: "password"} domain=DOMAIN,
data={
CONF_IP_ADDRESS: "1.2.3.4",
CONF_PASSWORD: "password",
},
) )
config_entry.add_to_hass(hass) config_entry.add_to_hass(hass)
with ( with (
@ -72,3 +82,226 @@ async def test_update_data_reauthenticate_on_access_denied(hass: HomeAssistant)
assert len(flows) == 1 assert len(flows) == 1
reauth_flow = flows[0] reauth_flow = flows[0]
assert reauth_flow["context"]["source"] == "reauth" assert reauth_flow["context"]["source"] == "reauth"
async def test_init_uses_cookie_if_present(hass: HomeAssistant) -> None:
"""Tests if the init will use the auth cookie if present.
If the cookie is present, the login step will be skipped and info will be fetched directly (see _login_and_fetch_base_info).
"""
mock_powerwall = await _mock_powerwall_with_fixtures(hass)
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_IP_ADDRESS: "1.2.3.4",
CONF_PASSWORD: "somepassword",
CONFIG_ENTRY_COOKIE: "somecookie",
},
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.powerwall.config_flow.Powerwall",
return_value=mock_powerwall,
),
patch(
"homeassistant.components.powerwall.Powerwall", return_value=mock_powerwall
),
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert not mock_powerwall.login.called
assert mock_powerwall.get_gateway_din.called
async def test_init_uses_password_if_no_cookies(hass: HomeAssistant) -> None:
"""Tests if the init will use the password if no auth cookie present."""
mock_powerwall = await _mock_powerwall_with_fixtures(hass)
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_IP_ADDRESS: "1.2.3.4",
CONF_PASSWORD: "somepassword",
},
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.powerwall.config_flow.Powerwall",
return_value=mock_powerwall,
),
patch(
"homeassistant.components.powerwall.Powerwall", return_value=mock_powerwall
),
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
mock_powerwall.login.assert_called_with("somepassword")
assert mock_powerwall.get_charge.called
async def test_init_saves_the_cookie(hass: HomeAssistant) -> None:
"""Tests that the cookie is properly saved."""
mock_powerwall = await _mock_powerwall_with_fixtures(hass)
mock_jar = MagicMock(CookieJar)
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_IP_ADDRESS: "1.2.3.4",
CONF_PASSWORD: "somepassword",
},
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.powerwall.config_flow.Powerwall",
return_value=mock_powerwall,
),
patch(
"homeassistant.components.powerwall.Powerwall", return_value=mock_powerwall
),
patch("homeassistant.components.powerwall.CookieJar", return_value=mock_jar),
):
auth_cookie = Morsel()
auth_cookie.set(AUTH_COOKIE_KEY, "somecookie", "somecookie")
mock_jar.__iter__.return_value = [auth_cookie]
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.data[CONFIG_ENTRY_COOKIE] == "somecookie"
async def test_retry_ignores_cookie(hass: HomeAssistant) -> None:
"""Tests that retrying uses the password instead."""
mock_powerwall = await _mock_powerwall_with_fixtures(hass)
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_IP_ADDRESS: "1.2.3.4",
CONF_PASSWORD: "somepassword",
CONFIG_ENTRY_COOKIE: "somecookie",
},
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.powerwall.config_flow.Powerwall",
return_value=mock_powerwall,
),
patch(
"homeassistant.components.powerwall.Powerwall", return_value=mock_powerwall
),
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert not mock_powerwall.login.called
assert mock_powerwall.get_gateway_din.called
mock_powerwall.login.reset_mock()
mock_powerwall.get_charge.reset_mock()
mock_powerwall.get_charge.side_effect = [AccessDeniedError("test"), 90.0]
async_fire_time_changed(hass, utcnow() + datetime.timedelta(minutes=1))
await hass.async_block_till_done()
mock_powerwall.login.assert_called_with("somepassword")
assert mock_powerwall.get_charge.call_count == 2
async def test_reauth_ignores_and_clears_cookie(hass: HomeAssistant) -> None:
"""Tests that the reauth flow uses password and clears the cookie."""
mock_powerwall = await _mock_powerwall_with_fixtures(hass)
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_IP_ADDRESS: "1.2.3.4",
CONF_PASSWORD: "somepassword",
CONFIG_ENTRY_COOKIE: "somecookie",
},
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.powerwall.config_flow.Powerwall",
return_value=mock_powerwall,
),
patch(
"homeassistant.components.powerwall.Powerwall", return_value=mock_powerwall
),
):
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
mock_powerwall.login.reset_mock()
mock_powerwall.get_charge.reset_mock()
mock_powerwall.get_charge.side_effect = [
AccessDeniedError("test"),
AccessDeniedError("test"),
]
async_fire_time_changed(hass, utcnow() + datetime.timedelta(minutes=1))
await hass.async_block_till_done()
mock_powerwall.login.assert_called_with("somepassword")
assert mock_powerwall.get_charge.call_count == 2
flows = hass.config_entries.flow.async_progress(DOMAIN)
assert len(flows) == 1
reauth_flow = flows[0]
assert reauth_flow["context"]["source"] == "reauth"
mock_powerwall.login.reset_mock()
assert config_entry.data[CONFIG_ENTRY_COOKIE] is not None
await hass.config_entries.flow.async_configure(
reauth_flow["flow_id"], {CONF_PASSWORD: "somepassword"}
)
mock_powerwall.login.assert_called_with("somepassword")
assert config_entry.data[CONFIG_ENTRY_COOKIE] is None
async def test_init_retries_with_password(hass: HomeAssistant) -> None:
"""Tests that the init retries with password if cookie fails."""
mock_powerwall = await _mock_powerwall_with_fixtures(hass)
config_entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_IP_ADDRESS: "1.2.3.4",
CONF_PASSWORD: "somepassword",
CONFIG_ENTRY_COOKIE: "somecookie",
},
)
config_entry.add_to_hass(hass)
with (
patch(
"homeassistant.components.powerwall.config_flow.Powerwall",
return_value=mock_powerwall,
),
patch(
"homeassistant.components.powerwall.Powerwall", return_value=mock_powerwall
),
):
mock_powerwall.get_gateway_din.side_effect = [
AccessDeniedError("get_gateway_din"),
MOCK_GATEWAY_DIN,
]
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
mock_powerwall.login.assert_called_with("somepassword")
assert mock_powerwall.get_gateway_din.call_count == 2