Add new data types to ADS integration (#125201)

* feat: Introduce new data types to ADS integration.

* refactor: ADS data unpacking based on PLC data type

* refactor: handle BOOL and STRING as special cases.
This commit is contained in:
Adam Pasztor 2024-09-05 12:07:19 +02:00 committed by GitHub
parent ba7f36328d
commit 70966c2b63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -29,18 +29,40 @@ DATA_ADS = "data_ads"
# Supported Types # Supported Types
ADSTYPE_BOOL = "bool" ADSTYPE_BOOL = "bool"
ADSTYPE_BYTE = "byte" ADSTYPE_BYTE = "byte"
ADSTYPE_DINT = "dint"
ADSTYPE_INT = "int" ADSTYPE_INT = "int"
ADSTYPE_UDINT = "udint"
ADSTYPE_UINT = "uint" ADSTYPE_UINT = "uint"
ADSTYPE_SINT = "sint"
ADSTYPE_USINT = "usint"
ADSTYPE_DINT = "dint"
ADSTYPE_UDINT = "udint"
ADSTYPE_WORD = "word"
ADSTYPE_DWORD = "dword"
ADSTYPE_LREAL = "lreal"
ADSTYPE_REAL = "real"
ADSTYPE_STRING = "string"
ADSTYPE_TIME = "time"
ADSTYPE_DATE = "date"
ADSTYPE_DATE_AND_TIME = "dt"
ADSTYPE_TOD = "tod"
ADS_TYPEMAP = { ADS_TYPEMAP = {
ADSTYPE_BOOL: pyads.PLCTYPE_BOOL, ADSTYPE_BOOL: pyads.PLCTYPE_BOOL,
ADSTYPE_BYTE: pyads.PLCTYPE_BYTE, ADSTYPE_BYTE: pyads.PLCTYPE_BYTE,
ADSTYPE_DINT: pyads.PLCTYPE_DINT,
ADSTYPE_INT: pyads.PLCTYPE_INT, ADSTYPE_INT: pyads.PLCTYPE_INT,
ADSTYPE_UDINT: pyads.PLCTYPE_UDINT,
ADSTYPE_UINT: pyads.PLCTYPE_UINT, ADSTYPE_UINT: pyads.PLCTYPE_UINT,
ADSTYPE_SINT: pyads.PLCTYPE_SINT,
ADSTYPE_USINT: pyads.PLCTYPE_USINT,
ADSTYPE_DINT: pyads.PLCTYPE_DINT,
ADSTYPE_UDINT: pyads.PLCTYPE_UDINT,
ADSTYPE_WORD: pyads.PLCTYPE_WORD,
ADSTYPE_DWORD: pyads.PLCTYPE_DWORD,
ADSTYPE_REAL: pyads.PLCTYPE_REAL,
ADSTYPE_LREAL: pyads.PLCTYPE_LREAL,
ADSTYPE_STRING: pyads.PLCTYPE_STRING,
ADSTYPE_TIME: pyads.PLCTYPE_TIME,
ADSTYPE_DATE: pyads.PLCTYPE_DATE,
ADSTYPE_DATE_AND_TIME: pyads.PLCTYPE_DT,
ADSTYPE_TOD: pyads.PLCTYPE_TOD,
} }
CONF_ADS_FACTOR = "factor" CONF_ADS_FACTOR = "factor"
@ -75,12 +97,23 @@ SCHEMA_SERVICE_WRITE_DATA_BY_NAME = vol.Schema(
{ {
vol.Required(CONF_ADS_TYPE): vol.In( vol.Required(CONF_ADS_TYPE): vol.In(
[ [
ADSTYPE_BOOL,
ADSTYPE_BYTE,
ADSTYPE_INT, ADSTYPE_INT,
ADSTYPE_UINT, ADSTYPE_UINT,
ADSTYPE_BYTE, ADSTYPE_SINT,
ADSTYPE_BOOL, ADSTYPE_USINT,
ADSTYPE_DINT, ADSTYPE_DINT,
ADSTYPE_UDINT, ADSTYPE_UDINT,
ADSTYPE_WORD,
ADSTYPE_DWORD,
ADSTYPE_REAL,
ADSTYPE_LREAL,
ADSTYPE_STRING,
ADSTYPE_TIME,
ADSTYPE_DATE,
ADSTYPE_DATE_AND_TIME,
ADSTYPE_TOD,
] ]
), ),
vol.Required(CONF_ADS_VALUE): vol.Coerce(int), vol.Required(CONF_ADS_VALUE): vol.Coerce(int),
@ -222,37 +255,53 @@ class AdsHub:
def _device_notification_callback(self, notification, name): def _device_notification_callback(self, notification, name):
"""Handle device notifications.""" """Handle device notifications."""
contents = notification.contents contents = notification.contents
hnotify = int(contents.hNotification) hnotify = int(contents.hNotification)
_LOGGER.debug("Received notification %d", hnotify) _LOGGER.debug("Received notification %d", hnotify)
# get dynamically sized data array # Get dynamically sized data array
data_size = contents.cbSampleSize data_size = contents.cbSampleSize
data = (ctypes.c_ubyte * data_size).from_address( data_address = (
ctypes.addressof(contents) ctypes.addressof(contents)
+ pyads.structs.SAdsNotificationHeader.data.offset + pyads.structs.SAdsNotificationHeader.data.offset
) )
data = (ctypes.c_ubyte * data_size).from_address(data_address)
try: # Acquire notification item
with self._lock: with self._lock:
notification_item = self._notification_items[hnotify] notification_item = self._notification_items.get(hnotify)
except KeyError:
if not notification_item:
_LOGGER.error("Unknown device notification handle: %d", hnotify) _LOGGER.error("Unknown device notification handle: %d", hnotify)
return return
# Parse data to desired datatype # Data parsing based on PLC data type
if notification_item.plc_datatype == pyads.PLCTYPE_BOOL: plc_datatype = notification_item.plc_datatype
unpack_formats = {
pyads.PLCTYPE_BYTE: "<b",
pyads.PLCTYPE_INT: "<h",
pyads.PLCTYPE_UINT: "<H",
pyads.PLCTYPE_SINT: "<b",
pyads.PLCTYPE_USINT: "<B",
pyads.PLCTYPE_DINT: "<i",
pyads.PLCTYPE_UDINT: "<I",
pyads.PLCTYPE_WORD: "<H",
pyads.PLCTYPE_DWORD: "<I",
pyads.PLCTYPE_LREAL: "<d",
pyads.PLCTYPE_REAL: "<f",
pyads.PLCTYPE_TOD: "<i", # Treat as DINT
pyads.PLCTYPE_DATE: "<i", # Treat as DINT
pyads.PLCTYPE_DT: "<i", # Treat as DINT
pyads.PLCTYPE_TIME: "<i", # Treat as DINT
}
if plc_datatype == pyads.PLCTYPE_BOOL:
value = bool(struct.unpack("<?", bytearray(data))[0]) value = bool(struct.unpack("<?", bytearray(data))[0])
elif notification_item.plc_datatype == pyads.PLCTYPE_INT: elif plc_datatype == pyads.PLCTYPE_STRING:
value = struct.unpack("<h", bytearray(data))[0] value = (
elif notification_item.plc_datatype == pyads.PLCTYPE_BYTE: bytearray(data).split(b"\x00", 1)[0].decode("utf-8", errors="ignore")
value = struct.unpack("<B", bytearray(data))[0] )
elif notification_item.plc_datatype == pyads.PLCTYPE_UINT: elif plc_datatype in unpack_formats:
value = struct.unpack("<H", bytearray(data))[0] value = struct.unpack(unpack_formats[plc_datatype], bytearray(data))[0]
elif notification_item.plc_datatype == pyads.PLCTYPE_DINT:
value = struct.unpack("<i", bytearray(data))[0]
elif notification_item.plc_datatype == pyads.PLCTYPE_UDINT:
value = struct.unpack("<I", bytearray(data))[0]
else: else:
value = bytearray(data) value = bytearray(data)
_LOGGER.warning("No callback available for this datatype") _LOGGER.warning("No callback available for this datatype")