Remove deprecated Raspihats integration (#67380)

This commit is contained in:
Franck Nijhof 2022-02-28 23:10:58 +01:00 committed by GitHub
parent e3709590cb
commit 487f4dcd90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 0 additions and 579 deletions

View File

@ -948,7 +948,6 @@ omit =
homeassistant/components/rainmachine/model.py
homeassistant/components/rainmachine/sensor.py
homeassistant/components/rainmachine/switch.py
homeassistant/components/raspihats/*
homeassistant/components/raspyrfm/*
homeassistant/components/recollect_waste/__init__.py
homeassistant/components/recollect_waste/sensor.py

View File

@ -135,11 +135,9 @@ jobs:
sed -i "s|# bluepy|bluepy|g" ${requirement_file}
sed -i "s|# beacontools|beacontools|g" ${requirement_file}
sed -i "s|# RPi.GPIO|RPi.GPIO|g" ${requirement_file}
sed -i "s|# raspihats|raspihats|g" ${requirement_file}
sed -i "s|# fritzconnection|fritzconnection|g" ${requirement_file}
sed -i "s|# pyuserinput|pyuserinput|g" ${requirement_file}
sed -i "s|# evdev|evdev|g" ${requirement_file}
sed -i "s|# smbus-cffi|smbus-cffi|g" ${requirement_file}
sed -i "s|# python-eq3bt|python-eq3bt|g" ${requirement_file}
sed -i "s|# pycups|pycups|g" ${requirement_file}
sed -i "s|# homekit|homekit|g" ${requirement_file}

View File

@ -1,253 +0,0 @@
"""Support for controlling raspihats boards."""
import logging
import threading
import time
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STOP
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType
_LOGGER = logging.getLogger(__name__)
DOMAIN = "raspihats"
CONF_I2C_HATS = "i2c_hats"
CONF_BOARD = "board"
CONF_CHANNELS = "channels"
CONF_INDEX = "index"
CONF_INVERT_LOGIC = "invert_logic"
CONF_INITIAL_STATE = "initial_state"
I2C_HAT_NAMES = [
"Di16",
"Rly10",
"Di6Rly6",
"DI16ac",
"DQ10rly",
"DQ16oc",
"DI6acDQ6rly",
]
I2C_HATS_MANAGER = "I2CH_MNG"
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the raspihats component."""
_LOGGER.warning(
"The Raspihats pHAT integration is deprecated and will be removed "
"in Home Assistant Core 2022.4; this integration is removed under "
"Architectural Decision Record 0019, more information can be found here: "
"https://github.com/home-assistant/architecture/blob/master/adr/0019-GPIO.md"
)
hass.data[DOMAIN] = {I2C_HATS_MANAGER: I2CHatsManager()}
def start_i2c_hats_keep_alive(event):
"""Start I2C-HATs keep alive."""
hass.data[DOMAIN][I2C_HATS_MANAGER].start_keep_alive()
def stop_i2c_hats_keep_alive(event):
"""Stop I2C-HATs keep alive."""
hass.data[DOMAIN][I2C_HATS_MANAGER].stop_keep_alive()
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, start_i2c_hats_keep_alive)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, stop_i2c_hats_keep_alive)
return True
def log_message(source, *parts):
"""Build log message."""
message = source.__class__.__name__
for part in parts:
message += f": {part!s}"
return message
class I2CHatsException(Exception):
"""I2C-HATs exception."""
class I2CHatsDIScanner:
"""Scan Digital Inputs and fire callbacks."""
_DIGITAL_INPUTS = "di"
_OLD_VALUE = "old_value"
_CALLBACKS = "callbacks"
def setup(self, i2c_hat):
"""Set up the I2C-HAT instance for digital inputs scanner."""
if hasattr(i2c_hat, self._DIGITAL_INPUTS):
digital_inputs = getattr(i2c_hat, self._DIGITAL_INPUTS)
old_value = None
# Add old value attribute
setattr(digital_inputs, self._OLD_VALUE, old_value)
# Add callbacks dict attribute {channel: callback}
setattr(digital_inputs, self._CALLBACKS, {})
def register_callback(self, i2c_hat, channel, callback):
"""Register edge callback."""
if hasattr(i2c_hat, self._DIGITAL_INPUTS):
digital_inputs = getattr(i2c_hat, self._DIGITAL_INPUTS)
callbacks = getattr(digital_inputs, self._CALLBACKS)
callbacks[channel] = callback
setattr(digital_inputs, self._CALLBACKS, callbacks)
def scan(self, i2c_hat):
"""Scan I2C-HATs digital inputs and fire callbacks."""
if hasattr(i2c_hat, self._DIGITAL_INPUTS):
digital_inputs = getattr(i2c_hat, self._DIGITAL_INPUTS)
callbacks = getattr(digital_inputs, self._CALLBACKS)
old_value = getattr(digital_inputs, self._OLD_VALUE)
value = digital_inputs.value # i2c data transfer
if old_value is not None and value != old_value:
for channel in range(0, len(digital_inputs.channels)):
state = (value >> channel) & 0x01
old_state = (old_value >> channel) & 0x01
if state != old_state:
callback = callbacks.get(channel)
if callback is not None:
callback(state)
setattr(digital_inputs, self._OLD_VALUE, value)
class I2CHatsManager(threading.Thread):
"""Manages all I2C-HATs instances."""
_EXCEPTION = "exception"
_CALLBACKS = "callbacks"
def __init__(self):
"""Init I2C-HATs Manager."""
threading.Thread.__init__(self)
self._lock = threading.Lock()
self._i2c_hats = {}
self._run = False
self._di_scanner = I2CHatsDIScanner()
def register_board(self, board, address):
"""Register I2C-HAT."""
with self._lock:
if (i2c_hat := self._i2c_hats.get(address)) is None:
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
import raspihats.i2c_hats as module
constructor = getattr(module, board)
i2c_hat = constructor(address)
setattr(i2c_hat, self._CALLBACKS, {})
# Setting exception attribute will trigger online callbacks
# when keep alive thread starts.
setattr(i2c_hat, self._EXCEPTION, None)
self._di_scanner.setup(i2c_hat)
self._i2c_hats[address] = i2c_hat
status_word = i2c_hat.status # read status_word to reset bits
_LOGGER.info(log_message(self, i2c_hat, "registered", status_word))
def run(self):
"""Keep alive for I2C-HATs."""
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
from raspihats.i2c_hats import ResponseException
_LOGGER.info(log_message(self, "starting"))
while self._run:
with self._lock:
for i2c_hat in list(self._i2c_hats.values()):
try:
self._di_scanner.scan(i2c_hat)
self._read_status(i2c_hat)
if hasattr(i2c_hat, self._EXCEPTION):
if getattr(i2c_hat, self._EXCEPTION) is not None:
_LOGGER.warning(
log_message(self, i2c_hat, "online again")
)
delattr(i2c_hat, self._EXCEPTION)
# trigger online callbacks
callbacks = getattr(i2c_hat, self._CALLBACKS)
for callback in list(callbacks.values()):
callback()
except ResponseException as ex:
if not hasattr(i2c_hat, self._EXCEPTION):
_LOGGER.error(log_message(self, i2c_hat, ex))
setattr(i2c_hat, self._EXCEPTION, ex)
time.sleep(0.05)
_LOGGER.info(log_message(self, "exiting"))
def _read_status(self, i2c_hat):
"""Read I2C-HATs status."""
status_word = i2c_hat.status
if status_word.value != 0x00:
_LOGGER.error(log_message(self, i2c_hat, status_word))
def start_keep_alive(self):
"""Start keep alive mechanism."""
self._run = True
threading.Thread.start(self)
def stop_keep_alive(self):
"""Stop keep alive mechanism."""
self._run = False
self.join()
def register_di_callback(self, address, channel, callback):
"""Register I2C-HAT digital input edge callback."""
with self._lock:
i2c_hat = self._i2c_hats[address]
self._di_scanner.register_callback(i2c_hat, channel, callback)
def register_online_callback(self, address, channel, callback):
"""Register I2C-HAT online callback."""
with self._lock:
i2c_hat = self._i2c_hats[address]
callbacks = getattr(i2c_hat, self._CALLBACKS)
callbacks[channel] = callback
setattr(i2c_hat, self._CALLBACKS, callbacks)
def read_di(self, address, channel):
"""Read a value from a I2C-HAT digital input."""
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
from raspihats.i2c_hats import ResponseException
with self._lock:
i2c_hat = self._i2c_hats[address]
try:
value = i2c_hat.di.value
return (value >> channel) & 0x01
except ResponseException as ex:
raise I2CHatsException(str(ex)) from ex
def write_dq(self, address, channel, value):
"""Write a value to a I2C-HAT digital output."""
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
from raspihats.i2c_hats import ResponseException
with self._lock:
i2c_hat = self._i2c_hats[address]
try:
i2c_hat.dq.channels[channel] = value
except ResponseException as ex:
raise I2CHatsException(str(ex)) from ex
def read_dq(self, address, channel):
"""Read a value from a I2C-HAT digital output."""
# This is a Pi module and can't be installed in CI without
# breaking the build.
# pylint: disable=import-outside-toplevel,import-error
from raspihats.i2c_hats import ResponseException
with self._lock:
i2c_hat = self._i2c_hats[address]
try:
return i2c_hat.dq.channels[channel]
except ResponseException as ex:
raise I2CHatsException(str(ex)) from ex

View File

@ -1,146 +0,0 @@
"""Support for raspihats board binary sensors."""
from __future__ import annotations
import logging
import voluptuous as vol
from homeassistant.components.binary_sensor import PLATFORM_SCHEMA, BinarySensorEntity
from homeassistant.const import (
CONF_ADDRESS,
CONF_DEVICE_CLASS,
CONF_NAME,
DEVICE_DEFAULT_NAME,
)
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import (
CONF_BOARD,
CONF_CHANNELS,
CONF_I2C_HATS,
CONF_INDEX,
CONF_INVERT_LOGIC,
DOMAIN,
I2C_HAT_NAMES,
I2C_HATS_MANAGER,
I2CHatsException,
I2CHatsManager,
)
_LOGGER = logging.getLogger(__name__)
DEFAULT_INVERT_LOGIC = False
DEFAULT_DEVICE_CLASS = None
_CHANNELS_SCHEMA = vol.Schema(
[
{
vol.Required(CONF_INDEX): cv.positive_int,
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_INVERT_LOGIC, default=DEFAULT_INVERT_LOGIC): cv.boolean,
vol.Optional(CONF_DEVICE_CLASS, default=DEFAULT_DEVICE_CLASS): cv.string,
}
]
)
_I2C_HATS_SCHEMA = vol.Schema(
[
{
vol.Required(CONF_BOARD): vol.In(I2C_HAT_NAMES),
vol.Required(CONF_ADDRESS): vol.Coerce(int),
vol.Required(CONF_CHANNELS): _CHANNELS_SCHEMA,
}
]
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{vol.Optional(CONF_I2C_HATS): _I2C_HATS_SCHEMA}
)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the raspihats binary_sensor devices."""
I2CHatBinarySensor.I2C_HATS_MANAGER = hass.data[DOMAIN][I2C_HATS_MANAGER]
binary_sensors = []
i2c_hat_configs = config.get(CONF_I2C_HATS, [])
for i2c_hat_config in i2c_hat_configs:
address = i2c_hat_config[CONF_ADDRESS]
board = i2c_hat_config[CONF_BOARD]
try:
assert I2CHatBinarySensor.I2C_HATS_MANAGER
I2CHatBinarySensor.I2C_HATS_MANAGER.register_board(board, address)
for channel_config in i2c_hat_config[CONF_CHANNELS]:
binary_sensors.append(
I2CHatBinarySensor(
address,
channel_config[CONF_INDEX],
channel_config[CONF_NAME],
channel_config[CONF_INVERT_LOGIC],
channel_config[CONF_DEVICE_CLASS],
)
)
except I2CHatsException as ex:
_LOGGER.error(
"Failed to register %s I2CHat@%s %s", board, hex(address), str(ex)
)
add_entities(binary_sensors)
class I2CHatBinarySensor(BinarySensorEntity):
"""Representation of a binary sensor that uses a I2C-HAT digital input."""
I2C_HATS_MANAGER: I2CHatsManager | None = None
def __init__(self, address, channel, name, invert_logic, device_class):
"""Initialize the raspihats sensor."""
self._address = address
self._channel = channel
self._name = name or DEVICE_DEFAULT_NAME
self._invert_logic = invert_logic
self._device_class = device_class
self._state = self.I2C_HATS_MANAGER.read_di(self._address, self._channel)
def online_callback():
"""Call fired when board is online."""
self.schedule_update_ha_state()
self.I2C_HATS_MANAGER.register_online_callback(
self._address, self._channel, online_callback
)
def edge_callback(state):
"""Read digital input state."""
self._state = state
self.schedule_update_ha_state()
self.I2C_HATS_MANAGER.register_di_callback(
self._address, self._channel, edge_callback
)
@property
def device_class(self):
"""Return the class of this sensor."""
return self._device_class
@property
def name(self):
"""Return the name of this sensor."""
return self._name
@property
def should_poll(self):
"""No polling needed for this sensor."""
return False
@property
def is_on(self):
"""Return the state of this sensor."""
return self._state != self._invert_logic

View File

@ -1,8 +0,0 @@
{
"domain": "raspihats",
"name": "Raspihats",
"documentation": "https://www.home-assistant.io/integrations/raspihats",
"requirements": ["raspihats==2.2.3", "smbus-cffi==0.5.1"],
"codeowners": [],
"iot_class": "local_push"
}

View File

@ -1,161 +0,0 @@
"""Support for raspihats board switches."""
from __future__ import annotations
import logging
import voluptuous as vol
from homeassistant.components.switch import PLATFORM_SCHEMA, SwitchEntity
from homeassistant.const import CONF_ADDRESS, CONF_NAME, DEVICE_DEFAULT_NAME
from homeassistant.core import HomeAssistant
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import (
CONF_BOARD,
CONF_CHANNELS,
CONF_I2C_HATS,
CONF_INDEX,
CONF_INITIAL_STATE,
CONF_INVERT_LOGIC,
DOMAIN,
I2C_HAT_NAMES,
I2C_HATS_MANAGER,
I2CHatsException,
I2CHatsManager,
)
_LOGGER = logging.getLogger(__name__)
_CHANNELS_SCHEMA = vol.Schema(
[
{
vol.Required(CONF_INDEX): cv.positive_int,
vol.Required(CONF_NAME): cv.string,
vol.Optional(CONF_INVERT_LOGIC, default=False): cv.boolean,
vol.Optional(CONF_INITIAL_STATE): cv.boolean,
}
]
)
_I2C_HATS_SCHEMA = vol.Schema(
[
{
vol.Required(CONF_BOARD): vol.In(I2C_HAT_NAMES),
vol.Required(CONF_ADDRESS): vol.Coerce(int),
vol.Required(CONF_CHANNELS): _CHANNELS_SCHEMA,
}
]
)
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{vol.Optional(CONF_I2C_HATS): _I2C_HATS_SCHEMA}
)
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the raspihats switch devices."""
I2CHatSwitch.I2C_HATS_MANAGER = hass.data[DOMAIN][I2C_HATS_MANAGER]
switches = []
i2c_hat_configs = config.get(CONF_I2C_HATS, [])
for i2c_hat_config in i2c_hat_configs:
board = i2c_hat_config[CONF_BOARD]
address = i2c_hat_config[CONF_ADDRESS]
try:
assert I2CHatSwitch.I2C_HATS_MANAGER
I2CHatSwitch.I2C_HATS_MANAGER.register_board(board, address)
for channel_config in i2c_hat_config[CONF_CHANNELS]:
switches.append(
I2CHatSwitch(
board,
address,
channel_config[CONF_INDEX],
channel_config[CONF_NAME],
channel_config[CONF_INVERT_LOGIC],
channel_config.get(CONF_INITIAL_STATE),
)
)
except I2CHatsException as ex:
_LOGGER.error(
"Failed to register %s I2CHat@%s %s", board, hex(address), str(ex)
)
add_entities(switches)
class I2CHatSwitch(SwitchEntity):
"""Representation a switch that uses a I2C-HAT digital output."""
I2C_HATS_MANAGER: I2CHatsManager | None = None
def __init__(self, board, address, channel, name, invert_logic, initial_state):
"""Initialize switch."""
self._board = board
self._address = address
self._channel = channel
self._name = name or DEVICE_DEFAULT_NAME
self._invert_logic = invert_logic
if initial_state is not None:
if self._invert_logic:
state = not initial_state
else:
state = initial_state
self.I2C_HATS_MANAGER.write_dq(self._address, self._channel, state)
def online_callback():
"""Call fired when board is online."""
self.schedule_update_ha_state()
self.I2C_HATS_MANAGER.register_online_callback(
self._address, self._channel, online_callback
)
def _log_message(self, message):
"""Create log message."""
string = f"{self._name} "
string += f"{self._board}I2CHat@{hex(self._address)} "
string += f"channel:{str(self._channel)}{message}"
return string
@property
def name(self):
"""Return the name of the switch."""
return self._name
@property
def should_poll(self):
"""Return the polling state."""
return False
@property
def is_on(self):
"""Return true if device is on."""
try:
state = self.I2C_HATS_MANAGER.read_dq(self._address, self._channel)
return state != self._invert_logic
except I2CHatsException as ex:
_LOGGER.error(self._log_message(f"Is ON check failed, {ex!s}"))
return False
def turn_on(self, **kwargs):
"""Turn the device on."""
try:
state = self._invert_logic is False
self.I2C_HATS_MANAGER.write_dq(self._address, self._channel, state)
self.schedule_update_ha_state()
except I2CHatsException as ex:
_LOGGER.error(self._log_message(f"Turn ON failed, {ex!s}"))
def turn_off(self, **kwargs):
"""Turn the device off."""
try:
state = self._invert_logic is not False
self.I2C_HATS_MANAGER.write_dq(self._address, self._channel, state)
self.schedule_update_ha_state()
except I2CHatsException as ex:
_LOGGER.error(self._log_message(f"Turn OFF failed, {ex!s}"))

View File

@ -2032,9 +2032,6 @@ radiotherm==2.1.0
# homeassistant.components.raincloud
raincloudy==0.0.7
# homeassistant.components.raspihats
# raspihats==2.2.3
# homeassistant.components.raspyrfm
raspyrfm-client==1.2.8
@ -2159,9 +2156,6 @@ smart-meter-texas==0.4.7
# homeassistant.components.smarthab
smarthab==0.21
# homeassistant.components.raspihats
# smbus-cffi==0.5.1
# homeassistant.components.smhi
smhi-pkg==1.0.15

View File

@ -33,9 +33,7 @@ COMMENT_REQUIREMENTS = (
"python-gammu",
"python-lirc",
"pyuserinput",
"raspihats",
"RPi.GPIO",
"smbus-cffi",
"tensorflow",
"tf-models-official",
)