mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 21:27:38 +00:00
Revert "Add bytes support for bitwise template operations" (#60854)
This commit is contained in:
parent
a64ff2ae27
commit
9f4a99fe81
@ -52,12 +52,7 @@ from homeassistant.helpers import (
|
|||||||
)
|
)
|
||||||
from homeassistant.helpers.typing import TemplateVarsType
|
from homeassistant.helpers.typing import TemplateVarsType
|
||||||
from homeassistant.loader import bind_hass
|
from homeassistant.loader import bind_hass
|
||||||
from homeassistant.util import (
|
from homeassistant.util import convert, dt as dt_util, location as loc_util
|
||||||
convert,
|
|
||||||
convert_to_int,
|
|
||||||
dt as dt_util,
|
|
||||||
location as loc_util,
|
|
||||||
)
|
|
||||||
from homeassistant.util.async_ import run_callback_threadsafe
|
from homeassistant.util.async_ import run_callback_threadsafe
|
||||||
from homeassistant.util.thread import ThreadWithException
|
from homeassistant.util.thread import ThreadWithException
|
||||||
|
|
||||||
@ -1634,18 +1629,14 @@ def regex_findall(value, find="", ignorecase=False):
|
|||||||
return re.findall(find, value, flags)
|
return re.findall(find, value, flags)
|
||||||
|
|
||||||
|
|
||||||
def bitwise_and(first_value, second_value, little_endian=False):
|
def bitwise_and(first_value, second_value):
|
||||||
"""Perform a bitwise and operation."""
|
"""Perform a bitwise and operation."""
|
||||||
return convert_to_int(first_value, little_endian=little_endian) & convert_to_int(
|
return first_value & second_value
|
||||||
second_value, little_endian=little_endian
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def bitwise_or(first_value, second_value, little_endian=False):
|
def bitwise_or(first_value, second_value):
|
||||||
"""Perform a bitwise or operation."""
|
"""Perform a bitwise or operation."""
|
||||||
return convert_to_int(first_value, little_endian=little_endian) | convert_to_int(
|
return first_value | second_value
|
||||||
second_value, little_endian=little_endian
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def base64_encode(value):
|
def base64_encode(value):
|
||||||
|
@ -76,31 +76,6 @@ def convert(
|
|||||||
return default
|
return default
|
||||||
|
|
||||||
|
|
||||||
def convert_to_int(
|
|
||||||
value: Any, default: int | None = None, little_endian: bool = False
|
|
||||||
) -> int | None:
|
|
||||||
"""Convert value or bytes to int, returns default if fails.
|
|
||||||
|
|
||||||
This supports bitwise integer operations on `bytes` objects.
|
|
||||||
By default the conversion is in Big-endian style (The last byte contains the least significant bit).
|
|
||||||
In Little-endian style the first byte contains the least significant bit.
|
|
||||||
"""
|
|
||||||
if isinstance(value, int):
|
|
||||||
return value
|
|
||||||
if isinstance(value, bytes) and value:
|
|
||||||
bytes_value = bytearray(value)
|
|
||||||
return_value = 0
|
|
||||||
while len(bytes_value):
|
|
||||||
return_value <<= 8
|
|
||||||
if little_endian:
|
|
||||||
return_value |= bytes_value.pop(len(bytes_value) - 1)
|
|
||||||
else:
|
|
||||||
return_value |= bytes_value.pop(0)
|
|
||||||
|
|
||||||
return return_value
|
|
||||||
return convert(value, int, default=default)
|
|
||||||
|
|
||||||
|
|
||||||
def ensure_unique_string(
|
def ensure_unique_string(
|
||||||
preferred_string: str, current_strings: Iterable[str] | KeysView[str]
|
preferred_string: str, current_strings: Iterable[str] | KeysView[str]
|
||||||
) -> str:
|
) -> str:
|
||||||
|
@ -373,39 +373,6 @@ async def test_setting_sensor_value_via_mqtt_message_and_template2(
|
|||||||
assert "template output: 'ILLEGAL'" in caplog.text
|
assert "template output: 'ILLEGAL'" in caplog.text
|
||||||
|
|
||||||
|
|
||||||
async def test_setting_sensor_value_via_mqtt_message_and_template_and_raw_state_encoding(
|
|
||||||
hass, mqtt_mock, caplog
|
|
||||||
):
|
|
||||||
"""Test processing a raw value via MQTT."""
|
|
||||||
assert await async_setup_component(
|
|
||||||
hass,
|
|
||||||
binary_sensor.DOMAIN,
|
|
||||||
{
|
|
||||||
binary_sensor.DOMAIN: {
|
|
||||||
"platform": "mqtt",
|
|
||||||
"name": "test",
|
|
||||||
"encoding": "",
|
|
||||||
"state_topic": "test-topic",
|
|
||||||
"payload_on": "ON",
|
|
||||||
"payload_off": "OFF",
|
|
||||||
"value_template": "{%if value|bitwise_and(1)-%}ON{%else%}OFF{%-endif-%}",
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
state = hass.states.get("binary_sensor.test")
|
|
||||||
assert state.state == STATE_OFF
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", b"\x01")
|
|
||||||
state = hass.states.get("binary_sensor.test")
|
|
||||||
assert state.state == STATE_ON
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", b"\x00")
|
|
||||||
state = hass.states.get("binary_sensor.test")
|
|
||||||
assert state.state == STATE_OFF
|
|
||||||
|
|
||||||
|
|
||||||
async def test_setting_sensor_value_via_mqtt_message_empty_template(
|
async def test_setting_sensor_value_via_mqtt_message_empty_template(
|
||||||
hass, mqtt_mock, caplog
|
hass, mqtt_mock, caplog
|
||||||
):
|
):
|
||||||
|
@ -737,100 +737,6 @@ async def test_discovery_expansion_3(hass, mqtt_mock, caplog):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def test_discovery_expansion_without_encoding_and_value_template_1(
|
|
||||||
hass, mqtt_mock, caplog
|
|
||||||
):
|
|
||||||
"""Test expansion of raw availability payload with a template as list."""
|
|
||||||
data = (
|
|
||||||
'{ "~": "some/base/topic",'
|
|
||||||
' "name": "DiscoveryExpansionTest1",'
|
|
||||||
' "stat_t": "test_topic/~",'
|
|
||||||
' "cmd_t": "~/test_topic",'
|
|
||||||
' "encoding":"",'
|
|
||||||
' "availability": [{'
|
|
||||||
' "topic":"~/avail_item1",'
|
|
||||||
' "payload_available": "1",'
|
|
||||||
' "payload_not_available": "0",'
|
|
||||||
' "value_template":"{{ value | bitwise_and(1) }}"'
|
|
||||||
" }],"
|
|
||||||
' "dev":{'
|
|
||||||
' "ids":["5706DF"],'
|
|
||||||
' "name":"DiscoveryExpansionTest1 Device",'
|
|
||||||
' "mdl":"Generic",'
|
|
||||||
' "sw":"1.2.3.4",'
|
|
||||||
' "mf":"None",'
|
|
||||||
' "sa":"default_area"'
|
|
||||||
" }"
|
|
||||||
"}"
|
|
||||||
)
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "homeassistant/switch/bla/config", data)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
state = hass.states.get("switch.DiscoveryExpansionTest1")
|
|
||||||
assert state.state == STATE_UNAVAILABLE
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "some/base/topic/avail_item1", b"\x01")
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
state = hass.states.get("switch.DiscoveryExpansionTest1")
|
|
||||||
assert state is not None
|
|
||||||
assert state.name == "DiscoveryExpansionTest1"
|
|
||||||
assert ("switch", "bla") in hass.data[ALREADY_DISCOVERED]
|
|
||||||
assert state.state == STATE_OFF
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "some/base/topic/avail_item1", b"\x00")
|
|
||||||
|
|
||||||
state = hass.states.get("switch.DiscoveryExpansionTest1")
|
|
||||||
assert state.state == STATE_UNAVAILABLE
|
|
||||||
|
|
||||||
|
|
||||||
async def test_discovery_expansion_without_encoding_and_value_template_2(
|
|
||||||
hass, mqtt_mock, caplog
|
|
||||||
):
|
|
||||||
"""Test expansion of raw availability payload with a template directly."""
|
|
||||||
data = (
|
|
||||||
'{ "~": "some/base/topic",'
|
|
||||||
' "name": "DiscoveryExpansionTest1",'
|
|
||||||
' "stat_t": "test_topic/~",'
|
|
||||||
' "cmd_t": "~/test_topic",'
|
|
||||||
' "availability_topic":"~/avail_item1",'
|
|
||||||
' "payload_available": "1",'
|
|
||||||
' "payload_not_available": "0",'
|
|
||||||
' "encoding":"",'
|
|
||||||
' "availability_template":"{{ value | bitwise_and(1) }}",'
|
|
||||||
' "dev":{'
|
|
||||||
' "ids":["5706DF"],'
|
|
||||||
' "name":"DiscoveryExpansionTest1 Device",'
|
|
||||||
' "mdl":"Generic",'
|
|
||||||
' "sw":"1.2.3.4",'
|
|
||||||
' "mf":"None",'
|
|
||||||
' "sa":"default_area"'
|
|
||||||
" }"
|
|
||||||
"}"
|
|
||||||
)
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "homeassistant/switch/bla/config", data)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
state = hass.states.get("switch.DiscoveryExpansionTest1")
|
|
||||||
assert state.state == STATE_UNAVAILABLE
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "some/base/topic/avail_item1", b"\x01")
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
state = hass.states.get("switch.DiscoveryExpansionTest1")
|
|
||||||
assert state is not None
|
|
||||||
assert state.name == "DiscoveryExpansionTest1"
|
|
||||||
assert ("switch", "bla") in hass.data[ALREADY_DISCOVERED]
|
|
||||||
assert state.state == STATE_OFF
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "some/base/topic/avail_item1", b"\x00")
|
|
||||||
|
|
||||||
state = hass.states.get("switch.DiscoveryExpansionTest1")
|
|
||||||
assert state.state == STATE_UNAVAILABLE
|
|
||||||
|
|
||||||
|
|
||||||
ABBREVIATIONS_WHITE_LIST = [
|
ABBREVIATIONS_WHITE_LIST = [
|
||||||
# MQTT client/server/trigger settings
|
# MQTT client/server/trigger settings
|
||||||
"CONF_BIRTH_MESSAGE",
|
"CONF_BIRTH_MESSAGE",
|
||||||
|
@ -893,35 +893,6 @@ async def test_entity_category(hass, mqtt_mock):
|
|||||||
await help_test_entity_category(hass, mqtt_mock, sensor.DOMAIN, DEFAULT_CONFIG)
|
await help_test_entity_category(hass, mqtt_mock, sensor.DOMAIN, DEFAULT_CONFIG)
|
||||||
|
|
||||||
|
|
||||||
async def test_value_template_with_raw_data(hass, mqtt_mock):
|
|
||||||
"""Test processing a raw value via MQTT."""
|
|
||||||
assert await async_setup_component(
|
|
||||||
hass,
|
|
||||||
sensor.DOMAIN,
|
|
||||||
{
|
|
||||||
sensor.DOMAIN: {
|
|
||||||
"platform": "mqtt",
|
|
||||||
"name": "test",
|
|
||||||
"encoding": "",
|
|
||||||
"state_topic": "test-topic",
|
|
||||||
"unit_of_measurement": "fav unit",
|
|
||||||
"value_template": "{{ value | bitwise_and(255) }}",
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
await hass.async_block_till_done()
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", b"\xff")
|
|
||||||
state = hass.states.get("sensor.test")
|
|
||||||
|
|
||||||
assert state.state == "255"
|
|
||||||
|
|
||||||
async_fire_mqtt_message(hass, "test-topic", b"\x01\x10")
|
|
||||||
state = hass.states.get("sensor.test")
|
|
||||||
|
|
||||||
assert state.state == "16"
|
|
||||||
|
|
||||||
|
|
||||||
async def test_value_template_with_entity_id(hass, mqtt_mock):
|
async def test_value_template_with_entity_id(hass, mqtt_mock):
|
||||||
"""Test the access to attributes in value_template via the entity_id."""
|
"""Test the access to attributes in value_template via the entity_id."""
|
||||||
assert await async_setup_component(
|
assert await async_setup_component(
|
||||||
|
@ -1479,24 +1479,6 @@ def test_bitwise_and(hass):
|
|||||||
)
|
)
|
||||||
assert tpl.async_render() == 8 & 2
|
assert tpl.async_render() == 8 & 2
|
||||||
|
|
||||||
tpl = template.Template(
|
|
||||||
"""
|
|
||||||
{{ ( value_a ) | bitwise_and(value_b) }}
|
|
||||||
""",
|
|
||||||
hass,
|
|
||||||
)
|
|
||||||
variables = {"value_a": b"\x9b\xc2", "value_b": 0xFF00}
|
|
||||||
assert tpl.async_render(variables=variables) == 0x9B00
|
|
||||||
|
|
||||||
tpl = template.Template(
|
|
||||||
"""
|
|
||||||
{{ ( value_a ) | bitwise_and(value_b, little_endian=True) }}
|
|
||||||
""",
|
|
||||||
hass,
|
|
||||||
)
|
|
||||||
variables = {"value_a": b"\xc2\x9b", "value_b": 0xFFFF}
|
|
||||||
assert tpl.async_render(variables=variables) == 0x9BC2
|
|
||||||
|
|
||||||
|
|
||||||
def test_bitwise_or(hass):
|
def test_bitwise_or(hass):
|
||||||
"""Test bitwise_or method."""
|
"""Test bitwise_or method."""
|
||||||
@ -1522,54 +1504,6 @@ def test_bitwise_or(hass):
|
|||||||
)
|
)
|
||||||
assert tpl.async_render() == 8 | 2
|
assert tpl.async_render() == 8 | 2
|
||||||
|
|
||||||
tpl = template.Template(
|
|
||||||
"""
|
|
||||||
{{ value_a | bitwise_or(value_b) }}
|
|
||||||
""",
|
|
||||||
hass,
|
|
||||||
)
|
|
||||||
variables = {
|
|
||||||
"value_a": b"\xc2\x9b",
|
|
||||||
"value_b": 0xFFFF,
|
|
||||||
}
|
|
||||||
assert tpl.async_render(variables=variables) == 65535 # 39874
|
|
||||||
|
|
||||||
tpl = template.Template(
|
|
||||||
"""
|
|
||||||
{{ ( value_a ) | bitwise_or(value_b) }}
|
|
||||||
""",
|
|
||||||
hass,
|
|
||||||
)
|
|
||||||
variables = {
|
|
||||||
"value_a": 0xFF00,
|
|
||||||
"value_b": b"\xc2\x9b",
|
|
||||||
}
|
|
||||||
assert tpl.async_render(variables=variables) == 0xFF9B
|
|
||||||
|
|
||||||
tpl = template.Template(
|
|
||||||
"""
|
|
||||||
{{ ( value_a ) | bitwise_or(value_b) }}
|
|
||||||
""",
|
|
||||||
hass,
|
|
||||||
)
|
|
||||||
variables = {
|
|
||||||
"value_a": b"\xc2\x9b",
|
|
||||||
"value_b": 0x0000,
|
|
||||||
}
|
|
||||||
assert tpl.async_render(variables=variables) == 0xC29B
|
|
||||||
|
|
||||||
tpl = template.Template(
|
|
||||||
"""
|
|
||||||
{{ ( value_a ) | bitwise_or(value_b, little_endian=True) }}
|
|
||||||
""",
|
|
||||||
hass,
|
|
||||||
)
|
|
||||||
variables = {
|
|
||||||
"value_a": b"\xc2\x9b",
|
|
||||||
"value_b": 0,
|
|
||||||
}
|
|
||||||
assert tpl.async_render(variables=variables) == 0x9BC2
|
|
||||||
|
|
||||||
|
|
||||||
def test_distance_function_with_1_state(hass):
|
def test_distance_function_with_1_state(hass):
|
||||||
"""Test distance function with 1 state."""
|
"""Test distance function with 1 state."""
|
||||||
|
@ -82,22 +82,6 @@ def test_convert():
|
|||||||
assert util.convert(object, int, 1) == 1
|
assert util.convert(object, int, 1) == 1
|
||||||
|
|
||||||
|
|
||||||
def test_convert_to_int():
|
|
||||||
"""Test convert of bytes and numbers to int."""
|
|
||||||
assert util.convert_to_int(b"\x9b\xc2") == 39874
|
|
||||||
assert util.convert_to_int(b"") is None
|
|
||||||
assert util.convert_to_int(b"\x9b\xc2", 10) == 39874
|
|
||||||
assert util.convert_to_int(b"\xc2\x9b", little_endian=True) == 39874
|
|
||||||
assert util.convert_to_int(b"\xc2\x9b", 10, little_endian=True) == 39874
|
|
||||||
assert util.convert_to_int("abc", 10) == 10
|
|
||||||
assert util.convert_to_int("11.0", 10) == 10
|
|
||||||
assert util.convert_to_int("12", 10) == 12
|
|
||||||
assert util.convert_to_int("\xc2\x9b", 10) == 10
|
|
||||||
assert util.convert_to_int(None, 10) == 10
|
|
||||||
assert util.convert_to_int(None) is None
|
|
||||||
assert util.convert_to_int("NOT A NUMBER", 1) == 1
|
|
||||||
|
|
||||||
|
|
||||||
def test_ensure_unique_string():
|
def test_ensure_unique_string():
|
||||||
"""Test ensure_unique_string."""
|
"""Test ensure_unique_string."""
|
||||||
assert util.ensure_unique_string("Beer", ["Beer", "Beer_2"]) == "Beer_3"
|
assert util.ensure_unique_string("Beer", ["Beer", "Beer_2"]) == "Beer_3"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user