mirror of
https://github.com/home-assistant/core.git
synced 2025-07-23 05:07:41 +00:00
Use http.HTTPStatus in components/r* (#58288)
This commit is contained in:
parent
5958e6a3f9
commit
5626cc4524
@ -1,4 +1,5 @@
|
||||
"""Config flow for Rachio integration."""
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
|
||||
from rachiopy import Rachio
|
||||
@ -6,7 +7,7 @@ from requests.exceptions import ConnectTimeout
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries, core, exceptions
|
||||
from homeassistant.const import CONF_API_KEY, HTTP_OK
|
||||
from homeassistant.const import CONF_API_KEY
|
||||
from homeassistant.core import callback
|
||||
|
||||
from .const import (
|
||||
@ -33,13 +34,13 @@ async def validate_input(hass: core.HomeAssistant, data):
|
||||
try:
|
||||
data = await hass.async_add_executor_job(rachio.person.info)
|
||||
_LOGGER.debug("rachio.person.getInfo: %s", data)
|
||||
if int(data[0][KEY_STATUS]) != HTTP_OK:
|
||||
if int(data[0][KEY_STATUS]) != HTTPStatus.OK:
|
||||
raise InvalidAuth
|
||||
|
||||
rachio_id = data[1][KEY_ID]
|
||||
data = await hass.async_add_executor_job(rachio.person.get, rachio_id)
|
||||
_LOGGER.debug("rachio.person.get: %s", data)
|
||||
if int(data[0][KEY_STATUS]) != HTTP_OK:
|
||||
if int(data[0][KEY_STATUS]) != HTTPStatus.OK:
|
||||
raise CannotConnect
|
||||
|
||||
username = data[1][KEY_USERNAME]
|
||||
|
@ -1,11 +1,12 @@
|
||||
"""Adapter to wrap the rachiopy api for home assistant."""
|
||||
from __future__ import annotations
|
||||
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, HTTP_OK
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
|
||||
from .const import (
|
||||
@ -123,12 +124,12 @@ class RachioPerson:
|
||||
rachio = self.rachio
|
||||
|
||||
response = rachio.person.info()
|
||||
assert int(response[0][KEY_STATUS]) == HTTP_OK, "API key error"
|
||||
assert int(response[0][KEY_STATUS]) == HTTPStatus.OK, "API key error"
|
||||
self._id = response[1][KEY_ID]
|
||||
|
||||
# Use user ID to get user data
|
||||
data = rachio.person.get(self._id)
|
||||
assert int(data[0][KEY_STATUS]) == HTTP_OK, "User ID error"
|
||||
assert int(data[0][KEY_STATUS]) == HTTPStatus.OK, "User ID error"
|
||||
self.username = data[1][KEY_USERNAME]
|
||||
devices = data[1][KEY_DEVICES]
|
||||
for controller in devices:
|
||||
|
@ -2,6 +2,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
import time
|
||||
from typing import Any
|
||||
@ -29,7 +30,6 @@ from homeassistant.const import (
|
||||
DATA_TERABYTES,
|
||||
DATA_YOTTABYTES,
|
||||
DATA_ZETTABYTES,
|
||||
HTTP_OK,
|
||||
)
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.util import dt as dt_util
|
||||
@ -215,7 +215,7 @@ class RadarrSensor(SensorEntity):
|
||||
self._attr_native_value = None
|
||||
return
|
||||
|
||||
if res.status_code == HTTP_OK:
|
||||
if res.status_code == HTTPStatus.OK:
|
||||
if sensor_type in ("upcoming", "movies", "commands"):
|
||||
self.data = res.json()
|
||||
self._attr_native_value = len(self.data)
|
||||
|
@ -1,4 +1,5 @@
|
||||
"""RESTful platform for notify component."""
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
|
||||
import requests
|
||||
@ -22,11 +23,8 @@ from homeassistant.const import (
|
||||
CONF_RESOURCE,
|
||||
CONF_USERNAME,
|
||||
CONF_VERIFY_SSL,
|
||||
HTTP_BAD_REQUEST,
|
||||
HTTP_BASIC_AUTHENTICATION,
|
||||
HTTP_DIGEST_AUTHENTICATION,
|
||||
HTTP_INTERNAL_SERVER_ERROR,
|
||||
HTTP_OK,
|
||||
)
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.template import Template
|
||||
@ -203,20 +201,23 @@ class RestNotificationService(BaseNotificationService):
|
||||
)
|
||||
|
||||
if (
|
||||
response.status_code >= HTTP_INTERNAL_SERVER_ERROR
|
||||
response.status_code >= HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
and response.status_code < 600
|
||||
):
|
||||
_LOGGER.exception(
|
||||
"Server error. Response %d: %s:", response.status_code, response.reason
|
||||
)
|
||||
elif (
|
||||
response.status_code >= HTTP_BAD_REQUEST
|
||||
and response.status_code < HTTP_INTERNAL_SERVER_ERROR
|
||||
response.status_code >= HTTPStatus.BAD_REQUEST
|
||||
and response.status_code < HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
):
|
||||
_LOGGER.exception(
|
||||
"Client error. Response %d: %s:", response.status_code, response.reason
|
||||
)
|
||||
elif response.status_code >= HTTP_OK and response.status_code < 300:
|
||||
elif (
|
||||
response.status_code >= HTTPStatus.OK
|
||||
and response.status_code < HTTPStatus.MULTIPLE_CHOICES
|
||||
):
|
||||
_LOGGER.debug(
|
||||
"Success. Response %d: %s:", response.status_code, response.reason
|
||||
)
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""Support for RESTful switches."""
|
||||
import asyncio
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
|
||||
import aiohttp
|
||||
@ -22,8 +23,6 @@ from homeassistant.const import (
|
||||
CONF_TIMEOUT,
|
||||
CONF_USERNAME,
|
||||
CONF_VERIFY_SSL,
|
||||
HTTP_BAD_REQUEST,
|
||||
HTTP_OK,
|
||||
)
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
@ -111,7 +110,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
|
||||
)
|
||||
|
||||
req = await switch.get_device_state(hass)
|
||||
if req.status >= HTTP_BAD_REQUEST:
|
||||
if req.status >= HTTPStatus.BAD_REQUEST:
|
||||
_LOGGER.error("Got non-ok response from resource: %s", req.status)
|
||||
else:
|
||||
async_add_entities([switch])
|
||||
@ -177,7 +176,7 @@ class RestSwitch(SwitchEntity):
|
||||
try:
|
||||
req = await self.set_device_state(body_on_t)
|
||||
|
||||
if req.status == HTTP_OK:
|
||||
if req.status == HTTPStatus.OK:
|
||||
self._state = True
|
||||
else:
|
||||
_LOGGER.error(
|
||||
@ -192,7 +191,7 @@ class RestSwitch(SwitchEntity):
|
||||
|
||||
try:
|
||||
req = await self.set_device_state(body_off_t)
|
||||
if req.status == HTTP_OK:
|
||||
if req.status == HTTPStatus.OK:
|
||||
self._state = False
|
||||
else:
|
||||
_LOGGER.error(
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""Support for exposing regular REST commands as services."""
|
||||
import asyncio
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
|
||||
import aiohttp
|
||||
@ -15,7 +16,6 @@ from homeassistant.const import (
|
||||
CONF_URL,
|
||||
CONF_USERNAME,
|
||||
CONF_VERIFY_SSL,
|
||||
HTTP_BAD_REQUEST,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
@ -125,7 +125,7 @@ async def async_setup(hass, config):
|
||||
timeout=timeout,
|
||||
) as response:
|
||||
|
||||
if response.status < HTTP_BAD_REQUEST:
|
||||
if response.status < HTTPStatus.BAD_REQUEST:
|
||||
_LOGGER.debug(
|
||||
"Success. Url: %s. Status code: %d. Payload: %s",
|
||||
response.url,
|
||||
|
@ -1,4 +1,5 @@
|
||||
"""Rocket.Chat notification service."""
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
|
||||
from rocketchat_API.APIExceptions.RocketExceptions import (
|
||||
@ -13,13 +14,7 @@ from homeassistant.components.notify import (
|
||||
PLATFORM_SCHEMA,
|
||||
BaseNotificationService,
|
||||
)
|
||||
from homeassistant.const import (
|
||||
CONF_PASSWORD,
|
||||
CONF_ROOM,
|
||||
CONF_URL,
|
||||
CONF_USERNAME,
|
||||
HTTP_OK,
|
||||
)
|
||||
from homeassistant.const import CONF_PASSWORD, CONF_ROOM, CONF_URL, CONF_USERNAME
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@ -68,7 +63,7 @@ class RocketChatNotificationService(BaseNotificationService):
|
||||
"""Send a message to Rocket.Chat."""
|
||||
data = kwargs.get(ATTR_DATA) or {}
|
||||
resp = self._server.chat_post_message(message, channel=self._room, **data)
|
||||
if resp.status_code == HTTP_OK:
|
||||
if resp.status_code == HTTPStatus.OK:
|
||||
if not resp.json()["success"]:
|
||||
_LOGGER.error("Unable to post Rocket.Chat message")
|
||||
else:
|
||||
|
@ -2,13 +2,14 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
|
||||
import boto3
|
||||
import requests
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import CONF_DOMAIN, CONF_TTL, CONF_ZONE, HTTP_OK
|
||||
from homeassistant.const import CONF_DOMAIN, CONF_TTL, CONF_ZONE
|
||||
import homeassistant.helpers.config_validation as cv
|
||||
from homeassistant.helpers.event import track_time_interval
|
||||
|
||||
@ -121,5 +122,5 @@ def _update_route53(
|
||||
)
|
||||
_LOGGER.debug("Response is %s", response)
|
||||
|
||||
if response["ResponseMetadata"]["HTTPStatusCode"] != HTTP_OK:
|
||||
if response["ResponseMetadata"]["HTTPStatusCode"] != HTTPStatus.OK:
|
||||
_LOGGER.warning(response)
|
||||
|
@ -1,6 +1,7 @@
|
||||
"""The tests for the REST binary sensor platform."""
|
||||
|
||||
import asyncio
|
||||
from http import HTTPStatus
|
||||
from os import path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
@ -92,7 +93,7 @@ async def test_setup_timeout(hass):
|
||||
@respx.mock
|
||||
async def test_setup_minimum(hass):
|
||||
"""Test setup with minimum configuration."""
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
binary_sensor.DOMAIN,
|
||||
@ -111,7 +112,7 @@ async def test_setup_minimum(hass):
|
||||
@respx.mock
|
||||
async def test_setup_minimum_resource_template(hass):
|
||||
"""Test setup with minimum configuration (resource_template)."""
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
binary_sensor.DOMAIN,
|
||||
@ -129,7 +130,7 @@ async def test_setup_minimum_resource_template(hass):
|
||||
@respx.mock
|
||||
async def test_setup_duplicate_resource_template(hass):
|
||||
"""Test setup with duplicate resources."""
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
binary_sensor.DOMAIN,
|
||||
@ -148,7 +149,7 @@ async def test_setup_duplicate_resource_template(hass):
|
||||
@respx.mock
|
||||
async def test_setup_get(hass):
|
||||
"""Test setup with valid configuration."""
|
||||
respx.get("http://localhost").respond(status_code=200, json={})
|
||||
respx.get("http://localhost").respond(status_code=HTTPStatus.OK, json={})
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
"binary_sensor",
|
||||
@ -181,7 +182,7 @@ async def test_setup_get(hass):
|
||||
@respx.mock
|
||||
async def test_setup_get_digest_auth(hass):
|
||||
"""Test setup with valid configuration."""
|
||||
respx.get("http://localhost").respond(status_code=200, json={})
|
||||
respx.get("http://localhost").respond(status_code=HTTPStatus.OK, json={})
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
"binary_sensor",
|
||||
@ -209,7 +210,7 @@ async def test_setup_get_digest_auth(hass):
|
||||
@respx.mock
|
||||
async def test_setup_post(hass):
|
||||
"""Test setup with valid configuration."""
|
||||
respx.post("http://localhost").respond(status_code=200, json={})
|
||||
respx.post("http://localhost").respond(status_code=HTTPStatus.OK, json={})
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
"binary_sensor",
|
||||
@ -238,7 +239,7 @@ async def test_setup_post(hass):
|
||||
async def test_setup_get_off(hass):
|
||||
"""Test setup with valid off configuration."""
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": "text/json"},
|
||||
json={"dog": False},
|
||||
)
|
||||
@ -268,7 +269,7 @@ async def test_setup_get_off(hass):
|
||||
async def test_setup_get_on(hass):
|
||||
"""Test setup with valid on configuration."""
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": "text/json"},
|
||||
json={"dog": True},
|
||||
)
|
||||
@ -297,7 +298,7 @@ async def test_setup_get_on(hass):
|
||||
@respx.mock
|
||||
async def test_setup_with_exception(hass):
|
||||
"""Test setup with exception."""
|
||||
respx.get("http://localhost").respond(status_code=200, json={})
|
||||
respx.get("http://localhost").respond(status_code=HTTPStatus.OK, json={})
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
"binary_sensor",
|
||||
@ -340,7 +341,7 @@ async def test_setup_with_exception(hass):
|
||||
async def test_reload(hass):
|
||||
"""Verify we can reload reset sensors."""
|
||||
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
|
||||
await async_setup_component(
|
||||
hass,
|
||||
@ -383,7 +384,7 @@ async def test_reload(hass):
|
||||
@respx.mock
|
||||
async def test_setup_query_params(hass):
|
||||
"""Test setup with query params."""
|
||||
respx.get("http://localhost", params={"search": "something"}) % 200
|
||||
respx.get("http://localhost", params={"search": "something"}) % HTTPStatus.OK
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
binary_sensor.DOMAIN,
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
from http import HTTPStatus
|
||||
from os import path
|
||||
from unittest.mock import patch
|
||||
|
||||
@ -67,7 +68,7 @@ async def test_setup_with_endpoint_timeout_with_recovery(hass):
|
||||
assert len(hass.states.async_all()) == 0
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={
|
||||
"sensor1": "1",
|
||||
"sensor2": "2",
|
||||
@ -107,7 +108,7 @@ async def test_setup_with_endpoint_timeout_with_recovery(hass):
|
||||
# endpoint is working again
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={
|
||||
"sensor1": "1",
|
||||
"sensor2": "2",
|
||||
@ -133,7 +134,7 @@ async def test_setup_minimum_resource_template(hass):
|
||||
"""Test setup with minimum configuration (resource_template)."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={
|
||||
"sensor1": "1",
|
||||
"sensor2": "2",
|
||||
@ -190,7 +191,7 @@ async def test_setup_minimum_resource_template(hass):
|
||||
async def test_reload(hass):
|
||||
"""Verify we can reload."""
|
||||
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
@ -242,7 +243,7 @@ async def test_reload(hass):
|
||||
async def test_reload_and_remove_all(hass):
|
||||
"""Verify we can reload and remove all."""
|
||||
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
@ -292,7 +293,7 @@ async def test_reload_and_remove_all(hass):
|
||||
async def test_reload_fails_to_read_configuration(hass):
|
||||
"""Verify reload when configuration is missing or broken."""
|
||||
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
@ -345,7 +346,7 @@ async def test_multiple_rest_endpoints(hass):
|
||||
"""Test multiple rest endpoints."""
|
||||
|
||||
respx.get("http://date.jsontest.com").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={
|
||||
"date": "03-17-2021",
|
||||
"milliseconds_since_epoch": 1616008268573,
|
||||
@ -354,7 +355,7 @@ async def test_multiple_rest_endpoints(hass):
|
||||
)
|
||||
|
||||
respx.get("http://time.jsontest.com").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={
|
||||
"date": "03-17-2021",
|
||||
"milliseconds_since_epoch": 1616008299665,
|
||||
@ -362,7 +363,7 @@ async def test_multiple_rest_endpoints(hass):
|
||||
},
|
||||
)
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={
|
||||
"value": "1",
|
||||
},
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""The tests for the REST sensor platform."""
|
||||
import asyncio
|
||||
from http import HTTPStatus
|
||||
from os import path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
@ -81,7 +82,7 @@ async def test_setup_timeout(hass):
|
||||
@respx.mock
|
||||
async def test_setup_minimum(hass):
|
||||
"""Test setup with minimum configuration."""
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
sensor.DOMAIN,
|
||||
@ -101,7 +102,9 @@ async def test_setup_minimum(hass):
|
||||
async def test_manual_update(hass):
|
||||
"""Test setup with minimum configuration."""
|
||||
await async_setup_component(hass, "homeassistant", {})
|
||||
respx.get("http://localhost").respond(status_code=200, json={"data": "first"})
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=HTTPStatus.OK, json={"data": "first"}
|
||||
)
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
sensor.DOMAIN,
|
||||
@ -119,7 +122,9 @@ async def test_manual_update(hass):
|
||||
assert len(hass.states.async_all("sensor")) == 1
|
||||
assert hass.states.get("sensor.mysensor").state == "first"
|
||||
|
||||
respx.get("http://localhost").respond(status_code=200, json={"data": "second"})
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=HTTPStatus.OK, json={"data": "second"}
|
||||
)
|
||||
await hass.services.async_call(
|
||||
"homeassistant",
|
||||
"update_entity",
|
||||
@ -132,7 +137,7 @@ async def test_manual_update(hass):
|
||||
@respx.mock
|
||||
async def test_setup_minimum_resource_template(hass):
|
||||
"""Test setup with minimum configuration (resource_template)."""
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
sensor.DOMAIN,
|
||||
@ -150,7 +155,7 @@ async def test_setup_minimum_resource_template(hass):
|
||||
@respx.mock
|
||||
async def test_setup_duplicate_resource_template(hass):
|
||||
"""Test setup with duplicate resources."""
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
sensor.DOMAIN,
|
||||
@ -169,7 +174,7 @@ async def test_setup_duplicate_resource_template(hass):
|
||||
@respx.mock
|
||||
async def test_setup_get(hass):
|
||||
"""Test setup with valid configuration."""
|
||||
respx.get("http://localhost").respond(status_code=200, json={})
|
||||
respx.get("http://localhost").respond(status_code=HTTPStatus.OK, json={})
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
"sensor",
|
||||
@ -215,7 +220,7 @@ async def test_setup_get(hass):
|
||||
@respx.mock
|
||||
async def test_setup_get_digest_auth(hass):
|
||||
"""Test setup with valid configuration."""
|
||||
respx.get("http://localhost").respond(status_code=200, json={})
|
||||
respx.get("http://localhost").respond(status_code=HTTPStatus.OK, json={})
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
"sensor",
|
||||
@ -244,7 +249,7 @@ async def test_setup_get_digest_auth(hass):
|
||||
@respx.mock
|
||||
async def test_setup_post(hass):
|
||||
"""Test setup with valid configuration."""
|
||||
respx.post("http://localhost").respond(status_code=200, json={})
|
||||
respx.post("http://localhost").respond(status_code=HTTPStatus.OK, json={})
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
"sensor",
|
||||
@ -274,7 +279,7 @@ async def test_setup_post(hass):
|
||||
async def test_setup_get_xml(hass):
|
||||
"""Test setup with valid xml configuration."""
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": "text/xml"},
|
||||
content="<dog>abc</dog>",
|
||||
)
|
||||
@ -305,7 +310,7 @@ async def test_setup_get_xml(hass):
|
||||
@respx.mock
|
||||
async def test_setup_query_params(hass):
|
||||
"""Test setup with query params."""
|
||||
respx.get("http://localhost", params={"search": "something"}) % 200
|
||||
respx.get("http://localhost", params={"search": "something"}) % HTTPStatus.OK
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
sensor.DOMAIN,
|
||||
@ -327,7 +332,7 @@ async def test_update_with_json_attrs(hass):
|
||||
"""Test attributes get extracted from a JSON result."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={"key": "some_json_value"},
|
||||
)
|
||||
assert await async_setup_component(
|
||||
@ -360,7 +365,7 @@ async def test_update_with_no_template(hass):
|
||||
"""Test update when there is no value template."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={"key": "some_json_value"},
|
||||
)
|
||||
assert await async_setup_component(
|
||||
@ -392,7 +397,7 @@ async def test_update_with_json_attrs_no_data(hass, caplog):
|
||||
"""Test attributes when no JSON result fetched."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": CONTENT_TYPE_JSON},
|
||||
content="",
|
||||
)
|
||||
@ -428,7 +433,7 @@ async def test_update_with_json_attrs_not_dict(hass, caplog):
|
||||
"""Test attributes get extracted from a JSON result."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json=["list", "of", "things"],
|
||||
)
|
||||
assert await async_setup_component(
|
||||
@ -463,7 +468,7 @@ async def test_update_with_json_attrs_bad_JSON(hass, caplog):
|
||||
"""Test attributes get extracted from a JSON result."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": CONTENT_TYPE_JSON},
|
||||
content="This is text rather than JSON data.",
|
||||
)
|
||||
@ -499,7 +504,7 @@ async def test_update_with_json_attrs_with_json_attrs_path(hass):
|
||||
"""Test attributes get extracted from a JSON result with a template for the attributes."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
json={
|
||||
"toplevel": {
|
||||
"master_value": "master",
|
||||
@ -543,7 +548,7 @@ async def test_update_with_xml_convert_json_attrs_with_json_attrs_path(hass):
|
||||
"""Test attributes get extracted from a JSON result that was converted from XML with a template for the attributes."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": "text/xml"},
|
||||
content="<toplevel><master_value>master</master_value><second_level><some_json_key>some_json_value</some_json_key><some_json_key2>some_json_value2</some_json_key2></second_level></toplevel>",
|
||||
)
|
||||
@ -579,7 +584,7 @@ async def test_update_with_xml_convert_json_attrs_with_jsonattr_template(hass):
|
||||
"""Test attributes get extracted from a JSON result that was converted from XML."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": "text/xml"},
|
||||
content='<?xml version="1.0" encoding="utf-8"?><response><scan>0</scan><ver>12556</ver><count>48</count><ssid>alexander</ssid><bss><valid>0</valid><name>0</name><privacy>0</privacy><wlan>bogus</wlan><strength>0</strength></bss><led0>0</led0><led1>0</led1><led2>0</led2><led3>0</led3><led4>0</led4><led5>0</led5><led6>0</led6><led7>0</led7><btn0>up</btn0><btn1>up</btn1><btn2>up</btn2><btn3>up</btn3><pot0>0</pot0><usr0>0</usr0><temp0>0x0XF0x0XF</temp0><time0> 0</time0></response>',
|
||||
)
|
||||
@ -620,7 +625,7 @@ async def test_update_with_application_xml_convert_json_attrs_with_jsonattr_temp
|
||||
"""Test attributes get extracted from a JSON result that was converted from XML with application/xml mime type."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": "application/xml"},
|
||||
content="<main><dog>1</dog><cat>3</cat></main>",
|
||||
)
|
||||
@ -656,7 +661,7 @@ async def test_update_with_xml_convert_bad_xml(hass, caplog):
|
||||
"""Test attributes get extracted from a XML result with bad xml."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": "text/xml"},
|
||||
content="",
|
||||
)
|
||||
@ -691,7 +696,7 @@ async def test_update_with_failed_get(hass, caplog):
|
||||
"""Test attributes get extracted from a XML result with bad xml."""
|
||||
|
||||
respx.get("http://localhost").respond(
|
||||
status_code=200,
|
||||
status_code=HTTPStatus.OK,
|
||||
headers={"content-type": "text/xml"},
|
||||
content="",
|
||||
)
|
||||
@ -725,7 +730,7 @@ async def test_update_with_failed_get(hass, caplog):
|
||||
async def test_reload(hass):
|
||||
"""Verify we can reload reset sensors."""
|
||||
|
||||
respx.get("http://localhost") % 200
|
||||
respx.get("http://localhost") % HTTPStatus.OK
|
||||
|
||||
await async_setup_component(
|
||||
hass,
|
||||
|
@ -1,5 +1,6 @@
|
||||
"""The tests for the rest command platform."""
|
||||
import asyncio
|
||||
from http import HTTPStatus
|
||||
|
||||
import aiohttp
|
||||
|
||||
@ -104,7 +105,7 @@ class TestRestCommandComponent:
|
||||
with assert_setup_component(5):
|
||||
setup_component(self.hass, rc.DOMAIN, self.config)
|
||||
|
||||
aioclient_mock.get(self.url, status=400)
|
||||
aioclient_mock.get(self.url, status=HTTPStatus.BAD_REQUEST)
|
||||
|
||||
self.hass.services.call(rc.DOMAIN, "get_test", {})
|
||||
self.hass.block_till_done()
|
||||
|
@ -1,4 +1,5 @@
|
||||
"""Test the Rituals Perfume Genie config flow."""
|
||||
from http import HTTPStatus
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from aiohttp import ClientResponseError
|
||||
@ -103,7 +104,9 @@ async def test_form_cannot_connect(hass):
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.rituals_perfume_genie.config_flow.Account.authenticate",
|
||||
side_effect=ClientResponseError(None, None, status=500),
|
||||
side_effect=ClientResponseError(
|
||||
None, None, status=HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
),
|
||||
):
|
||||
result2 = await hass.config_entries.flow.async_configure(
|
||||
result["flow_id"],
|
||||
|
@ -1,4 +1,5 @@
|
||||
"""Tests for the Roku component."""
|
||||
from http import HTTPStatus
|
||||
import re
|
||||
from socket import gaierror as SocketGIAError
|
||||
|
||||
@ -150,15 +151,29 @@ def mock_connection_server_error(
|
||||
"""Mock the Roku server error."""
|
||||
roku_url = f"http://{host}:8060"
|
||||
|
||||
aioclient_mock.get(f"{roku_url}/query/device-info", status=500)
|
||||
aioclient_mock.get(f"{roku_url}/query/apps", status=500)
|
||||
aioclient_mock.get(f"{roku_url}/query/active-app", status=500)
|
||||
aioclient_mock.get(f"{roku_url}/query/tv-active-channel", status=500)
|
||||
aioclient_mock.get(f"{roku_url}/query/tv-channels", status=500)
|
||||
aioclient_mock.get(
|
||||
f"{roku_url}/query/device-info", status=HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
)
|
||||
aioclient_mock.get(
|
||||
f"{roku_url}/query/apps", status=HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
)
|
||||
aioclient_mock.get(
|
||||
f"{roku_url}/query/active-app", status=HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
)
|
||||
aioclient_mock.get(
|
||||
f"{roku_url}/query/tv-active-channel", status=HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
)
|
||||
aioclient_mock.get(
|
||||
f"{roku_url}/query/tv-channels", status=HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
)
|
||||
|
||||
aioclient_mock.post(re.compile(f"{roku_url}/keypress/.*"), status=500)
|
||||
aioclient_mock.post(re.compile(f"{roku_url}/launch/.*"), status=500)
|
||||
aioclient_mock.post(f"{roku_url}/search", status=500)
|
||||
aioclient_mock.post(
|
||||
re.compile(f"{roku_url}/keypress/.*"), status=HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
)
|
||||
aioclient_mock.post(
|
||||
re.compile(f"{roku_url}/launch/.*"), status=HTTPStatus.INTERNAL_SERVER_ERROR
|
||||
)
|
||||
aioclient_mock.post(f"{roku_url}/search", status=HTTPStatus.INTERNAL_SERVER_ERROR)
|
||||
|
||||
|
||||
async def setup_integration(
|
||||
|
@ -1,8 +1,9 @@
|
||||
"""The tests for the rss_feed_api component."""
|
||||
from http import HTTPStatus
|
||||
|
||||
from defusedxml import ElementTree
|
||||
import pytest
|
||||
|
||||
from homeassistant.const import HTTP_NOT_FOUND
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
|
||||
@ -30,7 +31,7 @@ def mock_http_client(loop, hass, hass_client):
|
||||
async def test_get_nonexistant_feed(mock_http_client):
|
||||
"""Test if we can retrieve the correct rss feed."""
|
||||
resp = await mock_http_client.get("/api/rss_template/otherfeed")
|
||||
assert resp.status == HTTP_NOT_FOUND
|
||||
assert resp.status == HTTPStatus.NOT_FOUND
|
||||
|
||||
|
||||
async def test_get_rss_feed(mock_http_client, hass):
|
||||
@ -40,7 +41,7 @@ async def test_get_rss_feed(mock_http_client, hass):
|
||||
hass.states.async_set("test.test3", "a_state_3")
|
||||
|
||||
resp = await mock_http_client.get("/api/rss_template/testfeed")
|
||||
assert resp.status == 200
|
||||
assert resp.status == HTTPStatus.OK
|
||||
|
||||
text = await resp.text()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user