diff --git a/supervisor/utils/gdbus.py b/supervisor/utils/gdbus.py index 84e4c18f2..89d0b48ad 100644 --- a/supervisor/utils/gdbus.py +++ b/supervisor/utils/gdbus.py @@ -33,12 +33,20 @@ RE_GVARIANT_STRING_ESC: re.Pattern[Any] = re.compile( RE_GVARIANT_STRING: re.Pattern[Any] = re.compile( r"(?<=(?: |{|\[|\(|<))'(.*?)'(?=(?:|]|}|,|\)|>))" ) -RE_GVARIANT_BINARY: re.Pattern[Any] = re.compile(r"\[byte (.*?)\]") +RE_GVARIANT_BINARY: re.Pattern[Any] = re.compile( + r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|\[byte (.*?)\]" +) +RE_GVARIANT_BINARY_STRING: re.Pattern[Any] = re.compile( + r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|?" +) RE_GVARIANT_TUPLE_O: re.Pattern[Any] = re.compile(r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(\()") RE_GVARIANT_TUPLE_C: re.Pattern[Any] = re.compile( r"\"[^\"\\]*(?:\\.[^\"\\]*)*\"|(,?\))" ) +RE_BIN_STRING_OCT: re.Pattern[Any] = re.compile(r"\\\\(\d{3})") +RE_BIN_STRING_HEX: re.Pattern[Any] = re.compile(r"\\\\x(\d{2})") + RE_MONITOR_OUTPUT: re.Pattern[Any] = re.compile(r".+?: (?P[^ ].+) (?P.*)") # Map GDBus to errors @@ -66,6 +74,13 @@ def _convert_bytes(value: str) -> str: return f"[{', '.join(str(char) for char in data)}]" +def _convert_bytes_string(value: str) -> str: + """Convert bytes to string or byte-array.""" + data = RE_BIN_STRING_OCT.sub(lambda x: chr(int(x.group(1), 8)), value) + data = RE_BIN_STRING_HEX.sub(lambda x: chr(int(f"0x{x.group(1)}", 0)), data) + return f"[{', '.join(str(char) for char in list(char for char in data.encode()))}]" + + class DBus: """DBus handler.""" @@ -120,15 +135,23 @@ class DBus: def parse_gvariant(raw: str) -> Any: """Parse GVariant input to python.""" # Process first string - json_raw = RE_GVARIANT_BINARY.sub( - lambda x: _convert_bytes(x.group(1)), - raw, - ) json_raw = RE_GVARIANT_STRING_ESC.sub( - lambda x: x.group(0).replace('"', '\\"'), json_raw + lambda x: x.group(0).replace('"', '\\"'), raw ) json_raw = RE_GVARIANT_STRING.sub(r'"\1"', json_raw) + # Handle Bytes + json_raw = RE_GVARIANT_BINARY.sub( + lambda x: x.group(0) if not x.group(1) else _convert_bytes(x.group(1)), + json_raw, + ) + json_raw = RE_GVARIANT_BINARY_STRING.sub( + lambda x: x.group(0) + if not x.group(1) + else _convert_bytes_string(x.group(1)), + json_raw, + ) + # Remove complex type handling json_raw: str = RE_GVARIANT_TYPE.sub( lambda x: x.group(0) if not x.group(1) else "", json_raw diff --git a/tests/utils/test_gvariant_parser.py b/tests/utils/test_gvariant_parser.py index 5e9d80ed8..a04087fe5 100644 --- a/tests/utils/test_gvariant_parser.py +++ b/tests/utils/test_gvariant_parser.py @@ -404,6 +404,54 @@ def test_networkmanager_binary_data(): ] +def test_networkmanager_binary_string_data(): + """Test NetworkManager Binary string datastrings.""" + raw = "({'802-11-wireless': {'mac-address-blacklist': <@as []>, 'mac-address': , 'mode': <'infrastructure'>, 'security': <'802-11-wireless-security'>, 'seen-bssids': <['7C:2E:BD:98:1B:06']>, 'ssid': <[byte 0x4e, 0x45, 0x54, 0x54]>}, 'connection': {'id': <'NETT'>, 'interface-name': <'wlan0'>, 'permissions': <@as []>, 'timestamp': , 'type': <'802-11-wireless'>, 'uuid': <'13f9af79-a6e9-4e07-9353-165ad57bf1a8'>}, 'ipv6': {'address-data': <@aa{sv} []>, 'addresses': <@a(ayuay) []>, 'dns': <@aay []>, 'dns-search': <@as []>, 'method': <'auto'>, 'route-data': <@aa{sv} []>, 'routes': <@a(ayuayu) []>}, '802-11-wireless-security': {'auth-alg': <'open'>, 'key-mgmt': <'wpa-psk'>}, 'ipv4': {'address-data': <@aa{sv} []>, 'addresses': <@aau []>, 'dns': <@au []>, 'dns-search': <@as []>, 'method': <'auto'>, 'route-data': <@aa{sv} []>, 'routes': <@aau []>}, 'proxy': {}},)" + + data = DBus.parse_gvariant(raw) + + assert data == [ + { + "802-11-wireless": { + "mac-address": [42, 126, 95, 29, 195, 137], + "mac-address-blacklist": [], + "mode": "infrastructure", + "security": "802-11-wireless-security", + "seen-bssids": ["7C:2E:BD:98:1B:06"], + "ssid": [78, 69, 84, 84], + }, + "802-11-wireless-security": {"auth-alg": "open", "key-mgmt": "wpa-psk"}, + "connection": { + "id": "NETT", + "interface-name": "wlan0", + "permissions": [], + "timestamp": 1598526799, + "type": "802-11-wireless", + "uuid": "13f9af79-a6e9-4e07-9353-165ad57bf1a8", + }, + "ipv4": { + "address-data": [], + "addresses": [], + "dns": [], + "dns-search": [], + "method": "auto", + "route-data": [], + "routes": [], + }, + "ipv6": { + "address-data": [], + "addresses": [], + "dns": [], + "dns-search": [], + "method": "auto", + "route-data": [], + "routes": [], + }, + "proxy": {}, + } + ] + + def test_v6(): """Test IPv6 Property.""" raw = "({'addresses': <[([byte 0x20, 0x01, 0x04, 0x70, 0x79, 0x2d, 0x00, 0x01, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10], uint32 64, [byte 0x20, 0x01, 0x04, 0x70, 0x79, 0x2d, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01])]>, 'dns': <[[byte 0x20, 0x01, 0x04, 0x70, 0x79, 0x2d, 0x00, 0x01, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05]]>})"