Add diagnostics to stream's Stream objects (#68020)

* Add diagnostics to stream's Stream objects

Add diagnostics key/value pair to the Stream object. Diagnostics support
in camera integration will be added in a follow up and will access the
diagnostics on the Stream object, similar to the examples in the unit
test.

* Rename to audio/video codec

* Fix test codec names

* Update tests/components/stream/test_worker.py

Co-authored-by: uvjustin <46082645+uvjustin@users.noreply.github.com>

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
Co-authored-by: uvjustin <46082645+uvjustin@users.noreply.github.com>
This commit is contained in:
Allen Porter 2022-03-18 07:40:09 -07:00 committed by GitHub
parent 2686be921c
commit 41a032e3e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 3 deletions

View File

@ -23,7 +23,7 @@ import secrets
import threading import threading
import time import time
from types import MappingProxyType from types import MappingProxyType
from typing import cast from typing import Any, cast
import voluptuous as vol import voluptuous as vol
@ -51,6 +51,7 @@ from .const import (
TARGET_SEGMENT_DURATION_NON_LL_HLS, TARGET_SEGMENT_DURATION_NON_LL_HLS,
) )
from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings from .core import PROVIDERS, IdleTimer, KeyFrameConverter, StreamOutput, StreamSettings
from .diagnostics import Diagnostics
from .hls import HlsStreamOutput, async_setup_hls from .hls import HlsStreamOutput, async_setup_hls
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -225,6 +226,7 @@ class Stream:
if stream_label if stream_label
else _LOGGER else _LOGGER
) )
self._diagnostics = Diagnostics()
def endpoint_url(self, fmt: str) -> str: def endpoint_url(self, fmt: str) -> str:
"""Start the stream and returns a url for the output format.""" """Start the stream and returns a url for the output format."""
@ -259,6 +261,7 @@ class Stream:
self.hass, IdleTimer(self.hass, timeout, idle_callback) self.hass, IdleTimer(self.hass, timeout, idle_callback)
) )
self._outputs[fmt] = provider self._outputs[fmt] = provider
return provider return provider
def remove_provider(self, provider: StreamOutput) -> None: def remove_provider(self, provider: StreamOutput) -> None:
@ -310,6 +313,7 @@ class Stream:
def update_source(self, new_source: str) -> None: def update_source(self, new_source: str) -> None:
"""Restart the stream with a new stream source.""" """Restart the stream with a new stream source."""
self._diagnostics.increment("update_source")
self._logger.debug( self._logger.debug(
"Updating stream source %s", redact_credentials(str(new_source)) "Updating stream source %s", redact_credentials(str(new_source))
) )
@ -323,11 +327,13 @@ class Stream:
# pylint: disable=import-outside-toplevel # pylint: disable=import-outside-toplevel
from .worker import StreamState, StreamWorkerError, stream_worker from .worker import StreamState, StreamWorkerError, stream_worker
stream_state = StreamState(self.hass, self.outputs) stream_state = StreamState(self.hass, self.outputs, self._diagnostics)
wait_timeout = 0 wait_timeout = 0
while not self._thread_quit.wait(timeout=wait_timeout): while not self._thread_quit.wait(timeout=wait_timeout):
start_time = time.time() start_time = time.time()
self.hass.add_job(self._async_update_state, True) self.hass.add_job(self._async_update_state, True)
self._diagnostics.set_value("keepalive", self.keepalive)
self._diagnostics.increment("start_worker")
try: try:
stream_worker( stream_worker(
self.source, self.source,
@ -337,6 +343,7 @@ class Stream:
self._thread_quit, self._thread_quit,
) )
except StreamWorkerError as err: except StreamWorkerError as err:
self._diagnostics.increment("worker_error")
self._logger.error("Error from stream worker: %s", str(err)) self._logger.error("Error from stream worker: %s", str(err))
stream_state.discontinuity() stream_state.discontinuity()
@ -358,6 +365,7 @@ class Stream:
if time.time() - start_time > STREAM_RESTART_RESET_TIME: if time.time() - start_time > STREAM_RESTART_RESET_TIME:
wait_timeout = 0 wait_timeout = 0
wait_timeout += STREAM_RESTART_INCREMENT wait_timeout += STREAM_RESTART_INCREMENT
self._diagnostics.set_value("retry_timeout", wait_timeout)
self._logger.debug( self._logger.debug(
"Restarting stream worker in %d seconds: %s", "Restarting stream worker in %d seconds: %s",
wait_timeout, wait_timeout,
@ -447,6 +455,10 @@ class Stream:
width=width, height=height width=width, height=height
) )
def get_diagnostics(self) -> dict[str, Any]:
"""Return diagnostics information for the stream."""
return self._diagnostics.as_dict()
def _should_retry() -> bool: def _should_retry() -> bool:
"""Return true if worker failures should be retried, for disabling during tests.""" """Return true if worker failures should be retried, for disabling during tests."""

View File

@ -0,0 +1,33 @@
"""Diagnostics for debugging.
The stream component does not have config entries itself, and all diagnostics
information is managed by dependent components (e.g. camera)
"""
from __future__ import annotations
from collections import Counter
from typing import Any
class Diagnostics:
"""Holds diagnostics counters and key/values."""
def __init__(self) -> None:
"""Initialize Diagnostics."""
self._counter: Counter = Counter()
self._values: dict[str, Any] = {}
def increment(self, key: str) -> None:
"""Increment a counter for the spcified key/event."""
self._counter.update(Counter({key: 1}))
def set_value(self, key: str, value: Any) -> None:
"""Update a key/value pair."""
self._values[key] = value
def as_dict(self) -> dict[str, Any]:
"""Return diagnostics as a debug dictionary."""
result = {k: self._counter[k] for k in self._counter}
result.update(self._values)
return result

View File

@ -27,6 +27,7 @@ from .const import (
SOURCE_TIMEOUT, SOURCE_TIMEOUT,
) )
from .core import KeyFrameConverter, Part, Segment, StreamOutput, StreamSettings from .core import KeyFrameConverter, Part, Segment, StreamOutput, StreamSettings
from .diagnostics import Diagnostics
from .hls import HlsStreamOutput from .hls import HlsStreamOutput
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -52,6 +53,7 @@ class StreamState:
self, self,
hass: HomeAssistant, hass: HomeAssistant,
outputs_callback: Callable[[], Mapping[str, StreamOutput]], outputs_callback: Callable[[], Mapping[str, StreamOutput]],
diagnostics: Diagnostics,
) -> None: ) -> None:
"""Initialize StreamState.""" """Initialize StreamState."""
self._stream_id: int = 0 self._stream_id: int = 0
@ -62,6 +64,7 @@ class StreamState:
# sequence gets incremented before the first segment so the first segment # sequence gets incremented before the first segment so the first segment
# has a sequence number of 0. # has a sequence number of 0.
self._sequence = -1 self._sequence = -1
self._diagnostics = diagnostics
@property @property
def sequence(self) -> int: def sequence(self) -> int:
@ -93,6 +96,11 @@ class StreamState:
"""Return the active stream outputs.""" """Return the active stream outputs."""
return list(self._outputs_callback().values()) return list(self._outputs_callback().values())
@property
def diagnostics(self) -> Diagnostics:
"""Return diagnostics object."""
return self._diagnostics
class StreamMuxer: class StreamMuxer:
"""StreamMuxer re-packages video/audio packets for output.""" """StreamMuxer re-packages video/audio packets for output."""
@ -468,6 +476,10 @@ def stream_worker(
# Some audio streams do not have a profile and throw errors when remuxing # Some audio streams do not have a profile and throw errors when remuxing
if audio_stream and audio_stream.profile is None: if audio_stream and audio_stream.profile is None:
audio_stream = None audio_stream = None
stream_state.diagnostics.set_value("container_format", container.format.name)
stream_state.diagnostics.set_value("video_codec", video_stream.name)
if audio_stream:
stream_state.diagnostics.set_value("audio_codec", audio_stream.name)
dts_validator = TimestampValidator() dts_validator = TimestampValidator()
container_packets = PeekIterator( container_packets = PeekIterator(

View File

@ -177,6 +177,14 @@ async def test_hls_stream(
fail_response = await hls_client.get() fail_response = await hls_client.get()
assert fail_response.status == HTTPStatus.NOT_FOUND assert fail_response.status == HTTPStatus.NOT_FOUND
assert stream.get_diagnostics() == {
"container_format": "mov,mp4,m4a,3gp,3g2,mj2",
"keepalive": False,
"start_worker": 1,
"video_codec": "h264",
"worker_error": 1,
}
async def test_stream_timeout( async def test_stream_timeout(
hass, hass_client, setup_component, stream_worker_sync, h264_video hass, hass_client, setup_component, stream_worker_sync, h264_video

View File

@ -270,7 +270,7 @@ class MockPyAv:
def run_worker(hass, stream, stream_source): def run_worker(hass, stream, stream_source):
"""Run the stream worker under test.""" """Run the stream worker under test."""
stream_state = StreamState(hass, stream.outputs) stream_state = StreamState(hass, stream.outputs, stream._diagnostics)
stream_worker( stream_worker(
stream_source, {}, stream_state, KeyFrameConverter(hass), threading.Event() stream_source, {}, stream_state, KeyFrameConverter(hass), threading.Event()
) )
@ -873,6 +873,14 @@ async def test_h265_video_is_hvc1(hass, record_worker_sync):
stream.stop() stream.stop()
assert stream.get_diagnostics() == {
"container_format": "mov,mp4,m4a,3gp,3g2,mj2",
"keepalive": False,
"start_worker": 1,
"video_codec": "hevc",
"worker_error": 1,
}
async def test_get_image(hass, record_worker_sync): async def test_get_image(hass, record_worker_sync):
"""Test that the has_keyframe metadata matches the media.""" """Test that the has_keyframe metadata matches the media."""