Small cleanups to process_success_login (#103282)

This commit is contained in:
J. Nick Koston 2023-11-07 03:48:34 -06:00 committed by GitHub
parent 3ca6cddc1f
commit da1780f9ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 27 additions and 12 deletions

View File

@ -235,7 +235,7 @@ class LoginFlowBaseView(HomeAssistantView):
f"Login blocked: {user_access_error}", HTTPStatus.FORBIDDEN
)
await process_success_login(request)
process_success_login(request)
result["result"] = self._store_result(client_id, result_obj)
return self.json(result)

View File

@ -162,27 +162,28 @@ async def process_wrong_login(request: Request) -> None:
)
async def process_success_login(request: Request) -> None:
@callback
def process_success_login(request: Request) -> None:
"""Process a success login attempt.
Reset failed login attempts counter for remote IP address.
No release IP address from banned list function, it can only be done by
manual modify ip bans config file.
"""
remote_addr = ip_address(request.remote) # type: ignore[arg-type]
app = request.app
# Check if ban middleware is loaded
if KEY_BAN_MANAGER not in request.app or request.app[KEY_LOGIN_THRESHOLD] < 1:
if KEY_BAN_MANAGER not in app or app[KEY_LOGIN_THRESHOLD] < 1:
return
if (
remote_addr in request.app[KEY_FAILED_LOGIN_ATTEMPTS]
and request.app[KEY_FAILED_LOGIN_ATTEMPTS][remote_addr] > 0
):
remote_addr = ip_address(request.remote) # type: ignore[arg-type]
login_attempt_history: defaultdict[IPv4Address | IPv6Address, int] = app[
KEY_FAILED_LOGIN_ATTEMPTS
]
if remote_addr in login_attempt_history and login_attempt_history[remote_addr] > 0:
_LOGGER.debug(
"Login success, reset failed login attempts counter from %s", remote_addr
)
request.app[KEY_FAILED_LOGIN_ATTEMPTS].pop(remote_addr)
login_attempt_history.pop(remote_addr)
class IpBan:

View File

@ -103,7 +103,7 @@ class AuthPhase:
) -> ActiveConnection:
"""Create an active connection."""
self._logger.debug("Auth OK")
await process_success_login(self._request)
process_success_login(self._request)
self._send_message(auth_ok_message())
return ActiveConnection(
self._logger, self._hass, self._send_message, user, refresh_token

View File

@ -16,6 +16,7 @@ from homeassistant.components.http.ban import (
KEY_BAN_MANAGER,
KEY_FAILED_LOGIN_ATTEMPTS,
IpBanManager,
process_success_login,
setup_bans,
)
from homeassistant.components.http.view import request_handler_factory
@ -332,9 +333,14 @@ async def test_failed_login_attempts_counter(
"""Return 200 status code."""
return None, 200
async def auth_true_handler(request):
"""Return 200 status code."""
process_success_login(request)
return None, 200
app.router.add_get(
"/auth_true",
request_handler_factory(hass, Mock(requires_auth=True), auth_handler),
request_handler_factory(hass, Mock(requires_auth=True), auth_true_handler),
)
app.router.add_get(
"/auth_false",
@ -377,4 +383,12 @@ async def test_failed_login_attempts_counter(
# We no longer support trusted networks.
resp = await client.get("/auth_true")
assert resp.status == HTTPStatus.OK
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 0
resp = await client.get("/auth_false")
assert resp.status == HTTPStatus.UNAUTHORIZED
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 1
resp = await client.get("/auth_false")
assert resp.status == HTTPStatus.UNAUTHORIZED
assert app[KEY_FAILED_LOGIN_ATTEMPTS][remote_ip] == 2