mirror of
https://github.com/home-assistant/core.git
synced 2025-04-23 00:37:53 +00:00
Use f-strings in integrations starting with "F"and"G" (#32150)
* Use f-strings in integrations starting with F * Use f-strings in tests for integrations starting with F * Use f-strings in integrations starting with G * Use f-strings in tests for integrations starting with G * Fix pylint error * Fix broken test
This commit is contained in:
parent
496bd3dddf
commit
ad102b3840
@ -247,8 +247,8 @@ class FibaroController:
|
||||
room_name = self._room_map[device.roomID].name
|
||||
device.room_name = room_name
|
||||
device.friendly_name = f"{room_name} {device.name}"
|
||||
device.ha_id = "scene_{}_{}_{}".format(
|
||||
slugify(room_name), slugify(device.name), device.id
|
||||
device.ha_id = (
|
||||
f"scene_{slugify(room_name)}_{slugify(device.name)}_{device.id}"
|
||||
)
|
||||
device.unique_id_str = f"{self.hub_serial}.scene.{device.id}"
|
||||
self._scene_map[device.id] = device
|
||||
@ -269,8 +269,8 @@ class FibaroController:
|
||||
room_name = self._room_map[device.roomID].name
|
||||
device.room_name = room_name
|
||||
device.friendly_name = f"{room_name} {device.name}"
|
||||
device.ha_id = "{}_{}_{}".format(
|
||||
slugify(room_name), slugify(device.name), device.id
|
||||
device.ha_id = (
|
||||
f"{slugify(room_name)}_{slugify(device.name)}_{device.id}"
|
||||
)
|
||||
if (
|
||||
device.enabled
|
||||
|
@ -1,7 +1,7 @@
|
||||
"""Support for Fibaro binary sensors."""
|
||||
import logging
|
||||
|
||||
from homeassistant.components.binary_sensor import ENTITY_ID_FORMAT, BinarySensorDevice
|
||||
from homeassistant.components.binary_sensor import DOMAIN, BinarySensorDevice
|
||||
from homeassistant.const import CONF_DEVICE_CLASS, CONF_ICON
|
||||
|
||||
from . import FIBARO_DEVICES, FibaroDevice
|
||||
@ -40,7 +40,7 @@ class FibaroBinarySensor(FibaroDevice, BinarySensorDevice):
|
||||
"""Initialize the binary_sensor."""
|
||||
self._state = None
|
||||
super().__init__(fibaro_device)
|
||||
self.entity_id = ENTITY_ID_FORMAT.format(self.ha_id)
|
||||
self.entity_id = f"{DOMAIN}.{self.ha_id}"
|
||||
stype = None
|
||||
devconf = fibaro_device.device_config
|
||||
if fibaro_device.type in SENSOR_TYPES:
|
||||
|
@ -4,7 +4,7 @@ import logging
|
||||
from homeassistant.components.cover import (
|
||||
ATTR_POSITION,
|
||||
ATTR_TILT_POSITION,
|
||||
ENTITY_ID_FORMAT,
|
||||
DOMAIN,
|
||||
CoverDevice,
|
||||
)
|
||||
|
||||
@ -29,7 +29,7 @@ class FibaroCover(FibaroDevice, CoverDevice):
|
||||
def __init__(self, fibaro_device):
|
||||
"""Initialize the Vera device."""
|
||||
super().__init__(fibaro_device)
|
||||
self.entity_id = ENTITY_ID_FORMAT.format(self.ha_id)
|
||||
self.entity_id = f"{DOMAIN}.{self.ha_id}"
|
||||
|
||||
@staticmethod
|
||||
def bound(position):
|
||||
|
@ -7,7 +7,7 @@ from homeassistant.components.light import (
|
||||
ATTR_BRIGHTNESS,
|
||||
ATTR_HS_COLOR,
|
||||
ATTR_WHITE_VALUE,
|
||||
ENTITY_ID_FORMAT,
|
||||
DOMAIN,
|
||||
SUPPORT_BRIGHTNESS,
|
||||
SUPPORT_COLOR,
|
||||
SUPPORT_WHITE_VALUE,
|
||||
@ -77,7 +77,7 @@ class FibaroLight(FibaroDevice, Light):
|
||||
self._supported_flags |= SUPPORT_WHITE_VALUE
|
||||
|
||||
super().__init__(fibaro_device)
|
||||
self.entity_id = ENTITY_ID_FORMAT.format(self.ha_id)
|
||||
self.entity_id = f"{DOMAIN}.{self.ha_id}"
|
||||
|
||||
@property
|
||||
def brightness(self):
|
||||
|
@ -1,7 +1,7 @@
|
||||
"""Support for Fibaro sensors."""
|
||||
import logging
|
||||
|
||||
from homeassistant.components.sensor import ENTITY_ID_FORMAT
|
||||
from homeassistant.components.sensor import DOMAIN
|
||||
from homeassistant.const import (
|
||||
CONCENTRATION_PARTS_PER_MILLION,
|
||||
DEVICE_CLASS_HUMIDITY,
|
||||
@ -53,7 +53,7 @@ class FibaroSensor(FibaroDevice, Entity):
|
||||
self.current_value = None
|
||||
self.last_changed_time = None
|
||||
super().__init__(fibaro_device)
|
||||
self.entity_id = ENTITY_ID_FORMAT.format(self.ha_id)
|
||||
self.entity_id = f"{DOMAIN}.{self.ha_id}"
|
||||
if fibaro_device.type in SENSOR_TYPES:
|
||||
self._unit = SENSOR_TYPES[fibaro_device.type][1]
|
||||
self._icon = SENSOR_TYPES[fibaro_device.type][2]
|
||||
|
@ -1,7 +1,7 @@
|
||||
"""Support for Fibaro switches."""
|
||||
import logging
|
||||
|
||||
from homeassistant.components.switch import ENTITY_ID_FORMAT, SwitchDevice
|
||||
from homeassistant.components.switch import DOMAIN, SwitchDevice
|
||||
from homeassistant.util import convert
|
||||
|
||||
from . import FIBARO_DEVICES, FibaroDevice
|
||||
@ -26,7 +26,7 @@ class FibaroSwitch(FibaroDevice, SwitchDevice):
|
||||
"""Initialize the Fibaro device."""
|
||||
self._state = False
|
||||
super().__init__(fibaro_device)
|
||||
self.entity_id = ENTITY_ID_FORMAT.format(self.ha_id)
|
||||
self.entity_id = f"{DOMAIN}.{self.ha_id}"
|
||||
|
||||
def turn_on(self, **kwargs):
|
||||
"""Turn device on."""
|
||||
|
@ -46,15 +46,11 @@ class FileNotificationService(BaseNotificationService):
|
||||
"""Send a message to a file."""
|
||||
with open(self.filepath, "a") as file:
|
||||
if os.stat(self.filepath).st_size == 0:
|
||||
title = "{} notifications (Log started: {})\n{}\n".format(
|
||||
kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT),
|
||||
dt_util.utcnow().isoformat(),
|
||||
"-" * 80,
|
||||
)
|
||||
title = f"{kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)} notifications (Log started: {dt_util.utcnow().isoformat()})\n{'-' * 80}\n"
|
||||
file.write(title)
|
||||
|
||||
if self.add_timestamp:
|
||||
text = "{} {}\n".format(dt_util.utcnow().isoformat(), message)
|
||||
text = f"{dt_util.utcnow().isoformat()} {message}\n"
|
||||
else:
|
||||
text = f"{message}\n"
|
||||
file.write(text)
|
||||
|
@ -181,16 +181,14 @@ def request_app_setup(hass, config, add_entities, config_path, discovery_info=No
|
||||
|
||||
start_url = f"{hass.config.api.base_url}{FITBIT_AUTH_CALLBACK_PATH}"
|
||||
|
||||
description = """Please create a Fitbit developer app at
|
||||
description = f"""Please create a Fitbit developer app at
|
||||
https://dev.fitbit.com/apps/new.
|
||||
For the OAuth 2.0 Application Type choose Personal.
|
||||
Set the Callback URL to {}.
|
||||
Set the Callback URL to {start_url}.
|
||||
They will provide you a Client ID and secret.
|
||||
These need to be saved into the file located at: {}.
|
||||
These need to be saved into the file located at: {config_path}.
|
||||
Then come back here and hit the below button.
|
||||
""".format(
|
||||
start_url, config_path
|
||||
)
|
||||
"""
|
||||
|
||||
submit = "I have saved my Client ID and Client Secret into fitbit.conf."
|
||||
|
||||
@ -308,9 +306,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
|
||||
config_file.get(ATTR_CLIENT_ID), config_file.get(ATTR_CLIENT_SECRET)
|
||||
)
|
||||
|
||||
redirect_uri = "{}{}".format(
|
||||
hass.config.api.base_url, FITBIT_AUTH_CALLBACK_PATH
|
||||
)
|
||||
redirect_uri = f"{hass.config.api.base_url}{FITBIT_AUTH_CALLBACK_PATH}"
|
||||
|
||||
fitbit_auth_start_url, _ = oauth.authorize_token_url(
|
||||
redirect_uri=redirect_uri,
|
||||
@ -355,26 +351,20 @@ class FitbitAuthCallbackView(HomeAssistantView):
|
||||
|
||||
result = None
|
||||
if data.get("code") is not None:
|
||||
redirect_uri = "{}{}".format(
|
||||
hass.config.api.base_url, FITBIT_AUTH_CALLBACK_PATH
|
||||
)
|
||||
redirect_uri = f"{hass.config.api.base_url}{FITBIT_AUTH_CALLBACK_PATH}"
|
||||
|
||||
try:
|
||||
result = self.oauth.fetch_access_token(data.get("code"), redirect_uri)
|
||||
except MissingTokenError as error:
|
||||
_LOGGER.error("Missing token: %s", error)
|
||||
response_message = """Something went wrong when
|
||||
response_message = f"""Something went wrong when
|
||||
attempting authenticating with Fitbit. The error
|
||||
encountered was {}. Please try again!""".format(
|
||||
error
|
||||
)
|
||||
encountered was {error}. Please try again!"""
|
||||
except MismatchingStateError as error:
|
||||
_LOGGER.error("Mismatched state, CSRF error: %s", error)
|
||||
response_message = """Something went wrong when
|
||||
response_message = f"""Something went wrong when
|
||||
attempting authenticating with Fitbit. The error
|
||||
encountered was {}. Please try again!""".format(
|
||||
error
|
||||
)
|
||||
encountered was {error}. Please try again!"""
|
||||
else:
|
||||
_LOGGER.error("Unknown error when authing")
|
||||
response_message = """Something went wrong when
|
||||
@ -389,10 +379,8 @@ class FitbitAuthCallbackView(HomeAssistantView):
|
||||
An unknown error occurred. Please try again!
|
||||
"""
|
||||
|
||||
html_response = """<html><head><title>Fitbit Auth</title></head>
|
||||
<body><h1>{}</h1></body></html>""".format(
|
||||
response_message
|
||||
)
|
||||
html_response = f"""<html><head><title>Fitbit Auth</title></head>
|
||||
<body><h1>{response_message}</h1></body></html>"""
|
||||
|
||||
if result:
|
||||
config_contents = {
|
||||
@ -424,7 +412,7 @@ class FitbitSensor(Entity):
|
||||
self.extra = extra
|
||||
self._name = FITBIT_RESOURCES_LIST[self.resource_type][0]
|
||||
if self.extra:
|
||||
self._name = "{0} Battery".format(self.extra.get("deviceVersion"))
|
||||
self._name = f"{self.extra.get('deviceVersion')} Battery"
|
||||
unit_type = FITBIT_RESOURCES_LIST[self.resource_type][1]
|
||||
if unit_type == "":
|
||||
split_resource = self.resource_type.split("/")
|
||||
@ -460,7 +448,7 @@ class FitbitSensor(Entity):
|
||||
if self.resource_type == "devices/battery" and self.extra:
|
||||
battery_level = BATTERY_LEVELS[self.extra.get("battery")]
|
||||
return icon_for_battery_level(battery_level=battery_level, charging=None)
|
||||
return "mdi:{}".format(FITBIT_RESOURCES_LIST[self.resource_type][2])
|
||||
return f"mdi:{FITBIT_RESOURCES_LIST[self.resource_type][2]}"
|
||||
|
||||
@property
|
||||
def device_state_attributes(self):
|
||||
@ -513,7 +501,7 @@ class FitbitSensor(Entity):
|
||||
self._state = raw_state
|
||||
else:
|
||||
try:
|
||||
self._state = "{0:,}".format(int(raw_state))
|
||||
self._state = f"{int(raw_state):,}"
|
||||
except TypeError:
|
||||
self._state = raw_state
|
||||
|
||||
|
@ -168,7 +168,7 @@ class FlicButton(BinarySensorDevice):
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the device."""
|
||||
return "flic_{}".format(self.address.replace(":", ""))
|
||||
return f"flic_{self.address.replace(':', '')}"
|
||||
|
||||
@property
|
||||
def address(self):
|
||||
@ -192,9 +192,7 @@ class FlicButton(BinarySensorDevice):
|
||||
|
||||
def _queued_event_check(self, click_type, time_diff):
|
||||
"""Generate a log message and returns true if timeout exceeded."""
|
||||
time_string = "{:d} {}".format(
|
||||
time_diff, "second" if time_diff == 1 else "seconds"
|
||||
)
|
||||
time_string = f"{time_diff:d} {'second' if time_diff == 1 else 'seconds'}"
|
||||
|
||||
if time_diff > self._timeout:
|
||||
_LOGGER.warning(
|
||||
|
@ -164,7 +164,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
|
||||
"""Update lights."""
|
||||
await flux.async_flux_update()
|
||||
|
||||
service_name = slugify("{} {}".format(name, "update"))
|
||||
service_name = slugify(f"{name} update")
|
||||
hass.services.async_register(DOMAIN, service_name, async_update)
|
||||
|
||||
|
||||
|
@ -167,7 +167,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
|
||||
ipaddr = device["ipaddr"]
|
||||
if ipaddr in light_ips:
|
||||
continue
|
||||
device["name"] = "{} {}".format(device["id"], ipaddr)
|
||||
device["name"] = f"{device['id']} {ipaddr}"
|
||||
device[ATTR_MODE] = None
|
||||
device[CONF_PROTOCOL] = None
|
||||
device[CONF_CUSTOM_EFFECT] = None
|
||||
|
@ -101,7 +101,7 @@ class FoobotSensor(Entity):
|
||||
"""Initialize the sensor."""
|
||||
self._uuid = device["uuid"]
|
||||
self.foobot_data = data
|
||||
self._name = "Foobot {} {}".format(device["name"], SENSOR_TYPES[sensor_type][0])
|
||||
self._name = f"Foobot {device['name']} {SENSOR_TYPES[sensor_type][0]}"
|
||||
self.type = sensor_type
|
||||
self._unit_of_measurement = SENSOR_TYPES[sensor_type][1]
|
||||
|
||||
|
@ -190,12 +190,7 @@ class HassFoscamCamera(Camera):
|
||||
async def stream_source(self):
|
||||
"""Return the stream source."""
|
||||
if self._rtsp_port:
|
||||
return "rtsp://{}:{}@{}:{}/videoMain".format(
|
||||
self._username,
|
||||
self._password,
|
||||
self._foscam_session.host,
|
||||
self._rtsp_port,
|
||||
)
|
||||
return f"rtsp://{self._username}:{self._password}@{self._foscam_session.host}:{self._rtsp_port}/videoMain"
|
||||
return None
|
||||
|
||||
@property
|
||||
|
@ -52,12 +52,7 @@ def setup(hass, config):
|
||||
|
||||
def checkin_user(call):
|
||||
"""Check a user in on Swarm."""
|
||||
url = (
|
||||
"https://api.foursquare.com/v2/checkins/add"
|
||||
"?oauth_token={}"
|
||||
"&v=20160802"
|
||||
"&m=swarm"
|
||||
).format(config[CONF_ACCESS_TOKEN])
|
||||
url = f"https://api.foursquare.com/v2/checkins/add?oauth_token={config[CONF_ACCESS_TOKEN]}&v=20160802&m=swarm"
|
||||
response = requests.post(url, data=call.data, timeout=10)
|
||||
|
||||
if response.status_code not in (200, 201):
|
||||
|
@ -78,9 +78,9 @@ class FritzboxSwitch(SwitchDevice):
|
||||
attrs[ATTR_STATE_LOCKED] = self._device.lock
|
||||
|
||||
if self._device.has_powermeter:
|
||||
attrs[ATTR_TOTAL_CONSUMPTION] = "{:.3f}".format(
|
||||
(self._device.energy or 0.0) / 1000
|
||||
)
|
||||
attrs[
|
||||
ATTR_TOTAL_CONSUMPTION
|
||||
] = f"{((self._device.energy or 0.0) / 1000):.3f}"
|
||||
attrs[ATTR_TOTAL_CONSUMPTION_UNIT] = ATTR_TOTAL_CONSUMPTION_UNIT_VALUE
|
||||
if self._device.has_temperature_sensor:
|
||||
attrs[ATTR_TEMPERATURE] = str(
|
||||
|
@ -90,11 +90,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
|
||||
device = condition[CONF_DEVICE]
|
||||
sensor_type = condition[CONF_SENSOR_TYPE]
|
||||
scope = condition[CONF_SCOPE]
|
||||
name = "Fronius {} {} {}".format(
|
||||
condition[CONF_SENSOR_TYPE].replace("_", " ").capitalize(),
|
||||
device if scope == SCOPE_DEVICE else SCOPE_SYSTEM,
|
||||
config[CONF_RESOURCE],
|
||||
)
|
||||
name = f"Fronius {condition[CONF_SENSOR_TYPE].replace('_', ' ').capitalize()} {device if scope == SCOPE_DEVICE else SCOPE_SYSTEM} {config[CONF_RESOURCE]}"
|
||||
if sensor_type == TYPE_INVERTER:
|
||||
if scope == SCOPE_SYSTEM:
|
||||
adapter_cls = FroniusInverterSystem
|
||||
@ -258,9 +254,7 @@ class FroniusTemplateSensor(Entity):
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the sensor."""
|
||||
return "{} {}".format(
|
||||
self._name.replace("_", " ").capitalize(), self.parent.name
|
||||
)
|
||||
return f"{self._name.replace('_', ' ').capitalize()} {self.parent.name}"
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
|
@ -50,8 +50,8 @@ MANIFEST_JSON = {
|
||||
"display": "standalone",
|
||||
"icons": [
|
||||
{
|
||||
"src": "/static/icons/favicon-{size}x{size}.png".format(size=size),
|
||||
"sizes": "{size}x{size}".format(size=size),
|
||||
"src": f"/static/icons/favicon-{size}x{size}.png",
|
||||
"sizes": f"{size}x{size}",
|
||||
"type": "image/png",
|
||||
"purpose": "maskable any",
|
||||
}
|
||||
|
@ -9,7 +9,6 @@ from homeassistant.components import websocket_api
|
||||
|
||||
DATA_STORAGE = "frontend_storage"
|
||||
STORAGE_VERSION_USER_DATA = 1
|
||||
STORAGE_KEY_USER_DATA = "frontend.user_data_{}"
|
||||
|
||||
|
||||
async def async_setup_frontend_storage(hass):
|
||||
@ -31,8 +30,7 @@ def with_store(orig_func):
|
||||
|
||||
if store is None:
|
||||
store = stores[user_id] = hass.helpers.storage.Store(
|
||||
STORAGE_VERSION_USER_DATA,
|
||||
STORAGE_KEY_USER_DATA.format(connection.user.id),
|
||||
STORAGE_VERSION_USER_DATA, f"frontend.user_data_{connection.user.id}"
|
||||
)
|
||||
|
||||
if user_id not in data:
|
||||
|
@ -55,7 +55,6 @@ SUPPORT_FRONTIER_SILICON = (
|
||||
|
||||
DEFAULT_PORT = 80
|
||||
DEFAULT_PASSWORD = "1234"
|
||||
DEVICE_URL = "http://{0}:{1}/device"
|
||||
|
||||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
|
||||
{
|
||||
@ -83,7 +82,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
|
||||
|
||||
try:
|
||||
async_add_entities(
|
||||
[AFSAPIDevice(DEVICE_URL.format(host, port), password, name)], True
|
||||
[AFSAPIDevice(f"http://{host}:{port}/device", password, name)], True
|
||||
)
|
||||
_LOGGER.debug("FSAPI device %s:%s -> %s", host, port, password)
|
||||
return True
|
||||
|
@ -251,9 +251,7 @@ class GaradgetCover(CoverDevice):
|
||||
|
||||
def _get_variable(self, var):
|
||||
"""Get latest status."""
|
||||
url = "{}/v1/devices/{}/{}?access_token={}".format(
|
||||
self.particle_url, self.device_id, var, self.access_token
|
||||
)
|
||||
url = f"{self.particle_url}/v1/devices/{self.device_id}/{var}?access_token={self.access_token}"
|
||||
ret = requests.get(url, timeout=10)
|
||||
result = {}
|
||||
for pairs in ret.json()["result"].split("|"):
|
||||
|
@ -28,10 +28,6 @@ from .const import (
|
||||
DOMAIN,
|
||||
FEED,
|
||||
PLATFORMS,
|
||||
SIGNAL_DELETE_ENTITY,
|
||||
SIGNAL_NEW_GEOLOCATION,
|
||||
SIGNAL_STATUS,
|
||||
SIGNAL_UPDATE_ENTITY,
|
||||
VALID_CATEGORIES,
|
||||
)
|
||||
|
||||
@ -181,7 +177,7 @@ class GdacsFeedEntityManager:
|
||||
@callback
|
||||
def async_event_new_entity(self):
|
||||
"""Return manager specific event to signal new entity."""
|
||||
return SIGNAL_NEW_GEOLOCATION.format(self._config_entry_id)
|
||||
return f"gdacs_new_geolocation_{self._config_entry_id}"
|
||||
|
||||
def get_entry(self, external_id):
|
||||
"""Get feed entry by external id."""
|
||||
@ -199,14 +195,14 @@ class GdacsFeedEntityManager:
|
||||
|
||||
async def _update_entity(self, external_id):
|
||||
"""Update entity."""
|
||||
async_dispatcher_send(self._hass, SIGNAL_UPDATE_ENTITY.format(external_id))
|
||||
async_dispatcher_send(self._hass, f"gdacs_update_{external_id}")
|
||||
|
||||
async def _remove_entity(self, external_id):
|
||||
"""Remove entity."""
|
||||
async_dispatcher_send(self._hass, SIGNAL_DELETE_ENTITY.format(external_id))
|
||||
async_dispatcher_send(self._hass, f"gdacs_delete_{external_id}")
|
||||
|
||||
async def _status_update(self, status_info):
|
||||
"""Propagate status update."""
|
||||
_LOGGER.debug("Status update received: %s", status_info)
|
||||
self._status_info = status_info
|
||||
async_dispatcher_send(self._hass, SIGNAL_STATUS.format(self._config_entry_id))
|
||||
async_dispatcher_send(self._hass, f"gdacs_status_{self._config_entry_id}")
|
||||
|
@ -15,11 +15,5 @@ DEFAULT_ICON = "mdi:alert"
|
||||
DEFAULT_RADIUS = 500.0
|
||||
DEFAULT_SCAN_INTERVAL = timedelta(minutes=5)
|
||||
|
||||
SIGNAL_DELETE_ENTITY = "gdacs_delete_{}"
|
||||
SIGNAL_UPDATE_ENTITY = "gdacs_update_{}"
|
||||
SIGNAL_STATUS = "gdacs_status_{}"
|
||||
|
||||
SIGNAL_NEW_GEOLOCATION = "gdacs_new_geolocation_{}"
|
||||
|
||||
# Fetch valid categories from integration library.
|
||||
VALID_CATEGORIES = list(EVENT_TYPE_MAP.values())
|
||||
|
@ -13,13 +13,7 @@ from homeassistant.core import callback
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.util.unit_system import IMPERIAL_SYSTEM
|
||||
|
||||
from .const import (
|
||||
DEFAULT_ICON,
|
||||
DOMAIN,
|
||||
FEED,
|
||||
SIGNAL_DELETE_ENTITY,
|
||||
SIGNAL_UPDATE_ENTITY,
|
||||
)
|
||||
from .const import DEFAULT_ICON, DOMAIN, FEED
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -102,14 +96,10 @@ class GdacsEvent(GeolocationEvent):
|
||||
async def async_added_to_hass(self):
|
||||
"""Call when entity is added to hass."""
|
||||
self._remove_signal_delete = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_DELETE_ENTITY.format(self._external_id),
|
||||
self._delete_callback,
|
||||
self.hass, f"gdacs_delete_{self._external_id}", self._delete_callback
|
||||
)
|
||||
self._remove_signal_update = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_UPDATE_ENTITY.format(self._external_id),
|
||||
self._update_callback,
|
||||
self.hass, f"gdacs_update_{self._external_id}", self._update_callback
|
||||
)
|
||||
|
||||
async def async_will_remove_from_hass(self) -> None:
|
||||
|
@ -7,7 +7,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.util import dt
|
||||
|
||||
from .const import DEFAULT_ICON, DOMAIN, FEED, SIGNAL_STATUS
|
||||
from .const import DEFAULT_ICON, DOMAIN, FEED
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -55,7 +55,7 @@ class GdacsSensor(Entity):
|
||||
"""Call when entity is added to hass."""
|
||||
self._remove_signal_status = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_STATUS.format(self._config_entry_id),
|
||||
f"gdacs_status_{self._config_entry_id}",
|
||||
self._update_status_callback,
|
||||
)
|
||||
_LOGGER.debug("Waiting for updates %s", self._config_entry_id)
|
||||
|
@ -29,9 +29,6 @@ DEFAULT_UNIT_OF_MEASUREMENT = "km"
|
||||
|
||||
SCAN_INTERVAL = timedelta(minutes=5)
|
||||
|
||||
SIGNAL_DELETE_ENTITY = "geo_json_events_delete_{}"
|
||||
SIGNAL_UPDATE_ENTITY = "geo_json_events_update_{}"
|
||||
|
||||
SOURCE = "geo_json_events"
|
||||
|
||||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
|
||||
@ -108,11 +105,11 @@ class GeoJsonFeedEntityManager:
|
||||
|
||||
def _update_entity(self, external_id):
|
||||
"""Update entity."""
|
||||
dispatcher_send(self._hass, SIGNAL_UPDATE_ENTITY.format(external_id))
|
||||
dispatcher_send(self._hass, f"geo_json_events_update_{external_id}")
|
||||
|
||||
def _remove_entity(self, external_id):
|
||||
"""Remove entity."""
|
||||
dispatcher_send(self._hass, SIGNAL_DELETE_ENTITY.format(external_id))
|
||||
dispatcher_send(self._hass, f"geo_json_events_delete_{external_id}")
|
||||
|
||||
|
||||
class GeoJsonLocationEvent(GeolocationEvent):
|
||||
@ -133,12 +130,12 @@ class GeoJsonLocationEvent(GeolocationEvent):
|
||||
"""Call when entity is added to hass."""
|
||||
self._remove_signal_delete = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_DELETE_ENTITY.format(self._external_id),
|
||||
f"geo_json_events_delete_{self._external_id}",
|
||||
self._delete_callback,
|
||||
)
|
||||
self._remove_signal_update = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_UPDATE_ENTITY.format(self._external_id),
|
||||
f"geo_json_events_update_{self._external_id}",
|
||||
self._update_callback,
|
||||
)
|
||||
|
||||
|
@ -118,9 +118,7 @@ class GeoRssServiceSensor(Entity):
|
||||
@property
|
||||
def name(self):
|
||||
"""Return the name of the sensor."""
|
||||
return "{} {}".format(
|
||||
self._service_name, "Any" if self._category is None else self._category
|
||||
)
|
||||
return f"{self._service_name} {'Any' if self._category is None else self._category}"
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
|
@ -114,7 +114,7 @@ def _is_mobile_beacon(data, mobile_beacons):
|
||||
def _device_name(data):
|
||||
"""Return name of device tracker."""
|
||||
if ATTR_BEACON_ID in data:
|
||||
return "{}_{}".format(BEACON_DEV_PREFIX, data["name"])
|
||||
return f"{BEACON_DEV_PREFIX}_{data['name']}"
|
||||
return data["device"]
|
||||
|
||||
|
||||
|
@ -34,10 +34,6 @@ from .const import (
|
||||
DOMAIN,
|
||||
FEED,
|
||||
PLATFORMS,
|
||||
SIGNAL_DELETE_ENTITY,
|
||||
SIGNAL_NEW_GEOLOCATION,
|
||||
SIGNAL_STATUS,
|
||||
SIGNAL_UPDATE_ENTITY,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -200,7 +196,7 @@ class GeonetnzQuakesFeedEntityManager:
|
||||
@callback
|
||||
def async_event_new_entity(self):
|
||||
"""Return manager specific event to signal new entity."""
|
||||
return SIGNAL_NEW_GEOLOCATION.format(self._config_entry_id)
|
||||
return f"geonetnz_quakes_new_geolocation_{self._config_entry_id}"
|
||||
|
||||
def get_entry(self, external_id):
|
||||
"""Get feed entry by external id."""
|
||||
@ -222,14 +218,16 @@ class GeonetnzQuakesFeedEntityManager:
|
||||
|
||||
async def _update_entity(self, external_id):
|
||||
"""Update entity."""
|
||||
async_dispatcher_send(self._hass, SIGNAL_UPDATE_ENTITY.format(external_id))
|
||||
async_dispatcher_send(self._hass, f"geonetnz_quakes_update_{external_id}")
|
||||
|
||||
async def _remove_entity(self, external_id):
|
||||
"""Remove entity."""
|
||||
async_dispatcher_send(self._hass, SIGNAL_DELETE_ENTITY.format(external_id))
|
||||
async_dispatcher_send(self._hass, f"geonetnz_quakes_delete_{external_id}")
|
||||
|
||||
async def _status_update(self, status_info):
|
||||
"""Propagate status update."""
|
||||
_LOGGER.debug("Status update received: %s", status_info)
|
||||
self._status_info = status_info
|
||||
async_dispatcher_send(self._hass, SIGNAL_STATUS.format(self._config_entry_id))
|
||||
async_dispatcher_send(
|
||||
self._hass, f"geonetnz_quakes_status_{self._config_entry_id}"
|
||||
)
|
||||
|
@ -15,9 +15,3 @@ DEFAULT_MINIMUM_MAGNITUDE = 0.0
|
||||
DEFAULT_MMI = 3
|
||||
DEFAULT_RADIUS = 50.0
|
||||
DEFAULT_SCAN_INTERVAL = timedelta(minutes=5)
|
||||
|
||||
SIGNAL_DELETE_ENTITY = "geonetnz_quakes_delete_{}"
|
||||
SIGNAL_UPDATE_ENTITY = "geonetnz_quakes_update_{}"
|
||||
SIGNAL_STATUS = "geonetnz_quakes_status_{}"
|
||||
|
||||
SIGNAL_NEW_GEOLOCATION = "geonetnz_quakes_new_geolocation_{}"
|
||||
|
@ -14,7 +14,7 @@ from homeassistant.core import callback
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.util.unit_system import IMPERIAL_SYSTEM
|
||||
|
||||
from .const import DOMAIN, FEED, SIGNAL_DELETE_ENTITY, SIGNAL_UPDATE_ENTITY
|
||||
from .const import DOMAIN, FEED
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -75,12 +75,12 @@ class GeonetnzQuakesEvent(GeolocationEvent):
|
||||
"""Call when entity is added to hass."""
|
||||
self._remove_signal_delete = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_DELETE_ENTITY.format(self._external_id),
|
||||
f"geonetnz_quakes_delete_{self._external_id}",
|
||||
self._delete_callback,
|
||||
)
|
||||
self._remove_signal_update = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_UPDATE_ENTITY.format(self._external_id),
|
||||
f"geonetnz_quakes_update_{self._external_id}",
|
||||
self._update_callback,
|
||||
)
|
||||
|
||||
|
@ -7,7 +7,7 @@ from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.helpers.entity import Entity
|
||||
from homeassistant.util import dt
|
||||
|
||||
from .const import DOMAIN, FEED, SIGNAL_STATUS
|
||||
from .const import DOMAIN, FEED
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -53,7 +53,7 @@ class GeonetnzQuakesSensor(Entity):
|
||||
"""Call when entity is added to hass."""
|
||||
self._remove_signal_status = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_STATUS.format(self._config_entry_id),
|
||||
f"geonetnz_quakes_status_{self._config_entry_id}",
|
||||
self._update_status_callback,
|
||||
)
|
||||
_LOGGER.debug("Waiting for updates %s", self._config_entry_id)
|
||||
|
@ -24,14 +24,7 @@ from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.util.unit_system import METRIC_SYSTEM
|
||||
|
||||
from .config_flow import configured_instances
|
||||
from .const import (
|
||||
DEFAULT_RADIUS,
|
||||
DEFAULT_SCAN_INTERVAL,
|
||||
DOMAIN,
|
||||
FEED,
|
||||
SIGNAL_NEW_SENSOR,
|
||||
SIGNAL_UPDATE_ENTITY,
|
||||
)
|
||||
from .const import DEFAULT_RADIUS, DEFAULT_SCAN_INTERVAL, DOMAIN, FEED
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
@ -173,7 +166,7 @@ class GeonetnzVolcanoFeedEntityManager:
|
||||
@callback
|
||||
def async_event_new_entity(self):
|
||||
"""Return manager specific event to signal new entity."""
|
||||
return SIGNAL_NEW_SENSOR.format(self._config_entry_id)
|
||||
return f"geonetnz_volcano_new_sensor_{self._config_entry_id}"
|
||||
|
||||
def get_entry(self, external_id):
|
||||
"""Get feed entry by external id."""
|
||||
@ -199,7 +192,7 @@ class GeonetnzVolcanoFeedEntityManager:
|
||||
|
||||
async def _update_entity(self, external_id):
|
||||
"""Update entity."""
|
||||
async_dispatcher_send(self._hass, SIGNAL_UPDATE_ENTITY.format(external_id))
|
||||
async_dispatcher_send(self._hass, f"geonetnz_volcano_update_{external_id}")
|
||||
|
||||
async def _remove_entity(self, external_id):
|
||||
"""Ignore removing entity."""
|
||||
|
@ -14,6 +14,3 @@ ATTR_HAZARDS = "hazards"
|
||||
DEFAULT_ICON = "mdi:image-filter-hdr"
|
||||
DEFAULT_RADIUS = 50.0
|
||||
DEFAULT_SCAN_INTERVAL = timedelta(minutes=5)
|
||||
|
||||
SIGNAL_NEW_SENSOR = "geonetnz_volcano_new_sensor_{}"
|
||||
SIGNAL_UPDATE_ENTITY = "geonetnz_volcano_update_{}"
|
||||
|
@ -23,7 +23,6 @@ from .const import (
|
||||
DEFAULT_ICON,
|
||||
DOMAIN,
|
||||
FEED,
|
||||
SIGNAL_UPDATE_ENTITY,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -79,7 +78,7 @@ class GeonetnzVolcanoSensor(Entity):
|
||||
"""Call when entity is added to hass."""
|
||||
self._remove_signal_update = async_dispatcher_connect(
|
||||
self.hass,
|
||||
SIGNAL_UPDATE_ENTITY.format(self._external_id),
|
||||
f"geonetnz_volcano_update_{self._external_id}",
|
||||
self._update_callback,
|
||||
)
|
||||
|
||||
|
@ -51,9 +51,7 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
|
||||
except (TypeError, KeyError, NameError, ValueError) as ex:
|
||||
_LOGGER.error("%s", ex)
|
||||
hass.components.persistent_notification.create(
|
||||
"Error: {}<br />"
|
||||
"You will need to restart hass after fixing."
|
||||
"".format(ex),
|
||||
(f"Error: {ex}<br />You will need to restart hass after fixing."),
|
||||
title=NOTIFICATION_TITLE,
|
||||
notification_id=NOTIFICATION_ID,
|
||||
)
|
||||
|
@ -144,17 +144,17 @@ def do_authentication(hass, hass_config, config):
|
||||
dev_flow = oauth.step1_get_device_and_user_codes()
|
||||
except OAuth2DeviceCodeError as err:
|
||||
hass.components.persistent_notification.create(
|
||||
"Error: {}<br />You will need to restart hass after fixing." "".format(err),
|
||||
f"Error: {err}<br />You will need to restart hass after fixing." "",
|
||||
title=NOTIFICATION_TITLE,
|
||||
notification_id=NOTIFICATION_ID,
|
||||
)
|
||||
return False
|
||||
|
||||
hass.components.persistent_notification.create(
|
||||
"In order to authorize Home-Assistant to view your calendars "
|
||||
'you must visit: <a href="{}" target="_blank">{}</a> and enter '
|
||||
"code: {}".format(
|
||||
dev_flow.verification_url, dev_flow.verification_url, dev_flow.user_code
|
||||
(
|
||||
f"In order to authorize Home-Assistant to view your calendars "
|
||||
f'you must visit: <a href="{dev_flow.verification_url}" target="_blank">{dev_flow.verification_url}</a> and enter '
|
||||
f"code: {dev_flow.user_code}"
|
||||
),
|
||||
title=NOTIFICATION_TITLE,
|
||||
notification_id=NOTIFICATION_ID,
|
||||
@ -182,8 +182,10 @@ def do_authentication(hass, hass_config, config):
|
||||
do_setup(hass, hass_config, config)
|
||||
listener()
|
||||
hass.components.persistent_notification.create(
|
||||
"We are all setup now. Check {} for calendars that have "
|
||||
"been found".format(YAML_DEVICES),
|
||||
(
|
||||
f"We are all setup now. Check {YAML_DEVICES} for calendars that have "
|
||||
f"been found"
|
||||
),
|
||||
title=NOTIFICATION_TITLE,
|
||||
notification_id=NOTIFICATION_ID,
|
||||
)
|
||||
|
@ -53,7 +53,7 @@ def _get_homegraph_jwt(time, iss, key):
|
||||
|
||||
async def _get_homegraph_token(hass, jwt_signed):
|
||||
headers = {
|
||||
"Authorization": "Bearer {}".format(jwt_signed),
|
||||
"Authorization": f"Bearer {jwt_signed}",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
}
|
||||
data = {
|
||||
@ -185,7 +185,7 @@ class GoogleConfig(AbstractConfig):
|
||||
|
||||
async def _call():
|
||||
headers = {
|
||||
"Authorization": "Bearer {}".format(self._access_token),
|
||||
"Authorization": f"Bearer {self._access_token}",
|
||||
"X-GFE-SSL": "yes",
|
||||
}
|
||||
async with session.post(url, headers=headers, json=data) as res:
|
||||
|
@ -392,9 +392,7 @@ class ColorSettingTrait(_Trait):
|
||||
if temp < min_temp or temp > max_temp:
|
||||
raise SmartHomeError(
|
||||
ERR_VALUE_OUT_OF_RANGE,
|
||||
"Temperature should be between {} and {}".format(
|
||||
min_temp, max_temp
|
||||
),
|
||||
f"Temperature should be between {min_temp} and {max_temp}",
|
||||
)
|
||||
|
||||
await self.hass.services.async_call(
|
||||
@ -407,7 +405,7 @@ class ColorSettingTrait(_Trait):
|
||||
|
||||
elif "spectrumRGB" in params["color"]:
|
||||
# Convert integer to hex format and left pad with 0's till length 6
|
||||
hex_value = "{0:06x}".format(params["color"]["spectrumRGB"])
|
||||
hex_value = f"{params['color']['spectrumRGB']:06x}"
|
||||
color = color_util.color_RGB_to_hs(
|
||||
*color_util.rgb_hex_to_rgb_list(hex_value)
|
||||
)
|
||||
@ -746,9 +744,7 @@ class TemperatureSettingTrait(_Trait):
|
||||
if temp < min_temp or temp > max_temp:
|
||||
raise SmartHomeError(
|
||||
ERR_VALUE_OUT_OF_RANGE,
|
||||
"Temperature should be between {} and {}".format(
|
||||
min_temp, max_temp
|
||||
),
|
||||
f"Temperature should be between {min_temp} and {max_temp}",
|
||||
)
|
||||
|
||||
await self.hass.services.async_call(
|
||||
@ -769,8 +765,10 @@ class TemperatureSettingTrait(_Trait):
|
||||
if temp_high < min_temp or temp_high > max_temp:
|
||||
raise SmartHomeError(
|
||||
ERR_VALUE_OUT_OF_RANGE,
|
||||
"Upper bound for temperature range should be between "
|
||||
"{} and {}".format(min_temp, max_temp),
|
||||
(
|
||||
f"Upper bound for temperature range should be between "
|
||||
f"{min_temp} and {max_temp}"
|
||||
),
|
||||
)
|
||||
|
||||
temp_low = temp_util.convert(
|
||||
@ -782,8 +780,10 @@ class TemperatureSettingTrait(_Trait):
|
||||
if temp_low < min_temp or temp_low > max_temp:
|
||||
raise SmartHomeError(
|
||||
ERR_VALUE_OUT_OF_RANGE,
|
||||
"Lower bound for temperature range should be between "
|
||||
"{} and {}".format(min_temp, max_temp),
|
||||
(
|
||||
f"Lower bound for temperature range should be between "
|
||||
f"{min_temp} and {max_temp}"
|
||||
),
|
||||
)
|
||||
|
||||
supported = self.state.attributes.get(ATTR_SUPPORTED_FEATURES)
|
||||
|
@ -18,8 +18,6 @@ INTERVAL = timedelta(minutes=5)
|
||||
|
||||
DEFAULT_TIMEOUT = 10
|
||||
|
||||
UPDATE_URL = "https://{}:{}@domains.google.com/nic/update"
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema(
|
||||
{
|
||||
DOMAIN: vol.Schema(
|
||||
@ -62,7 +60,7 @@ async def async_setup(hass, config):
|
||||
|
||||
async def _update_google_domains(hass, session, domain, user, password, timeout):
|
||||
"""Update Google Domains."""
|
||||
url = UPDATE_URL.format(user, password)
|
||||
url = f"https://{user}:{password}@domains.google.com/nic/update"
|
||||
|
||||
params = {"hostname": domain}
|
||||
|
||||
|
@ -55,9 +55,7 @@ class GoogleMapsScanner:
|
||||
self.scan_interval = config.get(CONF_SCAN_INTERVAL) or timedelta(seconds=60)
|
||||
self._prev_seen = {}
|
||||
|
||||
credfile = "{}.{}".format(
|
||||
hass.config.path(CREDENTIALS_FILE), slugify(self.username)
|
||||
)
|
||||
credfile = f"{hass.config.path(CREDENTIALS_FILE)}.{slugify(self.username)}"
|
||||
try:
|
||||
self.service = Service(credfile, self.username)
|
||||
self._update_info()
|
||||
@ -75,7 +73,7 @@ class GoogleMapsScanner:
|
||||
def _update_info(self, now=None):
|
||||
for person in self.service.get_all_people():
|
||||
try:
|
||||
dev_id = "google_maps_{0}".format(slugify(person.id))
|
||||
dev_id = f"google_maps_{slugify(person.id)}"
|
||||
except TypeError:
|
||||
_LOGGER.warning("No location(s) shared with this account")
|
||||
return
|
||||
|
@ -163,7 +163,7 @@ def setup_platform(hass, config, add_entities_callback, discovery_info=None):
|
||||
options[CONF_MODE] = travel_mode
|
||||
|
||||
titled_mode = options.get(CONF_MODE).title()
|
||||
formatted_name = "{} - {}".format(DEFAULT_NAME, titled_mode)
|
||||
formatted_name = f"{DEFAULT_NAME} - {titled_mode}"
|
||||
name = config.get(CONF_NAME, formatted_name)
|
||||
api_key = config.get(CONF_API_KEY)
|
||||
origin = config.get(CONF_ORIGIN)
|
||||
|
@ -107,11 +107,7 @@ class GEMSensor(Entity):
|
||||
@property
|
||||
def unique_id(self):
|
||||
"""Return a unique ID for this sensor."""
|
||||
return "{serial}-{sensor_type}-{number}".format(
|
||||
serial=self._monitor_serial_number,
|
||||
sensor_type=self._sensor_type,
|
||||
number=self._number,
|
||||
)
|
||||
return f"{self._monitor_serial_number}-{self._sensor_type }-{self._number}"
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
@ -249,9 +245,7 @@ class PulseCounter(GEMSensor):
|
||||
@property
|
||||
def unit_of_measurement(self):
|
||||
"""Return the unit of measurement for this pulse counter."""
|
||||
return "{counted_quantity}/{time_unit}".format(
|
||||
counted_quantity=self._counted_quantity, time_unit=self._time_unit
|
||||
)
|
||||
return f"{self._counted_quantity}/{self._time_unit}"
|
||||
|
||||
@property
|
||||
def device_state_attributes(self):
|
||||
|
@ -74,7 +74,7 @@ GROUP_SCHEMA = vol.All(
|
||||
CONF_ICON: cv.icon,
|
||||
CONF_ALL: cv.boolean,
|
||||
}
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema(
|
||||
@ -231,7 +231,7 @@ async def async_setup(hass, config):
|
||||
async def groups_service_handler(service):
|
||||
"""Handle dynamic group service functions."""
|
||||
object_id = service.data[ATTR_OBJECT_ID]
|
||||
entity_id = ENTITY_ID_FORMAT.format(object_id)
|
||||
entity_id = f"{DOMAIN}.{object_id}"
|
||||
group = component.get_entity(entity_id)
|
||||
|
||||
# new group
|
||||
@ -311,7 +311,7 @@ async def async_setup(hass, config):
|
||||
vol.Exclusive(ATTR_ENTITIES, "entities"): cv.entity_ids,
|
||||
vol.Exclusive(ATTR_ADD_ENTITIES, "entities"): cv.entity_ids,
|
||||
}
|
||||
),
|
||||
)
|
||||
),
|
||||
)
|
||||
|
||||
@ -336,7 +336,7 @@ async def _async_process_config(hass, config, component):
|
||||
# Don't create tasks and await them all. The order is important as
|
||||
# groups get a number based on creation order.
|
||||
await Group.async_create_group(
|
||||
hass, name, entity_ids, icon=icon, object_id=object_id, mode=mode,
|
||||
hass, name, entity_ids, icon=icon, object_id=object_id, mode=mode
|
||||
)
|
||||
|
||||
|
||||
@ -388,7 +388,7 @@ class Group(Entity):
|
||||
"""Initialize a group."""
|
||||
return asyncio.run_coroutine_threadsafe(
|
||||
Group.async_create_group(
|
||||
hass, name, entity_ids, user_defined, icon, object_id, mode,
|
||||
hass, name, entity_ids, user_defined, icon, object_id, mode
|
||||
),
|
||||
hass.loop,
|
||||
).result()
|
||||
|
@ -143,7 +143,7 @@ def get_next_departure(
|
||||
tomorrow_where = f"OR calendar.{tomorrow_name} = 1"
|
||||
tomorrow_order = f"calendar.{tomorrow_name} DESC,"
|
||||
|
||||
sql_query = """
|
||||
sql_query = f"""
|
||||
SELECT trip.trip_id, trip.route_id,
|
||||
time(origin_stop_time.arrival_time) AS origin_arrival_time,
|
||||
time(origin_stop_time.departure_time) AS origin_depart_time,
|
||||
@ -162,8 +162,8 @@ def get_next_departure(
|
||||
destination_stop_time.stop_headsign AS dest_stop_headsign,
|
||||
destination_stop_time.stop_sequence AS dest_stop_sequence,
|
||||
destination_stop_time.timepoint AS dest_stop_timepoint,
|
||||
calendar.{yesterday_name} AS yesterday,
|
||||
calendar.{today_name} AS today,
|
||||
calendar.{yesterday.strftime("%A").lower()} AS yesterday,
|
||||
calendar.{now.strftime("%A").lower()} AS today,
|
||||
{tomorrow_select}
|
||||
calendar.start_date AS start_date,
|
||||
calendar.end_date AS end_date
|
||||
@ -178,8 +178,8 @@ def get_next_departure(
|
||||
ON trip.trip_id = destination_stop_time.trip_id
|
||||
INNER JOIN stops end_station
|
||||
ON destination_stop_time.stop_id = end_station.stop_id
|
||||
WHERE (calendar.{yesterday_name} = 1
|
||||
OR calendar.{today_name} = 1
|
||||
WHERE (calendar.{yesterday.strftime("%A").lower()} = 1
|
||||
OR calendar.{now.strftime("%A").lower()} = 1
|
||||
{tomorrow_where}
|
||||
)
|
||||
AND start_station.stop_id = :origin_station_id
|
||||
@ -187,18 +187,12 @@ def get_next_departure(
|
||||
AND origin_stop_sequence < dest_stop_sequence
|
||||
AND calendar.start_date <= :today
|
||||
AND calendar.end_date >= :today
|
||||
ORDER BY calendar.{yesterday_name} DESC,
|
||||
calendar.{today_name} DESC,
|
||||
ORDER BY calendar.{yesterday.strftime("%A").lower()} DESC,
|
||||
calendar.{now.strftime("%A").lower()} DESC,
|
||||
{tomorrow_order}
|
||||
origin_stop_time.departure_time
|
||||
LIMIT :limit
|
||||
""".format(
|
||||
yesterday_name=yesterday.strftime("%A").lower(),
|
||||
today_name=now.strftime("%A").lower(),
|
||||
tomorrow_select=tomorrow_select,
|
||||
tomorrow_where=tomorrow_where,
|
||||
tomorrow_order=tomorrow_order,
|
||||
)
|
||||
"""
|
||||
result = schedule.engine.execute(
|
||||
text(sql_query),
|
||||
origin_station_id=start_station_id,
|
||||
@ -220,7 +214,7 @@ def get_next_departure(
|
||||
if yesterday_start is None:
|
||||
yesterday_start = row["origin_depart_date"]
|
||||
if yesterday_start != row["origin_depart_date"]:
|
||||
idx = "{} {}".format(now_date, row["origin_depart_time"])
|
||||
idx = f"{now_date} {row['origin_depart_time']}"
|
||||
timetable[idx] = {**row, **extras}
|
||||
yesterday_last = idx
|
||||
|
||||
@ -233,7 +227,7 @@ def get_next_departure(
|
||||
idx_prefix = now_date
|
||||
else:
|
||||
idx_prefix = tomorrow_date
|
||||
idx = "{} {}".format(idx_prefix, row["origin_depart_time"])
|
||||
idx = f"{idx_prefix} {row['origin_depart_time']}"
|
||||
timetable[idx] = {**row, **extras}
|
||||
today_last = idx
|
||||
|
||||
@ -247,7 +241,7 @@ def get_next_departure(
|
||||
tomorrow_start = row["origin_depart_date"]
|
||||
extras["first"] = True
|
||||
if tomorrow_start == row["origin_depart_date"]:
|
||||
idx = "{} {}".format(tomorrow_date, row["origin_depart_time"])
|
||||
idx = f"{tomorrow_date} {row['origin_depart_time']}"
|
||||
timetable[idx] = {**row, **extras}
|
||||
|
||||
# Flag last departures.
|
||||
@ -273,24 +267,27 @@ def get_next_departure(
|
||||
origin_arrival = now
|
||||
if item["origin_arrival_time"] > item["origin_depart_time"]:
|
||||
origin_arrival -= datetime.timedelta(days=1)
|
||||
origin_arrival_time = "{} {}".format(
|
||||
origin_arrival.strftime(dt_util.DATE_STR_FORMAT), item["origin_arrival_time"]
|
||||
origin_arrival_time = (
|
||||
f"{origin_arrival.strftime(dt_util.DATE_STR_FORMAT)} "
|
||||
f"{item['origin_arrival_time']}"
|
||||
)
|
||||
|
||||
origin_depart_time = "{} {}".format(now_date, item["origin_depart_time"])
|
||||
origin_depart_time = f"{now_date} {item['origin_depart_time']}"
|
||||
|
||||
dest_arrival = now
|
||||
if item["dest_arrival_time"] < item["origin_depart_time"]:
|
||||
dest_arrival += datetime.timedelta(days=1)
|
||||
dest_arrival_time = "{} {}".format(
|
||||
dest_arrival.strftime(dt_util.DATE_STR_FORMAT), item["dest_arrival_time"]
|
||||
dest_arrival_time = (
|
||||
f"{dest_arrival.strftime(dt_util.DATE_STR_FORMAT)} "
|
||||
f"{item['dest_arrival_time']}"
|
||||
)
|
||||
|
||||
dest_depart = dest_arrival
|
||||
if item["dest_depart_time"] < item["dest_arrival_time"]:
|
||||
dest_depart += datetime.timedelta(days=1)
|
||||
dest_depart_time = "{} {}".format(
|
||||
dest_depart.strftime(dt_util.DATE_STR_FORMAT), item["dest_depart_time"]
|
||||
dest_depart_time = (
|
||||
f"{dest_depart.strftime(dt_util.DATE_STR_FORMAT)} "
|
||||
f"{item['dest_depart_time']}"
|
||||
)
|
||||
|
||||
depart_time = dt_util.parse_datetime(origin_depart_time)
|
||||
@ -511,15 +508,13 @@ class GTFSDepartureSensor(Entity):
|
||||
else:
|
||||
self._icon = ICON
|
||||
|
||||
name = "{agency} {origin} to {destination} next departure"
|
||||
if not self._departure:
|
||||
name = "{default}"
|
||||
self._name = self._custom_name or name.format(
|
||||
agency=getattr(self._agency, "agency_name", DEFAULT_NAME),
|
||||
default=DEFAULT_NAME,
|
||||
origin=self.origin,
|
||||
destination=self.destination,
|
||||
name = (
|
||||
f"{getattr(self._agency, 'agency_name', DEFAULT_NAME)} "
|
||||
f"{self.origin} to {self.destination} next departure"
|
||||
)
|
||||
if not self._departure:
|
||||
name = f"{DEFAULT_NAME}"
|
||||
self._name = self._custom_name or name
|
||||
|
||||
def update_attributes(self) -> None:
|
||||
"""Update state attributes."""
|
||||
|
@ -119,7 +119,7 @@ def mock_open_file():
|
||||
def test_check_box_health(caplog):
|
||||
"""Test check box health."""
|
||||
with requests_mock.Mocker() as mock_req:
|
||||
url = "http://{}:{}/healthz".format(MOCK_IP, MOCK_PORT)
|
||||
url = f"http://{MOCK_IP}:{MOCK_PORT}/healthz"
|
||||
mock_req.get(url, status_code=HTTP_OK, json=MOCK_HEALTH)
|
||||
assert fb.check_box_health(url, "user", "pass") == MOCK_BOX_ID
|
||||
|
||||
@ -184,7 +184,7 @@ async def test_process_image(hass, mock_healthybox, mock_image):
|
||||
hass.bus.async_listen("image_processing.detect_face", mock_face_event)
|
||||
|
||||
with requests_mock.Mocker() as mock_req:
|
||||
url = "http://{}:{}/facebox/check".format(MOCK_IP, MOCK_PORT)
|
||||
url = f"http://{MOCK_IP}:{MOCK_PORT}/facebox/check"
|
||||
mock_req.post(url, json=MOCK_JSON)
|
||||
data = {ATTR_ENTITY_ID: VALID_ENTITY_ID}
|
||||
await hass.services.async_call(ip.DOMAIN, ip.SERVICE_SCAN, service_data=data)
|
||||
@ -219,7 +219,7 @@ async def test_process_image_errors(hass, mock_healthybox, mock_image, caplog):
|
||||
|
||||
# Test connection error.
|
||||
with requests_mock.Mocker() as mock_req:
|
||||
url = "http://{}:{}/facebox/check".format(MOCK_IP, MOCK_PORT)
|
||||
url = f"http://{MOCK_IP}:{MOCK_PORT}/facebox/check"
|
||||
mock_req.register_uri("POST", url, exc=requests.exceptions.ConnectTimeout)
|
||||
data = {ATTR_ENTITY_ID: VALID_ENTITY_ID}
|
||||
await hass.services.async_call(ip.DOMAIN, ip.SERVICE_SCAN, service_data=data)
|
||||
@ -233,7 +233,7 @@ async def test_process_image_errors(hass, mock_healthybox, mock_image, caplog):
|
||||
|
||||
# Now test with bad auth.
|
||||
with requests_mock.Mocker() as mock_req:
|
||||
url = "http://{}:{}/facebox/check".format(MOCK_IP, MOCK_PORT)
|
||||
url = f"http://{MOCK_IP}:{MOCK_PORT}/facebox/check"
|
||||
mock_req.register_uri("POST", url, status_code=HTTP_UNAUTHORIZED)
|
||||
data = {ATTR_ENTITY_ID: VALID_ENTITY_ID}
|
||||
await hass.services.async_call(ip.DOMAIN, ip.SERVICE_SCAN, service_data=data)
|
||||
@ -253,7 +253,7 @@ async def test_teach_service(
|
||||
|
||||
# Test successful teach.
|
||||
with requests_mock.Mocker() as mock_req:
|
||||
url = "http://{}:{}/facebox/teach".format(MOCK_IP, MOCK_PORT)
|
||||
url = f"http://{MOCK_IP}:{MOCK_PORT}/facebox/teach"
|
||||
mock_req.post(url, status_code=HTTP_OK)
|
||||
data = {
|
||||
ATTR_ENTITY_ID: VALID_ENTITY_ID,
|
||||
@ -267,7 +267,7 @@ async def test_teach_service(
|
||||
|
||||
# Now test with bad auth.
|
||||
with requests_mock.Mocker() as mock_req:
|
||||
url = "http://{}:{}/facebox/teach".format(MOCK_IP, MOCK_PORT)
|
||||
url = f"http://{MOCK_IP}:{MOCK_PORT}/facebox/teach"
|
||||
mock_req.post(url, status_code=HTTP_UNAUTHORIZED)
|
||||
data = {
|
||||
ATTR_ENTITY_ID: VALID_ENTITY_ID,
|
||||
@ -282,7 +282,7 @@ async def test_teach_service(
|
||||
|
||||
# Now test the failed teaching.
|
||||
with requests_mock.Mocker() as mock_req:
|
||||
url = "http://{}:{}/facebox/teach".format(MOCK_IP, MOCK_PORT)
|
||||
url = f"http://{MOCK_IP}:{MOCK_PORT}/facebox/teach"
|
||||
mock_req.post(url, status_code=HTTP_BAD_REQUEST, text=MOCK_ERROR_NO_FACE)
|
||||
data = {
|
||||
ATTR_ENTITY_ID: VALID_ENTITY_ID,
|
||||
@ -297,7 +297,7 @@ async def test_teach_service(
|
||||
|
||||
# Now test connection error.
|
||||
with requests_mock.Mocker() as mock_req:
|
||||
url = "http://{}:{}/facebox/teach".format(MOCK_IP, MOCK_PORT)
|
||||
url = f"http://{MOCK_IP}:{MOCK_PORT}/facebox/teach"
|
||||
mock_req.post(url, exc=requests.exceptions.ConnectTimeout)
|
||||
data = {
|
||||
ATTR_ENTITY_ID: VALID_ENTITY_ID,
|
||||
@ -313,7 +313,7 @@ async def test_teach_service(
|
||||
|
||||
async def test_setup_platform_with_name(hass, mock_healthybox):
|
||||
"""Set up platform with one entity and a name."""
|
||||
named_entity_id = "image_processing.{}".format(MOCK_NAME)
|
||||
named_entity_id = f"image_processing.{MOCK_NAME}"
|
||||
|
||||
valid_config_named = VALID_CONFIG.copy()
|
||||
valid_config_named[ip.DOMAIN][ip.CONF_SOURCE][ip.CONF_NAME] = MOCK_NAME
|
||||
|
@ -119,14 +119,10 @@ async def test_if_fires_on_state_change(hass, calls):
|
||||
hass.states.async_set("fan.entity", STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0].data["some"] == "turn_on - device - {} - off - on - None".format(
|
||||
"fan.entity"
|
||||
)
|
||||
assert calls[0].data["some"] == "turn_on - device - fan.entity - off - on - None"
|
||||
|
||||
# Fake that the entity is turning off.
|
||||
hass.states.async_set("fan.entity", STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 2
|
||||
assert calls[1].data["some"] == "turn_off - device - {} - on - off - None".format(
|
||||
"fan.entity"
|
||||
)
|
||||
assert calls[1].data["some"] == "turn_off - device - fan.entity - on - off - None"
|
||||
|
@ -39,7 +39,7 @@ class TestFeedreaderComponent(unittest.TestCase):
|
||||
"""Initialize values for this testcase class."""
|
||||
self.hass = get_test_home_assistant()
|
||||
# Delete any previously stored data
|
||||
data_file = self.hass.config.path("{}.pickle".format("feedreader"))
|
||||
data_file = self.hass.config.path(f"{feedreader.DOMAIN}.pickle")
|
||||
if exists(data_file):
|
||||
remove(data_file)
|
||||
|
||||
@ -85,7 +85,7 @@ class TestFeedreaderComponent(unittest.TestCase):
|
||||
# Loading raw data from fixture and plug in to data object as URL
|
||||
# works since the third-party feedparser library accepts a URL
|
||||
# as well as the actual data.
|
||||
data_file = self.hass.config.path("{}.pickle".format(feedreader.DOMAIN))
|
||||
data_file = self.hass.config.path(f"{feedreader.DOMAIN}.pickle")
|
||||
storage = StoredData(data_file)
|
||||
with patch(
|
||||
"homeassistant.components.feedreader.track_time_interval"
|
||||
@ -179,7 +179,7 @@ class TestFeedreaderComponent(unittest.TestCase):
|
||||
@mock.patch("feedparser.parse", return_value=None)
|
||||
def test_feed_parsing_failed(self, mock_parse):
|
||||
"""Test feed where parsing fails."""
|
||||
data_file = self.hass.config.path("{}.pickle".format(feedreader.DOMAIN))
|
||||
data_file = self.hass.config.path(f"{feedreader.DOMAIN}.pickle")
|
||||
storage = StoredData(data_file)
|
||||
manager = FeedManager(
|
||||
"FEED DATA", DEFAULT_SCAN_INTERVAL, DEFAULT_MAX_ENTRIES, self.hass, storage
|
||||
|
@ -56,8 +56,9 @@ class TestNotifyFile(unittest.TestCase):
|
||||
):
|
||||
|
||||
mock_st.return_value.st_size = 0
|
||||
title = "{} notifications (Log started: {})\n{}\n".format(
|
||||
ATTR_TITLE_DEFAULT, dt_util.utcnow().isoformat(), "-" * 80
|
||||
title = (
|
||||
f"{ATTR_TITLE_DEFAULT} notifications "
|
||||
f"(Log started: {dt_util.utcnow().isoformat()})\n{'-' * 80}\n"
|
||||
)
|
||||
|
||||
self.hass.services.call(
|
||||
@ -72,12 +73,12 @@ class TestNotifyFile(unittest.TestCase):
|
||||
if not timestamp:
|
||||
assert m_open.return_value.write.call_args_list == [
|
||||
call(title),
|
||||
call("{}\n".format(message)),
|
||||
call(f"{message}\n"),
|
||||
]
|
||||
else:
|
||||
assert m_open.return_value.write.call_args_list == [
|
||||
call(title),
|
||||
call("{} {}\n".format(dt_util.utcnow().isoformat(), message)),
|
||||
call(f"{dt_util.utcnow().isoformat()} {message}\n"),
|
||||
]
|
||||
|
||||
def test_notify_file(self):
|
||||
|
@ -923,9 +923,9 @@ async def test_flux_with_multiple_lights(hass):
|
||||
|
||||
def event_date(hass, event, now=None):
|
||||
if event == SUN_EVENT_SUNRISE:
|
||||
print("sunrise {}".format(sunrise_time))
|
||||
print(f"sunrise {sunrise_time}")
|
||||
return sunrise_time
|
||||
print("sunset {}".format(sunset_time))
|
||||
print(f"sunset {sunset_time}")
|
||||
return sunset_time
|
||||
|
||||
with patch(
|
||||
|
@ -1,7 +1,7 @@
|
||||
"""The tests for frontend storage."""
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.frontend import storage
|
||||
from homeassistant.components.frontend import DOMAIN
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ async def test_get_user_data_empty(hass, hass_ws_client, hass_storage):
|
||||
|
||||
async def test_get_user_data(hass, hass_ws_client, hass_admin_user, hass_storage):
|
||||
"""Test get_user_data command."""
|
||||
storage_key = storage.STORAGE_KEY_USER_DATA.format(hass_admin_user.id)
|
||||
storage_key = f"{DOMAIN}.user_data_{hass_admin_user.id}"
|
||||
hass_storage[storage_key] = {
|
||||
"key": storage_key,
|
||||
"version": 1,
|
||||
@ -102,7 +102,7 @@ async def test_set_user_data_empty(hass, hass_ws_client, hass_storage):
|
||||
|
||||
async def test_set_user_data(hass, hass_ws_client, hass_storage, hass_admin_user):
|
||||
"""Test set_user_data command with initial data."""
|
||||
storage_key = storage.STORAGE_KEY_USER_DATA.format(hass_admin_user.id)
|
||||
storage_key = f"{DOMAIN}.user_data_{hass_admin_user.id}"
|
||||
hass_storage[storage_key] = {
|
||||
"version": 1,
|
||||
"data": {"test-key": "test-value", "test-complex": "string"},
|
||||
|
@ -5,8 +5,6 @@ from homeassistant.components import geo_location
|
||||
from homeassistant.components.geo_json_events.geo_location import (
|
||||
ATTR_EXTERNAL_ID,
|
||||
SCAN_INTERVAL,
|
||||
SIGNAL_DELETE_ENTITY,
|
||||
SIGNAL_UPDATE_ENTITY,
|
||||
)
|
||||
from homeassistant.components.geo_location import ATTR_SOURCE
|
||||
from homeassistant.const import (
|
||||
@ -190,8 +188,8 @@ async def test_setup_race_condition(hass):
|
||||
|
||||
# Set up some mock feed entries for this test.
|
||||
mock_entry_1 = _generate_mock_feed_entry("1234", "Title 1", 15.5, (-31.0, 150.0))
|
||||
delete_signal = SIGNAL_DELETE_ENTITY.format("1234")
|
||||
update_signal = SIGNAL_UPDATE_ENTITY.format("1234")
|
||||
delete_signal = f"geo_json_events_delete_1234"
|
||||
update_signal = f"geo_json_events_update_1234"
|
||||
|
||||
# Patching 'utcnow' to gain more control over the timed update.
|
||||
utcnow = dt_util.utcnow()
|
||||
|
@ -163,7 +163,7 @@ async def webhook_id(hass, geofency_client):
|
||||
|
||||
async def test_data_validation(geofency_client, webhook_id):
|
||||
"""Test data validation."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
|
||||
# No data
|
||||
req = await geofency_client.post(url)
|
||||
@ -181,14 +181,14 @@ async def test_data_validation(geofency_client, webhook_id):
|
||||
|
||||
async def test_gps_enter_and_exit_home(hass, geofency_client, webhook_id):
|
||||
"""Test GPS based zone enter and exit."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
|
||||
# Enter the Home zone
|
||||
req = await geofency_client.post(url, data=GPS_ENTER_HOME)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify(GPS_ENTER_HOME["device"])
|
||||
state_name = hass.states.get("{}.{}".format("device_tracker", device_name)).state
|
||||
state_name = hass.states.get(f"device_tracker.{device_name}").state
|
||||
assert STATE_HOME == state_name
|
||||
|
||||
# Exit the Home zone
|
||||
@ -196,7 +196,7 @@ async def test_gps_enter_and_exit_home(hass, geofency_client, webhook_id):
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify(GPS_EXIT_HOME["device"])
|
||||
state_name = hass.states.get("{}.{}".format("device_tracker", device_name)).state
|
||||
state_name = hass.states.get(f"device_tracker.{device_name}").state
|
||||
assert STATE_NOT_HOME == state_name
|
||||
|
||||
# Exit the Home zone with "Send Current Position" enabled
|
||||
@ -208,13 +208,13 @@ async def test_gps_enter_and_exit_home(hass, geofency_client, webhook_id):
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify(GPS_EXIT_HOME["device"])
|
||||
current_latitude = hass.states.get(
|
||||
"{}.{}".format("device_tracker", device_name)
|
||||
).attributes["latitude"]
|
||||
current_latitude = hass.states.get(f"device_tracker.{device_name}").attributes[
|
||||
"latitude"
|
||||
]
|
||||
assert NOT_HOME_LATITUDE == current_latitude
|
||||
current_longitude = hass.states.get(
|
||||
"{}.{}".format("device_tracker", device_name)
|
||||
).attributes["longitude"]
|
||||
current_longitude = hass.states.get(f"device_tracker.{device_name}").attributes[
|
||||
"longitude"
|
||||
]
|
||||
assert NOT_HOME_LONGITUDE == current_longitude
|
||||
|
||||
dev_reg = await hass.helpers.device_registry.async_get_registry()
|
||||
@ -226,43 +226,43 @@ async def test_gps_enter_and_exit_home(hass, geofency_client, webhook_id):
|
||||
|
||||
async def test_beacon_enter_and_exit_home(hass, geofency_client, webhook_id):
|
||||
"""Test iBeacon based zone enter and exit - a.k.a stationary iBeacon."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
|
||||
# Enter the Home zone
|
||||
req = await geofency_client.post(url, data=BEACON_ENTER_HOME)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify("beacon_{}".format(BEACON_ENTER_HOME["name"]))
|
||||
state_name = hass.states.get("{}.{}".format("device_tracker", device_name)).state
|
||||
device_name = slugify(f"beacon_{BEACON_ENTER_HOME['name']}")
|
||||
state_name = hass.states.get(f"device_tracker.{device_name}").state
|
||||
assert STATE_HOME == state_name
|
||||
|
||||
# Exit the Home zone
|
||||
req = await geofency_client.post(url, data=BEACON_EXIT_HOME)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify("beacon_{}".format(BEACON_ENTER_HOME["name"]))
|
||||
state_name = hass.states.get("{}.{}".format("device_tracker", device_name)).state
|
||||
device_name = slugify(f"beacon_{BEACON_ENTER_HOME['name']}")
|
||||
state_name = hass.states.get(f"device_tracker.{device_name}").state
|
||||
assert STATE_NOT_HOME == state_name
|
||||
|
||||
|
||||
async def test_beacon_enter_and_exit_car(hass, geofency_client, webhook_id):
|
||||
"""Test use of mobile iBeacon."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
|
||||
# Enter the Car away from Home zone
|
||||
req = await geofency_client.post(url, data=BEACON_ENTER_CAR)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify("beacon_{}".format(BEACON_ENTER_CAR["name"]))
|
||||
state_name = hass.states.get("{}.{}".format("device_tracker", device_name)).state
|
||||
device_name = slugify(f"beacon_{BEACON_ENTER_CAR['name']}")
|
||||
state_name = hass.states.get(f"device_tracker.{device_name}").state
|
||||
assert STATE_NOT_HOME == state_name
|
||||
|
||||
# Exit the Car away from Home zone
|
||||
req = await geofency_client.post(url, data=BEACON_EXIT_CAR)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify("beacon_{}".format(BEACON_ENTER_CAR["name"]))
|
||||
state_name = hass.states.get("{}.{}".format("device_tracker", device_name)).state
|
||||
device_name = slugify(f"beacon_{BEACON_ENTER_CAR['name']}")
|
||||
state_name = hass.states.get(f"device_tracker.{device_name}").state
|
||||
assert STATE_NOT_HOME == state_name
|
||||
|
||||
# Enter the Car in the Home zone
|
||||
@ -272,29 +272,29 @@ async def test_beacon_enter_and_exit_car(hass, geofency_client, webhook_id):
|
||||
req = await geofency_client.post(url, data=data)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify("beacon_{}".format(data["name"]))
|
||||
state_name = hass.states.get("{}.{}".format("device_tracker", device_name)).state
|
||||
device_name = slugify(f"beacon_{data['name']}")
|
||||
state_name = hass.states.get(f"device_tracker.{device_name}").state
|
||||
assert STATE_HOME == state_name
|
||||
|
||||
# Exit the Car in the Home zone
|
||||
req = await geofency_client.post(url, data=data)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify("beacon_{}".format(data["name"]))
|
||||
state_name = hass.states.get("{}.{}".format("device_tracker", device_name)).state
|
||||
device_name = slugify(f"beacon_{data['name']}")
|
||||
state_name = hass.states.get(f"device_tracker.{device_name}").state
|
||||
assert STATE_HOME == state_name
|
||||
|
||||
|
||||
async def test_load_unload_entry(hass, geofency_client, webhook_id):
|
||||
"""Test that the appropriate dispatch signals are added and removed."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
|
||||
# Enter the Home zone
|
||||
req = await geofency_client.post(url, data=GPS_ENTER_HOME)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
device_name = slugify(GPS_ENTER_HOME["device"])
|
||||
state_1 = hass.states.get("{}.{}".format("device_tracker", device_name))
|
||||
state_1 = hass.states.get(f"device_tracker.{device_name}")
|
||||
assert STATE_HOME == state_1.state
|
||||
|
||||
assert len(hass.data[DOMAIN]["devices"]) == 1
|
||||
@ -307,7 +307,7 @@ async def test_load_unload_entry(hass, geofency_client, webhook_id):
|
||||
assert await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state_2 = hass.states.get("{}.{}".format("device_tracker", device_name))
|
||||
state_2 = hass.states.get(f"device_tracker.{device_name}")
|
||||
assert state_2 is not None
|
||||
assert state_1 is not state_2
|
||||
|
||||
|
@ -218,7 +218,7 @@ async def test_offset_in_progress_event(hass, mock_next_event):
|
||||
event = copy.deepcopy(TEST_EVENT)
|
||||
event["start"]["dateTime"] = start
|
||||
event["end"]["dateTime"] = end
|
||||
event["summary"] = "{} !!-15".format(event_summary)
|
||||
event["summary"] = f"{event_summary} !!-15"
|
||||
mock_next_event.return_value.event = event
|
||||
|
||||
assert await async_setup_component(hass, "google", {"google": GOOGLE_CONFIG})
|
||||
@ -250,7 +250,7 @@ async def test_all_day_offset_in_progress_event(hass, mock_next_event):
|
||||
event = copy.deepcopy(TEST_EVENT)
|
||||
event["start"]["date"] = start
|
||||
event["end"]["date"] = end
|
||||
event["summary"] = "{} !!-25:0".format(event_summary)
|
||||
event["summary"] = f"{event_summary} !!-25:0"
|
||||
mock_next_event.return_value.event = event
|
||||
|
||||
assert await async_setup_component(hass, "google", {"google": GOOGLE_CONFIG})
|
||||
@ -282,7 +282,7 @@ async def test_all_day_offset_event(hass, mock_next_event):
|
||||
event = copy.deepcopy(TEST_EVENT)
|
||||
event["start"]["date"] = start
|
||||
event["end"]["date"] = end
|
||||
event["summary"] = "{} !!-{}:0".format(event_summary, offset_hours)
|
||||
event["summary"] = f"{event_summary} !!-{offset_hours}:0"
|
||||
mock_next_event.return_value.event = event
|
||||
|
||||
assert await async_setup_component(hass, "google", {"google": GOOGLE_CONFIG})
|
||||
|
@ -31,7 +31,7 @@ ACCESS_TOKEN = "superdoublesecret"
|
||||
@pytest.fixture
|
||||
def auth_header(hass_access_token):
|
||||
"""Generate an HTTP header with bearer token authorization."""
|
||||
return {AUTHORIZATION: "Bearer {}".format(hass_access_token)}
|
||||
return {AUTHORIZATION: f"Bearer {hass_access_token}"}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -27,7 +27,7 @@ MOCK_TOKEN = {"access_token": "dummtoken", "expires_in": 3600}
|
||||
MOCK_JSON = {"devices": {}}
|
||||
MOCK_URL = "https://dummy"
|
||||
MOCK_HEADER = {
|
||||
"Authorization": "Bearer {}".format(MOCK_TOKEN["access_token"]),
|
||||
"Authorization": f"Bearer {MOCK_TOKEN['access_token']}",
|
||||
"X-GFE-SSL": "yes",
|
||||
}
|
||||
|
||||
@ -57,7 +57,7 @@ async def test_get_access_token(hass, aioclient_mock):
|
||||
await _get_homegraph_token(hass, jwt)
|
||||
assert aioclient_mock.call_count == 1
|
||||
assert aioclient_mock.mock_calls[0][3] == {
|
||||
"Authorization": "Bearer {}".format(jwt),
|
||||
"Authorization": f"Bearer {jwt}",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
}
|
||||
|
||||
|
@ -13,7 +13,7 @@ DOMAIN = "test.example.com"
|
||||
USERNAME = "abc123"
|
||||
PASSWORD = "xyz789"
|
||||
|
||||
UPDATE_URL = google_domains.UPDATE_URL.format(USERNAME, PASSWORD)
|
||||
UPDATE_URL = f"https://{USERNAME}:{PASSWORD}@domains.google.com/nic/update"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
@ -48,9 +48,7 @@ class TestGoogleWifiSetup(unittest.TestCase):
|
||||
@requests_mock.Mocker()
|
||||
def test_setup_minimum(self, mock_req):
|
||||
"""Test setup with minimum configuration."""
|
||||
resource = "{}{}{}".format(
|
||||
"http://", google_wifi.DEFAULT_HOST, google_wifi.ENDPOINT
|
||||
)
|
||||
resource = f"http://{google_wifi.DEFAULT_HOST}{google_wifi.ENDPOINT}"
|
||||
mock_req.get(resource, status_code=200)
|
||||
assert setup_component(
|
||||
self.hass,
|
||||
@ -62,7 +60,7 @@ class TestGoogleWifiSetup(unittest.TestCase):
|
||||
@requests_mock.Mocker()
|
||||
def test_setup_get(self, mock_req):
|
||||
"""Test setup with full configuration."""
|
||||
resource = "{}{}{}".format("http://", "localhost", google_wifi.ENDPOINT)
|
||||
resource = f"http://localhost{google_wifi.ENDPOINT}"
|
||||
mock_req.get(resource, status_code=200)
|
||||
assert setup_component(
|
||||
self.hass,
|
||||
@ -101,7 +99,7 @@ class TestGoogleWifiSensor(unittest.TestCase):
|
||||
|
||||
def setup_api(self, data, mock_req):
|
||||
"""Set up API with fake data."""
|
||||
resource = "{}{}{}".format("http://", "localhost", google_wifi.ENDPOINT)
|
||||
resource = f"http://localhost{google_wifi.ENDPOINT}"
|
||||
now = datetime(1970, month=1, day=1)
|
||||
with patch("homeassistant.util.dt.now", return_value=now):
|
||||
mock_req.get(resource, text=data, status_code=200)
|
||||
@ -111,7 +109,7 @@ class TestGoogleWifiSensor(unittest.TestCase):
|
||||
self.sensor_dict = dict()
|
||||
for condition, cond_list in google_wifi.MONITORED_CONDITIONS.items():
|
||||
sensor = google_wifi.GoogleWifiSensor(self.api, self.name, condition)
|
||||
name = "{}_{}".format(self.name, condition)
|
||||
name = f"{self.name}_{condition}"
|
||||
units = cond_list[1]
|
||||
icon = cond_list[2]
|
||||
self.sensor_dict[condition] = {
|
||||
|
@ -77,7 +77,7 @@ async def webhook_id(hass, gpslogger_client):
|
||||
|
||||
async def test_missing_data(hass, gpslogger_client, webhook_id):
|
||||
"""Test missing data."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
|
||||
data = {"latitude": 1.0, "longitude": 1.1, "device": "123"}
|
||||
|
||||
@ -103,7 +103,7 @@ async def test_missing_data(hass, gpslogger_client, webhook_id):
|
||||
|
||||
async def test_enter_and_exit(hass, gpslogger_client, webhook_id):
|
||||
"""Test when there is a known zone."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
|
||||
data = {"latitude": HOME_LATITUDE, "longitude": HOME_LONGITUDE, "device": "123"}
|
||||
|
||||
@ -111,18 +111,14 @@ async def test_enter_and_exit(hass, gpslogger_client, webhook_id):
|
||||
req = await gpslogger_client.post(url, data=data)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
state_name = hass.states.get(
|
||||
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
|
||||
).state
|
||||
state_name = hass.states.get(f"{DEVICE_TRACKER_DOMAIN}.{data['device']}").state
|
||||
assert STATE_HOME == state_name
|
||||
|
||||
# Enter Home again
|
||||
req = await gpslogger_client.post(url, data=data)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
state_name = hass.states.get(
|
||||
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
|
||||
).state
|
||||
state_name = hass.states.get(f"{DEVICE_TRACKER_DOMAIN}.{data['device']}").state
|
||||
assert STATE_HOME == state_name
|
||||
|
||||
data["longitude"] = 0
|
||||
@ -132,9 +128,7 @@ async def test_enter_and_exit(hass, gpslogger_client, webhook_id):
|
||||
req = await gpslogger_client.post(url, data=data)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
state_name = hass.states.get(
|
||||
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
|
||||
).state
|
||||
state_name = hass.states.get(f"{DEVICE_TRACKER_DOMAIN}.{data['device']}").state
|
||||
assert STATE_NOT_HOME == state_name
|
||||
|
||||
dev_reg = await hass.helpers.device_registry.async_get_registry()
|
||||
@ -146,7 +140,7 @@ async def test_enter_and_exit(hass, gpslogger_client, webhook_id):
|
||||
|
||||
async def test_enter_with_attrs(hass, gpslogger_client, webhook_id):
|
||||
"""Test when additional attributes are present."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
|
||||
data = {
|
||||
"latitude": 1.0,
|
||||
@ -164,7 +158,7 @@ async def test_enter_with_attrs(hass, gpslogger_client, webhook_id):
|
||||
req = await gpslogger_client.post(url, data=data)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
state = hass.states.get("{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"]))
|
||||
state = hass.states.get(f"{DEVICE_TRACKER_DOMAIN}.{data['device']}")
|
||||
assert state.state == STATE_NOT_HOME
|
||||
assert state.attributes["gps_accuracy"] == 10.5
|
||||
assert state.attributes["battery_level"] == 10.0
|
||||
@ -190,7 +184,7 @@ async def test_enter_with_attrs(hass, gpslogger_client, webhook_id):
|
||||
req = await gpslogger_client.post(url, data=data)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
state = hass.states.get("{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"]))
|
||||
state = hass.states.get(f"{DEVICE_TRACKER_DOMAIN}.{data['device']}")
|
||||
assert state.state == STATE_HOME
|
||||
assert state.attributes["gps_accuracy"] == 123
|
||||
assert state.attributes["battery_level"] == 23
|
||||
@ -206,16 +200,14 @@ async def test_enter_with_attrs(hass, gpslogger_client, webhook_id):
|
||||
)
|
||||
async def test_load_unload_entry(hass, gpslogger_client, webhook_id):
|
||||
"""Test that the appropriate dispatch signals are added and removed."""
|
||||
url = "/api/webhook/{}".format(webhook_id)
|
||||
url = f"/api/webhook/{webhook_id}"
|
||||
data = {"latitude": HOME_LATITUDE, "longitude": HOME_LONGITUDE, "device": "123"}
|
||||
|
||||
# Enter the Home
|
||||
req = await gpslogger_client.post(url, data=data)
|
||||
await hass.async_block_till_done()
|
||||
assert req.status == HTTP_OK
|
||||
state_name = hass.states.get(
|
||||
"{}.{}".format(DEVICE_TRACKER_DOMAIN, data["device"])
|
||||
).state
|
||||
state_name = hass.states.get(f"{DEVICE_TRACKER_DOMAIN}.{data['device']}").state
|
||||
assert STATE_HOME == state_name
|
||||
assert len(hass.data[DATA_DISPATCHER][TRACKER_UPDATE]) == 1
|
||||
|
||||
|
@ -43,10 +43,7 @@ class TestComponentsGroup(unittest.TestCase):
|
||||
)
|
||||
|
||||
assert (
|
||||
STATE_ON
|
||||
== self.hass.states.get(
|
||||
group.ENTITY_ID_FORMAT.format("person_and_light")
|
||||
).state
|
||||
STATE_ON == self.hass.states.get(f"{group.DOMAIN}.person_and_light").state
|
||||
)
|
||||
|
||||
def test_setup_group_with_a_non_existing_state(self):
|
||||
@ -296,9 +293,7 @@ class TestComponentsGroup(unittest.TestCase):
|
||||
|
||||
setup_component(self.hass, "group", {"group": group_conf})
|
||||
|
||||
group_state = self.hass.states.get(
|
||||
group.ENTITY_ID_FORMAT.format("second_group")
|
||||
)
|
||||
group_state = self.hass.states.get(f"{group.DOMAIN}.second_group")
|
||||
assert STATE_ON == group_state.state
|
||||
assert set((test_group.entity_id, "light.bowl")) == set(
|
||||
group_state.attributes["entity_id"]
|
||||
@ -307,7 +302,7 @@ class TestComponentsGroup(unittest.TestCase):
|
||||
assert "mdi:work" == group_state.attributes.get(ATTR_ICON)
|
||||
assert 1 == group_state.attributes.get(group.ATTR_ORDER)
|
||||
|
||||
group_state = self.hass.states.get(group.ENTITY_ID_FORMAT.format("test_group"))
|
||||
group_state = self.hass.states.get(f"{group.DOMAIN}.test_group")
|
||||
assert STATE_UNKNOWN == group_state.state
|
||||
assert set(("sensor.happy", "hello.world")) == set(
|
||||
group_state.attributes["entity_id"]
|
||||
@ -373,10 +368,7 @@ class TestComponentsGroup(unittest.TestCase):
|
||||
)
|
||||
self.hass.states.set("device_tracker.Adam", "cool_state_not_home")
|
||||
self.hass.block_till_done()
|
||||
assert (
|
||||
STATE_NOT_HOME
|
||||
== self.hass.states.get(group.ENTITY_ID_FORMAT.format("peeps")).state
|
||||
)
|
||||
assert STATE_NOT_HOME == self.hass.states.get(f"{group.DOMAIN}.peeps").state
|
||||
|
||||
def test_reloading_groups(self):
|
||||
"""Test reloading the group config."""
|
||||
@ -431,9 +423,7 @@ class TestComponentsGroup(unittest.TestCase):
|
||||
common.set_group(self.hass, "modify_group", icon="mdi:play")
|
||||
self.hass.block_till_done()
|
||||
|
||||
group_state = self.hass.states.get(
|
||||
group.ENTITY_ID_FORMAT.format("modify_group")
|
||||
)
|
||||
group_state = self.hass.states.get(f"{group.DOMAIN}.modify_group")
|
||||
|
||||
assert self.hass.states.entity_ids() == ["group.modify_group"]
|
||||
assert group_state.attributes.get(ATTR_ICON) == "mdi:play"
|
||||
@ -463,9 +453,7 @@ async def test_service_group_set_group_remove_group(hass):
|
||||
assert group_state.attributes[group.ATTR_AUTO]
|
||||
assert group_state.attributes["friendly_name"] == "Test"
|
||||
|
||||
common.async_set_group(
|
||||
hass, "user_test_group", entity_ids=["test.entity_bla1"],
|
||||
)
|
||||
common.async_set_group(hass, "user_test_group", entity_ids=["test.entity_bla1"])
|
||||
await hass.async_block_till_done()
|
||||
|
||||
group_state = hass.states.get("group.user_test_group")
|
||||
|
Loading…
x
Reference in New Issue
Block a user