Add file name/size sensors to OctoPrint integration (#148636)

This commit is contained in:
Alex Leversen 2025-07-15 06:35:20 -04:00 committed by GitHub
parent 8256401f7f
commit c7aadcdd20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 137 additions and 23 deletions

View File

@ -13,7 +13,7 @@ from homeassistant.components.sensor import (
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import PERCENTAGE, UnitOfTemperature
from homeassistant.const import PERCENTAGE, UnitOfInformation, UnitOfTemperature
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
@ -84,6 +84,8 @@ async def async_setup_entry(
OctoPrintJobPercentageSensor(coordinator, device_id),
OctoPrintEstimatedFinishTimeSensor(coordinator, device_id),
OctoPrintStartTimeSensor(coordinator, device_id),
OctoPrintFileNameSensor(coordinator, device_id),
OctoPrintFileSizeSensor(coordinator, device_id),
]
async_add_entities(entities)
@ -262,3 +264,61 @@ class OctoPrintTemperatureSensor(OctoPrintSensorBase):
def available(self) -> bool:
"""Return if entity is available."""
return self.coordinator.last_update_success and self.coordinator.data["printer"]
class OctoPrintFileNameSensor(OctoPrintSensorBase):
"""Representation of an OctoPrint sensor."""
def __init__(
self,
coordinator: OctoprintDataUpdateCoordinator,
device_id: str,
) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, "Current File", device_id)
@property
def native_value(self) -> str | None:
"""Return sensor state."""
job: OctoprintJobInfo = self.coordinator.data["job"]
return job.job.file.name or None
@property
def available(self) -> bool:
"""Return if entity is available."""
if not self.coordinator.last_update_success:
return False
job: OctoprintJobInfo = self.coordinator.data["job"]
return job and job.job.file.name
class OctoPrintFileSizeSensor(OctoPrintSensorBase):
"""Representation of an OctoPrint sensor."""
_attr_device_class = SensorDeviceClass.DATA_SIZE
_attr_native_unit_of_measurement = UnitOfInformation.BYTES
_attr_suggested_unit_of_measurement = UnitOfInformation.MEGABYTES
def __init__(
self,
coordinator: OctoprintDataUpdateCoordinator,
device_id: str,
) -> None:
"""Initialize a new OctoPrint sensor."""
super().__init__(coordinator, "Current File Size", device_id)
@property
def native_value(self) -> int | None:
"""Return sensor state."""
job: OctoprintJobInfo = self.coordinator.data["job"]
return job.job.file.size or None
@property
def available(self) -> bool:
"""Return if entity is available."""
if not self.coordinator.last_update_success:
return False
job: OctoprintJobInfo = self.coordinator.data["job"]
return job and job.job.file.size

View File

@ -21,7 +21,21 @@ from homeassistant.helpers.typing import UNDEFINED, UndefinedType
from tests.common import MockConfigEntry
DEFAULT_JOB = {
"job": {"file": {}},
"job": {
"averagePrintTime": None,
"estimatedPrintTime": None,
"filament": None,
"file": {
"date": None,
"display": None,
"name": None,
"origin": None,
"path": None,
"size": None,
},
"lastPrintTime": None,
"user": None,
},
"progress": {"completion": 50},
}

View File

@ -4,6 +4,7 @@ from datetime import UTC, datetime
from freezegun.api import FrozenDateTimeFactory
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN, UnitOfInformation
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
@ -23,11 +24,7 @@ async def test_sensors(
},
"temperature": {"tool1": {"actual": 18.83136, "target": 37.83136}},
}
job = {
"job": {"file": {}},
"progress": {"completion": 50, "printTime": 600, "printTimeLeft": 6000},
"state": "Printing",
}
job = __standard_job()
freezer.move_to(datetime(2020, 2, 20, 9, 10, 13, 543, tzinfo=UTC))
await init_integration(hass, "sensor", printer=printer, job=job)
@ -80,6 +77,21 @@ async def test_sensors(
entry = entity_registry.async_get("sensor.octoprint_estimated_finish_time")
assert entry.unique_id == "Estimated Finish Time-uuid"
state = hass.states.get("sensor.octoprint_current_file")
assert state is not None
assert state.state == "Test_File_Name.gcode"
assert state.name == "OctoPrint Current File"
entry = entity_registry.async_get("sensor.octoprint_current_file")
assert entry.unique_id == "Current File-uuid"
state = hass.states.get("sensor.octoprint_current_file_size")
assert state is not None
assert state.state == "123.456789"
assert state.attributes.get("unit_of_measurement") == UnitOfInformation.MEGABYTES
assert state.name == "OctoPrint Current File Size"
entry = entity_registry.async_get("sensor.octoprint_current_file_size")
assert entry.unique_id == "Current File Size-uuid"
async def test_sensors_no_target_temp(
hass: HomeAssistant,
@ -106,11 +118,25 @@ async def test_sensors_no_target_temp(
state = hass.states.get("sensor.octoprint_target_tool1_temp")
assert state is not None
assert state.state == "unknown"
assert state.state == STATE_UNKNOWN
assert state.name == "OctoPrint target tool1 temp"
entry = entity_registry.async_get("sensor.octoprint_target_tool1_temp")
assert entry.unique_id == "target tool1 temp-uuid"
state = hass.states.get("sensor.octoprint_current_file")
assert state is not None
assert state.state == STATE_UNAVAILABLE
assert state.name == "OctoPrint Current File"
entry = entity_registry.async_get("sensor.octoprint_current_file")
assert entry.unique_id == "Current File-uuid"
state = hass.states.get("sensor.octoprint_current_file_size")
assert state is not None
assert state.state == STATE_UNAVAILABLE
assert state.name == "OctoPrint Current File Size"
entry = entity_registry.async_get("sensor.octoprint_current_file_size")
assert entry.unique_id == "Current File Size-uuid"
async def test_sensors_paused(
hass: HomeAssistant,
@ -125,24 +151,20 @@ async def test_sensors_paused(
},
"temperature": {"tool1": {"actual": 18.83136, "target": None}},
}
job = {
"job": {"file": {}},
"progress": {"completion": 50, "printTime": 600, "printTimeLeft": 6000},
"state": "Paused",
}
job = __standard_job()
freezer.move_to(datetime(2020, 2, 20, 9, 10, 0))
await init_integration(hass, "sensor", printer=printer, job=job)
state = hass.states.get("sensor.octoprint_start_time")
assert state is not None
assert state.state == "unknown"
assert state.state == STATE_UNKNOWN
assert state.name == "OctoPrint Start Time"
entry = entity_registry.async_get("sensor.octoprint_start_time")
assert entry.unique_id == "Start Time-uuid"
state = hass.states.get("sensor.octoprint_estimated_finish_time")
assert state is not None
assert state.state == "unknown"
assert state.state == STATE_UNKNOWN
assert state.name == "OctoPrint Estimated Finish Time"
entry = entity_registry.async_get("sensor.octoprint_estimated_finish_time")
assert entry.unique_id == "Estimated Finish Time-uuid"
@ -154,11 +176,7 @@ async def test_sensors_printer_disconnected(
entity_registry: er.EntityRegistry,
) -> None:
"""Test the underlying sensors."""
job = {
"job": {"file": {}},
"progress": {"completion": 50, "printTime": 600, "printTimeLeft": 6000},
"state": "Paused",
}
job = __standard_job()
freezer.move_to(datetime(2020, 2, 20, 9, 10, 0))
await init_integration(hass, "sensor", printer=None, job=job)
@ -171,21 +189,43 @@ async def test_sensors_printer_disconnected(
state = hass.states.get("sensor.octoprint_current_state")
assert state is not None
assert state.state == "unavailable"
assert state.state == STATE_UNAVAILABLE
assert state.name == "OctoPrint Current State"
entry = entity_registry.async_get("sensor.octoprint_current_state")
assert entry.unique_id == "Current State-uuid"
state = hass.states.get("sensor.octoprint_start_time")
assert state is not None
assert state.state == "unknown"
assert state.state == STATE_UNKNOWN
assert state.name == "OctoPrint Start Time"
entry = entity_registry.async_get("sensor.octoprint_start_time")
assert entry.unique_id == "Start Time-uuid"
state = hass.states.get("sensor.octoprint_estimated_finish_time")
assert state is not None
assert state.state == "unknown"
assert state.state == STATE_UNKNOWN
assert state.name == "OctoPrint Estimated Finish Time"
entry = entity_registry.async_get("sensor.octoprint_estimated_finish_time")
assert entry.unique_id == "Estimated Finish Time-uuid"
def __standard_job():
return {
"job": {
"averagePrintTime": 6500,
"estimatedPrintTime": 6000,
"filament": {"tool0": {"length": 3000, "volume": 7}},
"file": {
"date": 1577836800,
"display": "Test File Name",
"name": "Test_File_Name.gcode",
"origin": "local",
"path": "Folder1/Folder2/Test_File_Name.gcode",
"size": 123456789,
},
"lastPrintTime": 12345.678,
"user": "testUser",
},
"progress": {"completion": 50, "printTime": 600, "printTimeLeft": 6000},
"state": "Printing",
}