Use assignment expressions 11 (#57792)

This commit is contained in:
Marc Mueller 2021-10-17 20:15:48 +02:00 committed by GitHub
parent 238b488642
commit aa7dc78a1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 45 additions and 111 deletions

View File

@ -211,8 +211,7 @@ class HistoryPeriodView(HomeAssistantView):
if start_time > now: if start_time > now:
return self.json([]) return self.json([])
end_time_str = request.query.get("end_time") if end_time_str := request.query.get("end_time"):
if end_time_str:
end_time = dt_util.parse_datetime(end_time_str) end_time = dt_util.parse_datetime(end_time_str)
if end_time: if end_time:
end_time = dt_util.as_utc(end_time) end_time = dt_util.as_utc(end_time)
@ -304,13 +303,11 @@ class HistoryPeriodView(HomeAssistantView):
def sqlalchemy_filter_from_include_exclude_conf(conf): def sqlalchemy_filter_from_include_exclude_conf(conf):
"""Build a sql filter from config.""" """Build a sql filter from config."""
filters = Filters() filters = Filters()
exclude = conf.get(CONF_EXCLUDE) if exclude := conf.get(CONF_EXCLUDE):
if exclude:
filters.excluded_entities = exclude.get(CONF_ENTITIES, []) filters.excluded_entities = exclude.get(CONF_ENTITIES, [])
filters.excluded_domains = exclude.get(CONF_DOMAINS, []) filters.excluded_domains = exclude.get(CONF_DOMAINS, [])
filters.excluded_entity_globs = exclude.get(CONF_ENTITY_GLOBS, []) filters.excluded_entity_globs = exclude.get(CONF_ENTITY_GLOBS, [])
include = conf.get(CONF_INCLUDE) if include := conf.get(CONF_INCLUDE):
if include:
filters.included_entities = include.get(CONF_ENTITIES, []) filters.included_entities = include.get(CONF_ENTITIES, [])
filters.included_domains = include.get(CONF_DOMAINS, []) filters.included_domains = include.get(CONF_DOMAINS, [])
filters.included_entity_globs = include.get(CONF_ENTITY_GLOBS, []) filters.included_entity_globs = include.get(CONF_ENTITY_GLOBS, [])

View File

@ -150,9 +150,7 @@ def entities_in_scene(hass: HomeAssistant, entity_id: str) -> list[str]:
platform = hass.data[DATA_PLATFORM] platform = hass.data[DATA_PLATFORM]
entity = platform.entities.get(entity_id) if (entity := platform.entities.get(entity_id)) is None:
if entity is None:
return [] return []
return list(entity.scene_config.states) return list(entity.scene_config.states)
@ -233,8 +231,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
entities = call.data[CONF_ENTITIES] entities = call.data[CONF_ENTITIES]
for entity_id in snapshot: for entity_id in snapshot:
state = hass.states.get(entity_id) if (state := hass.states.get(entity_id)) is None:
if state is None:
_LOGGER.warning( _LOGGER.warning(
"Entity %s does not exist and therefore cannot be snapshotted", "Entity %s does not exist and therefore cannot be snapshotted",
entity_id, entity_id,
@ -248,8 +245,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
scene_config = SceneConfig(None, call.data[CONF_SCENE_ID], None, entities) scene_config = SceneConfig(None, call.data[CONF_SCENE_ID], None, entities)
entity_id = f"{SCENE_DOMAIN}.{scene_config.name}" entity_id = f"{SCENE_DOMAIN}.{scene_config.name}"
old = platform.entities.get(entity_id) if (old := platform.entities.get(entity_id)) is not None:
if old is not None:
if not old.from_service: if not old.from_service:
_LOGGER.warning("The scene %s already exists", entity_id) _LOGGER.warning("The scene %s already exists", entity_id)
return return
@ -263,10 +259,8 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
def _process_scenes_config(hass, async_add_entities, config): def _process_scenes_config(hass, async_add_entities, config):
"""Process multiple scenes and add them.""" """Process multiple scenes and add them."""
scene_config = config[STATES]
# Check empty list # Check empty list
if not scene_config: if not (scene_config := config[STATES]):
return return
async_add_entities( async_add_entities(

View File

@ -79,13 +79,11 @@ async def async_attach_trigger(hass, config, action, automation_info):
# Check state of entity. If valid, set up a listener. # Check state of entity. If valid, set up a listener.
if new_state.domain == "input_datetime": if new_state.domain == "input_datetime":
has_date = new_state.attributes["has_date"] if has_date := new_state.attributes["has_date"]:
if has_date:
year = new_state.attributes["year"] year = new_state.attributes["year"]
month = new_state.attributes["month"] month = new_state.attributes["month"]
day = new_state.attributes["day"] day = new_state.attributes["day"]
has_time = new_state.attributes["has_time"] if has_time := new_state.attributes["has_time"]:
if has_time:
hour = new_state.attributes["hour"] hour = new_state.attributes["hour"]
minute = new_state.attributes["minute"] minute = new_state.attributes["minute"]
second = new_state.attributes["second"] second = new_state.attributes["second"]

View File

@ -29,9 +29,7 @@ def async_sign_path(
hass: HomeAssistant, refresh_token_id: str, path: str, expiration: timedelta hass: HomeAssistant, refresh_token_id: str, path: str, expiration: timedelta
) -> str: ) -> str:
"""Sign a path for temporary access without auth header.""" """Sign a path for temporary access without auth header."""
secret = hass.data.get(DATA_SIGN_SECRET) if (secret := hass.data.get(DATA_SIGN_SECRET)) is None:
if secret is None:
secret = hass.data[DATA_SIGN_SECRET] = secrets.token_hex() secret = hass.data[DATA_SIGN_SECRET] = secrets.token_hex()
now = dt_util.utcnow() now = dt_util.utcnow()
@ -80,14 +78,10 @@ def setup_auth(hass: HomeAssistant, app: Application) -> None:
async def async_validate_signed_request(request: Request) -> bool: async def async_validate_signed_request(request: Request) -> bool:
"""Validate a signed request.""" """Validate a signed request."""
secret = hass.data.get(DATA_SIGN_SECRET) if (secret := hass.data.get(DATA_SIGN_SECRET)) is None:
if secret is None:
return False return False
signature = request.query.get(SIGN_QUERY_PARAM) if (signature := request.query.get(SIGN_QUERY_PARAM)) is None:
if signature is None:
return False return False
try: try:

View File

@ -40,8 +40,7 @@ class HomeAssistantView:
@staticmethod @staticmethod
def context(request: web.Request) -> Context: def context(request: web.Request) -> Context:
"""Generate a context from a request.""" """Generate a context from a request."""
user = request.get("hass_user") if (user := request.get("hass_user")) is None:
if user is None:
return Context() return Context()
return Context(user_id=user.id) return Context(user_id=user.id)

View File

@ -28,9 +28,7 @@ async def _async_reproduce_states(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce input boolean states.""" """Reproduce input boolean states."""
cur_state = hass.states.get(state.entity_id) if (cur_state := hass.states.get(state.entity_id)) is None:
if cur_state is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -83,8 +83,7 @@ def has_date_or_time(conf):
def valid_initial(conf): def valid_initial(conf):
"""Check the initial value is valid.""" """Check the initial value is valid."""
initial = conf.get(CONF_INITIAL) if not (initial := conf.get(CONF_INITIAL)):
if not initial:
return conf return conf
if conf[CONF_HAS_DATE] and conf[CONF_HAS_TIME]: if conf[CONF_HAS_DATE] and conf[CONF_HAS_TIME]:
@ -226,8 +225,7 @@ class InputDatetime(RestoreEntity):
self.editable = True self.editable = True
self._current_datetime = None self._current_datetime = None
initial = config.get(CONF_INITIAL) if not (initial := config.get(CONF_INITIAL)):
if not initial:
return return
if self.has_date and self.has_time: if self.has_date and self.has_time:

View File

@ -41,9 +41,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce a single state.""" """Reproduce a single state."""
cur_state = hass.states.get(state.entity_id) if (cur_state := hass.states.get(state.entity_id)) is None:
if cur_state is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -31,10 +31,8 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce a single state.""" """Reproduce a single state."""
cur_state = hass.states.get(state.entity_id)
# Return if we can't find entity # Return if we can't find entity
if cur_state is None: if (cur_state := hass.states.get(state.entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -22,10 +22,8 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce a single state.""" """Reproduce a single state."""
cur_state = hass.states.get(state.entity_id)
# Return if we can't find the entity # Return if we can't find the entity
if cur_state is None: if (cur_state := hass.states.get(state.entity_id)) is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -125,13 +125,11 @@ def get_supported_color_modes(hass: HomeAssistant, entity_id: str) -> set | None
First try the statemachine, then entity registry. First try the statemachine, then entity registry.
This is the equivalent of entity helper get_supported_features. This is the equivalent of entity helper get_supported_features.
""" """
state = hass.states.get(entity_id) if state := hass.states.get(entity_id):
if state:
return state.attributes.get(ATTR_SUPPORTED_COLOR_MODES) return state.attributes.get(ATTR_SUPPORTED_COLOR_MODES)
entity_registry = er.async_get(hass) entity_registry = er.async_get(hass)
entry = entity_registry.async_get(entity_id) if not (entry := entity_registry.async_get(entity_id)):
if not entry:
raise HomeAssistantError(f"Unknown entity {entity_id}") raise HomeAssistantError(f"Unknown entity {entity_id}")
if not entry.capabilities: if not entry.capabilities:
return None return None
@ -629,9 +627,7 @@ class Profiles:
@callback @callback
def apply_profile(self, name: str, params: dict) -> None: def apply_profile(self, name: str, params: dict) -> None:
"""Apply a profile.""" """Apply a profile."""
profile = self.data.get(name) if (profile := self.data.get(name)) is None:
if profile is None:
return return
if profile.hs_color is not None: if profile.hs_color is not None:

View File

@ -123,9 +123,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce a single state.""" """Reproduce a single state."""
cur_state = hass.states.get(state.entity_id) if (cur_state := hass.states.get(state.entity_id)) is None:
if cur_state is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -150,9 +150,7 @@ async def async_setup(hass, config):
"logbook", "logbook", "hass:format-list-bulleted-type" "logbook", "logbook", "hass:format-list-bulleted-type"
) )
conf = config.get(DOMAIN, {}) if conf := config.get(DOMAIN, {}):
if conf:
filters = sqlalchemy_filter_from_include_exclude_conf(conf) filters = sqlalchemy_filter_from_include_exclude_conf(conf)
entities_filter = convert_include_exclude_filter(conf) entities_filter = convert_include_exclude_filter(conf)
else: else:
@ -202,8 +200,7 @@ class LogbookView(HomeAssistantView):
else: else:
datetime = dt_util.start_of_local_day() datetime = dt_util.start_of_local_day()
period = request.query.get("period") if (period := request.query.get("period")) is None:
if period is None:
period = 1 period = 1
else: else:
period = int(period) period = int(period)
@ -218,8 +215,7 @@ class LogbookView(HomeAssistantView):
"Format should be <domain>.<object_id>" "Format should be <domain>.<object_id>"
) from vol.Invalid ) from vol.Invalid
end_time = request.query.get("end_time") if (end_time := request.query.get("end_time")) is None:
if end_time is None:
start_day = dt_util.as_utc(datetime) - timedelta(days=period - 1) start_day = dt_util.as_utc(datetime) - timedelta(days=period - 1)
end_day = start_day + timedelta(days=period) end_day = start_day + timedelta(days=period)
else: else:
@ -605,9 +601,7 @@ def _keep_event(hass, event, entities_filter):
def _augment_data_with_context( def _augment_data_with_context(
data, entity_id, event, context_lookup, entity_attr_cache, external_events data, entity_id, event, context_lookup, entity_attr_cache, external_events
): ):
context_event = context_lookup.get(event.context_id) if not (context_event := context_lookup.get(event.context_id)):
if not context_event:
return return
if event == context_event: if event == context_event:
@ -663,8 +657,7 @@ def _augment_data_with_context(
if event_type in external_events: if event_type in external_events:
domain, describe_event = external_events[event_type] domain, describe_event = external_events[event_type]
data["context_domain"] = domain data["context_domain"] = domain
name = describe_event(context_event).get(ATTR_NAME) if name := describe_event(context_event).get(ATTR_NAME):
if name:
data["context_name"] = name data["context_name"] = name
@ -789,8 +782,7 @@ class EntityAttributeCache:
else: else:
self._cache[entity_id] = {} self._cache[entity_id] = {}
current_state = self._hass.states.get(entity_id) if current_state := self._hass.states.get(entity_id):
if current_state:
# Try the current state as its faster than decoding the # Try the current state as its faster than decoding the
# attributes # attributes
self._cache[entity_id][attribute] = current_state.attributes.get(attribute) self._cache[entity_id][attribute] = current_state.attributes.get(attribute)

View File

@ -984,8 +984,7 @@ class MediaPlayerEntity(Entity):
response = await websession.get(url) response = await websession.get(url)
if response.status == HTTP_OK: if response.status == HTTP_OK:
content = await response.read() content = await response.read()
content_type = response.headers.get(CONTENT_TYPE) if content_type := response.headers.get(CONTENT_TYPE):
if content_type:
content_type = content_type.split(";")[0] content_type = content_type.split(";")[0]
if content is None: if content is None:

View File

@ -22,9 +22,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce a single state.""" """Reproduce a single state."""
cur_state = hass.states.get(state.entity_id) if (cur_state := hass.states.get(state.entity_id)) is None:
if cur_state is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -71,8 +71,7 @@ def async_create(
context: Context | None = None, context: Context | None = None,
) -> None: ) -> None:
"""Generate a notification.""" """Generate a notification."""
notifications = hass.data.get(DOMAIN) if (notifications := hass.data.get(DOMAIN)) is None:
if notifications is None:
notifications = hass.data[DOMAIN] = {} notifications = hass.data[DOMAIN] = {}
if notification_id is not None: if notification_id is not None:
@ -134,8 +133,7 @@ def async_dismiss(
hass: HomeAssistant, notification_id: str, *, context: Context | None = None hass: HomeAssistant, notification_id: str, *, context: Context | None = None
) -> None: ) -> None:
"""Remove a notification.""" """Remove a notification."""
notifications = hass.data.get(DOMAIN) if (notifications := hass.data.get(DOMAIN)) is None:
if notifications is None:
notifications = hass.data[DOMAIN] = {} notifications = hass.data[DOMAIN] = {}
entity_id = ENTITY_ID_FORMAT.format(slugify(notification_id)) entity_id = ENTITY_ID_FORMAT.format(slugify(notification_id))

View File

@ -226,9 +226,7 @@ class PersonStorageCollection(collection.StorageCollection):
"""Validate the config is valid.""" """Validate the config is valid."""
data = self.CREATE_SCHEMA(data) data = self.CREATE_SCHEMA(data)
user_id = data.get(CONF_USER_ID) if (user_id := data.get(CONF_USER_ID)) is not None:
if user_id is not None:
await self._validate_user_id(user_id) await self._validate_user_id(user_id)
return data return data
@ -410,8 +408,7 @@ class Person(RestoreEntity):
data[ATTR_GPS_ACCURACY] = self._gps_accuracy data[ATTR_GPS_ACCURACY] = self._gps_accuracy
if self._source is not None: if self._source is not None:
data[ATTR_SOURCE] = self._source data[ATTR_SOURCE] = self._source
user_id = self._config.get(CONF_USER_ID) if (user_id := self._config.get(CONF_USER_ID)) is not None:
if user_id is not None:
data[ATTR_USER_ID] = user_id data[ATTR_USER_ID] = user_id
return data return data
@ -448,9 +445,7 @@ class Person(RestoreEntity):
self._unsub_track_device() self._unsub_track_device()
self._unsub_track_device = None self._unsub_track_device = None
trackers = self._config[CONF_DEVICE_TRACKERS] if trackers := self._config[CONF_DEVICE_TRACKERS]:
if trackers:
_LOGGER.debug("Subscribe to device trackers for %s", self.entity_id) _LOGGER.debug("Subscribe to device trackers for %s", self.entity_id)
self._unsub_track_device = async_track_state_change_event( self._unsub_track_device = async_track_state_change_event(

View File

@ -463,9 +463,7 @@ class Recorder(threading.Thread):
if event.event_type in self.exclude_t: if event.event_type in self.exclude_t:
return False return False
entity_id = event.data.get(ATTR_ENTITY_ID) if (entity_id := event.data.get(ATTR_ENTITY_ID)) is None:
if entity_id is None:
return True return True
if isinstance(entity_id, str): if isinstance(entity_id, str):
@ -496,8 +494,7 @@ class Recorder(threading.Thread):
def do_adhoc_statistics(self, **kwargs): def do_adhoc_statistics(self, **kwargs):
"""Trigger an adhoc statistics run.""" """Trigger an adhoc statistics run."""
start = kwargs.get("start") if not (start := kwargs.get("start")):
if not start:
start = statistics.get_start_time() start = statistics.get_start_time()
self.queue.put(StatisticsTask(start)) self.queue.put(StatisticsTask(start))

View File

@ -523,8 +523,7 @@ def list_statistic_ids(
metadata = get_metadata_with_session(hass, session, None, statistic_type) metadata = get_metadata_with_session(hass, session, None, statistic_type)
for _, meta in metadata.values(): for _, meta in metadata.values():
unit = meta["unit_of_measurement"] if (unit := meta["unit_of_measurement"]) is not None:
if unit is not None:
# Display unit according to user settings # Display unit according to user settings
unit = _configured_unit(unit, units) unit = _configured_unit(unit, units)
meta["unit_of_measurement"] = unit meta["unit_of_measurement"] = unit

View File

@ -30,9 +30,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce a single state.""" """Reproduce a single state."""
cur_state = hass.states.get(state.entity_id) if (cur_state := hass.states.get(state.entity_id)) is None:
if cur_state is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -33,9 +33,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce a single state.""" """Reproduce a single state."""
cur_state = hass.states.get(state.entity_id) if (cur_state := hass.states.get(state.entity_id)) is None:
if cur_state is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -50,9 +50,7 @@ async def _async_reproduce_state(
reproduce_options: dict[str, Any] | None = None, reproduce_options: dict[str, Any] | None = None,
) -> None: ) -> None:
"""Reproduce a single state.""" """Reproduce a single state."""
cur_state = hass.states.get(state.entity_id) if (cur_state := hass.states.get(state.entity_id)) is None:
if cur_state is None:
_LOGGER.warning("Unable to find entity %s", state.entity_id) _LOGGER.warning("Unable to find entity %s", state.entity_id)
return return

View File

@ -81,10 +81,9 @@ def async_generate_path(webhook_id: str) -> str:
async def async_handle_webhook(hass, webhook_id, request): async def async_handle_webhook(hass, webhook_id, request):
"""Handle a webhook.""" """Handle a webhook."""
handlers = hass.data.setdefault(DOMAIN, {}) handlers = hass.data.setdefault(DOMAIN, {})
webhook = handlers.get(webhook_id)
# Always respond successfully to not give away if a hook exists or not. # Always respond successfully to not give away if a hook exists or not.
if webhook is None: if (webhook := handlers.get(webhook_id)) is None:
if isinstance(request, MockRequest): if isinstance(request, MockRequest):
received_from = request.mock_source received_from = request.mock_source
else: else:

View File

@ -58,8 +58,7 @@ def async_register_command(
schema = handler._ws_schema # type: ignore[attr-defined] schema = handler._ws_schema # type: ignore[attr-defined]
else: else:
command = command_or_handler command = command_or_handler
handlers = hass.data.get(DOMAIN) if (handlers := hass.data.get(DOMAIN)) is None:
if handlers is None:
handlers = hass.data[DOMAIN] = {} handlers = hass.data[DOMAIN] = {}
handlers[command] = (handler, schema) handlers[command] = (handler, schema)

View File

@ -420,9 +420,7 @@ def handle_entity_source(
perm_category=CAT_ENTITIES, perm_category=CAT_ENTITIES,
) )
source = raw_sources.get(entity_id) if (source := raw_sources.get(entity_id)) is None:
if source is None:
connection.send_error(msg["id"], ERR_NOT_FOUND, "Entity not found") connection.send_error(msg["id"], ERR_NOT_FOUND, "Entity not found")
return return