From 56d00fd0c86f40a568f102132792be13a4d3e9cb Mon Sep 17 00:00:00 2001 From: epenet <6771947+epenet@users.noreply.github.com> Date: Mon, 16 Sep 2024 10:19:40 +0200 Subject: [PATCH] Improve type hints in numato (#126022) --- homeassistant/components/numato/__init__.py | 31 ++++++++++--------- .../components/numato/binary_sensor.py | 2 +- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/homeassistant/components/numato/__init__.py b/homeassistant/components/numato/__init__.py index 978264d867e..3b99079f949 100644 --- a/homeassistant/components/numato/__init__.py +++ b/homeassistant/components/numato/__init__.py @@ -1,5 +1,6 @@ """Support for controlling GPIO pins of a Numato Labs USB GPIO expander.""" +from collections.abc import Callable import logging import numato_gpio as gpio @@ -16,7 +17,7 @@ from homeassistant.const import ( PERCENTAGE, Platform, ) -from homeassistant.core import HomeAssistant +from homeassistant.core import Event, HomeAssistant import homeassistant.helpers.config_validation as cv from homeassistant.helpers.discovery import load_platform from homeassistant.helpers.typing import ConfigType @@ -149,14 +150,14 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool: hass.data[DOMAIN][DATA_API] = NumatoAPI() - def cleanup_gpio(event): + def cleanup_gpio(event: Event) -> None: """Stuff to do before stopping.""" _LOGGER.debug("Clean up Numato GPIO") gpio.cleanup() if DATA_API in hass.data[DOMAIN]: hass.data[DOMAIN][DATA_API].ports_registered.clear() - def prepare_gpio(event): + def prepare_gpio(event: Event) -> None: """Stuff to do when home assistant starts.""" _LOGGER.debug("Setup cleanup at stop for Numato GPIO") hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio) @@ -172,11 +173,11 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool: class NumatoAPI: """Home-Assistant specific API for numato device access.""" - def __init__(self): + def __init__(self) -> None: """Initialize API state.""" - self.ports_registered = {} + self.ports_registered: dict[tuple[int, int], int] = {} - def check_port_free(self, device_id, port, direction): + def check_port_free(self, device_id: int, port: int, direction: int) -> None: """Check whether a port is still free set up. Fail with exception if it has already been registered. @@ -194,7 +195,7 @@ class NumatoAPI: ) ) - def check_device_id(self, device_id): + def check_device_id(self, device_id: int) -> None: """Check whether a device has been discovered. Fail with exception. @@ -202,7 +203,7 @@ class NumatoAPI: if device_id not in gpio.devices: raise gpio.NumatoGpioError(f"Device {device_id} not available.") - def check_port(self, device_id, port, direction): + def check_port(self, device_id: int, port: int, direction: int) -> None: """Raise an error if the port setup doesn't match the direction.""" self.check_device_id(device_id) if (device_id, port) not in self.ports_registered: @@ -220,35 +221,37 @@ class NumatoAPI: if self.ports_registered[(device_id, port)] != direction: raise gpio.NumatoGpioError(msg[direction]) - def setup_output(self, device_id, port): + def setup_output(self, device_id: int, port: int) -> None: """Set up a GPIO as output.""" self.check_device_id(device_id) self.check_port_free(device_id, port, gpio.OUT) gpio.devices[device_id].setup(port, gpio.OUT) - def setup_input(self, device_id, port): + def setup_input(self, device_id: int, port: int) -> None: """Set up a GPIO as input.""" self.check_device_id(device_id) gpio.devices[device_id].setup(port, gpio.IN) self.check_port_free(device_id, port, gpio.IN) - def write_output(self, device_id, port, value): + def write_output(self, device_id: int, port: int, value: int) -> None: """Write a value to a GPIO.""" self.check_port(device_id, port, gpio.OUT) gpio.devices[device_id].write(port, value) - def read_input(self, device_id, port): + def read_input(self, device_id: int, port: int) -> int: """Read a value from a GPIO.""" self.check_port(device_id, port, gpio.IN) return gpio.devices[device_id].read(port) - def read_adc_input(self, device_id, port): + def read_adc_input(self, device_id: int, port: int) -> int: """Read an ADC value from a GPIO ADC port.""" self.check_port(device_id, port, gpio.IN) self.check_device_id(device_id) return gpio.devices[device_id].adc_read(port) - def edge_detect(self, device_id, port, event_callback): + def edge_detect( + self, device_id: int, port: int, event_callback: Callable[[int, bool], None] + ) -> None: """Add detection for RISING and FALLING events.""" self.check_port(device_id, port, gpio.IN) gpio.devices[device_id].add_event_detect(port, event_callback, gpio.BOTH) diff --git a/homeassistant/components/numato/binary_sensor.py b/homeassistant/components/numato/binary_sensor.py index 1f664a372ba..47ab248d383 100644 --- a/homeassistant/components/numato/binary_sensor.py +++ b/homeassistant/components/numato/binary_sensor.py @@ -39,7 +39,7 @@ def setup_platform( if discovery_info is None: return - def read_gpio(device_id, port, level): + def read_gpio(device_id: int, port: int, level: bool) -> None: """Send signal to entity to have it update state.""" dispatcher_send(hass, NUMATO_SIGNAL.format(device_id, port), level)