Compare commits

..

No commits in common. "main" and "untagged-4666ac712dd34f0dc1a3" have entirely different histories.

10 changed files with 174 additions and 171 deletions

View File

@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: 🚀 Run stale
uses: actions/stale@v9.1.0
uses: actions/stale@v9.0.0
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
days-before-stale: 14

View File

@ -23,7 +23,7 @@ jobs:
with:
fetch-depth: 2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5.6.0
uses: actions/setup-python@v5.3.0
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies

View File

@ -194,6 +194,7 @@ switch:
| `persistent` | no | `false` | boolean | If true, the switch state will be persistent in HA and will be restored if HA restart / crash |
| `pull_mode` | no | `AS_IS` | string | Type of internal pull resistor to use: `UP` - pull-up resistor, `DOWN` - pull-down resistor, `AS-IS` no change |
| `drive` |no | `PUSH_PULL`|string | control drive configuration of the GPIO, determines how the line behaves when it is set to output mode; `PUSH_PULL`, GPIO line can both source and sink current, can actively drive the line to both high and low states. `OPEN-DRAIN`, GPPIO can only sink current (drive the line to low) and is otherwise left floating, and `OPEN-SOURCE` the reverse.
|`persistent` | no | `false` | boolean | If true, the switch state will be persistent in HA and will be restored if HA restart / crash. |
For more details about the GPIO layout, visit the Wikipedia [article](https://en.wikipedia.org/wiki/Raspberry_Pi#General_purpose_input-output_(GPIO)_connector) about the Raspberry Pi.
@ -210,4 +211,4 @@ switch:
```
# Reporting issues
*Before* reporting issues please enable debug logging as described [here](https://www.home-assistant.io/docs/configuration/troubleshooting/#enabling-debug-logging), check logs and report issue attaching the log file and the relevant YAML section.
*Before* reporting issues please enable debug logging as described [here](https://www.home-assistant.io/docs/configuration/troubleshooting/#enabling-debug-logging), check logs and report issue attaching the log file.

View File

@ -25,8 +25,6 @@ CONFIG_SCHEMA = vol.Schema(
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the GPIO component."""
version = getattr(hass.data["integrations"][DOMAIN], "version", 0)
_LOGGER.debug(f"{DOMAIN} integration starting. Version: {version}")
path = config.get(DOMAIN, {}).get(CONF_PATH)
hub = Hub(hass, path)
hass.data[DOMAIN] = hub

View File

@ -12,7 +12,6 @@ from homeassistant.helpers.config_validation import PLATFORM_SCHEMA
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.const import CONF_SENSORS, CONF_NAME, CONF_PORT, CONF_UNIQUE_ID
from .hub import BIAS
CONF_INVERT_LOGIC = "invert_logic"
DEFAULT_INVERT_LOGIC = False
CONF_BOUNCETIME = "bouncetime"
@ -52,20 +51,17 @@ async def async_setup_platform(
sensors = []
for sensor in config.get(CONF_SENSORS):
try:
sensors.append(
GPIODBinarySensor(
hub,
sensor[CONF_NAME],
sensor[CONF_PORT],
sensor.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{sensor[CONF_PORT]}_{sensor[CONF_NAME].lower().replace(' ', '_')}",
sensor.get(CONF_INVERT_LOGIC),
sensor.get(CONF_PULL_MODE),
sensor.get(CONF_BOUNCETIME)
)
sensors.append(
GPIODBinarySensor(
hub,
sensor[CONF_NAME],
sensor[CONF_PORT],
sensor.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{sensor[CONF_PORT]}_{sensor[CONF_NAME].lower().replace(' ', '_')}",
sensor.get(CONF_INVERT_LOGIC),
sensor.get(CONF_PULL_MODE),
sensor.get(CONF_BOUNCETIME)
)
except Exception as e:
_LOGGER.error(f"Failed to add binary sensor {sensor[CONF_NAME]} for port {sensor[CONF_PORT]}: {e}")
)
async_add_entities(sensors)
@ -82,22 +78,12 @@ class GPIODBinarySensor(BinarySensorEntity):
self._active_low = active_low
self._bias = bias
self._debounce = debounce
self._line, current_is_on = self._hub.add_sensor(self._port, self._active_low, self._bias, self._debounce)
self._attr_is_on = current_is_on
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
_LOGGER.debug(f"GPIODBinarySensor async_added_to_hass: Adding fd:{self._line.fd}")
self._hub._hass.loop.add_reader(self._line.fd, self.handle_event)
async def async_will_remove_from_hass(self) -> None:
await super().async_will_remove_from_hass()
_LOGGER.debug(f"GPIODBinarySensor async_will_remove_from_hass: Removing fd:{self._line.fd}")
self._hub._hass.loop.remove_reader(self._line.fd)
self._line.release()
self._hub.add_sensor(self, self._port, self._active_low, self._bias, self._debounce)
self.async_write_ha_state()
def handle_event(self):
for event in self._line.read_edge_events():
self._attr_is_on = True if event.event_type is event.Type.RISING_EDGE else False
_LOGGER.debug(f"Event: {event}. New line value: {self._attr_is_on}")
self._attr_is_on = self._hub.get_line_value(self._port)
self.schedule_update_ha_state(False)

View File

@ -17,7 +17,6 @@ from homeassistant.const import CONF_COVERS, CONF_NAME, CONF_UNIQUE_ID
from .hub import BIAS, DRIVE
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
import asyncio
CONF_RELAY_PIN = "relay_pin"
CONF_RELAY_TIME = "relay_time"
@ -71,24 +70,21 @@ async def async_setup_platform(
invert_relay = config[CONF_INVERT_RELAY]
covers = []
for cover in config.get(CONF_COVERS):
try:
covers.append(
GPIODCover(
hub,
cover[CONF_NAME],
cover.get(CONF_RELAY_PIN),
relay_time,
invert_relay,
"AS_IS",
"PUSH_PULL",
cover.get(CONF_STATE_PIN),
state_pull_mode,
invert_state,
cover.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{cover.get(CONF_RELAY_PIN)}_{cover[CONF_NAME].lower().replace(' ', '_')}",
)
covers.append(
GPIODCover(
hub,
cover[CONF_NAME],
cover.get(CONF_RELAY_PIN),
relay_time,
invert_relay,
"AS_IS",
"PUSH_PULL",
cover.get(CONF_STATE_PIN),
state_pull_mode,
invert_state,
cover.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{cover.get(CONF_RELAY_PORT) or cover.get("relay_pin")}_{cover[CONF_NAME].lower().replace(' ', '_')}",
)
except Exception as e:
_LOGGER.error(f"Failed to add cover {cover[CONF_NAME]} for port {cover.get(CONF_RELAY_PIN)}:{cover.get(CONF_STATE_PIN)}: {e}")
)
async_add_entities(covers)
@ -109,66 +105,51 @@ class GPIODCover(CoverEntity):
self._state_port = state_port
self._state_bias = state_bias
self._state_active_low = state_active_low
self._relay_line, self._state_line, current_is_on = self._hub.add_cover(
self._relay_port, self._relay_active_low, self._relay_bias, self._relay_drive,
self._state_port, self._state_bias, self._state_active_low)
self._attr_is_closed = current_is_on
self.is_on = current_is_on
self._attr_is_closed = False != state_active_low
async def async_added_to_hass(self) -> None:
await super().async_added_to_hass()
_LOGGER.debug(f"GPIODCover async_added_to_hass: Adding fd:{self._state_line.fd}")
self._hub._hass.loop.add_reader(self._state_line.fd, self.handle_event)
async def async_will_remove_from_hass(self) -> None:
await super().async_will_remove_from_hass()
_LOGGER.debug(f"GPIODCover async_will_remove_from_hass: Removing fd:{self._state_line.fd}")
self._hub._hass.loop.remove_reader(self._state_line.fd)
self._relay_line.release()
self._state_line.release()
self._hub.add_cover(self, self._relay_port, self._relay_active_low, self._relay_bias,
self._relay_drive, self._state_port, self._state_bias, self._state_active_low)
self.async_write_ha_state()
def handle_event(self):
for event in self._state_line.read_edge_events():
self._attr_is_closed = True if event.event_type is event.Type.RISING_EDGE else False
_LOGGER.debug(f"Event: {event}. New _attr_is_closed value: {self._attr_is_closed}")
self._attr_is_closed = self._hub.get_line_value(self._state_port)
self.schedule_update_ha_state(False)
async def async_close_cover(self, **kwargs):
_LOGGER.debug(f"GPIODCover async_close_cover: is_closed: {self.is_closed}. is_closing: {self.is_closing}, is_opening: {self.is_opening}")
def close_cover(self, **kwargs):
if self.is_closed:
return
self._hub.turn_on(self._relay_line, self._relay_port)
self._hub.turn_on(self._relay_port)
self._attr_is_closing = True
self.async_write_ha_state()
await asyncio.sleep(self._relay_time)
self.schedule_update_ha_state(False)
sleep(self._relay_time)
if not self.is_closing:
# closing stopped
return
self._hub.turn_off(self._relay_line, self._relay_port)
self._hub.turn_off(self._relay_port)
self._attr_is_closing = False
self.async_write_ha_state()
self.handle_event()
async def async_open_cover(self, **kwargs):
_LOGGER.debug(f"GPIODCover async_open_cover: is_closed: {self.is_closed}. is_closing: {self.is_closing}, is_opening: {self.is_opening}")
def open_cover(self, **kwargs):
if not self.is_closed:
return
self._hub.turn_on(self._relay_line, self._relay_port)
self._hub.turn_on(self._relay_port)
self._attr_is_opening = True
self.async_write_ha_state()
await asyncio.sleep(self._relay_time)
self.schedule_update_ha_state(False)
sleep(self._relay_time)
if not self.is_opening:
# opening stopped
return
self._hub.turn_off(self._relay_line, self._relay_port)
self._hub.turn_off(self._relay_port)
self._attr_is_opening = False
self.async_write_ha_state()
self.handle_event()
async def async_stop_cover(self, **kwargs):
_LOGGER.debug(f"GPIODCover async_stop_cover: is_closed: {self.is_closed}. is_closing: {self.is_closing}, is_opening: {self.is_opening}")
def stop_cover(self, **kwargs):
if not (self.is_closing or self.is_opening):
return
self._hub.turn_off(self._relay_line, self._relay_port)
self._hub.turn_off(self._relay_port)
self._attr_is_opening = False
self._attr_is_closing = False
self.async_write_ha_state()
self.schedule_update_ha_state(False)

View File

@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__)
from homeassistant.core import HomeAssistant
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_START
from homeassistant.exceptions import HomeAssistantError,ServiceValidationError
from homeassistant.exceptions import HomeAssistantError
from typing import Dict
from datetime import timedelta
@ -39,6 +39,10 @@ class Hub:
self._id = path
self._hass = hass
self._online = False
self._lines : gpiod.LineRequest = None
self._config : Dict[int, gpiod.LineSettings] = {}
self._edge_events = False
self._entities = {}
if path:
# use config
@ -58,8 +62,13 @@ class Hub:
break
self.verify_online()
_LOGGER.debug(f"using gpio_device: {self._path}")
# startup and shutdown triggers of hass
self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, self.startup)
self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.cleanup)
def verify_online(self):
if not self._online:
_LOGGER.error("No gpio device detected, bailing out")
@ -73,7 +82,6 @@ class Hub:
_LOGGER.debug(f"verify_gpiochip: {path} is a gpiochip_device")
self._chip = gpiod.Chip(path)
info = self._chip.get_info()
_LOGGER.debug(f"verify_gpiochip: {path} info is: {info}")
if not "pinctrl" in info.label:
_LOGGER.debug(f"verify_gpiochip: {path} no pinctrl {info.label}")
return False
@ -81,69 +89,105 @@ class Hub:
_LOGGER.debug(f"verify_gpiochip gpiodevice: {path} has pinctrl")
return True
def verify_port_ready(self, port: int):
info = self._chip.get_line_info(port)
_LOGGER.debug(f"original port {port} info: {info}")
if info.used:
if info.consumer != DOMAIN:
raise HomeAssistantError(f"Port {port} already in use by {info.consumer}")
else:
raise HomeAssistantError(f"Port {port} already in use by another entity, check your config for duplicates port usage")
async def startup(self, _):
"""Stuff to do after starting."""
_LOGGER.debug(f"startup {DOMAIN} hub")
if not self._online:
return
# setup lines
self.update_lines()
if not self._edge_events:
return
_LOGGER.debug("Start listener")
self._hass.loop.add_reader(self._lines.fd, self.handle_events)
def cleanup(self, _):
"""Stuff to do before stopping."""
_LOGGER.debug(f"cleanup {DOMAIN} hub")
if self._config:
self._config.clear()
if self._lines:
self._lines.release()
if self._chip:
self._chip.close()
self._online = False
@property
def hub_id(self) -> str:
"""ID for hub"""
return self._id
def add_switch(self, port, active_low, bias, drive_mode, init_state) -> gpiod.LineRequest:
_LOGGER.debug(f"add_switch - port: {port}, active_low: {active_low}, bias: {bias}, drive_mode: {drive_mode}, init_state: {init_state}")
self.verify_online()
self.verify_port_ready(port)
def update_lines(self) -> None:
if not self._online:
_LOGGER.debug(f"gpiod hub not online {self._path}")
if not self._config:
_LOGGER.debug(f"gpiod config is empty")
if self._lines:
self._lines.release()
line_request = self._chip.request_lines(
consumer=DOMAIN,
config={port: gpiod.LineSettings(
direction = Direction.OUTPUT,
bias = BIAS[bias],
drive = DRIVE[drive_mode],
active_low = active_low,
output_value = Value.ACTIVE if init_state is not None and init_state else Value.INACTIVE)})
_LOGGER.debug(f"add_switch line_request: {line_request}")
return line_request
_LOGGER.debug(f"updating lines: {self._config}")
self._lines = gpiod.request_lines(
self._path,
consumer = "rpi_gpio",
config = self._config
)
def turn_on(self, line, port) -> None:
def handle_events(self):
for event in self._lines.read_edge_events():
_LOGGER.debug(f"Event: {event}")
self._entities[event.line_offset].handle_event()
def add_switch(self, entity, port, active_low, bias, drive_mode, init_output_value = True) -> None:
_LOGGER.debug(f"in add_switch {port}")
self._entities[port] = entity
self._config[port] = gpiod.LineSettings(
direction = Direction.OUTPUT,
bias = BIAS[bias],
drive = DRIVE[drive_mode],
active_low = active_low,
output_value = Value.ACTIVE if init_output_value and entity.is_on else Value.INACTIVE
)
def turn_on(self, port) -> None:
_LOGGER.debug(f"in turn_on {port}")
self.verify_online()
line.set_value(port, Value.ACTIVE)
self._lines.set_value(port, Value.ACTIVE)
def turn_off(self, line, port) -> None:
def turn_off(self, port) -> None:
_LOGGER.debug(f"in turn_off {port}")
self.verify_online()
line.set_value(port, Value.INACTIVE)
self._lines.set_value(port, Value.INACTIVE)
def add_sensor(self, port, active_low, bias, debounce) -> gpiod.LineRequest:
_LOGGER.debug(f"add_sensor - port: {port}, active_low: {active_low}, bias: {bias}, debounce: {debounce}")
self.verify_online()
self.verify_port_ready(port)
def add_sensor(self, entity, port, active_low, bias, debounce) -> None:
_LOGGER.debug(f"in add_sensor {port}")
# read current status of the sensor
line = self._chip.request_lines({ port: {} })
value = True if line.get_value(port) == Value.ACTIVE else False
entity.is_on = True if value ^ active_low else False
line.release()
_LOGGER.debug(f"current value for port {port}: {entity.is_on}")
line_request = self._chip.request_lines(
consumer=DOMAIN,
config={port: gpiod.LineSettings(
direction = Direction.INPUT,
edge_detection = Edge.BOTH,
bias = BIAS[bias],
active_low = active_low,
debounce_period = timedelta(milliseconds=debounce),
event_clock = Clock.REALTIME)})
_LOGGER.debug(f"add_sensor line_request: {line_request}")
current_is_on = True if line_request.get_value(port) == Value.ACTIVE else False
_LOGGER.debug(f"add_sensor current state: {current_is_on}")
return line_request, current_is_on
self._entities[port] = entity
self._config[port] = gpiod.LineSettings(
direction = Direction.INPUT,
edge_detection = Edge.BOTH,
bias = BIAS[bias],
active_low = active_low,
debounce_period = timedelta(milliseconds=debounce),
event_clock = Clock.REALTIME,
output_value = Value.ACTIVE if entity.is_on else Value.INACTIVE,
)
self._edge_events = True
def add_cover(self, relay_port, relay_active_low, relay_bias, relay_drive,
state_port, state_bias, state_active_low):
_LOGGER.debug(f"add_cover - relay_port: {relay_port}, state_port: {state_port}")
relay_line = self.add_switch(relay_port, relay_active_low, relay_bias, relay_drive, False)
state_line, current_is_on = self.add_sensor(state_port, state_active_low, state_bias, 50)
return relay_line, state_line, current_is_on
def get_line_value(self, port, **kwargs):
return self._lines.get_value(port) == Value.ACTIVE
def add_cover(self, entity, relay_port, relay_active_low, relay_bias, relay_drive,
state_port, state_bias, state_active_low) -> None:
_LOGGER.debug(f"in add_cover {relay_port} {state_port}")
self.add_switch(entity, relay_port, relay_active_low, relay_bias, relay_drive, init_output_value = False)
self.add_sensor(entity, state_port, state_active_low, state_bias, 50)

View File

@ -6,6 +6,6 @@
"integration_type": "hub",
"iot_class": "local_push",
"issue_tracker": "https://github.com/thecode/ha-rpi_gpio/issues",
"requirements": [ "gpiod>=2.2.1" ],
"version": "2025.2.1"
"requirements": [ "gpiod>=2.0.2" ],
"version": "2024.10.0"
}

View File

@ -56,21 +56,18 @@ async def async_setup_platform(
switches = []
for switch in config.get(CONF_SWITCHES):
try:
switches.append(
GPIODSwitch(
hub,
switch[CONF_NAME],
switch[CONF_PORT],
switch.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{switch[CONF_PORT]}_{switch[CONF_NAME].lower().replace(' ', '_')}",
switch.get(CONF_INVERT_LOGIC),
switch.get(CONF_PULL_MODE),
switch.get(CONF_DRIVE),
switch[CONF_PERSISTENT]
)
switches.append(
GPIODSwitch(
hub,
switch[CONF_NAME],
switch[CONF_PORT],
switch.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{switch[CONF_PORT]}_{switch[CONF_NAME].lower().replace(' ', '_')}",
switch.get(CONF_INVERT_LOGIC),
switch.get(CONF_PULL_MODE),
switch.get(CONF_DRIVE),
switch[CONF_PERSISTENT]
)
except Exception as e:
_LOGGER.error(f"Failed to add switch {switch[CONF_NAME]} for port {switch[CONF_PORT]}: {e}")
)
async_add_entities(switches)
@ -88,9 +85,7 @@ class GPIODSwitch(SwitchEntity, RestoreEntity):
self._bias = bias
self._drive_mode = drive
self._persistent = persistent
self._line = None
self._hub.verify_port_ready(self._port)
async def async_added_to_hass(self) -> None:
"""Call when the switch is added to hass."""
await super().async_added_to_hass()
@ -100,21 +95,19 @@ class GPIODSwitch(SwitchEntity, RestoreEntity):
else:
_LOGGER.debug(f"setting initial persistent state for: {self._port}. state: {state.state}")
self._attr_is_on = True if state.state == STATE_ON else False
self.async_write_ha_state()
self._line = self._hub.add_switch(self._port, self._active_low, self._bias, self._drive_mode, self._attr_is_on)
async def async_will_remove_from_hass(self) -> None:
await super().async_will_remove_from_hass()
_LOGGER.debug(f"GPIODSwitch async_will_remove_from_hass")
if self._line:
self._line.release()
self._hub.add_switch(self, self._port, self._active_low, self._bias, self._drive_mode)
self.async_write_ha_state()
async def async_turn_on(self, **kwargs: Any) -> None:
self._hub.turn_on(self._line, self._port)
self._hub.turn_on(self._port)
self._attr_is_on = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
self._hub.turn_off(self._line, self._port)
self._hub.turn_off(self._port)
self._attr_is_on = False
self.async_write_ha_state()
def handle_event(self):
self._attr_is_on = self._hub.get_line_value(self._port)
self.schedule_update_ha_state(False)

View File

@ -1,7 +1,7 @@
black==25.1.0
flake8==7.3.0
isort==6.0.1
mypy==1.17.0
pre-commit==4.2.0
black==24.10.0
flake8==7.1.1
isort==5.13.2
mypy==1.12.1
pre-commit==4.0.1
pydocstyle==6.3.0
pylint==3.3.7
pylint==3.3.1