Enable strict typing for aprs (#106824)

This commit is contained in:
Marc Mueller 2024-01-01 20:14:00 +01:00 committed by GitHub
parent 3433e1d349
commit c37e268030
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 8 deletions

View File

@ -69,6 +69,7 @@ homeassistant.components.anova.*
homeassistant.components.anthemav.* homeassistant.components.anthemav.*
homeassistant.components.apcupsd.* homeassistant.components.apcupsd.*
homeassistant.components.apprise.* homeassistant.components.apprise.*
homeassistant.components.aprs.*
homeassistant.components.aqualogic.* homeassistant.components.aqualogic.*
homeassistant.components.aranet.* homeassistant.components.aranet.*
homeassistant.components.aseko_pool_live.* homeassistant.components.aseko_pool_live.*

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import logging import logging
import threading import threading
from typing import Any
import aprslib import aprslib
from aprslib import ConnectionError as AprsConnectionError, LoginError from aprslib import ConnectionError as AprsConnectionError, LoginError
@ -23,7 +24,7 @@ from homeassistant.const import (
CONF_USERNAME, CONF_USERNAME,
EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_STOP,
) )
from homeassistant.core import HomeAssistant from homeassistant.core import Event, HomeAssistant
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import slugify from homeassistant.util import slugify
@ -66,7 +67,7 @@ def make_filter(callsigns: list) -> str:
return " ".join(f"b/{sign.upper()}" for sign in callsigns) return " ".join(f"b/{sign.upper()}" for sign in callsigns)
def gps_accuracy(gps, posambiguity: int) -> int: def gps_accuracy(gps: tuple[float, float], posambiguity: int) -> int:
"""Calculate the GPS accuracy based on APRS posambiguity.""" """Calculate the GPS accuracy based on APRS posambiguity."""
pos_a_map = {0: 0, 1: 1 / 600, 2: 1 / 60, 3: 1 / 6, 4: 1} pos_a_map = {0: 0, 1: 1 / 600, 2: 1 / 60, 3: 1 / 6, 4: 1}
@ -74,7 +75,7 @@ def gps_accuracy(gps, posambiguity: int) -> int:
degrees = pos_a_map[posambiguity] degrees = pos_a_map[posambiguity]
gps2 = (gps[0], gps[1] + degrees) gps2 = (gps[0], gps[1] + degrees)
dist_m = geopy.distance.distance(gps, gps2).m dist_m: float = geopy.distance.distance(gps, gps2).m
accuracy = round(dist_m) accuracy = round(dist_m)
else: else:
@ -100,7 +101,7 @@ def setup_scanner(
timeout = config[CONF_TIMEOUT] timeout = config[CONF_TIMEOUT]
aprs_listener = AprsListenerThread(callsign, password, host, server_filter, see) aprs_listener = AprsListenerThread(callsign, password, host, server_filter, see)
def aprs_disconnect(event): def aprs_disconnect(event: Event) -> None:
"""Stop the APRS connection.""" """Stop the APRS connection."""
aprs_listener.stop() aprs_listener.stop()
@ -145,13 +146,13 @@ class AprsListenerThread(threading.Thread):
self.callsign, passwd=password, host=self.host, port=FILTER_PORT self.callsign, passwd=password, host=self.host, port=FILTER_PORT
) )
def start_complete(self, success: bool, message: str): def start_complete(self, success: bool, message: str) -> None:
"""Complete startup process.""" """Complete startup process."""
self.start_message = message self.start_message = message
self.start_success = success self.start_success = success
self.start_event.set() self.start_event.set()
def run(self): def run(self) -> None:
"""Connect to APRS and listen for data.""" """Connect to APRS and listen for data."""
self.ais.set_filter(self.server_filter) self.ais.set_filter(self.server_filter)
@ -171,11 +172,11 @@ class AprsListenerThread(threading.Thread):
"Closing connection to %s with callsign %s", self.host, self.callsign "Closing connection to %s with callsign %s", self.host, self.callsign
) )
def stop(self): def stop(self) -> None:
"""Close the connection to the APRS network.""" """Close the connection to the APRS network."""
self.ais.close() self.ais.close()
def rx_msg(self, msg: dict): def rx_msg(self, msg: dict[str, Any]) -> None:
"""Receive message and process if position.""" """Receive message and process if position."""
_LOGGER.debug("APRS message received: %s", str(msg)) _LOGGER.debug("APRS message received: %s", str(msg))
if msg[ATTR_FORMAT] in MSG_FORMATS: if msg[ATTR_FORMAT] in MSG_FORMATS:

View File

@ -450,6 +450,16 @@ disallow_untyped_defs = true
warn_return_any = true warn_return_any = true
warn_unreachable = true warn_unreachable = true
[mypy-homeassistant.components.aprs.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.aqualogic.*] [mypy-homeassistant.components.aqualogic.*]
check_untyped_defs = true check_untyped_defs = true
disallow_incomplete_defs = true disallow_incomplete_defs = true