mirror of
https://github.com/home-assistant/core.git
synced 2025-04-24 17:27:52 +00:00
Use assignment expressions 30 (#58714)
This commit is contained in:
parent
7063c05127
commit
84618fa831
@ -125,8 +125,7 @@ class ArubaDeviceScanner(DeviceScanner):
|
||||
|
||||
devices = {}
|
||||
for device in devices_result:
|
||||
match = _DEVICES_REGEX.search(device.decode("utf-8"))
|
||||
if match:
|
||||
if match := _DEVICES_REGEX.search(device.decode("utf-8")):
|
||||
devices[match.group("ip")] = {
|
||||
"ip": match.group("ip"),
|
||||
"mac": match.group("mac").upper(),
|
||||
|
@ -156,9 +156,7 @@ def entities_in_automation(hass: HomeAssistant, entity_id: str) -> list[str]:
|
||||
|
||||
component = hass.data[DOMAIN]
|
||||
|
||||
automation_entity = component.get_entity(entity_id)
|
||||
|
||||
if automation_entity is None:
|
||||
if (automation_entity := component.get_entity(entity_id)) is None:
|
||||
return []
|
||||
|
||||
return list(automation_entity.referenced_entities)
|
||||
@ -187,9 +185,7 @@ def devices_in_automation(hass: HomeAssistant, entity_id: str) -> list[str]:
|
||||
|
||||
component = hass.data[DOMAIN]
|
||||
|
||||
automation_entity = component.get_entity(entity_id)
|
||||
|
||||
if automation_entity is None:
|
||||
if (automation_entity := component.get_entity(entity_id)) is None:
|
||||
return []
|
||||
|
||||
return list(automation_entity.referenced_devices)
|
||||
@ -218,9 +214,7 @@ def areas_in_automation(hass: HomeAssistant, entity_id: str) -> list[str]:
|
||||
|
||||
component = hass.data[DOMAIN]
|
||||
|
||||
automation_entity = component.get_entity(entity_id)
|
||||
|
||||
if automation_entity is None:
|
||||
if (automation_entity := component.get_entity(entity_id)) is None:
|
||||
return []
|
||||
|
||||
return list(automation_entity.referenced_areas)
|
||||
|
@ -180,8 +180,7 @@ class BondLight(BondBaseLight, BondEntity, LightEntity):
|
||||
|
||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn on the light."""
|
||||
brightness = kwargs.get(ATTR_BRIGHTNESS)
|
||||
if brightness:
|
||||
if brightness := kwargs.get(ATTR_BRIGHTNESS):
|
||||
await self._hub.bond.action(
|
||||
self._device.device_id,
|
||||
Action.set_brightness(round((brightness * 100) / 255)),
|
||||
|
@ -275,9 +275,7 @@ def _get_camera_from_entity_id(hass: HomeAssistant, entity_id: str) -> Camera:
|
||||
if (component := hass.data.get(DOMAIN)) is None:
|
||||
raise HomeAssistantError("Camera integration not set up")
|
||||
|
||||
camera = component.get_entity(entity_id)
|
||||
|
||||
if camera is None:
|
||||
if (camera := component.get_entity(entity_id)) is None:
|
||||
raise HomeAssistantError("Camera not found")
|
||||
|
||||
if not camera.is_on:
|
||||
@ -596,9 +594,7 @@ class CameraView(HomeAssistantView):
|
||||
|
||||
async def get(self, request: web.Request, entity_id: str) -> web.StreamResponse:
|
||||
"""Start a GET request."""
|
||||
camera = self.component.get_entity(entity_id)
|
||||
|
||||
if camera is None:
|
||||
if (camera := self.component.get_entity(entity_id)) is None:
|
||||
raise web.HTTPNotFound()
|
||||
|
||||
camera = cast(Camera, camera)
|
||||
|
@ -81,8 +81,7 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
LOGGER.debug("async_step_user: user_input: %s", user_input)
|
||||
|
||||
if user_input is not None:
|
||||
host = user_input.get(CONF_HOST)
|
||||
if not host:
|
||||
if not (host := user_input.get(CONF_HOST)):
|
||||
# No device chosen, user might want to directly enter an URL
|
||||
return await self.async_step_manual()
|
||||
# User has chosen a device, ask for confirmation
|
||||
|
@ -613,8 +613,7 @@ class DlnaDmrEntity(MediaPlayerEntity):
|
||||
metadata: dict[str, Any] = extra.get("metadata") or {}
|
||||
|
||||
title = extra.get("title") or metadata.get("title") or "Home Assistant"
|
||||
thumb = extra.get("thumb")
|
||||
if thumb:
|
||||
if thumb := extra.get("thumb"):
|
||||
metadata["album_art_uri"] = thumb
|
||||
|
||||
# Translate metadata keys from HA names to DIDL-Lite names
|
||||
@ -666,8 +665,7 @@ class DlnaDmrEntity(MediaPlayerEntity):
|
||||
if not self._device:
|
||||
return None
|
||||
|
||||
play_mode = self._device.play_mode
|
||||
if not play_mode:
|
||||
if not (play_mode := self._device.play_mode):
|
||||
return None
|
||||
|
||||
if play_mode == PlayMode.VENDOR_DEFINED:
|
||||
@ -700,8 +698,7 @@ class DlnaDmrEntity(MediaPlayerEntity):
|
||||
if not self._device:
|
||||
return None
|
||||
|
||||
play_mode = self._device.play_mode
|
||||
if not play_mode:
|
||||
if not (play_mode := self._device.play_mode):
|
||||
return None
|
||||
|
||||
if play_mode == PlayMode.VENDOR_DEFINED:
|
||||
|
@ -182,9 +182,7 @@ class HangoutsBot:
|
||||
"""Detect a matching intent."""
|
||||
for intent_type, data in intents.items():
|
||||
for matcher in data.get(CONF_MATCHERS, []):
|
||||
match = matcher.match(text)
|
||||
|
||||
if not match:
|
||||
if not (match := matcher.match(text)):
|
||||
continue
|
||||
if intent_type == INTENT_HELP:
|
||||
return await self.hass.helpers.intent.async_handle(
|
||||
|
@ -75,8 +75,7 @@ def ensure_pin_format(pin, allow_insecure_setup_codes=None):
|
||||
|
||||
If incorrect code is entered, an exception is raised.
|
||||
"""
|
||||
match = PIN_FORMAT.search(pin.strip())
|
||||
if not match:
|
||||
if not (match := PIN_FORMAT.search(pin.strip())):
|
||||
raise aiohomekit.exceptions.MalformedPinError(f"Invalid PIN code f{pin}")
|
||||
pin_without_dashes = "".join(match.groups())
|
||||
if not allow_insecure_setup_codes and pin_without_dashes in INSECURE_CODES:
|
||||
|
@ -304,10 +304,8 @@ async def async_update_device_config(
|
||||
device_connection: DeviceConnectionType, device_config: ConfigType
|
||||
) -> None:
|
||||
"""Fill missing values in device_config with infos from LCN bus."""
|
||||
is_group = device_config[CONF_ADDRESS][2]
|
||||
|
||||
# fetch serial info if device is module
|
||||
if not is_group: # is module
|
||||
if not (is_group := device_config[CONF_ADDRESS][2]): # is module
|
||||
await device_connection.serial_known
|
||||
if device_config[CONF_HARDWARE_SERIAL] == -1:
|
||||
device_config[CONF_HARDWARE_SERIAL] = device_connection.hardware_serial
|
||||
|
@ -193,9 +193,7 @@ class LogbookView(HomeAssistantView):
|
||||
async def get(self, request, datetime=None):
|
||||
"""Retrieve logbook entries."""
|
||||
if datetime:
|
||||
datetime = dt_util.parse_datetime(datetime)
|
||||
|
||||
if datetime is None:
|
||||
if (datetime := dt_util.parse_datetime(datetime)) is None:
|
||||
return self.json_message("Invalid datetime", HTTPStatus.BAD_REQUEST)
|
||||
else:
|
||||
datetime = dt_util.start_of_local_day()
|
||||
@ -219,8 +217,7 @@ class LogbookView(HomeAssistantView):
|
||||
end_day = start_day + timedelta(days=period)
|
||||
else:
|
||||
start_day = datetime
|
||||
end_day = dt_util.parse_datetime(end_time)
|
||||
if end_day is None:
|
||||
if (end_day := dt_util.parse_datetime(end_time)) is None:
|
||||
return self.json_message("Invalid end_time", HTTPStatus.BAD_REQUEST)
|
||||
|
||||
hass = request.app["hass"]
|
||||
|
@ -1026,8 +1026,7 @@ class MediaPlayerImageView(HomeAssistantView):
|
||||
media_content_id: str | None = None,
|
||||
) -> web.Response:
|
||||
"""Start a get request."""
|
||||
player = self.component.get_entity(entity_id)
|
||||
if player is None:
|
||||
if (player := self.component.get_entity(entity_id)) is None:
|
||||
status = (
|
||||
HTTPStatus.NOT_FOUND
|
||||
if request[KEY_AUTHENTICATED]
|
||||
@ -1071,9 +1070,8 @@ async def websocket_handle_thumbnail(hass, connection, msg):
|
||||
Async friendly.
|
||||
"""
|
||||
component = hass.data[DOMAIN]
|
||||
player = component.get_entity(msg["entity_id"])
|
||||
|
||||
if player is None:
|
||||
if (player := component.get_entity(msg["entity_id"])) is None:
|
||||
connection.send_message(
|
||||
websocket_api.error_message(msg["id"], ERR_NOT_FOUND, "Entity not found")
|
||||
)
|
||||
|
@ -93,9 +93,7 @@ class MediaSourceItem:
|
||||
@classmethod
|
||||
def from_uri(cls, hass: HomeAssistant, uri: str) -> MediaSourceItem:
|
||||
"""Create an item from a uri."""
|
||||
match = URI_SCHEME_REGEX.match(uri)
|
||||
|
||||
if not match:
|
||||
if not (match := URI_SCHEME_REGEX.match(uri)):
|
||||
raise ValueError("Invalid media source URI")
|
||||
|
||||
domain = match.group("domain")
|
||||
|
@ -130,8 +130,7 @@ class MelCloudDevice:
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return a device description for device registry."""
|
||||
model = None
|
||||
unit_infos = self.device.units
|
||||
if unit_infos is not None:
|
||||
if (unit_infos := self.device.units) is not None:
|
||||
model = ", ".join([x["model"] for x in unit_infos if x["model"]])
|
||||
return DeviceInfo(
|
||||
connections={(CONNECTION_NETWORK_MAC, self.device.mac)},
|
||||
|
@ -68,8 +68,7 @@ class OctoPrintBinarySensorBase(CoordinatorEntity, BinarySensorEntity):
|
||||
@property
|
||||
def is_on(self):
|
||||
"""Return true if binary sensor is on."""
|
||||
printer = self.coordinator.data["printer"]
|
||||
if not printer:
|
||||
if not (printer := self.coordinator.data["printer"]):
|
||||
return None
|
||||
|
||||
return bool(self._get_flag_state(printer))
|
||||
|
@ -130,8 +130,7 @@ class OctoPrintJobPercentageSensor(OctoPrintSensorBase):
|
||||
if not job:
|
||||
return None
|
||||
|
||||
state = job.progress.completion
|
||||
if not state:
|
||||
if not (state := job.progress.completion):
|
||||
return 0
|
||||
|
||||
return round(state, 2)
|
||||
|
@ -274,8 +274,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
||||
|
||||
def _update_current_station(self, response):
|
||||
"""Update current station."""
|
||||
station_match = re.search(STATION_PATTERN, response)
|
||||
if station_match:
|
||||
if station_match := re.search(STATION_PATTERN, response):
|
||||
self._station = station_match.group(1)
|
||||
_LOGGER.debug("Got station as: %s", self._station)
|
||||
else:
|
||||
@ -283,8 +282,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
||||
|
||||
def _update_current_song(self, response):
|
||||
"""Update info about current song."""
|
||||
song_match = re.search(CURRENT_SONG_PATTERN, response)
|
||||
if song_match:
|
||||
if song_match := re.search(CURRENT_SONG_PATTERN, response):
|
||||
(
|
||||
self._media_title,
|
||||
self._media_artist,
|
||||
@ -343,8 +341,7 @@ class PandoraMediaPlayer(MediaPlayerEntity):
|
||||
_LOGGER.debug("Getting stations: %s", station_lines)
|
||||
self._stations = []
|
||||
for line in station_lines.split("\r\n"):
|
||||
match = re.search(r"\d+\).....(.+)", line)
|
||||
if match:
|
||||
if match := re.search(r"\d+\).....(.+)", line):
|
||||
station = match.group(1).strip()
|
||||
_LOGGER.debug("Found station %s", station)
|
||||
self._stations.append(station)
|
||||
|
@ -620,8 +620,7 @@ def list_statistic_ids(
|
||||
platform_statistic_ids = platform.list_statistic_ids(hass, statistic_type)
|
||||
|
||||
for statistic_id, info in platform_statistic_ids.items():
|
||||
unit = info["unit_of_measurement"]
|
||||
if unit is not None:
|
||||
if (unit := info["unit_of_measurement"]) is not None:
|
||||
# Display unit according to user settings
|
||||
unit = _configured_unit(unit, units)
|
||||
platform_statistic_ids[statistic_id]["unit_of_measurement"] = unit
|
||||
|
@ -254,8 +254,7 @@ class ToggleRflinkLight(SwitchableRflinkDevice, LightEntity):
|
||||
"""Adjust state if Rflink picks up a remote command for this device."""
|
||||
self.cancel_queued_send_commands()
|
||||
|
||||
command = event["command"]
|
||||
if command == "on":
|
||||
if event["command"] == "on":
|
||||
# if the state is unknown or false, it gets set as true
|
||||
# if the state is true, it gets set as false
|
||||
self._state = self._state in [None, False]
|
||||
|
@ -81,8 +81,7 @@ def _figure_out_source(record, call_stack, hass):
|
||||
for pathname in reversed(stack):
|
||||
|
||||
# Try to match with a file within Home Assistant
|
||||
match = re.match(paths_re, pathname[0])
|
||||
if match:
|
||||
if match := re.match(paths_re, pathname[0]):
|
||||
return [match.group(1), pathname[1]]
|
||||
# Ok, we don't know what this is
|
||||
return (record.pathname, record.lineno)
|
||||
|
@ -398,8 +398,7 @@ class TadoClimate(TadoZoneEntity, ClimateEntity):
|
||||
|
||||
def set_temperature(self, **kwargs):
|
||||
"""Set new target temperature."""
|
||||
temperature = kwargs.get(ATTR_TEMPERATURE)
|
||||
if temperature is None:
|
||||
if (temperature := kwargs.get(ATTR_TEMPERATURE)) is None:
|
||||
return
|
||||
|
||||
if self._current_tado_hvac_mode not in (
|
||||
|
@ -110,8 +110,7 @@ class ThomsonDeviceScanner(DeviceScanner):
|
||||
|
||||
devices = {}
|
||||
for device in devices_result:
|
||||
match = _DEVICES_REGEX.search(device.decode("utf-8"))
|
||||
if match:
|
||||
if match := _DEVICES_REGEX.search(device.decode("utf-8")):
|
||||
devices[match.group("ip")] = {
|
||||
"ip": match.group("ip"),
|
||||
"mac": match.group("mac").upper(),
|
||||
|
@ -233,8 +233,7 @@ def _parse_due_date(data: dict, gmt_string) -> datetime | None:
|
||||
# Add time information to date only strings.
|
||||
if len(data["date"]) == 10:
|
||||
return datetime.fromisoformat(data["date"]).replace(tzinfo=dt.UTC)
|
||||
nowtime = dt.parse_datetime(data["date"])
|
||||
if not nowtime:
|
||||
if not (nowtime := dt.parse_datetime(data["date"])):
|
||||
return None
|
||||
if nowtime.tzinfo is None:
|
||||
data["date"] += gmt_string
|
||||
|
@ -342,9 +342,7 @@ class MusicCastMediaPlayer(MusicCastDeviceEntity, MediaPlayerEntity):
|
||||
parts = media_id.split(":")
|
||||
|
||||
if parts[0] == "list":
|
||||
index = parts[3]
|
||||
|
||||
if index == "-1":
|
||||
if (index := parts[3]) == "-1":
|
||||
index = "0"
|
||||
|
||||
await self.coordinator.musiccast.play_list_media(index, self._zone_id)
|
||||
|
@ -469,9 +469,7 @@ def info_from_service(service: AsyncServiceInfo) -> HaServiceInfo | None:
|
||||
if isinstance(value, bytes):
|
||||
properties[key] = value.decode("utf-8")
|
||||
|
||||
addresses = service.addresses
|
||||
|
||||
if not addresses:
|
||||
if not (addresses := service.addresses):
|
||||
return None
|
||||
if (host := _first_non_link_local_or_v6_address(addresses)) is None:
|
||||
return None
|
||||
|
Loading…
x
Reference in New Issue
Block a user