diff --git a/tests/components/stream/test_worker.py b/tests/components/stream/test_worker.py index d5f5ef653c6..ffbeb44d79e 100644 --- a/tests/components/stream/test_worker.py +++ b/tests/components/stream/test_worker.py @@ -149,6 +149,7 @@ class FakePyAvBuffer: self.segments = [] self.audio_packets = [] self.video_packets = [] + self.memory_file: io.BytesIO | None = None def add_stream(self, template=None): """Create an output buffer that captures packets for test to examine.""" @@ -171,10 +172,13 @@ class FakePyAvBuffer: """Capture a packet for tests to examine.""" # Forward to appropriate FakeStream packet.stream.mux(packet) + # Make new init/part data available to the worker + self.memory_file.write(b"0") def close(self): """Close the buffer.""" - return + # Make the final segment data available to the worker + self.memory_file.write(b"0") def capture_output_segment(self, segment): """Capture the output segment for tests to inspect.""" @@ -201,23 +205,11 @@ class MockPyAv: def open(self, stream_source, *args, **kwargs): """Return a stream or buffer depending on args.""" if isinstance(stream_source, io.BytesIO): + self.capture_buffer.memory_file = stream_source return self.capture_buffer return self.container -class MockFlushPart: - """Class to hold a wrapper function for check_flush_part.""" - - # Wrap this method with a preceding write so the BytesIO pointer moves - check_flush_part = SegmentBuffer.check_flush_part - - @classmethod - def wrapped_check_flush_part(cls, segment_buffer, packet): - """Wrap check_flush_part to also advance the memory_file pointer.""" - segment_buffer._memory_file.write(b"0") - return cls.check_flush_part(segment_buffer, packet) - - async def async_decode_stream(hass, packets, py_av=None): """Start a stream worker that decodes incoming stream packets into output segments.""" stream = Stream(hass, STREAM_SOURCE, {}) @@ -230,10 +222,6 @@ async def async_decode_stream(hass, packets, py_av=None): with patch("av.open", new=py_av.open), patch( "homeassistant.components.stream.core.StreamOutput.put", side_effect=py_av.capture_buffer.capture_output_segment, - ), patch( - "homeassistant.components.stream.worker.SegmentBuffer.check_flush_part", - side_effect=MockFlushPart.wrapped_check_flush_part, - autospec=True, ): segment_buffer = SegmentBuffer(stream.outputs) stream_worker(STREAM_SOURCE, {}, segment_buffer, threading.Event()) @@ -612,11 +600,7 @@ async def test_update_stream_source(hass): worker_wake.wait() return py_av.open(stream_source, args, kwargs) - with patch("av.open", new=blocking_open), patch( - "homeassistant.components.stream.worker.SegmentBuffer.check_flush_part", - side_effect=MockFlushPart.wrapped_check_flush_part, - autospec=True, - ): + with patch("av.open", new=blocking_open): stream.start() assert worker_open.wait(TIMEOUT) assert last_stream_source == STREAM_SOURCE