Enable Ruff SIM105 (#86759)

* Enable Ruff SIM105

* Adjust existing cases
This commit is contained in:
Franck Nijhof 2023-01-27 03:06:22 +01:00 committed by GitHub
parent 71d7098530
commit e738924780
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 36 additions and 47 deletions

View File

@ -1,4 +1,6 @@
"""Config flow to configure Agent devices."""
from contextlib import suppress
from agent import AgentConnectionError, AgentError
from agent.a import Agent
import voluptuous as vol
@ -31,10 +33,8 @@ class AgentFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
server_origin = generate_url(host, port)
agent_client = Agent(server_origin, async_get_clientsession(self.hass))
try:
with suppress(AgentConnectionError, AgentError):
await agent_client.update()
except (AgentConnectionError, AgentError):
pass
await agent_client.close()

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
from contextlib import suppress
from datetime import timedelta
import logging
import os
@ -449,15 +450,13 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: # noqa:
payload = data
# Call API
try:
# The exceptions are logged properly in hassio.send_command
with suppress(HassioAPIError):
await hassio.send_command(
api_endpoint.command.format(addon=addon, slug=slug),
payload=payload,
timeout=api_endpoint.timeout,
)
except HassioAPIError:
# The exceptions are logged properly in hassio.send_command
pass
for service, settings in MAP_SERVICE_API.items():
hass.services.async_register(

View File

@ -1,6 +1,8 @@
"""Utilities for the Huawei LTE integration."""
from __future__ import annotations
from contextlib import suppress
from huawei_lte_api.Session import GetResponseType
from homeassistant.helpers.device_registry import format_mac
@ -18,9 +20,8 @@ def get_device_macs(
device_info.get(x)
for x in ("MacAddress1", "MacAddress2", "WifiMacAddrWl0", "WifiMacAddrWl1")
]
try:
# Assume not supported when exception is thrown
with suppress(Exception): # pylint: disable=broad-except
macs.extend(x.get("WifiMac") for x in wlan_settings["Ssids"]["Ssid"])
except Exception: # pylint: disable=broad-except
# Assume not supported
pass
return sorted({format_mac(str(x)) for x in macs if x})

View File

@ -1,6 +1,7 @@
"""The motionEye integration."""
from __future__ import annotations
from contextlib import suppress
from types import MappingProxyType
from typing import Any
@ -198,10 +199,8 @@ class MotionEyeMjpegCamera(MotionEyeEntity, MjpegCamera):
# which is not available during entity construction.
streaming_url = Template(streaming_template).render(**camera)
else:
try:
with suppress(MotionEyeClientURLParseError):
streaming_url = self._client.get_camera_stream_url(camera)
except MotionEyeClientURLParseError:
pass
return {
CONF_NAME: camera[KEY_NAME],

View File

@ -1,6 +1,7 @@
"""Support for MQTT cover devices."""
from __future__ import annotations
from contextlib import suppress
import functools
import logging
from typing import Any
@ -414,10 +415,8 @@ class MqttCover(MqttEntity, CoverEntity):
_LOGGER.debug("Ignoring empty position message from '%s'", msg.topic)
return
try:
with suppress(*JSON_DECODE_EXCEPTIONS):
payload_dict = json_loads(payload)
except JSON_DECODE_EXCEPTIONS:
pass
if payload_dict and isinstance(payload_dict, dict):
if "position" not in payload_dict:

View File

@ -1,4 +1,5 @@
"""Coordinator to fetch data from the Picnic API."""
from contextlib import suppress
import copy
from datetime import timedelta
import logging
@ -120,13 +121,11 @@ class PicnicUpdateCoordinator(DataUpdateCoordinator):
# Get the next order's position details if there is an undelivered order
delivery_position = {}
if next_delivery and not next_delivery.get("delivery_time"):
try:
# ValueError: If no information yet can mean an empty response
with suppress(ValueError):
delivery_position = self.picnic_api_client.get_delivery_position(
next_delivery["delivery_id"]
)
except ValueError:
# No information yet can mean an empty response
pass
# Determine the ETA, if available, the one from the delivery position API is more precise
# but, it's only available shortly before the actual delivery.

View File

@ -234,10 +234,8 @@ class PrometheusMetrics:
sample.name,
entity_id,
)
try:
with suppress(KeyError):
metric.remove(*sample.labels.values())
except KeyError:
pass
def _handle_attributes(self, state):
for key, value in state.attributes.items():

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from collections.abc import Callable
from contextlib import suppress
import logging
from synology_dsm import SynologyDSM
@ -263,11 +264,9 @@ class SynoApi:
async def async_unload(self) -> None:
"""Stop interacting with the NAS and prepare for removal from hass."""
try:
# ignore API errors during logout
with suppress(SynologyDSMException):
await self._syno_api_executer(self.dsm.logout)
except SynologyDSMException:
# ignore API errors during logout
pass
async def async_update(self) -> None:
"""Update function for updating API information."""

View File

@ -2,6 +2,7 @@
from __future__ import annotations
from collections.abc import Callable
from contextlib import suppress
import datetime as dt
import json
from typing import Any, cast
@ -262,11 +263,9 @@ def handle_get_states(
# If we can't serialize, we'll filter out unserializable states
serialized = []
for state in states:
try:
# Error is already logged above
with suppress(ValueError, TypeError):
serialized.append(JSON_DUMP(state))
except (ValueError, TypeError):
# Error is already logged above
pass
# We now have partially serialized states. Craft some JSON.
response2 = JSON_DUMP(messages.result_message(msg["id"], ["TO_REPLACE"]))

View File

@ -2,6 +2,7 @@
from __future__ import annotations
import collections
from contextlib import suppress
import json
from typing import Any
@ -583,11 +584,9 @@ class ZhaOptionsFlowHandler(BaseZhaFlow, config_entries.OptionsFlow):
) -> FlowResult:
"""Launch the options flow."""
if user_input is not None:
try:
# OperationNotAllowed: ZHA is not running
with suppress(config_entries.OperationNotAllowed):
await self.hass.config_entries.async_unload(self.config_entry.entry_id)
except config_entries.OperationNotAllowed:
# ZHA is not running
pass
return await self.async_step_prompt_migrate_or_reconfigure()

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import asyncio
import contextlib
from contextlib import suppress
import copy
import logging
import os
@ -320,11 +321,9 @@ class ZhaMultiPANMigrationHelper:
# ZHA is using another radio, do nothing
return False
try:
# OperationNotAllowed: ZHA is not running
with suppress(config_entries.OperationNotAllowed):
await self._hass.config_entries.async_unload(self._config_entry.entry_id)
except config_entries.OperationNotAllowed:
# ZHA is not running
pass
# Temporarily connect to the old radio to read its settings
config_entry_data = self._config_entry.data
@ -381,8 +380,6 @@ class ZhaMultiPANMigrationHelper:
_LOGGER.debug("Restored backup after %s retries", retry)
# Launch ZHA again
try:
# OperationNotAllowed: ZHA is not unloaded
with suppress(config_entries.OperationNotAllowed):
await self._hass.config_entries.async_setup(self._config_entry.entry_id)
except config_entries.OperationNotAllowed:
# ZHA is not unloaded
pass

View File

@ -248,6 +248,7 @@ select = [
"F", # pyflakes/autoflake
"PGH004", # Use specific rule codes when using noqa
"PT001", # Use @pytest.fixture without parentheses
"SIM105", # Use contextlib.suppress({exception}) instead of try-except-pass
"T20", # flake8-print
"UP", # pyupgrade
"W", # pycodestyle

View File

@ -6,6 +6,7 @@ This is NOT a full CI/linting replacement, only a quick check during development
"""
import asyncio
from collections import namedtuple
from contextlib import suppress
import itertools
import os
import re
@ -249,7 +250,5 @@ async def main():
if __name__ == "__main__":
try:
with suppress(FileNotFoundError, KeyboardInterrupt):
asyncio.run(main())
except (FileNotFoundError, KeyboardInterrupt):
pass