Compare commits

..

No commits in common. "main" and "2024.10.0" have entirely different histories.

10 changed files with 180 additions and 183 deletions

View File

@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: 🚀 Run stale - name: 🚀 Run stale
uses: actions/stale@v9.1.0 uses: actions/stale@v9.0.0
with: with:
repo-token: ${{ secrets.GITHUB_TOKEN }} repo-token: ${{ secrets.GITHUB_TOKEN }}
days-before-stale: 14 days-before-stale: 14

View File

@ -23,7 +23,7 @@ jobs:
with: with:
fetch-depth: 2 fetch-depth: 2
- name: Set up Python ${{ matrix.python-version }} - name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5.6.0 uses: actions/setup-python@v5.2.0
with: with:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
- name: Install dependencies - name: Install dependencies

View File

@ -18,7 +18,7 @@ Copy the `rpi_gpio` folder and all of its contents into your Home Assistant's `c
# Usage # Usage
The `rpi_gpio` platform will be initialized using the path to the gpio chip. When path is not in the config `/dev/gpiochip[0-5]` are tested for a gpiodevice having `pinctrl`, in sequence `[0,4,1,2,3,5]`. So with a raspberry pi you should be OK to leave the path empty. The `gpiod` platform will be initialized using the path to the gpio chip. When path is not in the config `/dev/gpiochip[0-5]` are tested for a gpiodevice having `pinctrl`, in sequence `[0,4,1,2,3,5]`. So with a raspberry pi you should be OK to leave the path empty.
Raspberry Pi | GPIO Device Raspberry Pi | GPIO Device
--- | --- --- | ---
@ -27,7 +27,7 @@ RPi5 | `/dev/gpiochip4`
```yaml ```yaml
# setup gpiod chip; mostly not required # setup gpiod chip; mostly not required
rpi_gpio: gpiod:
path: '/dev/gpiochip0' path: '/dev/gpiochip0'
``` ```
@ -194,6 +194,7 @@ switch:
| `persistent` | no | `false` | boolean | If true, the switch state will be persistent in HA and will be restored if HA restart / crash | | `persistent` | no | `false` | boolean | If true, the switch state will be persistent in HA and will be restored if HA restart / crash |
| `pull_mode` | no | `AS_IS` | string | Type of internal pull resistor to use: `UP` - pull-up resistor, `DOWN` - pull-down resistor, `AS-IS` no change | | `pull_mode` | no | `AS_IS` | string | Type of internal pull resistor to use: `UP` - pull-up resistor, `DOWN` - pull-down resistor, `AS-IS` no change |
| `drive` |no | `PUSH_PULL`|string | control drive configuration of the GPIO, determines how the line behaves when it is set to output mode; `PUSH_PULL`, GPIO line can both source and sink current, can actively drive the line to both high and low states. `OPEN-DRAIN`, GPPIO can only sink current (drive the line to low) and is otherwise left floating, and `OPEN-SOURCE` the reverse. | `drive` |no | `PUSH_PULL`|string | control drive configuration of the GPIO, determines how the line behaves when it is set to output mode; `PUSH_PULL`, GPIO line can both source and sink current, can actively drive the line to both high and low states. `OPEN-DRAIN`, GPPIO can only sink current (drive the line to low) and is otherwise left floating, and `OPEN-SOURCE` the reverse.
|`persistent` | no | `false` | boolean | If true, the switch state will be persistent in HA and will be restored if HA restart / crash. |
For more details about the GPIO layout, visit the Wikipedia [article](https://en.wikipedia.org/wiki/Raspberry_Pi#General_purpose_input-output_(GPIO)_connector) about the Raspberry Pi. For more details about the GPIO layout, visit the Wikipedia [article](https://en.wikipedia.org/wiki/Raspberry_Pi#General_purpose_input-output_(GPIO)_connector) about the Raspberry Pi.
@ -210,4 +211,4 @@ switch:
``` ```
# Reporting issues # Reporting issues
*Before* reporting issues please enable debug logging as described [here](https://www.home-assistant.io/docs/configuration/troubleshooting/#enabling-debug-logging), check logs and report issue attaching the log file and the relevant YAML section. *Before* reporting issues please enable debug logging as described [here](https://www.home-assistant.io/docs/configuration/troubleshooting/#enabling-debug-logging), check logs and report issue attaching the log file.

View File

@ -25,8 +25,6 @@ CONFIG_SCHEMA = vol.Schema(
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the GPIO component.""" """Set up the GPIO component."""
version = getattr(hass.data["integrations"][DOMAIN], "version", 0)
_LOGGER.debug(f"{DOMAIN} integration starting. Version: {version}")
path = config.get(DOMAIN, {}).get(CONF_PATH) path = config.get(DOMAIN, {}).get(CONF_PATH)
hub = Hub(hass, path) hub = Hub(hass, path)
hass.data[DOMAIN] = hub hass.data[DOMAIN] = hub

View File

@ -12,7 +12,6 @@ from homeassistant.helpers.config_validation import PLATFORM_SCHEMA
from homeassistant.components.binary_sensor import BinarySensorEntity from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.const import CONF_SENSORS, CONF_NAME, CONF_PORT, CONF_UNIQUE_ID from homeassistant.const import CONF_SENSORS, CONF_NAME, CONF_PORT, CONF_UNIQUE_ID
from .hub import BIAS from .hub import BIAS
CONF_INVERT_LOGIC = "invert_logic" CONF_INVERT_LOGIC = "invert_logic"
DEFAULT_INVERT_LOGIC = False DEFAULT_INVERT_LOGIC = False
CONF_BOUNCETIME = "bouncetime" CONF_BOUNCETIME = "bouncetime"
@ -52,20 +51,17 @@ async def async_setup_platform(
sensors = [] sensors = []
for sensor in config.get(CONF_SENSORS): for sensor in config.get(CONF_SENSORS):
try: sensors.append(
sensors.append( GPIODBinarySensor(
GPIODBinarySensor( hub,
hub, sensor[CONF_NAME],
sensor[CONF_NAME], sensor[CONF_PORT],
sensor[CONF_PORT], sensor.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{sensor[CONF_PORT]}_{sensor[CONF_NAME].lower().replace(' ', '_')}",
sensor.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{sensor[CONF_PORT]}_{sensor[CONF_NAME].lower().replace(' ', '_')}", sensor.get(CONF_INVERT_LOGIC),
sensor.get(CONF_INVERT_LOGIC), sensor.get(CONF_PULL_MODE),
sensor.get(CONF_PULL_MODE), sensor.get(CONF_BOUNCETIME)
sensor.get(CONF_BOUNCETIME)
)
) )
except Exception as e: )
_LOGGER.error(f"Failed to add binary sensor {sensor[CONF_NAME]} for port {sensor[CONF_PORT]}: {e}")
async_add_entities(sensors) async_add_entities(sensors)
@ -82,22 +78,12 @@ class GPIODBinarySensor(BinarySensorEntity):
self._active_low = active_low self._active_low = active_low
self._bias = bias self._bias = bias
self._debounce = debounce self._debounce = debounce
self._line, current_is_on = self._hub.add_sensor(self._port, self._active_low, self._bias, self._debounce)
self._attr_is_on = current_is_on
async def async_added_to_hass(self) -> None: async def async_added_to_hass(self) -> None:
await super().async_added_to_hass() await super().async_added_to_hass()
_LOGGER.debug(f"GPIODBinarySensor async_added_to_hass: Adding fd:{self._line.fd}") self._hub.add_sensor(self, self._port, self._active_low, self._bias, self._debounce)
self._hub._hass.loop.add_reader(self._line.fd, self.handle_event) self.async_write_ha_state()
async def async_will_remove_from_hass(self) -> None:
await super().async_will_remove_from_hass()
_LOGGER.debug(f"GPIODBinarySensor async_will_remove_from_hass: Removing fd:{self._line.fd}")
self._hub._hass.loop.remove_reader(self._line.fd)
self._line.release()
def handle_event(self): def handle_event(self):
for event in self._line.read_edge_events(): self._attr_is_on = self._hub.update(self._port)
self._attr_is_on = True if event.event_type is event.Type.RISING_EDGE else False
_LOGGER.debug(f"Event: {event}. New line value: {self._attr_is_on}")
self.schedule_update_ha_state(False) self.schedule_update_ha_state(False)

View File

@ -17,7 +17,6 @@ from homeassistant.const import CONF_COVERS, CONF_NAME, CONF_UNIQUE_ID
from .hub import BIAS, DRIVE from .hub import BIAS, DRIVE
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
import voluptuous as vol import voluptuous as vol
import asyncio
CONF_RELAY_PIN = "relay_pin" CONF_RELAY_PIN = "relay_pin"
CONF_RELAY_TIME = "relay_time" CONF_RELAY_TIME = "relay_time"
@ -71,24 +70,21 @@ async def async_setup_platform(
invert_relay = config[CONF_INVERT_RELAY] invert_relay = config[CONF_INVERT_RELAY]
covers = [] covers = []
for cover in config.get(CONF_COVERS): for cover in config.get(CONF_COVERS):
try: covers.append(
covers.append( GPIODCover(
GPIODCover( hub,
hub, cover[CONF_NAME],
cover[CONF_NAME], cover.get(CONF_RELAY_PIN),
cover.get(CONF_RELAY_PIN), relay_time,
relay_time, invert_relay,
invert_relay, "AS_IS",
"AS_IS", "PUSH_PULL",
"PUSH_PULL", cover.get(CONF_STATE_PIN),
cover.get(CONF_STATE_PIN), state_pull_mode,
state_pull_mode, invert_state,
invert_state, cover.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{cover.get(CONF_RELAY_PORT) or cover.get("relay_pin")}_{cover[CONF_NAME].lower().replace(' ', '_')}",
cover.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{cover.get(CONF_RELAY_PIN)}_{cover[CONF_NAME].lower().replace(' ', '_')}",
)
) )
except Exception as e: )
_LOGGER.error(f"Failed to add cover {cover[CONF_NAME]} for port {cover.get(CONF_RELAY_PIN)}:{cover.get(CONF_STATE_PIN)}: {e}")
async_add_entities(covers) async_add_entities(covers)
@ -109,66 +105,51 @@ class GPIODCover(CoverEntity):
self._state_port = state_port self._state_port = state_port
self._state_bias = state_bias self._state_bias = state_bias
self._state_active_low = state_active_low self._state_active_low = state_active_low
self._relay_line, self._state_line, current_is_on = self._hub.add_cover( self._attr_is_closed = False != state_active_low
self._relay_port, self._relay_active_low, self._relay_bias, self._relay_drive,
self._state_port, self._state_bias, self._state_active_low)
self._attr_is_closed = current_is_on
self.is_on = current_is_on
async def async_added_to_hass(self) -> None: async def async_added_to_hass(self) -> None:
await super().async_added_to_hass() await super().async_added_to_hass()
_LOGGER.debug(f"GPIODCover async_added_to_hass: Adding fd:{self._state_line.fd}") self._hub.add_cover(self, self._relay_port, self._relay_active_low, self._relay_bias,
self._hub._hass.loop.add_reader(self._state_line.fd, self.handle_event) self._relay_drive, self._state_port, self._state_bias, self._state_active_low)
self.async_write_ha_state()
async def async_will_remove_from_hass(self) -> None:
await super().async_will_remove_from_hass()
_LOGGER.debug(f"GPIODCover async_will_remove_from_hass: Removing fd:{self._state_line.fd}")
self._hub._hass.loop.remove_reader(self._state_line.fd)
self._relay_line.release()
self._state_line.release()
def handle_event(self): def handle_event(self):
for event in self._state_line.read_edge_events(): self._attr_is_closed = self._hub.update(self._state_port)
self._attr_is_closed = True if event.event_type is event.Type.RISING_EDGE else False
_LOGGER.debug(f"Event: {event}. New _attr_is_closed value: {self._attr_is_closed}")
self.schedule_update_ha_state(False) self.schedule_update_ha_state(False)
async def async_close_cover(self, **kwargs): def close_cover(self, **kwargs):
_LOGGER.debug(f"GPIODCover async_close_cover: is_closed: {self.is_closed}. is_closing: {self.is_closing}, is_opening: {self.is_opening}")
if self.is_closed: if self.is_closed:
return return
self._hub.turn_on(self._relay_line, self._relay_port) self._hub.turn_on(self._relay_port)
self._attr_is_closing = True self._attr_is_closing = True
self.async_write_ha_state() self.schedule_update_ha_state(False)
await asyncio.sleep(self._relay_time) sleep(self._relay_time)
if not self.is_closing: if not self.is_closing:
# closing stopped # closing stopped
return return
self._hub.turn_off(self._relay_line, self._relay_port) self._hub.turn_off(self._relay_port)
self._attr_is_closing = False self._attr_is_closing = False
self.async_write_ha_state() self.handle_event()
async def async_open_cover(self, **kwargs): def open_cover(self, **kwargs):
_LOGGER.debug(f"GPIODCover async_open_cover: is_closed: {self.is_closed}. is_closing: {self.is_closing}, is_opening: {self.is_opening}")
if not self.is_closed: if not self.is_closed:
return return
self._hub.turn_on(self._relay_line, self._relay_port) self._hub.turn_on(self._relay_port)
self._attr_is_opening = True self._attr_is_opening = True
self.async_write_ha_state() self.schedule_update_ha_state(False)
await asyncio.sleep(self._relay_time) sleep(self._relay_time)
if not self.is_opening: if not self.is_opening:
# opening stopped # opening stopped
return return
self._hub.turn_off(self._relay_line, self._relay_port) self._hub.turn_off(self._relay_port)
self._attr_is_opening = False self._attr_is_opening = False
self.async_write_ha_state() self.handle_event()
async def async_stop_cover(self, **kwargs): def stop_cover(self, **kwargs):
_LOGGER.debug(f"GPIODCover async_stop_cover: is_closed: {self.is_closed}. is_closing: {self.is_closing}, is_opening: {self.is_opening}")
if not (self.is_closing or self.is_opening): if not (self.is_closing or self.is_opening):
return return
self._hub.turn_off(self._relay_line, self._relay_port) self._hub.turn_off(self._relay_port)
self._attr_is_opening = False self._attr_is_opening = False
self._attr_is_closing = False self._attr_is_closing = False
self.async_write_ha_state() self.schedule_update_ha_state(False)

View File

@ -7,7 +7,7 @@ _LOGGER = logging.getLogger(__name__)
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_START from homeassistant.const import EVENT_HOMEASSISTANT_STOP, EVENT_HOMEASSISTANT_START
from homeassistant.exceptions import HomeAssistantError,ServiceValidationError from homeassistant.exceptions import HomeAssistantError
from typing import Dict from typing import Dict
from datetime import timedelta from datetime import timedelta
@ -39,16 +39,18 @@ class Hub:
self._id = path self._id = path
self._hass = hass self._hass = hass
self._online = False self._online = False
self._lines : gpiod.LineRequest = None
self._config : Dict[int, gpiod.LineSettings] = {}
self._edge_events = False
self._entities = {}
if path: if path:
# use config # use config
_LOGGER.debug(f"trying to use configured device: {path}")
if self.verify_gpiochip(path): if self.verify_gpiochip(path):
self._online = True self._online = True
self._path = path self._path = path
else: else:
# discover # discover
_LOGGER.debug(f"auto discovering gpio device")
for d in [0,4,1,2,3,5]: for d in [0,4,1,2,3,5]:
# rpi3,4 using 0. rpi5 using 4 # rpi3,4 using 0. rpi5 using 4
path = f"/dev/gpiochip{d}" path = f"/dev/gpiochip{d}"
@ -57,14 +59,16 @@ class Hub:
self._path = path self._path = path
break break
self.verify_online()
_LOGGER.debug(f"using gpio_device: {self._path}")
def verify_online(self):
if not self._online: if not self._online:
_LOGGER.error("No gpio device detected, bailing out") _LOGGER.error("No gpio device detected, bailing out")
raise HomeAssistantError("No gpio device detected") raise HomeAssistantError("No gpio device detected")
_LOGGER.debug(f"using gpio_device: {self._path}")
# startup and shutdown triggers of hass
self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_START, self.startup)
self._hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, self.cleanup)
def verify_gpiochip(self, path): def verify_gpiochip(self, path):
if not gpiod.is_gpiochip_device(path): if not gpiod.is_gpiochip_device(path):
_LOGGER.debug(f"verify_gpiochip: {path} not a gpiochip_device") _LOGGER.debug(f"verify_gpiochip: {path} not a gpiochip_device")
@ -73,7 +77,6 @@ class Hub:
_LOGGER.debug(f"verify_gpiochip: {path} is a gpiochip_device") _LOGGER.debug(f"verify_gpiochip: {path} is a gpiochip_device")
self._chip = gpiod.Chip(path) self._chip = gpiod.Chip(path)
info = self._chip.get_info() info = self._chip.get_info()
_LOGGER.debug(f"verify_gpiochip: {path} info is: {info}")
if not "pinctrl" in info.label: if not "pinctrl" in info.label:
_LOGGER.debug(f"verify_gpiochip: {path} no pinctrl {info.label}") _LOGGER.debug(f"verify_gpiochip: {path} no pinctrl {info.label}")
return False return False
@ -81,69 +84,104 @@ class Hub:
_LOGGER.debug(f"verify_gpiochip gpiodevice: {path} has pinctrl") _LOGGER.debug(f"verify_gpiochip gpiodevice: {path} has pinctrl")
return True return True
def verify_port_ready(self, port: int): async def startup(self, _):
info = self._chip.get_line_info(port) """Stuff to do after starting."""
_LOGGER.debug(f"original port {port} info: {info}") _LOGGER.debug(f"startup {DOMAIN} hub")
if info.used: if not self._online:
if info.consumer != DOMAIN: return
raise HomeAssistantError(f"Port {port} already in use by {info.consumer}")
else: # setup lines
raise HomeAssistantError(f"Port {port} already in use by another entity, check your config for duplicates port usage") self.update_lines()
if not self._edge_events:
return
_LOGGER.debug("Start listener")
self._hass.loop.add_reader(self._lines.fd, self.handle_events)
def cleanup(self, _):
"""Stuff to do before stopping."""
_LOGGER.debug(f"cleanup {DOMAIN} hub")
if self._config:
self._config.clear()
if self._lines:
self._lines.release()
if self._chip:
self._chip.close()
self._online = False
@property @property
def hub_id(self) -> str: def hub_id(self) -> str:
"""ID for hub""" """ID for hub"""
return self._id return self._id
def add_switch(self, port, active_low, bias, drive_mode, init_state) -> gpiod.LineRequest: def update_lines(self) -> None:
_LOGGER.debug(f"add_switch - port: {port}, active_low: {active_low}, bias: {bias}, drive_mode: {drive_mode}, init_state: {init_state}") if not self._online:
self.verify_online() _LOGGER.debug(f"gpiod hub not online {self._path}")
self.verify_port_ready(port) if not self._config:
_LOGGER.debug(f"gpiod config is empty")
if self._lines:
self._lines.release()
line_request = self._chip.request_lines( _LOGGER.debug(f"updating lines: {self._config}")
consumer=DOMAIN, self._lines = gpiod.request_lines(
config={port: gpiod.LineSettings( self._path,
direction = Direction.OUTPUT, consumer = "rpi_gpio",
bias = BIAS[bias], config = self._config
drive = DRIVE[drive_mode], )
active_low = active_low,
output_value = Value.ACTIVE if init_state is not None and init_state else Value.INACTIVE)})
_LOGGER.debug(f"add_switch line_request: {line_request}")
return line_request
def turn_on(self, line, port) -> None: def handle_events(self):
for event in self._lines.read_edge_events():
_LOGGER.debug(f"Event: {event}")
self._entities[event.line_offset].handle_event()
def add_switch(self, entity, port, active_low, bias, drive_mode, init_output_value = True) -> None:
_LOGGER.debug(f"in add_switch {port}")
self._entities[port] = entity
self._config[port] = gpiod.LineSettings(
direction = Direction.OUTPUT,
bias = BIAS[bias],
drive = DRIVE[drive_mode],
active_low = active_low,
output_value = Value.ACTIVE if init_output_value and entity.is_on else Value.INACTIVE
)
def turn_on(self, port) -> None:
_LOGGER.debug(f"in turn_on {port}") _LOGGER.debug(f"in turn_on {port}")
self.verify_online() self._lines.set_value(port, Value.ACTIVE)
line.set_value(port, Value.ACTIVE)
def turn_off(self, line, port) -> None: def turn_off(self, port) -> None:
_LOGGER.debug(f"in turn_off {port}") _LOGGER.debug(f"in turn_off {port}")
self.verify_online() self._lines.set_value(port, Value.INACTIVE)
line.set_value(port, Value.INACTIVE)
def add_sensor(self, port, active_low, bias, debounce) -> gpiod.LineRequest: def add_sensor(self, entity, port, active_low, bias, debounce) -> None:
_LOGGER.debug(f"add_sensor - port: {port}, active_low: {active_low}, bias: {bias}, debounce: {debounce}") _LOGGER.debug(f"in add_sensor {port}")
self.verify_online() # read current status of the sensor
self.verify_port_ready(port) line = self._chip.request_lines({ port: {} })
value = True if line.get_value(port) == Value.ACTIVE else False
entity.is_on = True if value ^ active_low else False
line.release()
_LOGGER.debug(f"current value for port {port}: {entity.is_on}")
line_request = self._chip.request_lines( self._entities[port] = entity
consumer=DOMAIN, self._config[port] = gpiod.LineSettings(
config={port: gpiod.LineSettings( direction = Direction.INPUT,
direction = Direction.INPUT, edge_detection = Edge.BOTH,
edge_detection = Edge.BOTH, bias = BIAS[bias],
bias = BIAS[bias], active_low = active_low,
active_low = active_low, debounce_period = timedelta(milliseconds=debounce),
debounce_period = timedelta(milliseconds=debounce), event_clock = Clock.REALTIME,
event_clock = Clock.REALTIME)}) output_value = Value.ACTIVE if entity.is_on else Value.INACTIVE,
_LOGGER.debug(f"add_sensor line_request: {line_request}") )
current_is_on = True if line_request.get_value(port) == Value.ACTIVE else False self._edge_events = True
_LOGGER.debug(f"add_sensor current state: {current_is_on}")
return line_request, current_is_on
def add_cover(self, relay_port, relay_active_low, relay_bias, relay_drive, def update(self, port, **kwargs):
state_port, state_bias, state_active_low): return self._lines.get_value(port) == Value.ACTIVE
_LOGGER.debug(f"add_cover - relay_port: {relay_port}, state_port: {state_port}")
relay_line = self.add_switch(relay_port, relay_active_low, relay_bias, relay_drive, False) def add_cover(self, entity, relay_port, relay_active_low, relay_bias, relay_drive,
state_line, current_is_on = self.add_sensor(state_port, state_active_low, state_bias, 50) state_port, state_bias, state_active_low) -> None:
return relay_line, state_line, current_is_on _LOGGER.debug(f"in add_cover {relay_port} {state_port}")
self.add_switch(entity, relay_port, relay_active_low, relay_bias, relay_drive, init_output_value = False)
self.add_sensor(entity, state_port, state_active_low, state_bias, 50)
self.update_lines()

View File

@ -6,6 +6,6 @@
"integration_type": "hub", "integration_type": "hub",
"iot_class": "local_push", "iot_class": "local_push",
"issue_tracker": "https://github.com/thecode/ha-rpi_gpio/issues", "issue_tracker": "https://github.com/thecode/ha-rpi_gpio/issues",
"requirements": [ "gpiod>=2.2.1" ], "requirements": [ "gpiod>=2.0.2" ],
"version": "2025.2.1" "version": "2024.10.0"
} }

View File

@ -56,21 +56,18 @@ async def async_setup_platform(
switches = [] switches = []
for switch in config.get(CONF_SWITCHES): for switch in config.get(CONF_SWITCHES):
try: switches.append(
switches.append( GPIODSwitch(
GPIODSwitch( hub,
hub, switch[CONF_NAME],
switch[CONF_NAME], switch[CONF_PORT],
switch[CONF_PORT], switch.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{switch[CONF_PORT]}_{switch[CONF_NAME].lower().replace(' ', '_')}",
switch.get(CONF_UNIQUE_ID) or f"{DOMAIN}_{switch[CONF_PORT]}_{switch[CONF_NAME].lower().replace(' ', '_')}", switch.get(CONF_INVERT_LOGIC),
switch.get(CONF_INVERT_LOGIC), switch.get(CONF_PULL_MODE),
switch.get(CONF_PULL_MODE), switch.get(CONF_DRIVE),
switch.get(CONF_DRIVE), switch[CONF_PERSISTENT]
switch[CONF_PERSISTENT]
)
) )
except Exception as e: )
_LOGGER.error(f"Failed to add switch {switch[CONF_NAME]} for port {switch[CONF_PORT]}: {e}")
async_add_entities(switches) async_add_entities(switches)
@ -79,7 +76,7 @@ class GPIODSwitch(SwitchEntity, RestoreEntity):
_attr_should_poll = False _attr_should_poll = False
def __init__(self, hub, name, port, unique_id, active_low, bias, drive, persistent): def __init__(self, hub, name, port, unique_id, active_low, bias, drive, persistent):
_LOGGER.debug(f"GPIODSwitch init: {port} - {name} - {unique_id} - active_low: {active_low} - bias: {bias} - drive: {drive} - persistent: {persistent}") _LOGGER.debug(f"GPIODSwitch init: {port} - {name} - {unique_id} - active_low: {active_low} - bias: {bias} - drive: {drive}")
self._hub = hub self._hub = hub
self._attr_name = name self._attr_name = name
self._attr_unique_id = unique_id self._attr_unique_id = unique_id
@ -88,9 +85,7 @@ class GPIODSwitch(SwitchEntity, RestoreEntity):
self._bias = bias self._bias = bias
self._drive_mode = drive self._drive_mode = drive
self._persistent = persistent self._persistent = persistent
self._line = None
self._hub.verify_port_ready(self._port)
async def async_added_to_hass(self) -> None: async def async_added_to_hass(self) -> None:
"""Call when the switch is added to hass.""" """Call when the switch is added to hass."""
await super().async_added_to_hass() await super().async_added_to_hass()
@ -98,23 +93,21 @@ class GPIODSwitch(SwitchEntity, RestoreEntity):
if not state or not self._persistent: if not state or not self._persistent:
self._attr_is_on = False self._attr_is_on = False
else: else:
_LOGGER.debug(f"setting initial persistent state for: {self._port}. state: {state.state}") _LOGGER.debug(f"GPIODSwitch async_added_to_has initial port: {self._port} persistent: {self._persistent} state: {state.state}")
self._attr_is_on = True if state.state == STATE_ON else False self._attr_is_on = True if state.state == STATE_ON else False
self.async_write_ha_state() self._hub.add_switch(self, self._port, self._active_low, self._bias, self._drive_mode)
self._line = self._hub.add_switch(self._port, self._active_low, self._bias, self._drive_mode, self._attr_is_on) self.async_write_ha_state()
async def async_will_remove_from_hass(self) -> None:
await super().async_will_remove_from_hass()
_LOGGER.debug(f"GPIODSwitch async_will_remove_from_hass")
if self._line:
self._line.release()
async def async_turn_on(self, **kwargs: Any) -> None: async def async_turn_on(self, **kwargs: Any) -> None:
self._hub.turn_on(self._line, self._port) self._hub.turn_on(self._port)
self._attr_is_on = True self._attr_is_on = True
self.async_write_ha_state() self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None: async def async_turn_off(self, **kwargs: Any) -> None:
self._hub.turn_off(self._line, self._port) self._hub.turn_off(self._port)
self._attr_is_on = False self._attr_is_on = False
self.async_write_ha_state() self.async_write_ha_state()
def handle_event(self):
self._attr_is_on = self._hub.update(self._port)
self.schedule_update_ha_state(False)

View File

@ -1,7 +1,7 @@
black==25.1.0 black==24.10.0
flake8==7.3.0 flake8==7.1.1
isort==6.0.1 isort==5.13.2
mypy==1.17.0 mypy==1.12.1
pre-commit==4.2.0 pre-commit==4.0.1
pydocstyle==6.3.0 pydocstyle==6.3.0
pylint==3.3.7 pylint==3.3.1