Refactor roomba to set vacuums in vacuum file (#132102)

This commit is contained in:
G Johansson 2024-12-03 21:18:54 +01:00 committed by GitHub
parent ab83ec61e0
commit b9e4855e05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 387 additions and 405 deletions

View File

@ -1,128 +0,0 @@
"""Class for Braava devices."""
import logging
from homeassistant.components.vacuum import VacuumEntityFeature
from .entity import SUPPORT_IROBOT, IRobotVacuum
_LOGGER = logging.getLogger(__name__)
ATTR_DETECTED_PAD = "detected_pad"
ATTR_LID_CLOSED = "lid_closed"
ATTR_TANK_PRESENT = "tank_present"
ATTR_TANK_LEVEL = "tank_level"
ATTR_PAD_WETNESS = "spray_amount"
OVERLAP_STANDARD = 67
OVERLAP_DEEP = 85
OVERLAP_EXTENDED = 25
MOP_STANDARD = "Standard"
MOP_DEEP = "Deep"
MOP_EXTENDED = "Extended"
BRAAVA_MOP_BEHAVIORS = [MOP_STANDARD, MOP_DEEP, MOP_EXTENDED]
BRAAVA_SPRAY_AMOUNT = [1, 2, 3]
# Braava Jets can set mopping behavior through fanspeed
SUPPORT_BRAAVA = SUPPORT_IROBOT | VacuumEntityFeature.FAN_SPEED
class BraavaJet(IRobotVacuum): # pylint: disable=hass-enforce-class-module
"""Braava Jet."""
_attr_supported_features = SUPPORT_BRAAVA
def __init__(self, roomba, blid):
"""Initialize the Roomba handler."""
super().__init__(roomba, blid)
# Initialize fan speed list
self._attr_fan_speed_list = [
f"{behavior}-{spray}"
for behavior in BRAAVA_MOP_BEHAVIORS
for spray in BRAAVA_SPRAY_AMOUNT
]
@property
def fan_speed(self):
"""Return the fan speed of the vacuum cleaner."""
# Mopping behavior and spray amount as fan speed
rank_overlap = self.vacuum_state.get("rankOverlap", {})
behavior = None
if rank_overlap == OVERLAP_STANDARD:
behavior = MOP_STANDARD
elif rank_overlap == OVERLAP_DEEP:
behavior = MOP_DEEP
elif rank_overlap == OVERLAP_EXTENDED:
behavior = MOP_EXTENDED
pad_wetness = self.vacuum_state.get("padWetness", {})
# "disposable" and "reusable" values are always the same
pad_wetness_value = pad_wetness.get("disposable")
return f"{behavior}-{pad_wetness_value}"
async def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
try:
split = fan_speed.split("-", 1)
behavior = split[0]
spray = int(split[1])
if behavior.capitalize() in BRAAVA_MOP_BEHAVIORS:
behavior = behavior.capitalize()
except IndexError:
_LOGGER.error(
"Fan speed error: expected {behavior}-{spray_amount}, got '%s'",
fan_speed,
)
return
except ValueError:
_LOGGER.error("Spray amount error: expected integer, got '%s'", split[1])
return
if behavior not in BRAAVA_MOP_BEHAVIORS:
_LOGGER.error(
"Mop behavior error: expected one of %s, got '%s'",
str(BRAAVA_MOP_BEHAVIORS),
behavior,
)
return
if spray not in BRAAVA_SPRAY_AMOUNT:
_LOGGER.error(
"Spray amount error: expected one of %s, got '%d'",
str(BRAAVA_SPRAY_AMOUNT),
spray,
)
return
overlap = 0
if behavior == MOP_STANDARD:
overlap = OVERLAP_STANDARD
elif behavior == MOP_DEEP:
overlap = OVERLAP_DEEP
else:
overlap = OVERLAP_EXTENDED
await self.hass.async_add_executor_job(
self.vacuum.set_preference, "rankOverlap", overlap
)
await self.hass.async_add_executor_job(
self.vacuum.set_preference,
"padWetness",
{"disposable": spray, "reusable": spray},
)
@property
def extra_state_attributes(self):
"""Return the state attributes of the device."""
state_attrs = super().extra_state_attributes
# Get Braava state
state = self.vacuum_state
detected_pad = state.get("detectedPad")
mop_ready = state.get("mopReady", {})
lid_closed = mop_ready.get("lidClosed")
tank_present = mop_ready.get("tankPresent")
tank_level = state.get("tankLvl")
state_attrs[ATTR_DETECTED_PAD] = detected_pad
state_attrs[ATTR_LID_CLOSED] = lid_closed
state_attrs[ATTR_TANK_PRESENT] = tank_present
state_attrs[ATTR_TANK_LEVEL] = tank_level
return state_attrs

View File

@ -2,62 +2,15 @@
from __future__ import annotations from __future__ import annotations
import asyncio from homeassistant.const import ATTR_CONNECTIONS
import logging
from homeassistant.components.vacuum import (
ATTR_STATUS,
STATE_CLEANING,
STATE_DOCKED,
STATE_ERROR,
STATE_RETURNING,
StateVacuumEntity,
VacuumEntityFeature,
)
from homeassistant.const import ATTR_CONNECTIONS, STATE_IDLE, STATE_PAUSED
import homeassistant.helpers.device_registry as dr import homeassistant.helpers.device_registry as dr
from homeassistant.helpers.device_registry import DeviceInfo from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import Entity from homeassistant.helpers.entity import Entity
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
from homeassistant.util.unit_system import METRIC_SYSTEM
from . import roomba_reported_state from . import roomba_reported_state
from .const import DOMAIN from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
ATTR_CLEANING_TIME = "cleaning_time"
ATTR_CLEANED_AREA = "cleaned_area"
ATTR_ERROR = "error"
ATTR_ERROR_CODE = "error_code"
ATTR_POSITION = "position"
ATTR_SOFTWARE_VERSION = "software_version"
# Commonly supported features
SUPPORT_IROBOT = (
VacuumEntityFeature.BATTERY
| VacuumEntityFeature.PAUSE
| VacuumEntityFeature.RETURN_HOME
| VacuumEntityFeature.SEND_COMMAND
| VacuumEntityFeature.START
| VacuumEntityFeature.STATE
| VacuumEntityFeature.STOP
| VacuumEntityFeature.LOCATE
)
STATE_MAP = {
"": STATE_IDLE,
"charge": STATE_DOCKED,
"evac": STATE_RETURNING, # Emptying at cleanbase
"hmMidMsn": STATE_CLEANING, # Recharging at the middle of a cycle
"hmPostMsn": STATE_RETURNING, # Cycle finished
"hmUsrDock": STATE_RETURNING,
"pause": STATE_PAUSED,
"run": STATE_CLEANING,
"stop": STATE_IDLE,
"stuck": STATE_ERROR,
}
class IRobotEntity(Entity): class IRobotEntity(Entity):
"""Base class for iRobot Entities.""" """Base class for iRobot Entities."""
@ -65,7 +18,7 @@ class IRobotEntity(Entity):
_attr_should_poll = False _attr_should_poll = False
_attr_has_entity_name = True _attr_has_entity_name = True
def __init__(self, roomba, blid): def __init__(self, roomba, blid) -> None:
"""Initialize the iRobot handler.""" """Initialize the iRobot handler."""
self.vacuum = roomba self.vacuum = roomba
self._blid = blid self._blid = blid
@ -127,20 +80,6 @@ class IRobotEntity(Entity):
return None return None
return dt_util.utc_from_timestamp(ts) return dt_util.utc_from_timestamp(ts)
@property
def _robot_state(self):
"""Return the state of the vacuum cleaner."""
clean_mission_status = self.vacuum_state.get("cleanMissionStatus", {})
cycle = clean_mission_status.get("cycle")
phase = clean_mission_status.get("phase")
try:
state = STATE_MAP[phase]
except KeyError:
return STATE_ERROR
if cycle != "none" and state in (STATE_IDLE, STATE_DOCKED):
state = STATE_PAUSED
return state
async def async_added_to_hass(self): async def async_added_to_hass(self):
"""Register callback function.""" """Register callback function."""
self.vacuum.register_on_message_callback(self.on_message) self.vacuum.register_on_message_callback(self.on_message)
@ -154,125 +93,3 @@ class IRobotEntity(Entity):
state = json_data.get("state", {}).get("reported", {}) state = json_data.get("state", {}).get("reported", {})
if self.new_state_filter(state): if self.new_state_filter(state):
self.schedule_update_ha_state() self.schedule_update_ha_state()
class IRobotVacuum(IRobotEntity, StateVacuumEntity): # pylint: disable=hass-enforce-class-module
"""Base class for iRobot robots."""
_attr_name = None
_attr_supported_features = SUPPORT_IROBOT
_attr_available = True # Always available, otherwise setup will fail
def __init__(self, roomba, blid):
"""Initialize the iRobot handler."""
super().__init__(roomba, blid)
self._cap_position = self.vacuum_state.get("cap", {}).get("pose") == 1
@property
def state(self):
"""Return the state of the vacuum cleaner."""
return self._robot_state
@property
def extra_state_attributes(self):
"""Return the state attributes of the device."""
state = self.vacuum_state
# Roomba software version
software_version = state.get("softwareVer")
# Set properties that are to appear in the GUI
state_attrs = {ATTR_SOFTWARE_VERSION: software_version}
# Set legacy status to avoid break changes
state_attrs[ATTR_STATUS] = self.vacuum.current_state
# Only add cleaning time and cleaned area attrs when the vacuum is
# currently on
if self.state == STATE_CLEANING:
# Get clean mission status
(
state_attrs[ATTR_CLEANING_TIME],
state_attrs[ATTR_CLEANED_AREA],
) = self.get_cleaning_status(state)
# Error
if self.vacuum.error_code != 0:
state_attrs[ATTR_ERROR] = self.vacuum.error_message
state_attrs[ATTR_ERROR_CODE] = self.vacuum.error_code
# Not all Roombas expose position data
# https://github.com/koalazak/dorita980/issues/48
if self._cap_position:
pos_state = state.get("pose", {})
position = None
pos_x = pos_state.get("point", {}).get("x")
pos_y = pos_state.get("point", {}).get("y")
theta = pos_state.get("theta")
if all(item is not None for item in (pos_x, pos_y, theta)):
position = f"({pos_x}, {pos_y}, {theta})"
state_attrs[ATTR_POSITION] = position
return state_attrs
def get_cleaning_status(self, state) -> tuple[int, int]:
"""Return the cleaning time and cleaned area from the device."""
if not (mission_state := state.get("cleanMissionStatus")):
return (0, 0)
if cleaning_time := mission_state.get("mssnM", 0):
pass
elif start_time := mission_state.get("mssnStrtTm"):
now = dt_util.as_timestamp(dt_util.utcnow())
if now > start_time:
cleaning_time = (now - start_time) // 60
if cleaned_area := mission_state.get("sqft", 0): # Imperial
# Convert to m2 if the unit_system is set to metric
if self.hass.config.units is METRIC_SYSTEM:
cleaned_area = round(cleaned_area * 0.0929)
return (cleaning_time, cleaned_area)
def on_message(self, json_data):
"""Update state on message change."""
state = json_data.get("state", {}).get("reported", {})
if self.new_state_filter(state):
_LOGGER.debug("Got new state from the vacuum: %s", json_data)
self.schedule_update_ha_state()
async def async_start(self):
"""Start or resume the cleaning task."""
if self.state == STATE_PAUSED:
await self.hass.async_add_executor_job(self.vacuum.send_command, "resume")
else:
await self.hass.async_add_executor_job(self.vacuum.send_command, "start")
async def async_stop(self, **kwargs):
"""Stop the vacuum cleaner."""
await self.hass.async_add_executor_job(self.vacuum.send_command, "stop")
async def async_pause(self):
"""Pause the cleaning cycle."""
await self.hass.async_add_executor_job(self.vacuum.send_command, "pause")
async def async_return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock."""
if self.state == STATE_CLEANING:
await self.async_pause()
for _ in range(10):
if self.state == STATE_PAUSED:
break
await asyncio.sleep(1)
await self.hass.async_add_executor_job(self.vacuum.send_command, "dock")
async def async_locate(self, **kwargs):
"""Located vacuum."""
await self.hass.async_add_executor_job(self.vacuum.send_command, "find")
async def async_send_command(self, command, params=None, **kwargs):
"""Send raw command."""
_LOGGER.debug("async_send_command %s (%s), %s", command, params, kwargs)
await self.hass.async_add_executor_job(
self.vacuum.send_command, command, params
)

View File

@ -1,89 +0,0 @@
"""Class for Roomba devices."""
import logging
from homeassistant.components.vacuum import VacuumEntityFeature
from .entity import SUPPORT_IROBOT, IRobotVacuum
_LOGGER = logging.getLogger(__name__)
ATTR_BIN_FULL = "bin_full"
ATTR_BIN_PRESENT = "bin_present"
FAN_SPEED_AUTOMATIC = "Automatic"
FAN_SPEED_ECO = "Eco"
FAN_SPEED_PERFORMANCE = "Performance"
FAN_SPEEDS = [FAN_SPEED_AUTOMATIC, FAN_SPEED_ECO, FAN_SPEED_PERFORMANCE]
# Only Roombas with CarpetBost can set their fanspeed
SUPPORT_ROOMBA_CARPET_BOOST = SUPPORT_IROBOT | VacuumEntityFeature.FAN_SPEED
class RoombaVacuum(IRobotVacuum): # pylint: disable=hass-enforce-class-module
"""Basic Roomba robot (without carpet boost)."""
@property
def extra_state_attributes(self):
"""Return the state attributes of the device."""
state_attrs = super().extra_state_attributes
# Get bin state
bin_raw_state = self.vacuum_state.get("bin", {})
bin_state = {}
if bin_raw_state.get("present") is not None:
bin_state[ATTR_BIN_PRESENT] = bin_raw_state.get("present")
if bin_raw_state.get("full") is not None:
bin_state[ATTR_BIN_FULL] = bin_raw_state.get("full")
state_attrs.update(bin_state)
return state_attrs
class RoombaVacuumCarpetBoost(RoombaVacuum): # pylint: disable=hass-enforce-class-module
"""Roomba robot with carpet boost."""
_attr_fan_speed_list = FAN_SPEEDS
_attr_supported_features = SUPPORT_ROOMBA_CARPET_BOOST
@property
def fan_speed(self):
"""Return the fan speed of the vacuum cleaner."""
fan_speed = None
carpet_boost = self.vacuum_state.get("carpetBoost")
high_perf = self.vacuum_state.get("vacHigh")
if carpet_boost is not None and high_perf is not None:
if carpet_boost:
fan_speed = FAN_SPEED_AUTOMATIC
elif high_perf:
fan_speed = FAN_SPEED_PERFORMANCE
else: # carpet_boost and high_perf are False
fan_speed = FAN_SPEED_ECO
return fan_speed
async def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
if fan_speed.capitalize() in FAN_SPEEDS:
fan_speed = fan_speed.capitalize()
_LOGGER.debug("Set fan speed to: %s", fan_speed)
high_perf = None
carpet_boost = None
if fan_speed == FAN_SPEED_AUTOMATIC:
high_perf = False
carpet_boost = True
elif fan_speed == FAN_SPEED_ECO:
high_perf = False
carpet_boost = False
elif fan_speed == FAN_SPEED_PERFORMANCE:
high_perf = True
carpet_boost = False
else:
_LOGGER.error("No such fan speed available: %s", fan_speed)
return
# The set_preference method does only accept string values
await self.hass.async_add_executor_job(
self.vacuum.set_preference, "carpetBoost", str(carpet_boost)
)
await self.hass.async_add_executor_job(
self.vacuum.set_preference, "vacHigh", str(high_perf)
)

View File

@ -2,16 +2,92 @@
from __future__ import annotations from __future__ import annotations
import asyncio
import logging
from typing import Any
from homeassistant.components.vacuum import (
ATTR_STATUS,
STATE_CLEANING,
STATE_DOCKED,
STATE_ERROR,
STATE_RETURNING,
StateVacuumEntity,
VacuumEntityFeature,
)
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.const import STATE_IDLE, STATE_PAUSED
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.util import dt as dt_util
from homeassistant.util.unit_system import METRIC_SYSTEM
from . import roomba_reported_state from . import roomba_reported_state
from .braava import BraavaJet
from .const import DOMAIN from .const import DOMAIN
from .entity import IRobotVacuum from .entity import IRobotEntity
from .models import RoombaData from .models import RoombaData
from .roomba import RoombaVacuum, RoombaVacuumCarpetBoost
SUPPORT_IROBOT = (
VacuumEntityFeature.BATTERY
| VacuumEntityFeature.PAUSE
| VacuumEntityFeature.RETURN_HOME
| VacuumEntityFeature.SEND_COMMAND
| VacuumEntityFeature.START
| VacuumEntityFeature.STATE
| VacuumEntityFeature.STOP
| VacuumEntityFeature.LOCATE
)
STATE_MAP = {
"": STATE_IDLE,
"charge": STATE_DOCKED,
"evac": STATE_RETURNING, # Emptying at cleanbase
"hmMidMsn": STATE_CLEANING, # Recharging at the middle of a cycle
"hmPostMsn": STATE_RETURNING, # Cycle finished
"hmUsrDock": STATE_RETURNING,
"pause": STATE_PAUSED,
"run": STATE_CLEANING,
"stop": STATE_IDLE,
"stuck": STATE_ERROR,
}
_LOGGER = logging.getLogger(__name__)
ATTR_SOFTWARE_VERSION = "software_version"
ATTR_CLEANING_TIME = "cleaning_time"
ATTR_CLEANED_AREA = "cleaned_area"
ATTR_ERROR = "error"
ATTR_ERROR_CODE = "error_code"
ATTR_POSITION = "position"
ATTR_SOFTWARE_VERSION = "software_version"
ATTR_BIN_FULL = "bin_full"
ATTR_BIN_PRESENT = "bin_present"
FAN_SPEED_AUTOMATIC = "Automatic"
FAN_SPEED_ECO = "Eco"
FAN_SPEED_PERFORMANCE = "Performance"
FAN_SPEEDS = [FAN_SPEED_AUTOMATIC, FAN_SPEED_ECO, FAN_SPEED_PERFORMANCE]
# Only Roombas with CarpetBost can set their fanspeed
SUPPORT_ROOMBA_CARPET_BOOST = SUPPORT_IROBOT | VacuumEntityFeature.FAN_SPEED
ATTR_DETECTED_PAD = "detected_pad"
ATTR_LID_CLOSED = "lid_closed"
ATTR_TANK_PRESENT = "tank_present"
ATTR_TANK_LEVEL = "tank_level"
ATTR_PAD_WETNESS = "spray_amount"
OVERLAP_STANDARD = 67
OVERLAP_DEEP = 85
OVERLAP_EXTENDED = 25
MOP_STANDARD = "Standard"
MOP_DEEP = "Deep"
MOP_EXTENDED = "Extended"
BRAAVA_MOP_BEHAVIORS = [MOP_STANDARD, MOP_DEEP, MOP_EXTENDED]
BRAAVA_SPRAY_AMOUNT = [1, 2, 3]
# Braava Jets can set mopping behavior through fanspeed
SUPPORT_BRAAVA = SUPPORT_IROBOT | VacuumEntityFeature.FAN_SPEED
async def async_setup_entry( async def async_setup_entry(
@ -39,3 +115,309 @@ async def async_setup_entry(
roomba_vac = constructor(roomba, blid) roomba_vac = constructor(roomba, blid)
async_add_entities([roomba_vac]) async_add_entities([roomba_vac])
class IRobotVacuum(IRobotEntity, StateVacuumEntity):
"""Base class for iRobot robots."""
_attr_name = None
_attr_supported_features = SUPPORT_IROBOT
_attr_available = True # Always available, otherwise setup will fail
def __init__(self, roomba, blid) -> None:
"""Initialize the iRobot handler."""
super().__init__(roomba, blid)
self._cap_position = self.vacuum_state.get("cap", {}).get("pose") == 1
@property
def _robot_state(self):
"""Return the state of the vacuum cleaner."""
clean_mission_status = self.vacuum_state.get("cleanMissionStatus", {})
cycle = clean_mission_status.get("cycle")
phase = clean_mission_status.get("phase")
try:
state = STATE_MAP[phase]
except KeyError:
return STATE_ERROR
if cycle != "none" and state in (STATE_IDLE, STATE_DOCKED):
state = STATE_PAUSED
return state
@property
def state(self) -> str:
"""Return the state of the vacuum cleaner."""
return self._robot_state
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the device."""
state = self.vacuum_state
# Roomba software version
software_version = state.get("softwareVer")
# Set properties that are to appear in the GUI
state_attrs = {ATTR_SOFTWARE_VERSION: software_version}
# Set legacy status to avoid break changes
state_attrs[ATTR_STATUS] = self.vacuum.current_state
# Only add cleaning time and cleaned area attrs when the vacuum is
# currently on
if self.state == STATE_CLEANING:
# Get clean mission status
(
state_attrs[ATTR_CLEANING_TIME],
state_attrs[ATTR_CLEANED_AREA],
) = self.get_cleaning_status(state)
# Error
if self.vacuum.error_code != 0:
state_attrs[ATTR_ERROR] = self.vacuum.error_message
state_attrs[ATTR_ERROR_CODE] = self.vacuum.error_code
# Not all Roombas expose position data
# https://github.com/koalazak/dorita980/issues/48
if self._cap_position:
pos_state = state.get("pose", {})
position = None
pos_x = pos_state.get("point", {}).get("x")
pos_y = pos_state.get("point", {}).get("y")
theta = pos_state.get("theta")
if all(item is not None for item in (pos_x, pos_y, theta)):
position = f"({pos_x}, {pos_y}, {theta})"
state_attrs[ATTR_POSITION] = position
return state_attrs
def get_cleaning_status(self, state) -> tuple[int, int]:
"""Return the cleaning time and cleaned area from the device."""
if not (mission_state := state.get("cleanMissionStatus")):
return (0, 0)
if cleaning_time := mission_state.get("mssnM", 0):
pass
elif start_time := mission_state.get("mssnStrtTm"):
now = dt_util.as_timestamp(dt_util.utcnow())
if now > start_time:
cleaning_time = (now - start_time) // 60
if cleaned_area := mission_state.get("sqft", 0): # Imperial
# Convert to m2 if the unit_system is set to metric
if self.hass.config.units is METRIC_SYSTEM:
cleaned_area = round(cleaned_area * 0.0929)
return (cleaning_time, cleaned_area)
def on_message(self, json_data):
"""Update state on message change."""
state = json_data.get("state", {}).get("reported", {})
if self.new_state_filter(state):
_LOGGER.debug("Got new state from the vacuum: %s", json_data)
self.schedule_update_ha_state()
async def async_start(self) -> None:
"""Start or resume the cleaning task."""
if self.state == STATE_PAUSED:
await self.hass.async_add_executor_job(self.vacuum.send_command, "resume")
else:
await self.hass.async_add_executor_job(self.vacuum.send_command, "start")
async def async_stop(self, **kwargs):
"""Stop the vacuum cleaner."""
await self.hass.async_add_executor_job(self.vacuum.send_command, "stop")
async def async_pause(self) -> None:
"""Pause the cleaning cycle."""
await self.hass.async_add_executor_job(self.vacuum.send_command, "pause")
async def async_return_to_base(self, **kwargs):
"""Set the vacuum cleaner to return to the dock."""
if self.state == STATE_CLEANING:
await self.async_pause()
for _ in range(10):
if self.state == STATE_PAUSED:
break
await asyncio.sleep(1)
await self.hass.async_add_executor_job(self.vacuum.send_command, "dock")
async def async_locate(self, **kwargs):
"""Located vacuum."""
await self.hass.async_add_executor_job(self.vacuum.send_command, "find")
async def async_send_command(self, command, params=None, **kwargs):
"""Send raw command."""
_LOGGER.debug("async_send_command %s (%s), %s", command, params, kwargs)
await self.hass.async_add_executor_job(
self.vacuum.send_command, command, params
)
class RoombaVacuum(IRobotVacuum):
"""Basic Roomba robot (without carpet boost)."""
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the device."""
state_attrs = super().extra_state_attributes
# Get bin state
bin_raw_state = self.vacuum_state.get("bin", {})
bin_state = {}
if bin_raw_state.get("present") is not None:
bin_state[ATTR_BIN_PRESENT] = bin_raw_state.get("present")
if bin_raw_state.get("full") is not None:
bin_state[ATTR_BIN_FULL] = bin_raw_state.get("full")
state_attrs.update(bin_state)
return state_attrs
class RoombaVacuumCarpetBoost(RoombaVacuum):
"""Roomba robot with carpet boost."""
_attr_fan_speed_list = FAN_SPEEDS
_attr_supported_features = SUPPORT_ROOMBA_CARPET_BOOST
@property
def fan_speed(self):
"""Return the fan speed of the vacuum cleaner."""
fan_speed = None
carpet_boost = self.vacuum_state.get("carpetBoost")
high_perf = self.vacuum_state.get("vacHigh")
if carpet_boost is not None and high_perf is not None:
if carpet_boost:
fan_speed = FAN_SPEED_AUTOMATIC
elif high_perf:
fan_speed = FAN_SPEED_PERFORMANCE
else: # carpet_boost and high_perf are False
fan_speed = FAN_SPEED_ECO
return fan_speed
async def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
if fan_speed.capitalize() in FAN_SPEEDS:
fan_speed = fan_speed.capitalize()
_LOGGER.debug("Set fan speed to: %s", fan_speed)
high_perf = None
carpet_boost = None
if fan_speed == FAN_SPEED_AUTOMATIC:
high_perf = False
carpet_boost = True
elif fan_speed == FAN_SPEED_ECO:
high_perf = False
carpet_boost = False
elif fan_speed == FAN_SPEED_PERFORMANCE:
high_perf = True
carpet_boost = False
else:
_LOGGER.error("No such fan speed available: %s", fan_speed)
return
# The set_preference method does only accept string values
await self.hass.async_add_executor_job(
self.vacuum.set_preference, "carpetBoost", str(carpet_boost)
)
await self.hass.async_add_executor_job(
self.vacuum.set_preference, "vacHigh", str(high_perf)
)
class BraavaJet(IRobotVacuum):
"""Braava Jet."""
_attr_supported_features = SUPPORT_BRAAVA
def __init__(self, roomba, blid) -> None:
"""Initialize the Roomba handler."""
super().__init__(roomba, blid)
# Initialize fan speed list
self._attr_fan_speed_list = [
f"{behavior}-{spray}"
for behavior in BRAAVA_MOP_BEHAVIORS
for spray in BRAAVA_SPRAY_AMOUNT
]
@property
def fan_speed(self):
"""Return the fan speed of the vacuum cleaner."""
# Mopping behavior and spray amount as fan speed
rank_overlap = self.vacuum_state.get("rankOverlap", {})
behavior = None
if rank_overlap == OVERLAP_STANDARD:
behavior = MOP_STANDARD
elif rank_overlap == OVERLAP_DEEP:
behavior = MOP_DEEP
elif rank_overlap == OVERLAP_EXTENDED:
behavior = MOP_EXTENDED
pad_wetness = self.vacuum_state.get("padWetness", {})
# "disposable" and "reusable" values are always the same
pad_wetness_value = pad_wetness.get("disposable")
return f"{behavior}-{pad_wetness_value}"
async def async_set_fan_speed(self, fan_speed, **kwargs):
"""Set fan speed."""
try:
split = fan_speed.split("-", 1)
behavior = split[0]
spray = int(split[1])
if behavior.capitalize() in BRAAVA_MOP_BEHAVIORS:
behavior = behavior.capitalize()
except IndexError:
_LOGGER.error(
"Fan speed error: expected {behavior}-{spray_amount}, got '%s'",
fan_speed,
)
return
except ValueError:
_LOGGER.error("Spray amount error: expected integer, got '%s'", split[1])
return
if behavior not in BRAAVA_MOP_BEHAVIORS:
_LOGGER.error(
"Mop behavior error: expected one of %s, got '%s'",
str(BRAAVA_MOP_BEHAVIORS),
behavior,
)
return
if spray not in BRAAVA_SPRAY_AMOUNT:
_LOGGER.error(
"Spray amount error: expected one of %s, got '%d'",
str(BRAAVA_SPRAY_AMOUNT),
spray,
)
return
overlap = 0
if behavior == MOP_STANDARD:
overlap = OVERLAP_STANDARD
elif behavior == MOP_DEEP:
overlap = OVERLAP_DEEP
else:
overlap = OVERLAP_EXTENDED
await self.hass.async_add_executor_job(
self.vacuum.set_preference, "rankOverlap", overlap
)
await self.hass.async_add_executor_job(
self.vacuum.set_preference,
"padWetness",
{"disposable": spray, "reusable": spray},
)
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes of the device."""
state_attrs = super().extra_state_attributes
# Get Braava state
state = self.vacuum_state
detected_pad = state.get("detectedPad")
mop_ready = state.get("mopReady", {})
lid_closed = mop_ready.get("lidClosed")
tank_present = mop_ready.get("tankPresent")
tank_level = state.get("tankLvl")
state_attrs[ATTR_DETECTED_PAD] = detected_pad
state_attrs[ATTR_LID_CLOSED] = lid_closed
state_attrs[ATTR_TANK_PRESENT] = tank_present
state_attrs[ATTR_TANK_LEVEL] = tank_level
return state_attrs