mirror of
https://github.com/home-assistant/core.git
synced 2025-07-22 20:57:21 +00:00
Improve type hints in numato (#126022)
This commit is contained in:
parent
457f63cce0
commit
56d00fd0c8
@ -1,5 +1,6 @@
|
|||||||
"""Support for controlling GPIO pins of a Numato Labs USB GPIO expander."""
|
"""Support for controlling GPIO pins of a Numato Labs USB GPIO expander."""
|
||||||
|
|
||||||
|
from collections.abc import Callable
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import numato_gpio as gpio
|
import numato_gpio as gpio
|
||||||
@ -16,7 +17,7 @@ from homeassistant.const import (
|
|||||||
PERCENTAGE,
|
PERCENTAGE,
|
||||||
Platform,
|
Platform,
|
||||||
)
|
)
|
||||||
from homeassistant.core import HomeAssistant
|
from homeassistant.core import Event, HomeAssistant
|
||||||
import homeassistant.helpers.config_validation as cv
|
import homeassistant.helpers.config_validation as cv
|
||||||
from homeassistant.helpers.discovery import load_platform
|
from homeassistant.helpers.discovery import load_platform
|
||||||
from homeassistant.helpers.typing import ConfigType
|
from homeassistant.helpers.typing import ConfigType
|
||||||
@ -149,14 +150,14 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||||||
|
|
||||||
hass.data[DOMAIN][DATA_API] = NumatoAPI()
|
hass.data[DOMAIN][DATA_API] = NumatoAPI()
|
||||||
|
|
||||||
def cleanup_gpio(event):
|
def cleanup_gpio(event: Event) -> None:
|
||||||
"""Stuff to do before stopping."""
|
"""Stuff to do before stopping."""
|
||||||
_LOGGER.debug("Clean up Numato GPIO")
|
_LOGGER.debug("Clean up Numato GPIO")
|
||||||
gpio.cleanup()
|
gpio.cleanup()
|
||||||
if DATA_API in hass.data[DOMAIN]:
|
if DATA_API in hass.data[DOMAIN]:
|
||||||
hass.data[DOMAIN][DATA_API].ports_registered.clear()
|
hass.data[DOMAIN][DATA_API].ports_registered.clear()
|
||||||
|
|
||||||
def prepare_gpio(event):
|
def prepare_gpio(event: Event) -> None:
|
||||||
"""Stuff to do when home assistant starts."""
|
"""Stuff to do when home assistant starts."""
|
||||||
_LOGGER.debug("Setup cleanup at stop for Numato GPIO")
|
_LOGGER.debug("Setup cleanup at stop for Numato GPIO")
|
||||||
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio)
|
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio)
|
||||||
@ -172,11 +173,11 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
|||||||
class NumatoAPI:
|
class NumatoAPI:
|
||||||
"""Home-Assistant specific API for numato device access."""
|
"""Home-Assistant specific API for numato device access."""
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self) -> None:
|
||||||
"""Initialize API state."""
|
"""Initialize API state."""
|
||||||
self.ports_registered = {}
|
self.ports_registered: dict[tuple[int, int], int] = {}
|
||||||
|
|
||||||
def check_port_free(self, device_id, port, direction):
|
def check_port_free(self, device_id: int, port: int, direction: int) -> None:
|
||||||
"""Check whether a port is still free set up.
|
"""Check whether a port is still free set up.
|
||||||
|
|
||||||
Fail with exception if it has already been registered.
|
Fail with exception if it has already been registered.
|
||||||
@ -194,7 +195,7 @@ class NumatoAPI:
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
def check_device_id(self, device_id):
|
def check_device_id(self, device_id: int) -> None:
|
||||||
"""Check whether a device has been discovered.
|
"""Check whether a device has been discovered.
|
||||||
|
|
||||||
Fail with exception.
|
Fail with exception.
|
||||||
@ -202,7 +203,7 @@ class NumatoAPI:
|
|||||||
if device_id not in gpio.devices:
|
if device_id not in gpio.devices:
|
||||||
raise gpio.NumatoGpioError(f"Device {device_id} not available.")
|
raise gpio.NumatoGpioError(f"Device {device_id} not available.")
|
||||||
|
|
||||||
def check_port(self, device_id, port, direction):
|
def check_port(self, device_id: int, port: int, direction: int) -> None:
|
||||||
"""Raise an error if the port setup doesn't match the direction."""
|
"""Raise an error if the port setup doesn't match the direction."""
|
||||||
self.check_device_id(device_id)
|
self.check_device_id(device_id)
|
||||||
if (device_id, port) not in self.ports_registered:
|
if (device_id, port) not in self.ports_registered:
|
||||||
@ -220,35 +221,37 @@ class NumatoAPI:
|
|||||||
if self.ports_registered[(device_id, port)] != direction:
|
if self.ports_registered[(device_id, port)] != direction:
|
||||||
raise gpio.NumatoGpioError(msg[direction])
|
raise gpio.NumatoGpioError(msg[direction])
|
||||||
|
|
||||||
def setup_output(self, device_id, port):
|
def setup_output(self, device_id: int, port: int) -> None:
|
||||||
"""Set up a GPIO as output."""
|
"""Set up a GPIO as output."""
|
||||||
self.check_device_id(device_id)
|
self.check_device_id(device_id)
|
||||||
self.check_port_free(device_id, port, gpio.OUT)
|
self.check_port_free(device_id, port, gpio.OUT)
|
||||||
gpio.devices[device_id].setup(port, gpio.OUT)
|
gpio.devices[device_id].setup(port, gpio.OUT)
|
||||||
|
|
||||||
def setup_input(self, device_id, port):
|
def setup_input(self, device_id: int, port: int) -> None:
|
||||||
"""Set up a GPIO as input."""
|
"""Set up a GPIO as input."""
|
||||||
self.check_device_id(device_id)
|
self.check_device_id(device_id)
|
||||||
gpio.devices[device_id].setup(port, gpio.IN)
|
gpio.devices[device_id].setup(port, gpio.IN)
|
||||||
self.check_port_free(device_id, port, gpio.IN)
|
self.check_port_free(device_id, port, gpio.IN)
|
||||||
|
|
||||||
def write_output(self, device_id, port, value):
|
def write_output(self, device_id: int, port: int, value: int) -> None:
|
||||||
"""Write a value to a GPIO."""
|
"""Write a value to a GPIO."""
|
||||||
self.check_port(device_id, port, gpio.OUT)
|
self.check_port(device_id, port, gpio.OUT)
|
||||||
gpio.devices[device_id].write(port, value)
|
gpio.devices[device_id].write(port, value)
|
||||||
|
|
||||||
def read_input(self, device_id, port):
|
def read_input(self, device_id: int, port: int) -> int:
|
||||||
"""Read a value from a GPIO."""
|
"""Read a value from a GPIO."""
|
||||||
self.check_port(device_id, port, gpio.IN)
|
self.check_port(device_id, port, gpio.IN)
|
||||||
return gpio.devices[device_id].read(port)
|
return gpio.devices[device_id].read(port)
|
||||||
|
|
||||||
def read_adc_input(self, device_id, port):
|
def read_adc_input(self, device_id: int, port: int) -> int:
|
||||||
"""Read an ADC value from a GPIO ADC port."""
|
"""Read an ADC value from a GPIO ADC port."""
|
||||||
self.check_port(device_id, port, gpio.IN)
|
self.check_port(device_id, port, gpio.IN)
|
||||||
self.check_device_id(device_id)
|
self.check_device_id(device_id)
|
||||||
return gpio.devices[device_id].adc_read(port)
|
return gpio.devices[device_id].adc_read(port)
|
||||||
|
|
||||||
def edge_detect(self, device_id, port, event_callback):
|
def edge_detect(
|
||||||
|
self, device_id: int, port: int, event_callback: Callable[[int, bool], None]
|
||||||
|
) -> None:
|
||||||
"""Add detection for RISING and FALLING events."""
|
"""Add detection for RISING and FALLING events."""
|
||||||
self.check_port(device_id, port, gpio.IN)
|
self.check_port(device_id, port, gpio.IN)
|
||||||
gpio.devices[device_id].add_event_detect(port, event_callback, gpio.BOTH)
|
gpio.devices[device_id].add_event_detect(port, event_callback, gpio.BOTH)
|
||||||
|
@ -39,7 +39,7 @@ def setup_platform(
|
|||||||
if discovery_info is None:
|
if discovery_info is None:
|
||||||
return
|
return
|
||||||
|
|
||||||
def read_gpio(device_id, port, level):
|
def read_gpio(device_id: int, port: int, level: bool) -> None:
|
||||||
"""Send signal to entity to have it update state."""
|
"""Send signal to entity to have it update state."""
|
||||||
dispatcher_send(hass, NUMATO_SIGNAL.format(device_id, port), level)
|
dispatcher_send(hass, NUMATO_SIGNAL.format(device_id, port), level)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user