Add config flow to Raspberry pi GPIO (#103)

* Add config flow to rpi_gpio

* remove title

* fix removing entities in config_flow and in cover.py

* Set config entry per port
Limit PR to binary_sensor

* restore original files

* Add name key to give the device a name
This commit is contained in:
Rami Mosleh 2022-10-05 14:09:44 +03:00 committed by GitHub
parent 8f1d3116d7
commit c87c8595fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 517 additions and 118 deletions

View File

@ -1,59 +1,128 @@
"""Support for controlling GPIO pins of a Raspberry Pi."""
"""The Raspberry Pi GPIO integration."""
from __future__ import annotations
import asyncio
from typing import Any
from RPi import GPIO # pylint: disable=import-error
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
EVENT_HOMEASSISTANT_START,
CONF_PLATFORM,
CONF_PORT,
EVENT_HOMEASSISTANT_STOP,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
DOMAIN = "rpi_gpio"
PLATFORMS = [
Platform.BINARY_SENSOR,
Platform.COVER,
Platform.SWITCH,
]
from .const import (
CONF_BOUNCETIME,
CONF_CONFIGURED_PORTS,
CONF_GPIO,
CONF_PULL_MODE,
DEFAULT_BOUNCETIME,
DEFAULT_PULL_MODE,
DOMAIN,
)
PLATFORMS: list[Platform] = [Platform.BINARY_SENSOR]
def setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Raspberry PI GPIO component."""
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Raspberry Pi GPIO from a config entry."""
def cleanup_gpio(event):
"""Stuff to do before stopping."""
@callback
def cleanup_gpio(event: Any) -> None:
"""Cleanup before stopping."""
GPIO.cleanup()
def prepare_gpio(event):
"""Stuff to do when Home Assistant starts."""
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio)
if DOMAIN not in hass.data:
# actions that should be done the first time only
hass.data[DOMAIN] = {}
hass.data[DOMAIN][CONF_CONFIGURED_PORTS] = []
hass.data[DOMAIN][CONF_GPIO] = RpiGPIO(hass)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, cleanup_gpio)
await hass.config_entries.async_forward_entry_setup(
entry, entry.data[CONF_PLATFORM]
)
hass.data[DOMAIN][CONF_CONFIGURED_PORTS].append(entry.data[CONF_PORT])
entry.async_on_unload(entry.add_update_listener(options_updated))
hass.bus.listen_once(EVENT_HOMEASSISTANT_START, prepare_gpio)
GPIO.setmode(GPIO.BCM)
return True
def setup_output(port):
"""Set up a GPIO as output."""
GPIO.setup(port, GPIO.OUT)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_forward_entry_unload(
entry, entry.data[CONF_PLATFORM]
):
if not hass.data[DOMAIN][CONF_CONFIGURED_PORTS]:
del hass.data[DOMAIN]
return unload_ok
def setup_input(port, pull_mode):
"""Set up a GPIO as input."""
GPIO.setup(port, GPIO.IN, GPIO.PUD_DOWN if pull_mode == "DOWN" else GPIO.PUD_UP)
async def options_updated(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Update when config_entry options update."""
await hass.config_entries.async_reload(entry.entry_id)
def write_output(port, value):
"""Write a value to a GPIO."""
GPIO.output(port, value)
class RpiGPIO:
"""Base class for Rpi GPIOs."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the class."""
self.hass = hass
GPIO.setmode(GPIO.BCM)
def read_input(port):
"""Read a value from a GPIO."""
return GPIO.input(port)
async def async_reset_port(self, port: str) -> None:
"""Reset removed port to input."""
await self.hass.async_add_executor_job(GPIO.setup, int(port), GPIO.IN)
async def async_read_input(self, port: str) -> bool:
"""Read a value from a GPIO."""
return await self.hass.async_add_executor_job(GPIO.input, int(port))
def edge_detect(port, event_callback, bounce):
"""Add detection for RISING and FALLING events."""
GPIO.add_event_detect(port, GPIO.BOTH, callback=event_callback, bouncetime=bounce)
async def async_write_output(self, port: str, value: int) -> None:
"""Write value to a GPIO."""
await self.hass.async_add_executor_job(GPIO.output, int(port), value)
async def async_remove_edge_detection(self, port: str) -> None:
"""Remove edge detection if input is deleted."""
await self.hass.async_add_executor_job(GPIO.remove_event_detect, int(port))
@callback
async def async_signal_edge_detected(self, port: int, bounce_time: int) -> None:
"""Send signal that input edge is detected."""
await asyncio.sleep(float(bounce_time / 1000))
async_dispatcher_send(self.hass, f"port_{port}_edge_detected")
def setup_port(self, entry: ConfigEntry) -> None:
"""Setup GPIO ports."""
@callback
def edge_detected(port: int) -> None:
"""Edge detection handler."""
self.hass.add_job(
self.async_signal_edge_detected,
port,
entry.options.get(CONF_BOUNCETIME, DEFAULT_BOUNCETIME),
)
if entry.data[CONF_PLATFORM] == Platform.BINARY_SENSOR:
# Setup input
GPIO.setup(
int(entry.data[CONF_PORT]),
GPIO.IN,
int(entry.options.get(CONF_PULL_MODE, DEFAULT_PULL_MODE)),
)
# Add edge detection
GPIO.add_event_detect(
int(entry.data[CONF_PORT]),
GPIO.BOTH,
callback=edge_detected,
bouncetime=entry.options.get(CONF_BOUNCETIME, DEFAULT_BOUNCETIME),
)

View File

@ -1,34 +1,38 @@
"""Support for binary sensor using RPi GPIO."""
from __future__ import annotations
import asyncio
import voluptuous as vol
from homeassistant.components.binary_sensor import PLATFORM_SCHEMA, BinarySensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_NAME,
CONF_PLATFORM,
CONF_PORT,
CONF_SENSORS,
CONF_UNIQUE_ID,
DEVICE_DEFAULT_NAME,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.reload import setup_reload_service
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import DOMAIN, PLATFORMS, edge_detect, read_input, setup_input
CONF_BOUNCETIME = "bouncetime"
CONF_INVERT_LOGIC = "invert_logic"
CONF_PORTS = "ports"
CONF_PULL_MODE = "pull_mode"
DEFAULT_BOUNCETIME = 50
DEFAULT_INVERT_LOGIC = False
DEFAULT_PULL_MODE = "UP"
from . import RpiGPIO
from .const import (
CONF_BOUNCETIME,
CONF_GPIO,
CONF_INVERT_LOGIC,
CONF_PORTS,
CONF_PULL_MODE,
DEFAULT_BOUNCETIME,
DEFAULT_INVERT_LOGIC,
DEFAULT_PULL_MODE,
DOMAIN,
)
from .entity import RpiGPIOEntity
_SENSORS_LEGACY_SCHEMA = vol.Schema({cv.positive_int: cv.string})
@ -59,83 +63,62 @@ PLATFORM_SCHEMA = vol.All(
)
def setup_platform(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Raspberry PI GPIO devices."""
setup_reload_service(hass, DOMAIN, PLATFORMS)
sensors = []
sensors_conf = config.get(CONF_SENSORS)
if sensors_conf is not None:
for sensor in sensors_conf:
sensors.append(
RPiGPIOBinarySensor(
sensor[CONF_NAME],
sensor[CONF_PORT],
sensor[CONF_PULL_MODE],
sensor[CONF_BOUNCETIME],
sensor[CONF_INVERT_LOGIC],
sensor.get(CONF_UNIQUE_ID),
)
)
add_entities(sensors, True)
return
pull_mode = config[CONF_PULL_MODE]
bouncetime = config[CONF_BOUNCETIME]
invert_logic = config[CONF_INVERT_LOGIC]
ports = config[CONF_PORTS]
for port_num, port_name in ports.items():
sensors.append(
RPiGPIOBinarySensor(
port_name, port_num, pull_mode, bouncetime, invert_logic
)
)
add_entities(sensors, True)
async_create_issue(
hass,
DOMAIN,
"deprecated_yaml",
breaks_in_ha_version="2022.11.0",
is_fixable=False,
severity=IssueSeverity.WARNING,
translation_key="deprecated_yaml",
)
class RPiGPIOBinarySensor(BinarySensorEntity):
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up rpi_power binary sensor."""
rpi_gpio: RpiGPIO = hass.data[DOMAIN][CONF_GPIO]
await hass.async_add_executor_job(rpi_gpio.setup_port, entry)
async_add_entities([RPiGPIOBinarySensor(hass, entry, rpi_gpio)], True)
class RPiGPIOBinarySensor(RpiGPIOEntity, BinarySensorEntity):
"""Represent a binary sensor that uses Raspberry Pi GPIO."""
async def async_read_gpio(self):
"""Read state from GPIO."""
await asyncio.sleep(float(self._bouncetime) / 1000)
self._state = await self.hass.async_add_executor_job(read_input, self._port)
self.async_write_ha_state()
async def async_update(self) -> None:
"""Update entity."""
self._attr_is_on = (
await self.rpi_gpio.async_read_input(self.port) != self.invert_logic
)
def __init__(self, name, port, pull_mode, bouncetime, invert_logic, unique_id=None):
"""Initialize the RPi binary sensor."""
self._attr_name = name or DEVICE_DEFAULT_NAME
self._attr_unique_id = unique_id
self._attr_should_poll = False
self._port = port
self._pull_mode = pull_mode
self._bouncetime = bouncetime
self._invert_logic = invert_logic
self._state = None
@callback
def _update_callback(self) -> None:
"""Call update method."""
self.async_schedule_update_ha_state(True)
setup_input(self._port, self._pull_mode)
async def async_added_to_hass(self) -> None:
"""Register callbacks"""
def edge_detected(port):
"""Edge detection handler."""
if self.hass is not None:
self.hass.add_job(self.async_read_gpio)
self.async_on_remove(
async_dispatcher_connect(
self.hass, f"port_{self.port}_edge_detected", self._update_callback
)
)
await super().async_added_to_hass()
edge_detect(self._port, edge_detected, self._bouncetime)
@property
def is_on(self):
"""Return the state of the entity."""
return self._state != self._invert_logic
def update(self):
"""Update the GPIO state."""
self._state = read_input(self._port)
async def async_will_remove_from_hass(self) -> None:
"""Remove edge detection."""
await self.rpi_gpio.async_remove_edge_detection(self.port)
await super().async_will_remove_from_hass()

View File

@ -0,0 +1,179 @@
"""Config flow for Raspberry Pi Power Supply Checker."""
from __future__ import annotations
from typing import Any
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.const import (
CONF_NAME,
CONF_PLATFORM,
CONF_PORT,
TIME_MILLISECONDS,
Platform,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import FlowResult
from homeassistant.helpers import selector
from .const import (
CONF_BOUNCETIME,
CONF_CONFIGURED_PORTS,
CONF_INVERT_LOGIC,
CONF_PULL_MODE,
DEFAULT_BOUNCETIME,
DEFAULT_INVERT_LOGIC,
DEFAULT_PULL_MODE,
DOMAIN,
GPIO_PIN_MAP,
PUD_DOWN,
PUD_UP,
)
PULL_MODES = [
selector.SelectOptionDict(value=PUD_UP, label="UP"),
selector.SelectOptionDict(value=PUD_DOWN, label="DOWN"),
]
BINARY_SENSOR_OPTIONS_SCHEMA = {
vol.Optional(CONF_INVERT_LOGIC, default=False): selector.BooleanSelector(),
vol.Optional(CONF_BOUNCETIME, default=DEFAULT_BOUNCETIME): vol.All(
selector.NumberSelector(
selector.NumberSelectorConfig(
min=50,
mode=selector.NumberSelectorMode.BOX,
unit_of_measurement=TIME_MILLISECONDS,
)
),
vol.Coerce(int),
),
vol.Optional(CONF_PULL_MODE, default=PUD_UP): selector.SelectSelector(
selector.SelectSelectorConfig(options=PULL_MODES)
),
}
def _get_options_schema(platform: Platform, options: dict[str, Any]) -> vol.Schema:
"""Return options schema based on platform."""
if platform == Platform.BINARY_SENSOR:
return vol.Schema(
{
vol.Optional(
CONF_INVERT_LOGIC,
default=options.get(CONF_INVERT_LOGIC, DEFAULT_INVERT_LOGIC),
): selector.BooleanSelector(),
vol.Optional(
CONF_BOUNCETIME,
default=options.get(CONF_BOUNCETIME, DEFAULT_BOUNCETIME),
): vol.All(
selector.NumberSelector(
selector.NumberSelectorConfig(
min=50,
mode=selector.NumberSelectorMode.BOX,
unit_of_measurement=TIME_MILLISECONDS,
)
),
vol.Coerce(int),
),
vol.Optional(
CONF_PULL_MODE,
default=options.get(CONF_PULL_MODE, DEFAULT_PULL_MODE),
): selector.SelectSelector(
selector.SelectSelectorConfig(options=PULL_MODES)
),
}
)
def _get_avaiable_ports(hass: HomeAssistant) -> list[selector.SelectOptionDict]:
"""Return schema with availble ports."""
if DOMAIN in hass.data:
configured_ports = hass.data[DOMAIN][CONF_CONFIGURED_PORTS]
else:
configured_ports = []
return [
selector.SelectOptionDict(
value=port,
label=f"GPIO{port} - PIN {GPIO_PIN_MAP[port]}",
)
for port in list(GPIO_PIN_MAP)
if port not in configured_ports
]
class ConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Raspberry Pi GPIO."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, str] | None = None
) -> FlowResult:
"""Handle a flow initialized by the user."""
return self.async_show_menu(
step_id="user",
menu_options=["add_binary_sensor"],
)
async def async_step_add_binary_sensor(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle adding a binary sensor entry."""
if user_input is None:
return self.async_show_form(
step_id="add_binary_sensor",
data_schema=vol.Schema(
{
vol.Required(CONF_NAME): selector.TextSelector(),
vol.Required(CONF_PORT, default=[]): selector.SelectSelector(
selector.SelectSelectorConfig(
options=_get_avaiable_ports(self.hass),
mode=selector.SelectSelectorMode.DROPDOWN,
)
),
}
),
)
await self.async_set_unique_id(user_input[CONF_PORT])
self._abort_if_unique_id_configured()
return self.async_create_entry(
title=f"{user_input[CONF_NAME]} (GPIO {user_input[CONF_PORT]})",
data={CONF_PLATFORM: Platform.BINARY_SENSOR, **user_input},
)
@staticmethod
@callback
def async_get_options_flow(
config_entry: config_entries.ConfigEntry,
) -> RpiGPIOOptionsFlowHandler:
"""Options callback for AccuWeather."""
return RpiGPIOOptionsFlowHandler(config_entry)
class RpiGPIOOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle integration options."""
def __init__(self, config_entry: config_entries.ConfigEntry) -> None:
"""Initialize integration options flow."""
self.config_entry = config_entry
async def async_step_init(
self, user_input: dict[str, str] | None = None
) -> FlowResult:
"""Manage the options."""
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
return self.async_show_form(
step_id="init",
data_schema=_get_options_schema(
self.config_entry.data[CONF_PLATFORM],
dict(self.config_entry.options),
),
)

View File

@ -0,0 +1,52 @@
"""Constants for the Raspberry Pi GPIO integration."""
from typing import Final
DOMAIN: Final = "rpi_gpio"
CONF_GPIO: Final = "gpio"
CONF_CONFIGURED_PORTS: Final = "configured_ports"
CONF_PORTS: Final = "ports"
CONF_BOUNCETIME: Final = "bouncetime"
CONF_PULL_MODE: Final = "pull_mode"
CONF_INVERT_LOGIC: Final = "invert_logic"
PUD_DOWN: Final = "21"
PUD_UP: Final = "22"
DEFAULT_BOUNCETIME: Final = 50
DEFAULT_INVERT_LOGIC: Final = False
DEFAULT_PULL_MODE: Final = PUD_UP
GPIO_PIN_MAP: Final = {
"0": "27",
"1": "28",
"2": "3",
"3": "5",
"4": "7",
"5": "29",
"6": "31",
"7": "26",
"8": "24",
"9": "21",
"10": "19",
"11": "23",
"12": "32",
"13": "33",
"14": "8",
"15": "10",
"16": "36",
"17": "11",
"18": "12",
"19": "35",
"20": "38",
"21": "40",
"22": "15",
"23": "16",
"24": "18",
"25": "22",
"26": "37",
"27": "13",
}

View File

@ -0,0 +1,46 @@
"""Base entity for Rpi GPIO ports."""
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_NAME, CONF_PORT
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import DeviceInfo, Entity
from . import RpiGPIO
from .const import (
CONF_CONFIGURED_PORTS,
CONF_INVERT_LOGIC,
DEFAULT_INVERT_LOGIC,
DOMAIN,
)
class RpiGPIOEntity(Entity):
"""Representation of a Raspberry Pi GPIO."""
_attr_has_entity_name = True
_attr_should_poll = False
def __init__(
self, hass: HomeAssistant, entry: ConfigEntry, rpi_gpio: RpiGPIO
) -> None:
"""Initialize the RPi GPIO entity."""
self.hass = hass
self.entry = entry
self.rpi_gpio = rpi_gpio
self.port: str = entry.data[CONF_PORT]
self._attr_unique_id = self.port
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, self._attr_unique_id)},
name=f"{entry.data[CONF_NAME]} (GPIO {self.port})",
manufacturer="Raspberry Pi",
)
@property
def invert_logic(self) -> bool:
"""Return if port state should be inverted."""
return self.entry.options.get(CONF_INVERT_LOGIC, DEFAULT_INVERT_LOGIC)
async def async_will_remove_from_hass(self) -> None:
"""Reset port to input."""
await self.rpi_gpio.async_reset_port(self.port)
self.hass.data[DOMAIN][CONF_CONFIGURED_PORTS].remove(self.port)

View File

@ -1,10 +1,11 @@
{
"domain": "rpi_gpio",
"name": "Raspberry Pi GPIO",
"config_flow": true,
"documentation": "https://github.com/thecode/ha-rpi_gpio",
"issue_tracker": "https://github.com/thecode/ha-rpi_gpio/issues",
"requirements": ["RPi.GPIO==0.7.1"],
"codeowners": ["@thecode"],
"iot_class": "local_push",
"version": "2022.7.0"
"version": "2022.9.0"
}

View File

@ -1,3 +0,0 @@
reload:
name: Reload
description: Reload all rpi_gpio entities.

View File

@ -0,0 +1,36 @@
{
"config": {
"step": {
"add_binary_sensor": {
"data": {
"port": "Select port to set as binary sensor"
}
},
"user": {
"menu_options": {
"add_binary_sensor": "Add Binary sensor",
"add_switch": "Add Switch",
"add_cover": "Add Cover",
"remove": "Remove entity"
}
}
}
},
"options": {
"step": {
"init": {
"data": {
"bouncetime": "Port debounce time (milliseconds)",
"invert_logic": "Invert port state (default is ACTIVE_HIGH)",
"pull_mode": "Set internal pull resistor"
}
}
}
},
"issues": {
"deprecated_yaml": {
"title": "The Raspberry Pi GPIO YAML configuration is being removed",
"description": "Configuring Raspberry Pi GPIO using YAML is being removed.\n\nYour existing YAML configuration has been imported into the UI automatically.\n\nRemove the Raspberry Pi GPIO YAML configuration from your configuration.yaml file and restart Home Assistant to fix this issue."
}
}
}

View File

@ -0,0 +1,36 @@
{
"config": {
"step": {
"add_binary_sensor": {
"data": {
"port": "Select port to set as binary sensor"
}
},
"user": {
"menu_options": {
"add_binary_sensor": "Add Binary sensor",
"add_cover": "Add Cover",
"add_switch": "Add Switch",
"remove": "Remove entity"
}
}
}
},
"issues": {
"deprecated_yaml": {
"description": "Configuring Raspberry Pi GPIO using YAML is being removed.\n\nYour existing YAML configuration has been imported into the UI automatically.\n\nRemove the Raspberry Pi GPIO YAML configuration from your configuration.yaml file and restart Home Assistant to fix this issue.",
"title": "The Raspberry Pi GPIO YAML configuration is being removed"
}
},
"options": {
"step": {
"init": {
"data": {
"bouncetime": "Port debounce time (milliseconds)",
"invert_logic": "Invert port state (default is ACTIVE_HIGH)",
"pull_mode": "Set internal pull resistor"
}
}
}
}
}