1
0
mirror of https://github.com/home-assistant/core.git synced 2025-08-13 15:30:03 +00:00
Files
.devcontainer
.github
.vscode
docs
homeassistant
machine
pylint
rootfs
script
tests
auth
backports
components
abode
accuweather
acmeda
adax
adguard
advantage_air
aemet
agent_dvr
air_quality
airly
airnow
airq
airthings
airthings_ble
airtouch4
airvisual
airzone
aladdin_connect
alarm_control_panel
alarmdecoder
alert
alexa
almond
amberelectric
ambiclimate
ambient_station
analytics
android_ip_webcam
androidtv
anthemav
apache_kafka
apcupsd
api
api_streams
apple_tv
application_credentials
apprise
aprs
aranet
arcam_fmj
aseko_pool_live
asuswrt
atag
august
aurora
aurora_abb_powerone
aussie_broadband
auth
automation
awair
aws
axis
azure_devops
azure_event_hub
backup
baf
balboa
bayesian
binary_sensor
blackbird
blebox
blink
bluemaestro
blueprint
bluetooth
bluetooth_le_tracker
bmw_connected_drive
bond
bosch_shc
braviatv
broadlink
brother
brunt
bsblan
bthome
buienradar
button
caldav
calendar
camera
canary
cast
cert_expiry
clicksend_tts
climate
cloud
cloudflare
co2signal
coinbase
color_extractor
comfoconnect
command_line
compensation
config
configurator
control4
conversation
coolmaster
coronavirus
counter
cover
cpuspeed
crownstone
daikin
darksky
datadog
debugpy
deconz
default_config
deluge
demo
denonavr
derivative
device_automation
device_sun_light_trigger
device_tracker
devolo_home_control
devolo_home_network
dexcom
dhcp
diagnostics
dialogflow
directv
discord
discovery
dlna_dmr
dlna_dms
dnsip
doorbird
dsmr
dsmr_reader
dte_energy_bridge
duckdns
dunehd
dynalite
eafm
ecobee
econet
ecowitt
efergy
eight_sleep
elgato
elkm1
elmax
emonitor
emulated_hue
emulated_kasa
emulated_roku
energy
enocean
enphase_envoy
environment_canada
epson
escea
esphome
everlights
evil_genius_labs
ezviz
faa_delays
facebook
facebox
fail2ban
fan
feedreader
ffmpeg
fibaro
fido
file
file_upload
filesize
filter
fireservicerota
firmata
fivem
fjaraskupan
flic
flick_electric
flipr
flo
flume
flux
flux_led
folder
folder_watcher
foobot
forecast_solar
forked_daapd
foscam
freebox
freedns
freedompro
fritz
fritzbox
fritzbox_callmonitor
fronius
frontend
fully_kiosk
garages_amsterdam
gdacs
generic
generic_hygrostat
generic_thermostat
geo_json_events
geo_location
geo_rss_events
geocaching
geofency
geonetnz_quakes
geonetnz_volcano
gios
github
glances
goalzero
gogogate2
goodwe
google
google_assistant
google_domains
google_pubsub
google_sheets
google_translate
google_travel_time
google_wifi
govee_ble
gpslogger
graphite
gree
greeneye_monitor
group
growatt_server
guardian
habitica
hardkernel
hardware
harmony
hassio
hddtemp
hdmi_cec
heos
here_travel_time
hisense_aehw4a1
history
history_stats
hive
hlk_sw16
home_connect
home_plus_control
homeassistant
homeassistant_alerts
homeassistant_hardware
homeassistant_sky_connect
homeassistant_yellow
homekit
homekit_controller
homematic
homematicip_cloud
homewizard
honeywell
html5
http
huawei_lte
hue
huisbaasje
humidifier
hunterdouglas_powerview
hvv_departures
hyperion
ialarm
iaqualink
ibeacon
icloud
ifttt
ign_sismologia
image
image_processing
imap_email_content
influxdb
inkbird
input_boolean
input_button
input_datetime
input_number
input_select
input_text
insteon
integration
intellifire
intent
intent_script
ios
iotawatt
ipma
ipp
iqvia
islamic_prayer_times
iss
isy994
izone
jellyfin
jewish_calendar
juicenet
justnimbus
kaleidescape
keenetic_ndms2
kegtron
keymitt_ble
kira
kmtronic
knx
kodi
konnected
kostal_plenticore
kraken
kulersky
lacrosse_view
lametric
landisgyr_heat_meter
lastfm
launch_library
laundrify
lcn
led_ble
lg_soundbar
lidarr
life360
lifx
light
litejet
litterrobot
livisi
local_file
local_ip
locative
lock
logbook
logentries
logger
logi_circle
london_air
lookin
lovelace
luftdaten
lutron_caseta
lyric
mailbox
mailgun
manual
manual_mqtt
marytts
maxcube
mazda
meater
media_player
media_source
melcloud
melissa
melnor
meraki
met
met_eireann
meteo_france
meteoclimatic
metoffice
mfi
microsoft_face
microsoft_face_detect
microsoft_face_identify
mikrotik
mill
min_max
minecraft_server
minio
mjpeg
moat
mobile_app
mochad
modbus
modem_callerid
modern_forms
moehlenhoff_alpha2
mold_indicator
monoprice
moon
motion_blinds
motioneye
mqtt
mqtt_eventstream
mqtt_json
mqtt_room
mqtt_statestream
mullvad
mutesync
my
myq
mysensors
mythicbeastsdns
nam
namecheapdns
nanoleaf
neato
ness_alarm
nest
netatmo
netgear
network
nexia
nextbus
nextdns
nfandroidtv
nibe_heatpump
nightscout
nina
nmap_tracker
no_ip
nobo_hub
notify
notify_events
notion
nsw_fuel_station
nsw_rural_fire_service_feed
nuheat
nuki
numato
number
nut
nws
nx584
nzbget
octoprint
omnilogic
onboarding
oncue
ondilo_ico
onewire
onvif
open_meteo
openalpr_cloud
openerz
openexchangerates
opengarage
openhardwaremonitor
opentherm_gw
openuv
openweathermap
opnsense
oralb
overkiz
ovo_energy
owntracks
p1_monitor
panasonic_viera
panel_custom
panel_iframe
peco
persistent_notification
person
philips_js
pi_hole
picnic
pilight
ping
plaato
plant
plex
plugwise
plum_lightpad
point
poolsense
powerwall
profiler
progettihwsw
prometheus
prosegur
proximity
prusalink
ps4
pure_energie
push
pushbullet
pushover
pvoutput
pvpc_hourly_pricing
python_script
__init__.py
test_init.py
qingping
qld_bushfire
qnap_qsw
qwikswitch
rachio
radarr
radio_browser
radiotherm
rainforest_eagle
rainmachine
random
raspberry_pi
rdw
recollect_waste
recorder
reddit
remember_the_milk
remote
renault
repairs
rest
rest_command
rflink
rfxtrx
rhasspy
ridwell
ring
risco
rituals_perfume_genie
rmvtransport
roku
roomba
roon
rpi_power
rss_feed_template
rtsp_to_webrtc
ruckus_unleashed
ruuvitag_ble
sabnzbd
safe_mode
samsungtv
scene
schedule
scrape
screenlogic
script
search
season
select
sense
senseme
sensibo
sensirion_ble
sensor
sensorpro
sensorpush
sentry
senz
seventeentrack
sharkiq
shell_command
shelly
shopping_list
sia
sigfox
sighthound
signal_messenger
simplepush
simplisafe
simulated
siren
skybell
slack
sleepiq
slimproto
sma
smappee
smart_meter_texas
smartthings
smarttub
smhi
smtp
snips
snmp
snooz
solaredge
solarlog
solax
soma
somfy_mylink
sonarr
songpal
sonos
soundtouch
spaceapi
spc
speedtestdotnet
spider
spotify
sql
squeezebox
srp_energy
ssdp
starline
startca
statistics
statsd
steam_online
steamist
stookalert
stream
stt
subaru
sun
surepetcare
switch
switch_as_x
switchbee
switchbot
switcher_kis
syncthing
syncthru
synology_dsm
system_bridge
system_health
system_log
tado
tag
tailscale
tankerkoenig
tasmota
tautulli
tcp
telegram
telegram_bot
tellduslive
template
tesla_wall_connector
text
thermobeacon
thermopro
threshold
tibber
tile
tilt_ble
time_date
timer
tod
todoist
tolo
tomato
tomorrowio
toon
totalconnect
tplink
traccar
trace
tractive
tradfri
trafikverket_ferry
trafikverket_train
trafikverket_weatherstation
transmission
transport_nsw
trend
tts
tuya
twentemilieu
twilio
twinkly
twitch
uk_transport
ukraine_alarm
unifi
unifi_direct
unifiprotect
universal
upb
upcloud
update
upnp
uptime
uptimerobot
usb
usgs_earthquakes_feed
utility_meter
uvc
vacuum
vallox
velbus
venstar
vera
verisure
version
vesync
vicare
vilfo
vizio
vlc_telnet
voicerss
volumio
volvooncall
vulcan
vultr
wake_on_lan
wallbox
water_heater
watttime
waze_travel_time
weather
webhook
webostv
websocket_api
wemo
whirlpool
whois
wiffi
wilight
withings
wiz
wled
wolflink
workday
worldclock
ws66i
wsdot
xbox
xiaomi
xiaomi_aqara
xiaomi_ble
xiaomi_miio
yale_smart_alarm
yalexs_ble
yamaha
yamaha_musiccast
yandex_transport
yandextts
yeelight
yolink
youless
zamg
zeroconf
zerproc
zha
zodiac
zone
zwave_js
zwave_me
__init__.py
conftest.py
fixtures
hassfest
helpers
pylint
resources
scripts
test_util
testing_config
util
__init__.py
bandit.yaml
common.py
conftest.py
ignore_uncaught_exceptions.py
test_bootstrap.py
test_config.py
test_config_entries.py
test_core.py
test_data_entry_flow.py
test_exceptions.py
test_loader.py
test_main.py
test_requirements.py
test_runner.py
test_setup.py
test_test_fixtures.py
.core_files.yaml
.coveragerc
.dockerignore
.gitattributes
.gitignore
.hadolint.yaml
.pre-commit-config.yaml
.prettierignore
.readthedocs.yml
.strict-typing
.yamllint
CLA.md
CODEOWNERS
CODE_OF_CONDUCT.md
CONTRIBUTING.md
Dockerfile
Dockerfile.dev
LICENSE.md
MANIFEST.in
README.rst
build.yaml
codecov.yml
mypy.ini
pyproject.toml
requirements.txt
requirements_all.txt
requirements_docs.txt
requirements_test.txt
requirements_test_all.txt
requirements_test_pre_commit.txt
setup.cfg
core/tests/components/python_script/test_init.py

425 lines
13 KiB
Python

"""Test the python_script component."""
import logging
from unittest.mock import mock_open, patch
from homeassistant.components.python_script import DOMAIN, FOLDER, execute
from homeassistant.helpers.service import async_get_all_descriptions
from homeassistant.setup import async_setup_component
from tests.common import patch_yaml_files
async def test_setup(hass):
"""Test we can discover scripts."""
scripts = [
"/some/config/dir/python_scripts/hello.py",
"/some/config/dir/python_scripts/world_beer.py",
]
with patch(
"homeassistant.components.python_script.os.path.isdir", return_value=True
), patch("homeassistant.components.python_script.glob.iglob", return_value=scripts):
res = await async_setup_component(hass, "python_script", {})
assert res
assert hass.services.has_service("python_script", "hello")
assert hass.services.has_service("python_script", "world_beer")
with patch(
"homeassistant.components.python_script.open",
mock_open(read_data="fake source"),
create=True,
), patch("homeassistant.components.python_script.execute") as mock_ex:
await hass.services.async_call(
"python_script", "hello", {"some": "data"}, blocking=True
)
assert len(mock_ex.mock_calls) == 1
hass, script, source, data = mock_ex.mock_calls[0][1]
assert hass is hass
assert script == "hello.py"
assert source == "fake source"
assert data == {"some": "data"}
async def test_setup_fails_on_no_dir(hass, caplog):
"""Test we fail setup when no dir found."""
with patch(
"homeassistant.components.python_script.os.path.isdir", return_value=False
):
res = await async_setup_component(hass, "python_script", {})
assert not res
assert "Folder python_scripts not found in configuration folder" in caplog.text
async def test_execute_with_data(hass, caplog):
"""Test executing a script."""
caplog.set_level(logging.WARNING)
source = """
hass.states.set('test.entity', data.get('name', 'not set'))
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {"name": "paulus"})
await hass.async_block_till_done()
assert hass.states.is_state("test.entity", "paulus")
# No errors logged = good
assert caplog.text == ""
async def test_execute_warns_print(hass, caplog):
"""Test print triggers warning."""
caplog.set_level(logging.WARNING)
source = """
print("This triggers warning.")
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert "Don't use print() inside scripts." in caplog.text
async def test_execute_logging(hass, caplog):
"""Test logging works."""
caplog.set_level(logging.INFO)
source = """
logger.info('Logging from inside script')
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert "Logging from inside script" in caplog.text
async def test_execute_compile_error(hass, caplog):
"""Test compile error logs error."""
caplog.set_level(logging.ERROR)
source = """
this is not valid Python
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert "Error loading script test.py" in caplog.text
async def test_execute_runtime_error(hass, caplog):
"""Test compile error logs error."""
caplog.set_level(logging.ERROR)
source = """
raise Exception('boom')
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert "Error executing script: boom" in caplog.text
async def test_accessing_async_methods(hass, caplog):
"""Test compile error logs error."""
caplog.set_level(logging.ERROR)
source = """
hass.async_stop()
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert "Not allowed to access async methods" in caplog.text
async def test_using_complex_structures(hass, caplog):
"""Test that dicts and lists work."""
caplog.set_level(logging.INFO)
source = """
mydict = {"a": 1, "b": 2}
mylist = [1, 2, 3, 4]
logger.info('Logging from inside script: %s %s' % (mydict["a"], mylist[2]))
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert "Logging from inside script: 1 3" in caplog.text
async def test_accessing_forbidden_methods(hass, caplog):
"""Test compile error logs error."""
caplog.set_level(logging.ERROR)
for source, name in {
"hass.stop()": "HomeAssistant.stop",
"dt_util.set_default_time_zone()": "module.set_default_time_zone",
"datetime.non_existing": "module.non_existing",
"time.tzset()": "TimeWrapper.tzset",
}.items():
caplog.records.clear()
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert f"Not allowed to access {name}" in caplog.text
async def test_iterating(hass):
"""Test compile error logs error."""
source = """
for i in [1, 2]:
hass.states.set('hello.{}'.format(i), 'world')
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert hass.states.is_state("hello.1", "world")
assert hass.states.is_state("hello.2", "world")
async def test_using_enumerate(hass):
"""Test that enumerate is accepted and executed."""
source = """
for index, value in enumerate(["earth", "mars"]):
hass.states.set('hello.{}'.format(index), value)
"""
hass.async_add_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert hass.states.is_state("hello.0", "earth")
assert hass.states.is_state("hello.1", "mars")
async def test_unpacking_sequence(hass, caplog):
"""Test compile error logs error."""
caplog.set_level(logging.ERROR)
source = """
a,b = (1,2)
ab_list = [(a,b) for a,b in [(1, 2), (3, 4)]]
hass.states.set('hello.a', a)
hass.states.set('hello.b', b)
hass.states.set('hello.ab_list', '{}'.format(ab_list))
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert hass.states.is_state("hello.a", "1")
assert hass.states.is_state("hello.b", "2")
assert hass.states.is_state("hello.ab_list", "[(1, 2), (3, 4)]")
# No errors logged = good
assert caplog.text == ""
async def test_execute_sorted(hass, caplog):
"""Test sorted() function."""
caplog.set_level(logging.ERROR)
source = """
a = sorted([3,1,2])
assert(a == [1,2,3])
hass.states.set('hello.a', a[0])
hass.states.set('hello.b', a[1])
hass.states.set('hello.c', a[2])
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert hass.states.is_state("hello.a", "1")
assert hass.states.is_state("hello.b", "2")
assert hass.states.is_state("hello.c", "3")
# No errors logged = good
assert caplog.text == ""
async def test_exposed_modules(hass, caplog):
"""Test datetime and time modules exposed."""
caplog.set_level(logging.ERROR)
source = """
hass.states.set('module.time', time.strftime('%Y', time.gmtime(521276400)))
hass.states.set('module.time_strptime',
time.strftime('%H:%M', time.strptime('12:34', '%H:%M')))
hass.states.set('module.datetime',
datetime.timedelta(minutes=1).total_seconds())
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert hass.states.is_state("module.time", "1986")
assert hass.states.is_state("module.time_strptime", "12:34")
assert hass.states.is_state("module.datetime", "60.0")
# No errors logged = good
assert caplog.text == ""
async def test_execute_functions(hass, caplog):
"""Test functions defined in script can call one another."""
caplog.set_level(logging.ERROR)
source = """
def a():
hass.states.set('hello.a', 'one')
def b():
a()
hass.states.set('hello.b', 'two')
b()
"""
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert hass.states.is_state("hello.a", "one")
assert hass.states.is_state("hello.b", "two")
# No errors logged = good
assert caplog.text == ""
async def test_reload(hass):
"""Test we can re-discover scripts."""
scripts = [
"/some/config/dir/python_scripts/hello.py",
"/some/config/dir/python_scripts/world_beer.py",
]
with patch(
"homeassistant.components.python_script.os.path.isdir", return_value=True
), patch("homeassistant.components.python_script.glob.iglob", return_value=scripts):
res = await async_setup_component(hass, "python_script", {})
assert res
assert hass.services.has_service("python_script", "hello")
assert hass.services.has_service("python_script", "world_beer")
assert hass.services.has_service("python_script", "reload")
scripts = [
"/some/config/dir/python_scripts/hello2.py",
"/some/config/dir/python_scripts/world_beer.py",
]
with patch(
"homeassistant.components.python_script.os.path.isdir", return_value=True
), patch("homeassistant.components.python_script.glob.iglob", return_value=scripts):
await hass.services.async_call("python_script", "reload", {}, blocking=True)
assert not hass.services.has_service("python_script", "hello")
assert hass.services.has_service("python_script", "hello2")
assert hass.services.has_service("python_script", "world_beer")
assert hass.services.has_service("python_script", "reload")
async def test_service_descriptions(hass):
"""Test that service descriptions are loaded and reloaded correctly."""
# Test 1: no user-provided services.yaml file
scripts1 = [
"/some/config/dir/python_scripts/hello.py",
"/some/config/dir/python_scripts/world_beer.py",
]
service_descriptions1 = (
"hello:\n"
" name: ABC\n"
" description: Description of hello.py.\n"
" fields:\n"
" fake_param:\n"
" description: Parameter used by hello.py.\n"
" example: 'This is a test of python_script.hello'"
)
services_yaml1 = {
"{}/{}/services.yaml".format(
hass.config.config_dir, FOLDER
): service_descriptions1
}
with patch(
"homeassistant.components.python_script.os.path.isdir", return_value=True
), patch(
"homeassistant.components.python_script.glob.iglob", return_value=scripts1
), patch(
"homeassistant.components.python_script.os.path.exists", return_value=True
), patch_yaml_files(
services_yaml1
):
await async_setup_component(hass, DOMAIN, {})
descriptions = await async_get_all_descriptions(hass)
assert len(descriptions) == 1
assert descriptions[DOMAIN]["hello"]["name"] == "ABC"
assert descriptions[DOMAIN]["hello"]["description"] == "Description of hello.py."
assert (
descriptions[DOMAIN]["hello"]["fields"]["fake_param"]["description"]
== "Parameter used by hello.py."
)
assert (
descriptions[DOMAIN]["hello"]["fields"]["fake_param"]["example"]
== "This is a test of python_script.hello"
)
# Verify default name = file name
assert descriptions[DOMAIN]["world_beer"]["name"] == "world_beer"
assert descriptions[DOMAIN]["world_beer"]["description"] == ""
assert bool(descriptions[DOMAIN]["world_beer"]["fields"]) is False
# Test 2: user-provided services.yaml file
scripts2 = [
"/some/config/dir/python_scripts/hello2.py",
"/some/config/dir/python_scripts/world_beer.py",
]
service_descriptions2 = (
"hello2:\n"
" description: Description of hello2.py.\n"
" fields:\n"
" fake_param:\n"
" description: Parameter used by hello2.py.\n"
" example: 'This is a test of python_script.hello2'"
)
services_yaml2 = {
"{}/{}/services.yaml".format(
hass.config.config_dir, FOLDER
): service_descriptions2
}
with patch(
"homeassistant.components.python_script.os.path.isdir", return_value=True
), patch(
"homeassistant.components.python_script.glob.iglob", return_value=scripts2
), patch(
"homeassistant.components.python_script.os.path.exists", return_value=True
), patch_yaml_files(
services_yaml2
):
await hass.services.async_call(DOMAIN, "reload", {}, blocking=True)
descriptions = await async_get_all_descriptions(hass)
assert len(descriptions) == 1
assert descriptions[DOMAIN]["hello2"]["description"] == "Description of hello2.py."
assert (
descriptions[DOMAIN]["hello2"]["fields"]["fake_param"]["description"]
== "Parameter used by hello2.py."
)
assert (
descriptions[DOMAIN]["hello2"]["fields"]["fake_param"]["example"]
== "This is a test of python_script.hello2"
)
async def test_sleep_warns_one(hass, caplog):
"""Test time.sleep warns once."""
caplog.set_level(logging.WARNING)
source = """
time.sleep(2)
time.sleep(5)
"""
with patch("homeassistant.components.python_script.time.sleep"):
hass.async_add_executor_job(execute, hass, "test.py", source, {})
await hass.async_block_till_done()
assert caplog.text.count("time.sleep") == 1