mirror of
https://github.com/home-assistant/supervisor.git
synced 2025-10-09 03:39:39 +00:00
![dependabot[bot]](/assets/img/avatar_default.png)
* Bump pyupgrade from 2.26.0 to 2.26.0.post1 Bumps [pyupgrade](https://github.com/asottile/pyupgrade) from 2.26.0 to 2.26.0.post1. - [Release notes](https://github.com/asottile/pyupgrade/releases) - [Commits](https://github.com/asottile/pyupgrade/commits) --- updated-dependencies: - dependency-name: pyupgrade dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com> * Update .pre-commit-config.yaml * Fixes Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Pascal Vizeli <pvizeli@syshack.ch>
68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
"""Init file for Supervisor Observer RESTful API."""
|
|
import asyncio
|
|
import logging
|
|
from typing import Any
|
|
|
|
from aiohttp import web
|
|
import voluptuous as vol
|
|
|
|
from ..const import (
|
|
ATTR_BLK_READ,
|
|
ATTR_BLK_WRITE,
|
|
ATTR_CPU_PERCENT,
|
|
ATTR_HOST,
|
|
ATTR_MEMORY_LIMIT,
|
|
ATTR_MEMORY_PERCENT,
|
|
ATTR_MEMORY_USAGE,
|
|
ATTR_NETWORK_RX,
|
|
ATTR_NETWORK_TX,
|
|
ATTR_UPDATE_AVAILABLE,
|
|
ATTR_VERSION,
|
|
ATTR_VERSION_LATEST,
|
|
)
|
|
from ..coresys import CoreSysAttributes
|
|
from ..validate import version_tag
|
|
from .utils import api_process, api_validate
|
|
|
|
_LOGGER: logging.Logger = logging.getLogger(__name__)
|
|
|
|
SCHEMA_VERSION = vol.Schema({vol.Optional(ATTR_VERSION): version_tag})
|
|
|
|
|
|
class APIObserver(CoreSysAttributes):
|
|
"""Handle RESTful API for Observer functions."""
|
|
|
|
@api_process
|
|
async def info(self, request: web.Request) -> dict[str, Any]:
|
|
"""Return HA Observer information."""
|
|
return {
|
|
ATTR_HOST: str(self.sys_docker.network.observer),
|
|
ATTR_VERSION: self.sys_plugins.observer.version,
|
|
ATTR_VERSION_LATEST: self.sys_plugins.observer.latest_version,
|
|
ATTR_UPDATE_AVAILABLE: self.sys_plugins.observer.need_update,
|
|
}
|
|
|
|
@api_process
|
|
async def stats(self, request: web.Request) -> dict[str, Any]:
|
|
"""Return resource information."""
|
|
stats = await self.sys_plugins.observer.stats()
|
|
|
|
return {
|
|
ATTR_CPU_PERCENT: stats.cpu_percent,
|
|
ATTR_MEMORY_USAGE: stats.memory_usage,
|
|
ATTR_MEMORY_LIMIT: stats.memory_limit,
|
|
ATTR_MEMORY_PERCENT: stats.memory_percent,
|
|
ATTR_NETWORK_RX: stats.network_rx,
|
|
ATTR_NETWORK_TX: stats.network_tx,
|
|
ATTR_BLK_READ: stats.blk_read,
|
|
ATTR_BLK_WRITE: stats.blk_write,
|
|
}
|
|
|
|
@api_process
|
|
async def update(self, request: web.Request) -> None:
|
|
"""Update HA observer."""
|
|
body = await api_validate(SCHEMA_VERSION, request)
|
|
version = body.get(ATTR_VERSION, self.sys_plugins.observer.latest_version)
|
|
|
|
await asyncio.shield(self.sys_plugins.observer.update(version))
|