From 31ee54c13366bf3c4641465dd0a20b2c8200d08d Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Mon, 11 May 2020 08:17:10 -0500 Subject: [PATCH] Ensure homekit camera stream can be restarted after failure (#35384) * Ensure camera stream can be restarted after failure * If ffmpeg failed to start, was killed, or the iOS device closed the stream right away, the stream could never be started until the HomeKit bridge was restarted. * watch ffmpeg instead of checking only once * handle forceful shutdowns gracefully * Increase coverage --- homeassistant/components/homekit/const.py | 2 + .../components/homekit/type_cameras.py | 84 +++++++++-- homeassistant/components/homekit/util.py | 10 ++ tests/components/homekit/test_type_cameras.py | 131 ++++++++++++++++-- 4 files changed, 200 insertions(+), 27 deletions(-) diff --git a/homeassistant/components/homekit/const.py b/homeassistant/components/homekit/const.py index 4c917dac3de..7d6ce40e85a 100644 --- a/homeassistant/components/homekit/const.py +++ b/homeassistant/components/homekit/const.py @@ -98,6 +98,7 @@ TYPE_VALVE = "valve" SERV_ACCESSORY_INFO = "AccessoryInformation" SERV_AIR_QUALITY_SENSOR = "AirQualitySensor" SERV_BATTERY_SERVICE = "BatteryService" +SERV_CAMERA_RTP_STREAM_MANAGEMENT = "CameraRTPStreamManagement" SERV_CARBON_DIOXIDE_SENSOR = "CarbonDioxideSensor" SERV_CARBON_MONOXIDE_SENSOR = "CarbonMonoxideSensor" SERV_CONTACT_SENSOR = "ContactSensor" @@ -177,6 +178,7 @@ CHAR_SERIAL_NUMBER = "SerialNumber" CHAR_SLEEP_DISCOVER_MODE = "SleepDiscoveryMode" CHAR_SMOKE_DETECTED = "SmokeDetected" CHAR_STATUS_LOW_BATTERY = "StatusLowBattery" +CHAR_STREAMING_STRATUS = "StreamingStatus" CHAR_SWING_MODE = "SwingMode" CHAR_TARGET_DOOR_STATE = "TargetDoorState" CHAR_TARGET_HEATING_COOLING = "TargetHeatingCoolingState" diff --git a/homeassistant/components/homekit/type_cameras.py b/homeassistant/components/homekit/type_cameras.py index ae91362c5fc..a3ce5fb1400 100644 --- a/homeassistant/components/homekit/type_cameras.py +++ b/homeassistant/components/homekit/type_cameras.py @@ -1,9 +1,11 @@ """Class to hold all camera accessories.""" import asyncio +from datetime import timedelta import logging from haffmpeg.core import HAFFmpeg from pyhap.camera import ( + STREAMING_STATUS, VIDEO_CODEC_PARAM_LEVEL_TYPES, VIDEO_CODEC_PARAM_PROFILE_ID_TYPES, Camera as PyhapCamera, @@ -13,10 +15,12 @@ from pyhap.const import CATEGORY_CAMERA from homeassistant.components.camera.const import DOMAIN as DOMAIN_CAMERA from homeassistant.components.ffmpeg import DATA_FFMPEG from homeassistant.core import callback +from homeassistant.helpers.event import async_track_time_interval from homeassistant.util import get_local_ip from .accessories import TYPES, HomeAccessory from .const import ( + CHAR_STREAMING_STRATUS, CONF_AUDIO_CODEC, CONF_AUDIO_MAP, CONF_AUDIO_PACKET_SIZE, @@ -29,9 +33,10 @@ from .const import ( CONF_VIDEO_CODEC, CONF_VIDEO_MAP, CONF_VIDEO_PACKET_SIZE, + SERV_CAMERA_RTP_STREAM_MANAGEMENT, ) from .img_util import scale_jpeg_camera_image -from .util import CAMERA_SCHEMA +from .util import CAMERA_SCHEMA, pid_is_alive _LOGGER = logging.getLogger(__name__) @@ -84,6 +89,11 @@ RESOLUTIONS = [ VIDEO_PROFILE_NAMES = ["baseline", "main", "high"] +FFMPEG_WATCH_INTERVAL = timedelta(seconds=5) +FFMPEG_WATCHER = "ffmpeg_watcher" +FFMPEG_PID = "ffmpeg_pid" +SESSION_ID = "session_id" + @TYPES.register("Camera") class Camera(HomeAccessory, PyhapCamera): @@ -92,6 +102,7 @@ class Camera(HomeAccessory, PyhapCamera): def __init__(self, hass, driver, name, entity_id, aid, config): """Initialize a Camera accessory object.""" self._ffmpeg = hass.data[DATA_FFMPEG] + self._cur_session = None self._camera = hass.data[DOMAIN_CAMERA] config_w_defaults = CAMERA_SCHEMA(config) @@ -159,11 +170,14 @@ class Camera(HomeAccessory, PyhapCamera): if stream_source: return stream_source try: - return await camera.stream_source() + stream_source = await camera.stream_source() except Exception: # pylint: disable=broad-except _LOGGER.exception( "Failed to get stream source - this could be a transient error or your camera might not be compatible with HomeKit yet" ) + if stream_source: + self.config[CONF_STREAM_SOURCE] = stream_source + return stream_source async def start_stream(self, session_info, stream_config): """Start a new stream with the given configuration.""" @@ -222,7 +236,45 @@ class Camera(HomeAccessory, PyhapCamera): session_info["id"], stream.process.pid, ) - return True + + ffmpeg_watcher = async_track_time_interval( + self.hass, self._async_ffmpeg_watch, FFMPEG_WATCH_INTERVAL + ) + self._cur_session = { + FFMPEG_WATCHER: ffmpeg_watcher, + FFMPEG_PID: stream.process.pid, + SESSION_ID: session_info["id"], + } + + return await self._async_ffmpeg_watch(0) + + async def _async_ffmpeg_watch(self, _): + """Check to make sure ffmpeg is still running and cleanup if not.""" + ffmpeg_pid = self._cur_session[FFMPEG_PID] + session_id = self._cur_session[SESSION_ID] + if pid_is_alive(ffmpeg_pid): + return True + + _LOGGER.warning("Streaming process ended unexpectedly - PID %d", ffmpeg_pid) + self._async_stop_ffmpeg_watch() + self._async_set_streaming_available(session_id) + return False + + @callback + def _async_stop_ffmpeg_watch(self): + """Cleanup a streaming session after stopping.""" + if not self._cur_session: + return + self._cur_session[FFMPEG_WATCHER]() + self._cur_session = None + + @callback + def _async_set_streaming_available(self, session_id): + """Free the session so they can start another.""" + self.streaming_status = STREAMING_STATUS["AVAILABLE"] + self.get_service(SERV_CAMERA_RTP_STREAM_MANAGEMENT).get_characteristic( + CHAR_STREAMING_STRATUS + ).notify() async def stop_stream(self, session_info): """Stop the stream for the given ``session_id``.""" @@ -230,19 +282,23 @@ class Camera(HomeAccessory, PyhapCamera): stream = session_info.get("stream") if not stream: _LOGGER.debug("No stream for session ID %s", session_id) - _LOGGER.info("[%s] Stopping stream.", session_id) - - try: - await stream.close() return - except Exception: # pylint: disable=broad-except - _LOGGER.exception("Failed to gracefully close stream.") - try: - await stream.kill() - except Exception: # pylint: disable=broad-except - _LOGGER.exception("Failed to forcefully close stream.") - _LOGGER.debug("Stream process stopped forcefully.") + self._async_stop_ffmpeg_watch() + + if not pid_is_alive(stream.process.pid): + _LOGGER.info("[%s] Stream already stopped.", session_id) + return True + + for shutdown_method in ["close", "kill"]: + _LOGGER.info("[%s] %s stream.", session_id, shutdown_method) + try: + await getattr(stream, shutdown_method)() + return + except Exception: # pylint: disable=broad-except + _LOGGER.exception( + "[%s] Failed to %s stream.", session_id, shutdown_method + ) async def reconfigure_stream(self, session_info, stream_config): """Reconfigure the stream so that it uses the given ``stream_config``.""" diff --git a/homeassistant/components/homekit/util.py b/homeassistant/components/homekit/util.py index 4c84b17822b..f4943cd603f 100644 --- a/homeassistant/components/homekit/util.py +++ b/homeassistant/components/homekit/util.py @@ -472,3 +472,13 @@ def find_next_available_port(start_port: int): if port == MAX_PORT: raise continue + + +def pid_is_alive(pid): + """Check to see if a process is alive.""" + try: + os.kill(pid, 0) + return True + except OSError: + pass + return False diff --git a/tests/components/homekit/test_type_cameras.py b/tests/components/homekit/test_type_cameras.py index 903b1808463..e3444ca23e4 100644 --- a/tests/components/homekit/test_type_cameras.py +++ b/tests/components/homekit/test_type_cameras.py @@ -14,6 +14,7 @@ from homeassistant.components.homekit.const import ( CONF_SUPPORT_AUDIO, CONF_VIDEO_CODEC, VIDEO_CODEC_COPY, + VIDEO_CODEC_H264_OMX, ) from homeassistant.components.homekit.img_util import TurboJPEGSingleton from homeassistant.components.homekit.type_cameras import Camera @@ -23,12 +24,14 @@ from homeassistant.setup import async_setup_component from .common import mock_turbo_jpeg -from tests.async_mock import AsyncMock, MagicMock, patch +from tests.async_mock import AsyncMock, MagicMock, PropertyMock, patch MOCK_START_STREAM_TLV = "ARUCAQEBEDMD1QMXzEaatnKSQ2pxovYCNAEBAAIJAQECAgECAwEAAwsBAgAFAgLQAgMBHgQXAQFjAgQ768/RAwIrAQQEAAAAPwUCYgUDLAEBAwIMAQEBAgEAAwECBAEUAxYBAW4CBCzq28sDAhgABAQAAKBABgENBAEA" MOCK_END_POINTS_TLV = "ARAzA9UDF8xGmrZykkNqcaL2AgEAAxoBAQACDTE5Mi4xNjguMjA4LjUDAi7IBAKkxwQlAQEAAhDN0+Y0tZ4jzoO0ske9UsjpAw6D76oVXnoi7DbawIG4CwUlAQEAAhCyGcROB8P7vFRDzNF2xrK1Aw6NdcLugju9yCfkWVSaVAYEDoAsAAcEpxV8AA==" MOCK_START_STREAM_SESSION_UUID = UUID("3303d503-17cc-469a-b672-92436a71a2f6") +PID_THAT_WILL_NEVER_BE_ALIVE = 2147483647 + async def _async_start_streaming(hass, acc): """Start streaming a camera.""" @@ -44,13 +47,27 @@ async def _async_setup_endpoints(hass, acc): await hass.async_block_till_done() -async def _async_stop_stream(hass, acc): - """Stop a camera stream.""" +async def _async_reconfigure_stream(hass, acc, session_info, stream_config): + """Reconfigure the stream.""" + await acc.reconfigure_stream(session_info, stream_config) + await acc.run_handler() + await hass.async_block_till_done() + + +async def _async_stop_all_streams(hass, acc): + """Stop all camera streams.""" await acc.stop() await acc.run_handler() await hass.async_block_till_done() +async def _async_stop_stream(hass, acc, session_info): + """Stop a camera stream.""" + await acc.stop_stream(session_info) + await acc.run_handler() + await hass.async_block_till_done() + + @pytest.fixture() def run_driver(hass): """Return a custom AccessoryDriver instance for HomeKit accessory init.""" @@ -66,6 +83,16 @@ def run_driver(hass): ) +def _get_exits_after_startup_mock_ffmpeg(): + """Return a ffmpeg that will have an invalid pid.""" + ffmpeg = MagicMock() + type(ffmpeg.process).pid = PropertyMock(return_value=PID_THAT_WILL_NEVER_BE_ALIVE) + ffmpeg.open = AsyncMock(return_value=True) + ffmpeg.close = AsyncMock(return_value=True) + ffmpeg.kill = AsyncMock(return_value=True) + return ffmpeg + + def _get_working_mock_ffmpeg(): """Return a working ffmpeg.""" ffmpeg = MagicMock() @@ -78,6 +105,7 @@ def _get_working_mock_ffmpeg(): def _get_failing_mock_ffmpeg(): """Return an ffmpeg that fails to shutdown.""" ffmpeg = MagicMock() + type(ffmpeg.process).pid = PropertyMock(return_value=PID_THAT_WILL_NEVER_BE_ALIVE) ffmpeg.open = AsyncMock(return_value=False) ffmpeg.close = AsyncMock(side_effect=OSError) ffmpeg.kill = AsyncMock(side_effect=OSError) @@ -125,7 +153,7 @@ async def test_camera_stream_source_configured(hass, run_driver, events): return_value=working_ffmpeg, ): await _async_start_streaming(hass, acc) - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) expected_output = ( "-map 0:v:0 -an -c:v libx264 -profile:v high -tune zerolatency -pix_fmt " @@ -146,6 +174,10 @@ async def test_camera_stream_source_configured(hass, run_driver, events): stdout_pipe=False, ) + await _async_setup_endpoints(hass, acc) + working_ffmpeg = _get_working_mock_ffmpeg() + session_info = acc.sessions[MOCK_START_STREAM_SESSION_UUID] + with patch( "homeassistant.components.demo.camera.DemoCamera.stream_source", return_value="rtsp://example.local", @@ -154,9 +186,9 @@ async def test_camera_stream_source_configured(hass, run_driver, events): return_value=working_ffmpeg, ): await _async_start_streaming(hass, acc) - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) # Calling a second time should not throw - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) turbo_jpeg = mock_turbo_jpeg( first_width=16, first_height=12, second_width=300, second_height=200 @@ -225,9 +257,9 @@ async def test_camera_stream_source_configured_with_failing_ffmpeg( return_value=_get_failing_mock_ffmpeg(), ): await _async_start_streaming(hass, acc) - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) # Calling a second time should not throw - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) async def test_camera_stream_source_found(hass, run_driver, events): @@ -257,7 +289,9 @@ async def test_camera_stream_source_found(hass, run_driver, events): return_value=_get_working_mock_ffmpeg(), ): await _async_start_streaming(hass, acc) - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) + + await _async_setup_endpoints(hass, acc) with patch( "homeassistant.components.demo.camera.DemoCamera.stream_source", @@ -267,7 +301,7 @@ async def test_camera_stream_source_found(hass, run_driver, events): return_value=_get_working_mock_ffmpeg(), ): await _async_start_streaming(hass, acc) - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) async def test_camera_stream_source_fails(hass, run_driver, events): @@ -297,7 +331,7 @@ async def test_camera_stream_source_fails(hass, run_driver, events): return_value=_get_working_mock_ffmpeg(), ): await _async_start_streaming(hass, acc) - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) async def test_camera_with_no_stream(hass, run_driver, events): @@ -317,7 +351,7 @@ async def test_camera_with_no_stream(hass, run_driver, events): await _async_setup_endpoints(hass, acc) await _async_start_streaming(hass, acc) - await _async_stop_stream(hass, acc) + await _async_stop_all_streams(hass, acc) with pytest.raises(HomeAssistantError): await hass.async_add_executor_job( @@ -370,7 +404,9 @@ async def test_camera_stream_source_configured_and_copy_codec(hass, run_driver, return_value=working_ffmpeg, ): await _async_start_streaming(hass, acc) - await _async_stop_stream(hass, acc) + await _async_reconfigure_stream(hass, acc, session_info, {}) + await _async_stop_stream(hass, acc, session_info) + await _async_stop_all_streams(hass, acc) expected_output = ( "-map 0:v:0 -an -c:v copy -tune zerolatency -pix_fmt yuv420p -r 30 -b:v 299k " @@ -389,3 +425,72 @@ async def test_camera_stream_source_configured_and_copy_codec(hass, run_driver, output=expected_output.format(**session_info), stdout_pipe=False, ) + + +async def test_camera_streaming_fails_after_starting_ffmpeg(hass, run_driver, events): + """Test a camera that can stream with a configured source.""" + await async_setup_component(hass, ffmpeg.DOMAIN, {ffmpeg.DOMAIN: {}}) + await async_setup_component( + hass, camera.DOMAIN, {camera.DOMAIN: {"platform": "demo"}} + ) + + entity_id = "camera.demo_camera" + + hass.states.async_set(entity_id, None) + await hass.async_block_till_done() + acc = Camera( + hass, + run_driver, + "Camera", + entity_id, + 2, + { + CONF_STREAM_SOURCE: "/dev/null", + CONF_SUPPORT_AUDIO: True, + CONF_VIDEO_CODEC: VIDEO_CODEC_H264_OMX, + CONF_AUDIO_CODEC: AUDIO_CODEC_COPY, + }, + ) + bridge = HomeBridge("hass", run_driver, "Test Bridge") + bridge.add_accessory(acc) + + await acc.run_handler() + + assert acc.aid == 2 + assert acc.category == 17 # Camera + + await _async_setup_endpoints(hass, acc) + session_info = acc.sessions[MOCK_START_STREAM_SESSION_UUID] + + ffmpeg_with_invalid_pid = _get_exits_after_startup_mock_ffmpeg() + + with patch( + "homeassistant.components.demo.camera.DemoCamera.stream_source", + return_value=None, + ), patch( + "homeassistant.components.homekit.type_cameras.HAFFmpeg", + return_value=ffmpeg_with_invalid_pid, + ): + await _async_start_streaming(hass, acc) + await _async_reconfigure_stream(hass, acc, session_info, {}) + # Should not throw + await _async_stop_stream(hass, acc, {"id": "does_not_exist"}) + await _async_stop_all_streams(hass, acc) + + expected_output = ( + "-map 0:v:0 -an -c:v h264_omx -profile:v high -tune zerolatency -pix_fmt yuv420p -r 30 -b:v 299k " + "-bufsize 1196k -maxrate 299k -payload_type 99 -ssrc {v_ssrc} -f rtp -srtp_out_suite " + "AES_CM_128_HMAC_SHA1_80 -srtp_out_params zdPmNLWeI86DtLJHvVLI6YPvqhVeeiLsNtrAgbgL " + "srtp://192.168.208.5:51246?rtcpport=51246&localrtcpport=51246&pkt_size=1316 -map 0:a:0 " + "-vn -c:a copy -ac 1 -ar 24k -b:a 24k -bufsize 96k -payload_type 110 -ssrc {a_ssrc} " + "-f rtp -srtp_out_suite AES_CM_128_HMAC_SHA1_80 -srtp_out_params " + "shnETgfD+7xUQ8zRdsaytY11wu6CO73IJ+RZVJpU " + "srtp://192.168.208.5:51108?rtcpport=51108&localrtcpport=51108&pkt_size=188" + ) + + ffmpeg_with_invalid_pid.open.assert_called_with( + cmd=[], + input_source="-i /dev/null", + output=expected_output.format(**session_info), + stdout_pipe=False, + )