mirror of
https://github.com/home-assistant/core.git
synced 2025-11-15 05:50:13 +00:00
New component: Python Script (#7950)
* Add initial version
* Fix requirements
* Prefer logging over printing
* Set executor thread name on >Py36 only
* Add tests
* Lint
* Add restrictedpython to test dependencies
* Create python_script.py
From doc:
```
However, an empty dict ({}) is treated as is. If you want to specify a list that can contain anything, specify it as dict:
>>> schema = Schema({}, extra=ALLOW_EXTRA) # don't do this
>>> try:
... schema({'extra': 1})
... raise AssertionError('MultipleInvalid not raised')
... except MultipleInvalid as e:
... exc = e
>>> str(exc) == "not a valid value"
True
>>> schema({})
{}
>>> schema = Schema(dict) # do this instead
>>> schema({})
{}
>>> schema({'extra': 1})
{'extra': 1}
```
This commit is contained in:
committed by
Pascal Vizeli
parent
640c692e1f
commit
db0efc647d
86
homeassistant/components/python_script.py
Normal file
86
homeassistant/components/python_script.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Component to allow running Python scripts."""
|
||||
import glob
|
||||
import os
|
||||
import logging
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
DOMAIN = 'python_script'
|
||||
REQUIREMENTS = ['restrictedpython==4.0a2']
|
||||
FOLDER = 'python_scripts'
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
CONFIG_SCHEMA = vol.Schema({
|
||||
DOMAIN: vol.Schema(dict)
|
||||
}, extra=vol.ALLOW_EXTRA)
|
||||
|
||||
|
||||
def setup(hass, config):
|
||||
"""Initialize the python_script component."""
|
||||
path = hass.config.path(FOLDER)
|
||||
|
||||
if not os.path.isdir(path):
|
||||
_LOGGER.warning('Folder %s not found in config folder', FOLDER)
|
||||
return False
|
||||
|
||||
def service_handler(call):
|
||||
"""Handle python script service calls."""
|
||||
filename = '{}.py'.format(call.service)
|
||||
with open(hass.config.path(FOLDER, filename)) as fil:
|
||||
execute(hass, filename, fil.read(), call.data)
|
||||
|
||||
for fil in glob.iglob(os.path.join(path, '*.py')):
|
||||
name = os.path.splitext(os.path.basename(fil))[0]
|
||||
hass.services.register(DOMAIN, name, service_handler)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def execute(hass, filename, source, data):
|
||||
"""Execute a script."""
|
||||
from RestrictedPython import compile_restricted_exec
|
||||
from RestrictedPython.Guards import safe_builtins, full_write_guard
|
||||
|
||||
compiled = compile_restricted_exec(source, filename=filename)
|
||||
|
||||
if compiled.errors:
|
||||
_LOGGER.error('Error loading script %s: %s', filename,
|
||||
', '.join(compiled.errors))
|
||||
return
|
||||
|
||||
if compiled.warnings:
|
||||
_LOGGER.warning('Warning loading script %s: %s', filename,
|
||||
', '.join(compiled.warnings))
|
||||
|
||||
restricted_globals = {
|
||||
'__builtins__': safe_builtins,
|
||||
'_print_': StubPrinter,
|
||||
'_getattr_': getattr,
|
||||
'_write_': full_write_guard,
|
||||
}
|
||||
local = {
|
||||
'hass': hass,
|
||||
'data': data,
|
||||
'logger': logging.getLogger('{}.{}'.format(__name__, filename))
|
||||
}
|
||||
|
||||
try:
|
||||
_LOGGER.info('Executing %s: %s', filename, data)
|
||||
# pylint: disable=exec-used
|
||||
exec(compiled.code, restricted_globals, local)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
_LOGGER.exception('Error executing script %s: %s', filename, err)
|
||||
|
||||
|
||||
class StubPrinter:
|
||||
"""Class to handle printing inside scripts."""
|
||||
|
||||
def __init__(self, _getattr_):
|
||||
"""Initialize our printer."""
|
||||
pass
|
||||
|
||||
def _call_print(self, *objects, **kwargs):
|
||||
"""Print text."""
|
||||
# pylint: disable=no-self-use
|
||||
_LOGGER.warning(
|
||||
"Don't use print() inside scripts. Use logger.info() instead.")
|
||||
Reference in New Issue
Block a user