Expose time module in Python Scripts (#9736)

* Expose time module in Python Scripts

* Make dt_util available in Python Scripts

* Limit methods in time module

* Add time.mktime

* Limit access to datetime

* Add warning to time.sleep

* Lint
This commit is contained in:
Paulus Schoutsen 2017-10-08 23:51:32 -07:00 committed by Pascal Vizeli
parent 35484ca086
commit 414900fefb
2 changed files with 85 additions and 11 deletions

View File

@ -1,8 +1,9 @@
"""Component to allow running Python scripts.""" """Component to allow running Python scripts."""
import glob
import os
import logging
import datetime import datetime
import glob
import logging
import os
import time
import voluptuous as vol import voluptuous as vol
@ -10,6 +11,7 @@ from homeassistant.const import SERVICE_RELOAD
from homeassistant.exceptions import HomeAssistantError from homeassistant.exceptions import HomeAssistantError
from homeassistant.loader import bind_hass from homeassistant.loader import bind_hass
from homeassistant.util import sanitize_filename from homeassistant.util import sanitize_filename
import homeassistant.util.dt as dt_util
DOMAIN = 'python_script' DOMAIN = 'python_script'
REQUIREMENTS = ['restrictedpython==4.0a3'] REQUIREMENTS = ['restrictedpython==4.0a3']
@ -25,6 +27,13 @@ ALLOWED_EVENTBUS = set(['fire'])
ALLOWED_STATEMACHINE = set(['entity_ids', 'all', 'get', 'is_state', ALLOWED_STATEMACHINE = set(['entity_ids', 'all', 'get', 'is_state',
'is_state_attr', 'remove', 'set']) 'is_state_attr', 'remove', 'set'])
ALLOWED_SERVICEREGISTRY = set(['services', 'has_service', 'call']) ALLOWED_SERVICEREGISTRY = set(['services', 'has_service', 'call'])
ALLOWED_TIME = set(['sleep', 'strftime', 'strptime', 'gmtime', 'localtime',
'ctime', 'time', 'mktime'])
ALLOWED_DATETIME = set(['date', 'time', 'datetime', 'timedelta', 'tzinfo'])
ALLOWED_DT_UTIL = set([
'utcnow', 'now', 'as_utc', 'as_timestamp', 'as_local',
'utc_from_timestamp', 'start_of_local_day', 'parse_datetime', 'parse_date',
'get_age'])
class ScriptError(HomeAssistantError): class ScriptError(HomeAssistantError):
@ -111,7 +120,10 @@ def execute(hass, filename, source, data=None):
elif (obj is hass and name not in ALLOWED_HASS or elif (obj is hass and name not in ALLOWED_HASS or
obj is hass.bus and name not in ALLOWED_EVENTBUS or obj is hass.bus and name not in ALLOWED_EVENTBUS or
obj is hass.states and name not in ALLOWED_STATEMACHINE or obj is hass.states and name not in ALLOWED_STATEMACHINE or
obj is hass.services and name not in ALLOWED_SERVICEREGISTRY): obj is hass.services and name not in ALLOWED_SERVICEREGISTRY or
obj is dt_util and name not in ALLOWED_DT_UTIL or
obj is datetime and name not in ALLOWED_DATETIME or
isinstance(obj, TimeWrapper) and name not in ALLOWED_TIME):
raise ScriptError('Not allowed to access {}.{}'.format( raise ScriptError('Not allowed to access {}.{}'.format(
obj.__class__.__name__, name)) obj.__class__.__name__, name))
@ -120,6 +132,8 @@ def execute(hass, filename, source, data=None):
builtins = safe_builtins.copy() builtins = safe_builtins.copy()
builtins.update(utility_builtins) builtins.update(utility_builtins)
builtins['datetime'] = datetime builtins['datetime'] = datetime
builtins['time'] = TimeWrapper()
builtins['dt_util'] = dt_util
restricted_globals = { restricted_globals = {
'__builtins__': builtins, '__builtins__': builtins,
'_print_': StubPrinter, '_print_': StubPrinter,
@ -159,3 +173,24 @@ class StubPrinter:
# pylint: disable=no-self-use # pylint: disable=no-self-use
_LOGGER.warning( _LOGGER.warning(
"Don't use print() inside scripts. Use logger.info() instead.") "Don't use print() inside scripts. Use logger.info() instead.")
class TimeWrapper:
"""Wrapper of the time module."""
# Class variable, only going to warn once per Home Assistant run
warned = False
# pylint: disable=no-self-use
def sleep(self, *args, **kwargs):
"""Sleep method that warns once."""
if not TimeWrapper.warned:
TimeWrapper.warned = True
_LOGGER.warning('Using time.sleep can reduce the performance of '
'Home Assistant')
time.sleep(*args, **kwargs)
def __getattr__(self, attr):
"""Fetch an attribute from Time module."""
return getattr(time, attr)

View File

@ -157,14 +157,17 @@ logger.info('Logging from inside script: %s %s' % (mydict["a"], mylist[2]))
def test_accessing_forbidden_methods(hass, caplog): def test_accessing_forbidden_methods(hass, caplog):
"""Test compile error logs error.""" """Test compile error logs error."""
caplog.set_level(logging.ERROR) caplog.set_level(logging.ERROR)
source = """
hass.stop()
"""
for source, name in {
'hass.stop()': 'HomeAssistant.stop',
'dt_util.set_default_time_zone()': 'module.set_default_time_zone',
'datetime.non_existing': 'module.non_existing',
'time.tzset()': 'TimeWrapper.tzset',
}.items():
caplog.records.clear()
hass.async_add_job(execute, hass, 'test.py', source, {}) hass.async_add_job(execute, hass, 'test.py', source, {})
yield from hass.async_block_till_done() yield from hass.async_block_till_done()
assert "Not allowed to access {}".format(name) in caplog.text
assert "Not allowed to access HomeAssistant.stop" in caplog.text
@asyncio.coroutine @asyncio.coroutine
@ -205,6 +208,26 @@ hass.states.set('hello.ab_list', '{}'.format(ab_list))
assert caplog.text == '' assert caplog.text == ''
@asyncio.coroutine
def test_exposed_modules(hass, caplog):
"""Test datetime and time modules exposed."""
caplog.set_level(logging.ERROR)
source = """
hass.states.set('module.time', time.strftime('%Y', time.gmtime(521276400)))
hass.states.set('module.datetime',
datetime.timedelta(minutes=1).total_seconds())
"""
hass.async_add_job(execute, hass, 'test.py', source, {})
yield from hass.async_block_till_done()
assert hass.states.is_state('module.time', '1986')
assert hass.states.is_state('module.datetime', '60.0')
# No errors logged = good
assert caplog.text == ''
@asyncio.coroutine @asyncio.coroutine
def test_reload(hass): def test_reload(hass):
"""Test we can re-discover scripts.""" """Test we can re-discover scripts."""
@ -238,3 +261,19 @@ def test_reload(hass):
assert hass.services.has_service('python_script', 'hello2') assert hass.services.has_service('python_script', 'hello2')
assert hass.services.has_service('python_script', 'world_beer') assert hass.services.has_service('python_script', 'world_beer')
assert hass.services.has_service('python_script', 'reload') assert hass.services.has_service('python_script', 'reload')
@asyncio.coroutine
def test_sleep_warns_one(hass, caplog):
"""Test time.sleep warns once."""
caplog.set_level(logging.WARNING)
source = """
time.sleep(2)
time.sleep(5)
"""
with patch('homeassistant.components.python_script.time.sleep'):
hass.async_add_job(execute, hass, 'test.py', source, {})
yield from hass.async_block_till_done()
assert caplog.text.count('time.sleep') == 1