Replace pylint protected-access with Ruff SLF001 (#115735)

This commit is contained in:
Sid 2024-05-06 20:33:26 +02:00 committed by GitHub
parent 460c05dc43
commit b456d97e65
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
90 changed files with 168 additions and 223 deletions

View File

@ -680,7 +680,7 @@ class _WatchPendingSetups:
if remaining_with_setup_started:
_LOGGER.debug("Integration remaining: %s", remaining_with_setup_started)
elif waiting_tasks := self._hass._active_tasks: # pylint: disable=protected-access
elif waiting_tasks := self._hass._active_tasks: # noqa: SLF001
_LOGGER.debug("Waiting on tasks: %s", waiting_tasks)
self._async_dispatch(remaining_with_setup_started)
if (
@ -984,7 +984,7 @@ async def _async_set_up_integrations(
except TimeoutError:
_LOGGER.warning(
"Setup timed out for stage 1 waiting on %s - moving forward",
hass._active_tasks, # pylint: disable=protected-access
hass._active_tasks, # noqa: SLF001
)
# Add after dependencies when setting up stage 2 domains
@ -1000,7 +1000,7 @@ async def _async_set_up_integrations(
except TimeoutError:
_LOGGER.warning(
"Setup timed out for stage 2 waiting on %s - moving forward",
hass._active_tasks, # pylint: disable=protected-access
hass._active_tasks, # noqa: SLF001
)
# Wrap up startup
@ -1011,7 +1011,7 @@ async def _async_set_up_integrations(
except TimeoutError:
_LOGGER.warning(
"Setup timed out for bootstrap waiting on %s - moving forward",
hass._active_tasks, # pylint: disable=protected-access
hass._active_tasks, # noqa: SLF001
)
watcher.async_stop()

View File

@ -80,11 +80,11 @@ class AgentCamera(MjpegCamera):
"""Initialize as a subclass of MjpegCamera."""
self.device = device
self._removed = False
self._attr_unique_id = f"{device._client.unique}_{device.typeID}_{device.id}"
self._attr_unique_id = f"{device.client.unique}_{device.typeID}_{device.id}"
super().__init__(
name=device.name,
mjpeg_url=f"{device.client._server_url}{device.mjpeg_image_url}&size={device.mjpegStreamWidth}x{device.mjpegStreamHeight}",
still_image_url=f"{device.client._server_url}{device.still_image_url}&size={device.mjpegStreamWidth}x{device.mjpegStreamHeight}",
mjpeg_url=f"{device.client._server_url}{device.mjpeg_image_url}&size={device.mjpegStreamWidth}x{device.mjpegStreamHeight}", # noqa: SLF001
still_image_url=f"{device.client._server_url}{device.still_image_url}&size={device.mjpegStreamWidth}x{device.mjpegStreamHeight}", # noqa: SLF001
)
self._attr_device_info = DeviceInfo(
identifiers={(AGENT_DOMAIN, self.unique_id)},

View File

@ -85,14 +85,12 @@ def adb_decorator(
err,
)
await self.aftv.adb_close()
# pylint: disable-next=protected-access
self._attr_available = False
return None
except Exception:
# An unforeseen exception occurred. Close the ADB connection so that
# it doesn't happen over and over again, then raise the exception.
await self.aftv.adb_close()
# pylint: disable-next=protected-access
self._attr_available = False
raise

View File

@ -95,10 +95,9 @@ def deserialize_entity_description(
descriptions_class: type[EntityDescription], data: dict[str, Any]
) -> EntityDescription:
"""Deserialize an entity description."""
# pylint: disable=protected-access
result: dict[str, Any] = {}
if hasattr(descriptions_class, "_dataclass"):
descriptions_class = descriptions_class._dataclass
descriptions_class = descriptions_class._dataclass # noqa: SLF001
for field in cached_fields(descriptions_class):
field_name = field.name
# It would be nice if field.type returned the actual

View File

@ -24,14 +24,14 @@ async def async_get_config_entry_diagnostics(
"data": async_redact_data(entry.data, TO_REDACT),
},
"hub": {
"version": hub._version, # pylint: disable=protected-access
"version": hub._version, # noqa: SLF001
},
"devices": [
{
"device_id": device.device_id,
"props": device.props,
"attrs": device._attrs, # pylint: disable=protected-access
"supported_actions": device._supported_actions, # pylint: disable=protected-access
"attrs": device._attrs, # noqa: SLF001
"supported_actions": device._supported_actions, # noqa: SLF001
}
for device in hub.devices
],

View File

@ -162,7 +162,7 @@ class CastStatusListener(
self._valid = True
self._mz_mgr = mz_mgr
if cast_device._cast_info.is_audio_group:
if cast_device._cast_info.is_audio_group: # noqa: SLF001
self._mz_mgr.add_multizone(chromecast)
if mz_only:
return
@ -170,7 +170,7 @@ class CastStatusListener(
chromecast.register_status_listener(self)
chromecast.socket_client.media_controller.register_status_listener(self)
chromecast.register_connection_listener(self)
if not cast_device._cast_info.is_audio_group:
if not cast_device._cast_info.is_audio_group: # noqa: SLF001
self._mz_mgr.register_listener(chromecast.uuid, self)
def new_cast_status(self, status):
@ -214,8 +214,7 @@ class CastStatusListener(
All following callbacks won't be forwarded.
"""
# pylint: disable-next=protected-access
if self._cast_device._cast_info.is_audio_group:
if self._cast_device._cast_info.is_audio_group: # noqa: SLF001
self._mz_mgr.remove_multizone(self._uuid)
else:
self._mz_mgr.deregister_listener(self._uuid, self)

View File

@ -109,11 +109,9 @@ async def websocket_detect_config(
# We don't want any integrations to use the name of the unit system
# so we are using the private attribute here
if location_info.use_metric:
# pylint: disable-next=protected-access
info["unit_system"] = unit_system._CONF_UNIT_SYSTEM_METRIC
info["unit_system"] = unit_system._CONF_UNIT_SYSTEM_METRIC # noqa: SLF001
else:
# pylint: disable-next=protected-access
info["unit_system"] = unit_system._CONF_UNIT_SYSTEM_US_CUSTOMARY
info["unit_system"] = unit_system._CONF_UNIT_SYSTEM_US_CUSTOMARY # noqa: SLF001
if location_info.latitude:
info["latitude"] = location_info.latitude

View File

@ -82,8 +82,7 @@ def retry(
"Decora connect error for device %s. Reconnecting",
device.name,
)
# pylint: disable-next=protected-access
device._switch.connect()
device._switch.connect() # noqa: SLF001
return wrapper_retry

View File

@ -177,7 +177,6 @@ def async_log_errors(
async def wrapper(
self: _DenonDeviceT, *args: _P.args, **kwargs: _P.kwargs
) -> _R | None:
# pylint: disable=protected-access
available = True
try:
return await func(self, *args, **kwargs)

View File

@ -54,8 +54,8 @@ class ElectricKiwiSelectHOPEntity(
"""Initialise the HOP selection entity."""
super().__init__(coordinator)
self._attr_unique_id = (
f"{coordinator._ek_api.customer_number}"
f"_{coordinator._ek_api.connection_id}_{description.key}"
f"{coordinator._ek_api.customer_number}" # noqa: SLF001
f"_{coordinator._ek_api.connection_id}_{description.key}" # noqa: SLF001
)
self.entity_description = description
self.values_dict = coordinator.get_hop_options()

View File

@ -167,8 +167,8 @@ class ElectricKiwiAccountEntity(
super().__init__(coordinator)
self._attr_unique_id = (
f"{coordinator._ek_api.customer_number}"
f"_{coordinator._ek_api.connection_id}_{description.key}"
f"{coordinator._ek_api.customer_number}" # noqa: SLF001
f"_{coordinator._ek_api.connection_id}_{description.key}" # noqa: SLF001
)
self.entity_description = description
@ -196,8 +196,8 @@ class ElectricKiwiHOPEntity(
super().__init__(coordinator)
self._attr_unique_id = (
f"{coordinator._ek_api.customer_number}"
f"_{coordinator._ek_api.connection_id}_{description.key}"
f"{coordinator._ek_api.customer_number}" # noqa: SLF001
f"_{coordinator._ek_api.connection_id}_{description.key}" # noqa: SLF001
)
self.entity_description = description

View File

@ -136,8 +136,7 @@ async def async_setup(hass: HomeAssistant, yaml_config: ConfigType) -> bool:
# We misunderstood the startup signal. You're not allowed to change
# anything during startup. Temp workaround.
# pylint: disable-next=protected-access
app._on_startup.freeze()
app._on_startup.freeze() # noqa: SLF001
await app.startup()
DescriptionXmlView(config).register(hass, app, app.router)

View File

@ -130,7 +130,6 @@ def esphome_state_property(
@functools.wraps(func)
def _wrapper(self: _EntityT) -> _R | None:
# pylint: disable-next=protected-access
if not self._has_state:
return None
val = func(self)

View File

@ -451,7 +451,7 @@ class EvoBroker:
self._location: evo.Location = client.locations[loc_idx]
self.config = client.installation_info[loc_idx][GWS][0][TCS][0]
self.tcs: evo.ControlSystem = self._location._gateways[0]._control_systems[0]
self.tcs: evo.ControlSystem = self._location._gateways[0]._control_systems[0] # noqa: SLF001
self.tcs_utc_offset = timedelta(minutes=self._location.timeZone[UTC_OFFSET])
self.temps: dict[str, float | None] = {}

View File

@ -128,7 +128,7 @@ class FileUploadView(HomeAssistantView):
async def _upload_file(self, request: web.Request) -> web.Response:
"""Handle uploaded file."""
# Increase max payload
request._client_max_size = MAX_SIZE # pylint: disable=protected-access
request._client_max_size = MAX_SIZE # noqa: SLF001
reader = await request.multipart()
file_field_reader = await reader.next()

View File

@ -34,7 +34,7 @@ async def validate_setup(
"""Check path is a folder."""
value: str = user_input[CONF_FOLDER]
dir_in = os.path.expanduser(str(value))
handler.parent_handler._async_abort_entries_match({CONF_FOLDER: value}) # pylint: disable=protected-access
handler.parent_handler._async_abort_entries_match({CONF_FOLDER: value}) # noqa: SLF001
if not os.path.isdir(dir_in):
raise SchemaFlowError("not_dir")

View File

@ -215,8 +215,8 @@ class GeniusBroker:
"""Make any useful debug log entries."""
_LOGGER.debug(
"Raw JSON: \n\nclient._zones = %s \n\nclient._devices = %s",
self.client._zones, # pylint: disable=protected-access
self.client._devices, # pylint: disable=protected-access
self.client._zones, # noqa: SLF001
self.client._devices, # noqa: SLF001
)
@ -309,8 +309,7 @@ class GeniusZone(GeniusEntity):
mode = payload["data"][ATTR_ZONE_MODE]
# pylint: disable-next=protected-access
if mode == "footprint" and not self._zone._has_pir:
if mode == "footprint" and not self._zone._has_pir: # noqa: SLF001
raise TypeError(
f"'{self.entity_id}' cannot support footprint mode (it has no PIR)"
)

View File

@ -396,8 +396,7 @@ async def async_get_users(hass: HomeAssistant) -> list[str]:
This is called by the cloud integration to import from the previously shared store.
"""
# pylint: disable-next=protected-access
path = hass.config.path(STORAGE_DIR, GoogleConfigStore._STORAGE_KEY)
path = hass.config.path(STORAGE_DIR, GoogleConfigStore._STORAGE_KEY) # noqa: SLF001
try:
store_data = await hass.async_add_executor_job(json_util.load_json, path)
except HomeAssistantError:

View File

@ -158,10 +158,8 @@ class HassIOView(HomeAssistantView):
if path == "backups/new/upload":
# We need to reuse the full content type that includes the boundary
if TYPE_CHECKING:
# pylint: disable-next=protected-access
assert isinstance(request._stored_content_type, str)
# pylint: disable-next=protected-access
headers[CONTENT_TYPE] = request._stored_content_type
assert isinstance(request._stored_content_type, str) # noqa: SLF001
headers[CONTENT_TYPE] = request._stored_content_type # noqa: SLF001
try:
client = await self._websession.request(

View File

@ -127,7 +127,6 @@ class SupervisorIssueRepairFlow(RepairsFlow):
self: SupervisorIssueRepairFlow, user_input: dict[str, str] | None = None
) -> FlowResult:
"""Handle a flow step for a suggestion."""
# pylint: disable-next=protected-access
return await self._async_step_apply_suggestion(
suggestion, confirmed=user_input is not None
)

View File

@ -143,15 +143,13 @@ async def async_attach_trigger(
if event_context_items:
# Fast path for simple items comparison
# This is safe because we do not mutate the event context
# pylint: disable-next=protected-access
if not (event.context._as_dict.items() >= event_context_items):
if not (event.context._as_dict.items() >= event_context_items): # noqa: SLF001
return
elif event_context_schema:
try:
# Slow path for schema validation
# This is safe because we make a copy of the event context
# pylint: disable-next=protected-access
event_context_schema(dict(event.context._as_dict))
event_context_schema(dict(event.context._as_dict)) # noqa: SLF001
except vol.Invalid:
# If event doesn't match, skip event
return

View File

@ -150,8 +150,7 @@ async def async_send_command(hass: HomeAssistant, data: Mapping[str, Any]) -> No
else:
_LOGGER.debug("Sending command '%s'", command)
await hass.async_add_executor_job(
# pylint: disable-next=protected-access
homeworks_data.controller._send,
homeworks_data.controller._send, # noqa: SLF001
command,
)
@ -312,8 +311,7 @@ class HomeworksKeypad:
def _request_keypad_led_states(self) -> None:
"""Query keypad led state."""
# pylint: disable-next=protected-access
self._controller._send(f"RKLS, {self._addr}")
self._controller._send(f"RKLS, {self._addr}") # noqa: SLF001
async def request_keypad_led_states(self) -> None:
"""Query keypad led state.

View File

@ -71,16 +71,13 @@ class HomeworksButton(HomeworksEntity, ButtonEntity):
async def async_press(self) -> None:
"""Press the button."""
await self.hass.async_add_executor_job(
# pylint: disable-next=protected-access
self._controller._send,
self._controller._send, # noqa: SLF001
f"KBP, {self._addr}, {self._idx}",
)
if not self._release_delay:
return
await asyncio.sleep(self._release_delay)
# pylint: disable-next=protected-access
await self.hass.async_add_executor_job(
# pylint: disable-next=protected-access
self._controller._send,
self._controller._send, # noqa: SLF001
f"KBR, {self._addr}, {self._idx}",
)

View File

@ -103,14 +103,14 @@ async def validate_add_controller(
user_input[CONF_CONTROLLER_ID] = slugify(user_input[CONF_NAME])
user_input[CONF_PORT] = int(user_input[CONF_PORT])
try:
handler._async_abort_entries_match( # pylint: disable=protected-access
handler._async_abort_entries_match( # noqa: SLF001
{CONF_HOST: user_input[CONF_HOST], CONF_PORT: user_input[CONF_PORT]}
)
except AbortFlow as err:
raise SchemaFlowError("duplicated_host_port") from err
try:
handler._async_abort_entries_match( # pylint: disable=protected-access
handler._async_abort_entries_match( # noqa: SLF001
{CONF_CONTROLLER_ID: user_input[CONF_CONTROLLER_ID]}
)
except AbortFlow as err:

View File

@ -214,13 +214,13 @@ class HoneywellUSThermostat(ClimateEntity):
ClimateEntityFeature.TURN_OFF | ClimateEntityFeature.TURN_ON
)
if device._data.get("canControlHumidification"):
if device._data.get("canControlHumidification"): # noqa: SLF001
self._attr_supported_features |= ClimateEntityFeature.TARGET_HUMIDITY
if device.raw_ui_data.get("SwitchEmergencyHeatAllowed"):
self._attr_supported_features |= ClimateEntityFeature.AUX_HEAT
if not device._data.get("hasFan"):
if not device._data.get("hasFan"): # noqa: SLF001
return
# not all honeywell fans support all modes

View File

@ -557,8 +557,7 @@ class HomeAssistantHTTP:
# However in Home Assistant components can be discovered after boot.
# This will now raise a RunTimeError.
# To work around this we now prevent the router from getting frozen
# pylint: disable-next=protected-access
self.app._router.freeze = lambda: None # type: ignore[method-assign]
self.app._router.freeze = lambda: None # type: ignore[method-assign] # noqa: SLF001
self.runner = web.AppRunner(
self.app, handler_cancellation=True, shutdown_timeout=10

View File

@ -87,7 +87,7 @@ class HassAqualinkThermostat(AqualinkEntity, ClimateEntity):
@property
def hvac_action(self) -> HVACAction:
"""Return the current HVAC action."""
state = AqualinkState(self.dev._heater.state)
state = AqualinkState(self.dev._heater.state) # noqa: SLF001
if state == AqualinkState.ON:
return HVACAction.HEATING
if state == AqualinkState.ENABLED:

View File

@ -160,7 +160,7 @@ class ImageUploadView(HomeAssistantView):
async def post(self, request: web.Request) -> web.Response:
"""Handle upload."""
# Increase max payload
request._client_max_size = MAX_SIZE # pylint: disable=protected-access
request._client_max_size = MAX_SIZE # noqa: SLF001
data = await request.post()
item = await request.app[KEY_HASS].data[DOMAIN].async_create_item(data)

View File

@ -250,7 +250,7 @@ class InputSelect(collection.CollectionEntity, SelectEntity, RestoreEntity):
"""Representation of a select input."""
_entity_component_unrecorded_attributes = (
SelectEntity._entity_component_unrecorded_attributes - {ATTR_OPTIONS}
SelectEntity._entity_component_unrecorded_attributes - {ATTR_OPTIONS} # noqa: SLF001
)
_unrecorded_attributes = frozenset({ATTR_EDITABLE})

View File

@ -153,7 +153,7 @@ class KNXClimate(KnxEntity, ClimateEntity):
f"{self._device.temperature.group_address_state}_"
f"{self._device.target_temperature.group_address_state}_"
f"{self._device.target_temperature.group_address}_"
f"{self._device._setpoint_shift.group_address}"
f"{self._device._setpoint_shift.group_address}" # noqa: SLF001
)
self.default_hvac_mode: HVACMode = config[
ClimateSchema.CONF_DEFAULT_CONTROLLER_MODE

View File

@ -83,7 +83,7 @@ class KNXWeather(KnxEntity, WeatherEntity):
def __init__(self, xknx: XKNX, config: ConfigType) -> None:
"""Initialize of a KNX sensor."""
super().__init__(_create_weather(xknx, config))
self._attr_unique_id = str(self._device._temperature.group_address_state)
self._attr_unique_id = str(self._device._temperature.group_address_state) # noqa: SLF001
self._attr_entity_category = config.get(CONF_ENTITY_CATEGORY)
@property

View File

@ -162,7 +162,7 @@ class LGNetCast(config_entries.ConfigFlow, domain=DOMAIN):
try:
await self.hass.async_add_executor_job(
self.client._get_session_id # pylint: disable=protected-access
self.client._get_session_id # noqa: SLF001
)
except AccessTokenError:
if user_input is not None:
@ -194,7 +194,7 @@ class LGNetCast(config_entries.ConfigFlow, domain=DOMAIN):
assert self.client is not None
with contextlib.suppress(AccessTokenError, SessionIdError):
await self.hass.async_add_executor_job(
self.client._get_session_id # pylint: disable=protected-access
self.client._get_session_id # noqa: SLF001
)
@callback

View File

@ -368,7 +368,7 @@ def filter_turn_on_params(light: LightEntity, params: dict[str, Any]) -> dict[st
params.pop(ATTR_TRANSITION, None)
supported_color_modes = (
light._light_internal_supported_color_modes # pylint:disable=protected-access
light._light_internal_supported_color_modes # noqa: SLF001
)
if not brightness_supported(supported_color_modes):
params.pop(ATTR_BRIGHTNESS, None)
@ -445,8 +445,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: # noqa:
):
profiles.apply_default(light.entity_id, light.is_on, params)
# pylint: disable-next=protected-access
legacy_supported_color_modes = light._light_internal_supported_color_modes
legacy_supported_color_modes = light._light_internal_supported_color_modes # noqa: SLF001
supported_color_modes = light.supported_color_modes
# If a color temperature is specified, emulate it if not supported by the light

View File

@ -200,14 +200,14 @@ def state(
transition_time = DEFAULT_TRANSITION
if self.effect == EFFECT_COLORLOOP:
self.group.stop()
self._attr_effect = None # pylint: disable=protected-access
self._attr_effect = None
# Set transition time.
if ATTR_TRANSITION in kwargs:
transition_time = int(cast(float, kwargs[ATTR_TRANSITION]))
# Do group type-specific work.
function(self, transition_time, pipeline, *args, **kwargs)
# Update state.
self._attr_is_on = new_state # pylint: disable=protected-access
self._attr_is_on = new_state
self.group.enqueue(pipeline)
self.schedule_update_ha_state()

View File

@ -81,7 +81,7 @@ class LutronEventEntity(LutronKeypad, EventEntity):
"""Unregister callbacks."""
await super().async_will_remove_from_hass()
# Temporary solution until https://github.com/thecynic/pylutron/pull/93 gets merged
self._lutron_device._subscribers.remove((self.handle_event, None)) # pylint: disable=protected-access
self._lutron_device._subscribers.remove((self.handle_event, None)) # noqa: SLF001
@callback
def handle_event(

View File

@ -257,7 +257,7 @@ class UploadMediaView(http.HomeAssistantView):
async def post(self, request: web.Request) -> web.Response:
"""Handle upload."""
# Increase max payload
request._client_max_size = MAX_UPLOAD_SIZE # pylint: disable=protected-access
request._client_max_size = MAX_UPLOAD_SIZE # noqa: SLF001
try:
data = self.schema(dict(await request.post()))

View File

@ -46,8 +46,7 @@ def get_minio_notification_response(
):
"""Start listening to minio events. Copied from minio-py."""
query = {"prefix": prefix, "suffix": suffix, "events": events}
# pylint: disable-next=protected-access
return minio_client._url_open(
return minio_client._url_open( # noqa: SLF001
"GET", bucket_name=bucket_name, query=query, preload_content=False
)

View File

@ -39,7 +39,7 @@ class MotionCoordinatorEntity(CoordinatorEntity[DataUpdateCoordinatorMotionBlind
if blind.device_type in DEVICE_TYPES_GATEWAY:
gateway = blind
else:
gateway = blind._gateway
gateway = blind._gateway # noqa: SLF001
if gateway.firmware is not None:
sw_version = f"{gateway.firmware}, protocol: {gateway.protocol}"
else:
@ -70,7 +70,7 @@ class MotionCoordinatorEntity(CoordinatorEntity[DataUpdateCoordinatorMotionBlind
manufacturer=MANUFACTURER,
model=blind.blind_type,
name=device_name(blind),
via_device=(DOMAIN, blind._gateway.mac),
via_device=(DOMAIN, blind._gateway.mac), # noqa: SLF001
hw_version=blind.wireless_name,
)

View File

@ -67,7 +67,7 @@ class NikoHomeControlLight(LightEntity):
self._attr_is_on = light.is_on
self._attr_color_mode = ColorMode.ONOFF
self._attr_supported_color_modes = {ColorMode.ONOFF}
if light._state["type"] == 2:
if light._state["type"] == 2: # noqa: SLF001
self._attr_color_mode = ColorMode.BRIGHTNESS
self._attr_supported_color_modes = {ColorMode.BRIGHTNESS}

View File

@ -134,8 +134,7 @@ class NX584Watcher(threading.Thread):
zone = event["zone"]
if not (zone_sensor := self._zone_sensors.get(zone)):
return
# pylint: disable-next=protected-access
zone_sensor._zone["state"] = event["zone_state"]
zone_sensor._zone["state"] = event["zone_state"] # noqa: SLF001
zone_sensor.schedule_update_ha_state()
def _process_events(self, events):

View File

@ -67,7 +67,7 @@ def wsdiscovery() -> list[Service]:
finally:
discovery.stop()
# Stop the threads started by WSDiscovery since otherwise there is a leak.
discovery._stopThreads() # pylint: disable=protected-access
discovery._stopThreads() # noqa: SLF001
async def async_discovery(hass: HomeAssistant) -> list[dict[str, Any]]:

View File

@ -160,7 +160,7 @@ class EventManager:
#
# Our parser expects the topic to be
# tns1:RuleEngine/CellMotionDetector/Motion
topic = msg.Topic._value_1.rstrip("/.") # pylint: disable=protected-access
topic = msg.Topic._value_1.rstrip("/.") # noqa: SLF001
if not (parser := PARSERS.get(topic)):
if topic not in UNHANDLED_TOPICS:

View File

@ -23,7 +23,7 @@ VIDEO_SOURCE_MAPPING = {
def extract_message(msg: Any) -> tuple[str, Any]:
"""Extract the message content and the topic."""
return msg.Topic._value_1, msg.Message._value_1 # pylint: disable=protected-access
return msg.Topic._value_1, msg.Message._value_1 # noqa: SLF001
def _normalize_video_source(source: str) -> str:

View File

@ -324,7 +324,7 @@ def library_section_payload(section):
children_media_class = ITEM_TYPE_MEDIA_CLASS[section.TYPE]
except KeyError as err:
raise UnknownMediaType(f"Unknown type received: {section.TYPE}") from err
server_id = section._server.machineIdentifier # pylint: disable=protected-access
server_id = section._server.machineIdentifier # noqa: SLF001
return BrowseMedia(
title=section.title,
media_class=MediaClass.DIRECTORY,
@ -357,7 +357,7 @@ def hub_payload(hub):
media_content_id = f"{hub.librarySectionID}/{hub.hubIdentifier}"
else:
media_content_id = f"server/{hub.hubIdentifier}"
server_id = hub._server.machineIdentifier # pylint: disable=protected-access
server_id = hub._server.machineIdentifier # noqa: SLF001
payload = {
"title": hub.title,
"media_class": MediaClass.DIRECTORY,
@ -371,7 +371,7 @@ def hub_payload(hub):
def station_payload(station):
"""Create response payload for a music station."""
server_id = station._server.machineIdentifier # pylint: disable=protected-access
server_id = station._server.machineIdentifier # noqa: SLF001
return BrowseMedia(
title=station.title,
media_class=ITEM_TYPE_MEDIA_CLASS[station.type],

View File

@ -571,7 +571,7 @@ class PlexServer:
@property
def url_in_use(self):
"""Return URL used for connected Plex server."""
return self._plex_server._baseurl # pylint: disable=protected-access
return self._plex_server._baseurl # noqa: SLF001
@property
def option_ignore_new_shared_users(self):

View File

@ -233,7 +233,7 @@ async def async_setup_entry( # noqa: C901
async def _async_dump_thread_frames(call: ServiceCall) -> None:
"""Log all thread frames."""
frames = sys._current_frames() # pylint: disable=protected-access
frames = sys._current_frames() # noqa: SLF001
main_thread = threading.main_thread()
for thread in threading.enumerate():
if thread == main_thread:

View File

@ -103,8 +103,7 @@ def _validate_table_schema_has_correct_collation(
collate = (
dialect_kwargs.get("mysql_collate")
or dialect_kwargs.get("mariadb_collate")
# pylint: disable-next=protected-access
or connection.dialect._fetch_setting(connection, "collation_server") # type: ignore[attr-defined]
or connection.dialect._fetch_setting(connection, "collation_server") # type: ignore[attr-defined] # noqa: SLF001
)
if collate and collate != "utf8mb4_unicode_ci":
_LOGGER.debug(

View File

@ -106,7 +106,7 @@ class RecorderPool(SingletonThreadPool, NullPool):
exclude_integrations={"recorder"},
error_if_core=False,
)
return NullPool._create_connection(self)
return NullPool._create_connection(self) # noqa: SLF001
class MutexPool(StaticPool):

View File

@ -242,7 +242,7 @@ class WaitTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance._queue_watch.set() # pylint: disable=[protected-access]
instance._queue_watch.set() # noqa: SLF001
@dataclass(slots=True)
@ -255,7 +255,7 @@ class DatabaseLockTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance._lock_database(self) # pylint: disable=[protected-access]
instance._lock_database(self) # noqa: SLF001
@dataclass(slots=True)
@ -277,8 +277,7 @@ class KeepAliveTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task."""
# pylint: disable-next=[protected-access]
instance._send_keep_alive()
instance._send_keep_alive() # noqa: SLF001
@dataclass(slots=True)
@ -289,8 +288,7 @@ class CommitTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task."""
# pylint: disable-next=[protected-access]
instance._commit_event_session_or_retry()
instance._commit_event_session_or_retry() # noqa: SLF001
@dataclass(slots=True)
@ -333,7 +331,7 @@ class PostSchemaMigrationTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance._post_schema_migration( # pylint: disable=[protected-access]
instance._post_schema_migration( # noqa: SLF001
self.old_version, self.new_version
)
@ -357,7 +355,7 @@ class AdjustLRUSizeTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task to adjust the size."""
instance._adjust_lru_size() # pylint: disable=[protected-access]
instance._adjust_lru_size() # noqa: SLF001
@dataclass(slots=True)
@ -369,7 +367,7 @@ class StatesContextIDMigrationTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Run context id migration task."""
if (
not instance._migrate_states_context_ids() # pylint: disable=[protected-access]
not instance._migrate_states_context_ids() # noqa: SLF001
):
# Schedule a new migration task if this one didn't finish
instance.queue_task(StatesContextIDMigrationTask())
@ -384,7 +382,7 @@ class EventsContextIDMigrationTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Run context id migration task."""
if (
not instance._migrate_events_context_ids() # pylint: disable=[protected-access]
not instance._migrate_events_context_ids() # noqa: SLF001
):
# Schedule a new migration task if this one didn't finish
instance.queue_task(EventsContextIDMigrationTask())
@ -401,7 +399,7 @@ class EventTypeIDMigrationTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Run event type id migration task."""
if not instance._migrate_event_type_ids(): # pylint: disable=[protected-access]
if not instance._migrate_event_type_ids(): # noqa: SLF001
# Schedule a new migration task if this one didn't finish
instance.queue_task(EventTypeIDMigrationTask())
@ -417,7 +415,7 @@ class EntityIDMigrationTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Run entity_id migration task."""
if not instance._migrate_entity_ids(): # pylint: disable=[protected-access]
if not instance._migrate_entity_ids(): # noqa: SLF001
# Schedule a new migration task if this one didn't finish
instance.queue_task(EntityIDMigrationTask())
else:
@ -436,7 +434,7 @@ class EntityIDPostMigrationTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Run entity_id post migration task."""
if (
not instance._post_migrate_entity_ids() # pylint: disable=[protected-access]
not instance._post_migrate_entity_ids() # noqa: SLF001
):
# Schedule a new migration task if this one didn't finish
instance.queue_task(EntityIDPostMigrationTask())
@ -453,7 +451,7 @@ class EventIdMigrationTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Clean up the legacy event_id index on states."""
instance._cleanup_legacy_states_event_ids() # pylint: disable=[protected-access]
instance._cleanup_legacy_states_event_ids() # noqa: SLF001
@dataclass(slots=True)

View File

@ -99,8 +99,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
with suppress(TypeError):
process.kill()
# https://bugs.python.org/issue43884
# pylint: disable-next=protected-access
process._transport.close() # type: ignore[attr-defined]
process._transport.close() # type: ignore[attr-defined] # noqa: SLF001
del process
raise HomeAssistantError(

View File

@ -159,8 +159,7 @@ class Monitor(threading.Thread, SensorEntity):
)
if SKIP_HANDLE_LOOKUP:
# HACK: inject handle mapping collected offline
# pylint: disable-next=protected-access
device._characteristics[UUID(BLE_TEMP_UUID)] = cached_char
device._characteristics[UUID(BLE_TEMP_UUID)] = cached_char # noqa: SLF001
# Magic: writing this makes device happy
device.char_write_handle(0x1B, bytearray([255]), False)
device.subscribe(BLE_TEMP_UUID, self._update)

View File

@ -103,7 +103,7 @@ def _find_target_identifier(instance: Any, fallback_soco: SoCo | None) -> str |
if soco := getattr(instance, "soco", fallback_soco):
# Holds a SoCo instance attribute
# Only use attributes with no I/O
return soco._player_name or soco.ip_address # pylint: disable=protected-access
return soco._player_name or soco.ip_address # noqa: SLF001
return None

View File

@ -98,7 +98,6 @@ def spotify_exception_handler(
def wrapper(
self: _SpotifyMediaPlayerT, *args: _P.args, **kwargs: _P.kwargs
) -> _R | None:
# pylint: disable=protected-access
try:
result = func(self, *args, **kwargs)
except requests.RequestException:

View File

@ -79,7 +79,7 @@ class SynoDSMCamera(SynologyDSMBaseEntity[SynologyDSMCameraUpdateCoordinator], C
camera_id
].is_enabled,
)
self.snapshot_quality = api._entry.options.get(
self.snapshot_quality = api._entry.options.get( # noqa: SLF001
CONF_SNAPSHOT_QUALITY, DEFAULT_SNAPSHOT_QUALITY
)
super().__init__(api, coordinator, description)

View File

@ -40,7 +40,7 @@ async def async_get_config_entry_diagnostics(
"utilisation": {},
"is_system_loaded": True,
"api_details": {
"fetching_entities": syno_api._fetching_entities, # pylint: disable=protected-access
"fetching_entities": syno_api._fetching_entities, # noqa: SLF001
},
}

View File

@ -106,7 +106,7 @@ def _figure_out_source(
# and since this code is running in the event loop, we need to avoid
# blocking I/O.
frame = sys._getframe(4) # pylint: disable=protected-access
frame = sys._getframe(4) # noqa: SLF001
#
# We use _getframe with 4 to skip the following frames:
#

View File

@ -464,8 +464,7 @@ class TemplateEntity(Entity):
template_var_tup = TrackTemplate(template, variables)
is_availability_template = False
for attribute in attributes:
# pylint: disable-next=protected-access
if attribute._attribute == "_attr_available":
if attribute._attribute == "_attr_available": # noqa: SLF001
has_availability_template = True
is_availability_template = True
attribute.async_setup()

View File

@ -21,7 +21,6 @@ TO_REDACT = [
]
# Private variable access needed for diagnostics
# pylint: disable=protected-access
async def async_get_config_entry_diagnostics(
@ -33,17 +32,17 @@ async def async_get_config_entry_diagnostics(
data: dict[str, Any] = {}
data["client"] = {
"auto_bypass_low_battery": client.auto_bypass_low_battery,
"module_flags": client._module_flags,
"module_flags": client._module_flags, # noqa: SLF001
"retry_delay": client.retry_delay,
"invalid_credentials": client._invalid_credentials,
"invalid_credentials": client._invalid_credentials, # noqa: SLF001
}
data["user"] = {
"master": client._user._master_user,
"user_admin": client._user._user_admin,
"config_admin": client._user._config_admin,
"security_problem": client._user.security_problem(),
"features": client._user._features,
"master": client._user._master_user, # noqa: SLF001
"user_admin": client._user._user_admin, # noqa: SLF001
"config_admin": client._user._config_admin, # noqa: SLF001
"security_problem": client._user.security_problem(), # noqa: SLF001
"features": client._user._features, # noqa: SLF001
}
data["locations"] = []
@ -51,7 +50,7 @@ async def async_get_config_entry_diagnostics(
new_location = {
"location_id": location.location_id,
"name": location.location_name,
"module_flags": location._module_flags,
"module_flags": location._module_flags, # noqa: SLF001
"security_device_id": location.security_device_id,
"ac_loss": location.ac_loss,
"low_battery": location.low_battery,

View File

@ -754,7 +754,7 @@ class ProtectEventSensor(EventEntityMixin, SensorEntity):
@callback
def _async_update_device_from_protect(self, device: ProtectModelWithId) -> None:
# do not call ProtectDeviceSensor method since we want event to get value here
EventEntityMixin._async_update_device_from_protect(self, device)
EventEntityMixin._async_update_device_from_protect(self, device) # noqa: SLF001
event = self._event
entity_description = self.entity_description
is_on = entity_description.get_is_on(self.device, self._event)

View File

@ -247,8 +247,7 @@ class UnifiVideoCamera(Camera):
(
uri
for i, uri in enumerate(channel["rtspUris"])
# pylint: disable-next=protected-access
if re.search(self._nvr._host, uri)
if re.search(self._nvr._host, uri) # noqa: SLF001
)
)

View File

@ -59,7 +59,6 @@ def catch_vlc_errors(
except CommandError as err:
LOGGER.error("Command error: %s", err)
except ConnectError as err:
# pylint: disable=protected-access
if self._attr_available:
LOGGER.error("Connection error: %s", err)
self._attr_available = False

View File

@ -1059,8 +1059,7 @@ async def async_get_forecasts_service(
if native_forecast_list is None:
converted_forecast_list = []
else:
# pylint: disable-next=protected-access
converted_forecast_list = weather._convert_forecast(native_forecast_list)
converted_forecast_list = weather._convert_forecast(native_forecast_list) # noqa: SLF001
return {
"forecast": converted_forecast_list,
}

View File

@ -34,8 +34,7 @@ async def validate_user_input(
handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
) -> dict[str, Any]:
"""Validate user input."""
# pylint: disable-next=protected-access
handler.parent_handler._async_abort_entries_match(
handler.parent_handler._async_abort_entries_match( # noqa: SLF001
{CONF_HOST: user_input[CONF_HOST]}
)
instance, _ = get_instance_from_options(handler.parent_handler.hass, user_input)

View File

@ -56,11 +56,10 @@ def async_register_command(
schema: vol.Schema | None = None,
) -> None:
"""Register a websocket command."""
# pylint: disable=protected-access
if handler is None:
handler = cast(const.WebSocketCommandHandler, command_or_handler)
command = handler._ws_command # type: ignore[attr-defined]
schema = handler._ws_schema # type: ignore[attr-defined]
command = handler._ws_command # type: ignore[attr-defined] # noqa: SLF001
schema = handler._ws_schema # type: ignore[attr-defined] # noqa: SLF001
else:
command = command_or_handler
if (handlers := hass.data.get(DOMAIN)) is None:

View File

@ -144,11 +144,10 @@ def websocket_command(
def decorate(func: const.WebSocketCommandHandler) -> const.WebSocketCommandHandler:
"""Decorate ws command function."""
# pylint: disable=protected-access
if is_dict and len(schema) == 1: # type only empty schema
func._ws_schema = False # type: ignore[attr-defined]
func._ws_schema = False # type: ignore[attr-defined] # noqa: SLF001
elif is_dict:
func._ws_schema = messages.BASE_COMMAND_MESSAGE_SCHEMA.extend(schema) # type: ignore[attr-defined]
func._ws_schema = messages.BASE_COMMAND_MESSAGE_SCHEMA.extend(schema) # type: ignore[attr-defined] # noqa: SLF001
else:
if TYPE_CHECKING:
assert not isinstance(schema, dict)
@ -158,8 +157,8 @@ def websocket_command(
),
*schema.validators[1:],
)
func._ws_schema = extended_schema # type: ignore[attr-defined]
func._ws_command = command # type: ignore[attr-defined]
func._ws_schema = extended_schema # type: ignore[attr-defined] # noqa: SLF001
func._ws_command = command # type: ignore[attr-defined] # noqa: SLF001
return func
return decorate

View File

@ -295,7 +295,7 @@ class WebSocketHandler:
EVENT_HOMEASSISTANT_STOP, self._async_handle_hass_stop
)
writer = wsock._writer # pylint: disable=protected-access
writer = wsock._writer # noqa: SLF001
if TYPE_CHECKING:
assert writer is not None
@ -378,7 +378,7 @@ class WebSocketHandler:
# added a way to set the limit, but there is no way to actually
# reach the code to set the limit, so we have to set it directly.
#
writer._limit = 2**20 # pylint: disable=protected-access
writer._limit = 2**20 # noqa: SLF001
async_handle_str = connection.async_handle
async_handle_binary = connection.async_handle_binary

View File

@ -256,10 +256,10 @@ class XiaomiGenericSelector(XiaomiSelector):
if description.options_map:
self._options_map = {}
for key, val in enum_class._member_map_.items():
for key, val in enum_class._member_map_.items(): # noqa: SLF001
self._options_map[description.options_map[key]] = val
else:
self._options_map = enum_class._member_map_
self._options_map = enum_class._member_map_ # noqa: SLF001
self._reverse_map = {val: key for key, val in self._options_map.items()}
self._enum_class = enum_class

View File

@ -296,7 +296,7 @@ class ZHAGateway:
@property
def radio_concurrency(self) -> int:
"""Maximum configured radio concurrency."""
return self.application_controller._concurrent_requests_semaphore.max_value # pylint: disable=protected-access
return self.application_controller._concurrent_requests_semaphore.max_value # noqa: SLF001
async def async_fetch_updated_state_mains(self) -> None:
"""Fetch updated state for mains powered devices."""

View File

@ -1136,13 +1136,13 @@ class LightGroup(BaseLight, ZhaGroupEntity):
# time of any members.
if member.device.manufacturer in DEFAULT_MIN_TRANSITION_MANUFACTURERS:
self._DEFAULT_MIN_TRANSITION_TIME = (
MinTransitionLight._DEFAULT_MIN_TRANSITION_TIME
MinTransitionLight._DEFAULT_MIN_TRANSITION_TIME # noqa: SLF001
)
# Check all group members to see if they support execute_if_off.
# If at least one member has a color cluster and doesn't support it,
# it's not used.
for endpoint in member.device._endpoints.values():
for endpoint in member.device._endpoints.values(): # noqa: SLF001
for cluster_handler in endpoint.all_cluster_handlers.values():
if (
cluster_handler.name == CLUSTER_HANDLER_COLOR

View File

@ -376,7 +376,7 @@ class EnumSensor(Sensor):
def _init_from_quirks_metadata(self, entity_metadata: ZCLEnumMetadata) -> None:
"""Init this entity from the quirks metadata."""
ZhaEntity._init_from_quirks_metadata(self, entity_metadata) # pylint: disable=protected-access
ZhaEntity._init_from_quirks_metadata(self, entity_metadata) # noqa: SLF001
self._attribute_name = entity_metadata.attribute_name
self._enum = entity_metadata.enum

View File

@ -363,7 +363,7 @@ class Zone(collection.CollectionEntity):
"""Return entity instance initialized from storage."""
zone = cls(config)
zone.editable = True
zone._generate_attrs()
zone._generate_attrs() # noqa: SLF001
return zone
@classmethod
@ -371,7 +371,7 @@ class Zone(collection.CollectionEntity):
"""Return entity instance initialized from yaml."""
zone = cls(config)
zone.editable = False
zone._generate_attrs()
zone._generate_attrs() # noqa: SLF001
return zone
@property

View File

@ -2210,7 +2210,7 @@ class FirmwareUploadView(HomeAssistantView):
assert node.client.driver
# Increase max payload
request._client_max_size = 1024 * 1024 * 10 # pylint: disable=protected-access
request._client_max_size = 1024 * 1024 * 10 # noqa: SLF001
data = await request.post()

View File

@ -153,7 +153,7 @@ class ConfigEntryState(Enum):
"""Create new ConfigEntryState."""
obj = object.__new__(cls)
obj._value_ = value
obj._recoverable = recoverable
obj._recoverable = recoverable # noqa: SLF001
return obj
@property
@ -887,8 +887,7 @@ class ConfigEntry(Generic[_DataT]):
)
return False
if result:
# pylint: disable-next=protected-access
hass.config_entries._async_schedule_save()
hass.config_entries._async_schedule_save() # noqa: SLF001
except Exception: # pylint: disable=broad-except
_LOGGER.exception(
"Error migrating entry %s for %s", self.title, self.domain

View File

@ -1230,12 +1230,11 @@ class HomeAssistant:
def _cancel_cancellable_timers(self) -> None:
"""Cancel timer handles marked as cancellable."""
# pylint: disable-next=protected-access
handles: Iterable[asyncio.TimerHandle] = self.loop._scheduled # type: ignore[attr-defined]
handles: Iterable[asyncio.TimerHandle] = self.loop._scheduled # type: ignore[attr-defined] # noqa: SLF001
for handle in handles:
if (
not handle.cancelled()
and (args := handle._args) # pylint: disable=protected-access
and (args := handle._args) # noqa: SLF001
and type(job := args[0]) is HassJob
and job.cancel_on_shutdown
):
@ -1347,7 +1346,7 @@ class Event(Generic[_DataT]):
# _as_dict is marked as protected
# to avoid callers outside of this module
# from misusing it by mistake.
"context": self.context._as_dict, # pylint: disable=protected-access
"context": self.context._as_dict, # noqa: SLF001
}
def as_dict(self) -> ReadOnlyDict[str, Any]:
@ -1842,7 +1841,7 @@ class State:
# _as_dict is marked as protected
# to avoid callers outside of this module
# from misusing it by mistake.
"context": self.context._as_dict, # pylint: disable=protected-access
"context": self.context._as_dict, # noqa: SLF001
}
def as_dict(
@ -1897,7 +1896,7 @@ class State:
# _as_dict is marked as protected
# to avoid callers outside of this module
# from misusing it by mistake.
context = state_context._as_dict # pylint: disable=protected-access
context = state_context._as_dict # noqa: SLF001
compressed_state: CompressedState = {
COMPRESSED_STATE_STATE: self.state,
COMPRESSED_STATE_ATTRIBUTES: self.attributes,
@ -3078,7 +3077,7 @@ class Config:
"elevation": self.elevation,
# We don't want any integrations to use the name of the unit system
# so we are using the private attribute here
"unit_system_v2": self.units._name, # pylint: disable=protected-access
"unit_system_v2": self.units._name, # noqa: SLF001
"location_name": self.location_name,
"time_zone": self.time_zone,
"external_url": self.external_url,

View File

@ -155,8 +155,7 @@ def _async_create_clientsession(
# It's important that we identify as Home Assistant
# If a package requires a different user agent, override it by passing a headers
# dictionary to the request method.
# pylint: disable-next=protected-access
clientsession._default_headers = MappingProxyType( # type: ignore[assignment]
clientsession._default_headers = MappingProxyType( # type: ignore[assignment] # noqa: SLF001
{USER_AGENT: SERVER_SOFTWARE},
)

View File

@ -75,7 +75,7 @@ def get_integration_logger(fallback_name: str) -> logging.Logger:
def get_current_frame(depth: int = 0) -> FrameType:
"""Return the current frame."""
# Add one to depth since get_current_frame is included
return sys._getframe(depth + 1) # pylint: disable=protected-access
return sys._getframe(depth + 1) # noqa: SLF001
def get_integration_frame(exclude_integrations: set | None = None) -> IntegrationFrame:

View File

@ -356,7 +356,6 @@ class SchemaConfigFlowHandler(ConfigFlow, ABC):
self: SchemaConfigFlowHandler, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a config flow step."""
# pylint: disable-next=protected-access
return await self._common_handler.async_step(step_id, user_input)
return _async_step
@ -450,7 +449,6 @@ class SchemaOptionsFlowHandler(OptionsFlowWithConfigEntry):
self: SchemaConfigFlowHandler, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle an options flow step."""
# pylint: disable-next=protected-access
return await self._common_handler.async_step(step_id, user_input)
return _async_step

View File

@ -414,16 +414,15 @@ class _ScriptRun:
def _changed(self) -> None:
if not self._stop.done():
self._script._changed() # pylint: disable=protected-access
self._script._changed() # noqa: SLF001
async def _async_get_condition(self, config):
# pylint: disable-next=protected-access
return await self._script._async_get_condition(config)
return await self._script._async_get_condition(config) # noqa: SLF001
def _log(
self, msg: str, *args: Any, level: int = logging.INFO, **kwargs: Any
) -> None:
self._script._log( # pylint: disable=protected-access
self._script._log( # noqa: SLF001
msg, *args, level=level, **kwargs
)
@ -509,7 +508,7 @@ class _ScriptRun:
trace_element.update_variables(self._variables)
def _finish(self) -> None:
self._script._runs.remove(self) # pylint: disable=protected-access
self._script._runs.remove(self) # noqa: SLF001
if not self._script.is_running:
self._script.last_action = None
self._changed()
@ -848,8 +847,7 @@ class _ScriptRun:
repeat_vars["item"] = item
self._variables["repeat"] = repeat_vars
# pylint: disable-next=protected-access
script = self._script._get_repeat_script(self._step)
script = self._script._get_repeat_script(self._step) # noqa: SLF001
warned_too_many_loops = False
async def async_run_sequence(iteration, extra_msg=""):
@ -1005,8 +1003,7 @@ class _ScriptRun:
async def _async_choose_step(self) -> None:
"""Choose a sequence."""
# pylint: disable-next=protected-access
choose_data = await self._script._async_get_choose_data(self._step)
choose_data = await self._script._async_get_choose_data(self._step) # noqa: SLF001
with trace_path("choose"):
for idx, (conditions, script) in enumerate(choose_data["choices"]):
@ -1027,8 +1024,7 @@ class _ScriptRun:
async def _async_if_step(self) -> None:
"""If sequence."""
# pylint: disable-next=protected-access
if_data = await self._script._async_get_if_data(self._step)
if_data = await self._script._async_get_if_data(self._step) # noqa: SLF001
test_conditions = False
try:
@ -1190,8 +1186,7 @@ class _ScriptRun:
@async_trace_path("parallel")
async def _async_parallel_step(self) -> None:
"""Run a sequence in parallel."""
# pylint: disable-next=protected-access
scripts = await self._script._async_get_parallel_scripts(self._step)
scripts = await self._script._async_get_parallel_scripts(self._step) # noqa: SLF001
async def async_run_with_trace(idx: int, script: Script) -> None:
"""Run a script with a trace path."""
@ -1229,7 +1224,7 @@ class _QueuedScriptRun(_ScriptRun):
# shared lock. At the same time monitor if we've been told to stop.
try:
async with async_interrupt.interrupt(self._stop, ScriptStoppedError, None):
await self._script._queue_lck.acquire() # pylint: disable=protected-access
await self._script._queue_lck.acquire() # noqa: SLF001
except ScriptStoppedError as ex:
# If we've been told to stop, then just finish up.
self._finish()
@ -1241,7 +1236,7 @@ class _QueuedScriptRun(_ScriptRun):
def _finish(self) -> None:
if self.lock_acquired:
self._script._queue_lck.release() # pylint: disable=protected-access
self._script._queue_lck.release() # noqa: SLF001
self.lock_acquired = False
super()._finish()

View File

@ -702,15 +702,14 @@ class Template:
render_info = RenderInfo(self)
# pylint: disable=protected-access
if self.is_static:
render_info._result = self.template.strip()
render_info._freeze_static()
render_info._result = self.template.strip() # noqa: SLF001
render_info._freeze_static() # noqa: SLF001
return render_info
token = _render_info.set(render_info)
try:
render_info._result = self.async_render(
render_info._result = self.async_render( # noqa: SLF001
variables, strict=strict, log_fn=log_fn, **kwargs
)
except TemplateError as ex:
@ -718,7 +717,7 @@ class Template:
finally:
_render_info.reset(token)
render_info._freeze()
render_info._freeze() # noqa: SLF001
return render_info
def render_with_possible_json_value(self, value, error_value=_SENTINEL):
@ -1169,7 +1168,7 @@ def _state_generator(
#
container: Iterable[State]
if domain is None:
container = states._states.values() # pylint: disable=protected-access
container = states._states.values() # noqa: SLF001
else:
container = states.async_all(domain)
for state in container:

View File

@ -32,7 +32,7 @@ class UndefinedType(Enum):
_singleton = 0
UNDEFINED = UndefinedType._singleton # pylint: disable=protected-access
UNDEFINED = UndefinedType._singleton # noqa: SLF001
# The following types should not used and

View File

@ -88,7 +88,7 @@ class HassEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
Back ported from cpython 3.12
"""
with events._lock: # type: ignore[attr-defined] # pylint: disable=protected-access
with events._lock: # type: ignore[attr-defined] # noqa: SLF001
if self._watcher is None: # pragma: no branch
if can_use_pidfd():
self._watcher = asyncio.PidfdChildWatcher()
@ -96,7 +96,7 @@ class HassEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
self._watcher = asyncio.ThreadedChildWatcher()
if threading.current_thread() is threading.main_thread():
self._watcher.attach_loop(
self._local._loop # type: ignore[attr-defined] # pylint: disable=protected-access
self._local._loop # type: ignore[attr-defined] # noqa: SLF001
)
@property
@ -159,15 +159,14 @@ async def setup_and_run_hass(runtime_config: RuntimeConfig) -> int:
return 1
# threading._shutdown can deadlock forever
# pylint: disable-next=protected-access
threading._shutdown = deadlock_safe_shutdown # type: ignore[attr-defined]
threading._shutdown = deadlock_safe_shutdown # type: ignore[attr-defined] # noqa: SLF001
return await hass.async_run()
def _enable_posix_spawn() -> None:
"""Enable posix_spawn on Alpine Linux."""
if subprocess._USE_POSIX_SPAWN: # pylint: disable=protected-access
if subprocess._USE_POSIX_SPAWN: # noqa: SLF001
return
# The subprocess module does not know about Alpine Linux/musl
@ -175,8 +174,7 @@ def _enable_posix_spawn() -> None:
# less efficient. This is a workaround to force posix_spawn()
# when using musl since cpython is not aware its supported.
tag = next(packaging.tags.sys_tags())
# pylint: disable-next=protected-access
subprocess._USE_POSIX_SPAWN = "musllinux" in tag.platform
subprocess._USE_POSIX_SPAWN = "musllinux" in tag.platform # noqa: SLF001
def run(runtime_config: RuntimeConfig) -> int:

View File

@ -215,7 +215,7 @@ def check(config_dir, secrets=False):
def secrets_proxy(*args):
secrets = Secrets(*args)
res["secret_cache"] = secrets._cache # pylint: disable=protected-access
res["secret_cache"] = secrets._cache # noqa: SLF001
return secrets
try:

View File

@ -171,14 +171,12 @@ class Throttle:
else:
host = args[0] if args else wrapper
# pylint: disable=protected-access
if not hasattr(host, "_throttle"):
host._throttle = {}
host._throttle = {} # noqa: SLF001
if id(self) not in host._throttle:
host._throttle[id(self)] = [threading.Lock(), None]
throttle = host._throttle[id(self)]
# pylint: enable=protected-access
if id(self) not in host._throttle: # noqa: SLF001
host._throttle[id(self)] = [threading.Lock(), None] # noqa: SLF001
throttle = host._throttle[id(self)] # noqa: SLF001
if not throttle[0].acquire(False):
return throttled_value()

View File

@ -90,8 +90,7 @@ def serialize_response(response: web.Response) -> dict[str, Any]:
if (body := response.body) is None:
body_decoded = None
elif isinstance(body, payload.StringPayload):
# pylint: disable-next=protected-access
body_decoded = body._value.decode(body.encoding)
body_decoded = body._value.decode(body.encoding) # noqa: SLF001
elif isinstance(body, bytes):
body_decoded = body.decode(response.charset or "utf-8")
else:

View File

@ -24,7 +24,7 @@ EXECUTOR_SHUTDOWN_TIMEOUT = 10
def _log_thread_running_at_shutdown(name: str, ident: int) -> None:
"""Log the stack of a thread that was still running at shutdown."""
frames = sys._current_frames() # pylint: disable=protected-access
frames = sys._current_frames() # noqa: SLF001
stack = frames.get(ident)
formatted_stack = traceback.format_stack(stack)
_LOGGER.warning(

View File

@ -16,7 +16,6 @@ def _class_fields(cls: type, kw_only: bool) -> list[tuple[str, Any, Any]]:
Extracted from dataclasses._process_class.
"""
# pylint: disable=protected-access
cls_annotations = cls.__dict__.get("__annotations__", {})
cls_fields: list[dataclasses.Field[Any]] = []
@ -24,20 +23,20 @@ def _class_fields(cls: type, kw_only: bool) -> list[tuple[str, Any, Any]]:
_dataclasses = sys.modules[dataclasses.__name__]
for name, _type in cls_annotations.items():
# See if this is a marker to change the value of kw_only.
if dataclasses._is_kw_only(type, _dataclasses) or ( # type: ignore[attr-defined]
if dataclasses._is_kw_only(type, _dataclasses) or ( # type: ignore[attr-defined] # noqa: SLF001
isinstance(_type, str)
and dataclasses._is_type( # type: ignore[attr-defined]
and dataclasses._is_type( # type: ignore[attr-defined] # noqa: SLF001
_type,
cls,
_dataclasses,
dataclasses.KW_ONLY,
dataclasses._is_kw_only, # type: ignore[attr-defined]
dataclasses._is_kw_only, # type: ignore[attr-defined] # noqa: SLF001
)
):
kw_only = True
else:
# Otherwise it's a field of some type.
cls_fields.append(dataclasses._get_field(cls, name, _type, kw_only)) # type: ignore[attr-defined]
cls_fields.append(dataclasses._get_field(cls, name, _type, kw_only)) # type: ignore[attr-defined] # noqa: SLF001
return [(field.name, field.type, field) for field in cls_fields]

View File

@ -310,6 +310,7 @@ disable = [
"no-else-continue", # RET507
"no-else-raise", # RET506
"no-else-return", # RET505
"protected-access", # SLF001
# "no-self-use", # PLR6301 # Optional plugin, not enabled
# Handled by mypy
@ -399,9 +400,8 @@ enable = [
]
per-file-ignores = [
# hass-component-root-import: Tests test non-public APIs
# protected-access: Tests do often test internals a lot
# redefined-outer-name: Tests reference fixtures in the test function
"/tests/:hass-component-root-import,protected-access,redefined-outer-name",
"/tests/:hass-component-root-import,redefined-outer-name",
]
[tool.pylint.REPORTS]
@ -726,6 +726,7 @@ select = [
"S608", # hardcoded-sql-expression
"S609", # unix-command-wildcard-injection
"SIM", # flake8-simplify
"SLF", # flake8-self
"SLOT", # flake8-slots
"T100", # Trace found: {name} used
"T20", # flake8-print

View File

@ -44,7 +44,7 @@ async def test_full_flow(
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
state = config_entry_oauth2_flow._encode_jwt(
state = config_entry_oauth2_flow._encode_jwt( # noqa: SLF001
hass,
{
"flow_id": result["flow_id"],

View File

@ -104,7 +104,7 @@ def bump_version(
raise ValueError(f"Unsupported type: {bump_type}")
temp = Version("0")
temp._version = version._version._replace(**to_change)
temp._version = version._version._replace(**to_change) # noqa: SLF001
return Version(str(temp))

View File

@ -7,6 +7,7 @@ extend-ignore = [
"B904", # Use raise from to specify exception cause
"N815", # Variable {name} in class scope should not be mixedCase
"RUF018", # Avoid assignment expressions in assert statements
"SLF001", # Private member accessed: Tests do often test internals a lot
]
[lint.isort]