[RFC] Add Tag integration (#38727)

Co-authored-by: Paulus Schoutsen <paulus@home-assistant.io>
This commit is contained in:
David F. Mulcahey 2020-08-11 05:08:47 -04:00 committed by GitHub
parent bd3f075060
commit 227d7c0a99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 246 additions and 0 deletions

View File

@ -412,6 +412,7 @@ homeassistant/components/synology_dsm/* @ProtoThis @Quentame
homeassistant/components/synology_srm/* @aerialls
homeassistant/components/syslog/* @fabaff
homeassistant/components/tado/* @michaelarnauts @bdraco
homeassistant/components/tag/* @balloob @dmulcahey
homeassistant/components/tahoma/* @philklei
homeassistant/components/tankerkoenig/* @guillempages
homeassistant/components/tautulli/* @ludeeus

View File

@ -16,6 +16,7 @@
"ssdp",
"sun",
"system_health",
"tag",
"updater",
"zeroconf",
"zone",

View File

@ -0,0 +1,124 @@
"""The Tag integration."""
import logging
import typing
import voluptuous as vol
from homeassistant.const import CONF_ID, CONF_NAME
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import collection
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.storage import Store
from homeassistant.loader import bind_hass
import homeassistant.util.dt as dt_util
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
DEVICE_ID = "device_id"
EVENT_TAG_SCANNED = "tag_scanned"
LAST_SCANNED = "last_scanned"
STORAGE_KEY = DOMAIN
STORAGE_VERSION = 1
TAG_ID = "tag_id"
TAGS = "tags"
CREATE_FIELDS = {
vol.Required(CONF_ID): cv.string,
vol.Optional(TAG_ID): cv.string,
vol.Optional(CONF_NAME): vol.All(str, vol.Length(min=1)),
vol.Optional("description"): cv.string,
vol.Optional(LAST_SCANNED): cv.datetime,
}
UPDATE_FIELDS = {
vol.Optional(CONF_NAME): vol.All(str, vol.Length(min=1)),
vol.Optional("description"): cv.string,
vol.Optional(LAST_SCANNED): cv.datetime,
}
class TagIDExistsError(HomeAssistantError):
"""Raised when an item is not found."""
def __init__(self, item_id: str):
"""Initialize tag id exists error."""
super().__init__(f"Tag with id: {item_id} already exists.")
self.item_id = item_id
class TagIDManager(collection.IDManager):
"""ID manager for tags."""
def generate_id(self, suggestion: str) -> str:
"""Generate an ID."""
if self.has_id(suggestion):
raise TagIDExistsError(suggestion)
return suggestion
class TagStorageCollection(collection.StorageCollection):
"""Tag collection stored in storage."""
CREATE_SCHEMA = vol.Schema(CREATE_FIELDS)
UPDATE_SCHEMA = vol.Schema(UPDATE_FIELDS)
async def _process_create_data(self, data: typing.Dict) -> typing.Dict:
"""Validate the config is valid."""
if TAG_ID in data:
data[CONF_ID] = data.pop(TAG_ID)
data = self.CREATE_SCHEMA(data)
# make last_scanned JSON serializeable
if LAST_SCANNED in data:
data[LAST_SCANNED] = str(data[LAST_SCANNED])
return data
@callback
def _get_suggested_id(self, info: typing.Dict) -> str:
"""Suggest an ID based on the config."""
return info[CONF_ID]
async def _update_data(self, data: dict, update_data: typing.Dict) -> typing.Dict:
"""Return a new updated data object."""
data = {**data, **self.UPDATE_SCHEMA(update_data)}
# make last_scanned JSON serializeable
if LAST_SCANNED in data:
data[LAST_SCANNED] = str(data[LAST_SCANNED])
return data
async def async_setup(hass: HomeAssistant, config: dict):
"""Set up the Tag component."""
hass.data[DOMAIN] = {}
id_manager = TagIDManager()
hass.data[DOMAIN][TAGS] = storage_collection = TagStorageCollection(
Store(hass, STORAGE_VERSION, STORAGE_KEY),
logging.getLogger(f"{__name__}_storage_collection"),
id_manager,
)
await storage_collection.async_load()
collection.StorageCollectionWebsocket(
storage_collection, DOMAIN, DOMAIN, CREATE_FIELDS, UPDATE_FIELDS
).async_setup(hass)
return True
@bind_hass
async def async_scan_tag(hass, tag_id, device_id, context=None):
"""Handle when a tag is scanned."""
if DOMAIN not in hass.config.components:
raise HomeAssistantError("tag component has not been set up.")
hass.bus.async_fire(
EVENT_TAG_SCANNED, {TAG_ID: tag_id, DEVICE_ID: device_id}, context=context
)
helper = hass.data[DOMAIN][TAGS]
if tag_id in helper.data:
await helper.async_update_item(tag_id, {LAST_SCANNED: dt_util.utcnow()})
else:
await helper.async_create_item(
{CONF_ID: tag_id, LAST_SCANNED: dt_util.utcnow()}
)
_LOGGER.debug("Tag: %s scanned by device: %s", tag_id, device_id)

View File

@ -0,0 +1,3 @@
"""Constants for the Tag integration."""
DOMAIN = "tag"

View File

@ -0,0 +1,12 @@
{
"domain": "tag",
"name": "Tag",
"config_flow": false,
"documentation": "https://www.home-assistant.io/integrations/tag",
"requirements": [],
"ssdp": [],
"zeroconf": [],
"homekit": {},
"dependencies": [],
"codeowners": ["@balloob", "@dmulcahey"]
}

View File

@ -0,0 +1,3 @@
{
"title": "Tag"
}

View File

@ -0,0 +1,3 @@
{
"title": "Tag"
}

View File

@ -0,0 +1 @@
"""Tests for the Tag integration."""

View File

@ -0,0 +1,98 @@
"""Tests for the tag component."""
import logging
import pytest
from homeassistant.components.tag import DOMAIN, TAGS, async_scan_tag
from homeassistant.helpers import collection
from homeassistant.setup import async_setup_component
_LOGGER = logging.getLogger(__name__)
@pytest.fixture
def storage_setup(hass, hass_storage):
"""Storage setup."""
async def _storage(items=None):
if items is None:
hass_storage[DOMAIN] = {
"key": DOMAIN,
"version": 1,
"data": {"items": [{"id": "test tag"}]},
}
else:
hass_storage[DOMAIN] = items
config = {DOMAIN: {}}
return await async_setup_component(hass, DOMAIN, config)
return _storage
async def test_ws_list(hass, hass_ws_client, storage_setup):
"""Test listing tags via WS."""
assert await storage_setup()
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": f"{DOMAIN}/list"})
resp = await client.receive_json()
assert resp["success"]
result = {item["id"]: item for item in resp["result"]}
assert len(result) == 1
assert "test tag" in result
async def test_tag_scanned(hass, hass_ws_client, storage_setup):
"""Test scanning tags."""
assert await storage_setup()
client = await hass_ws_client(hass)
await client.send_json({"id": 6, "type": f"{DOMAIN}/list"})
resp = await client.receive_json()
assert resp["success"]
result = {item["id"]: item for item in resp["result"]}
assert len(result) == 1
assert "test tag" in result
await async_scan_tag(hass, "new tag", "some_scanner")
await client.send_json({"id": 7, "type": f"{DOMAIN}/list"})
resp = await client.receive_json()
assert resp["success"]
result = {item["id"]: item for item in resp["result"]}
assert len(result) == 2
assert "test tag" in result
assert "new tag" in result
assert result["new tag"]["last_scanned"] is not None
def track_changes(coll: collection.ObservableCollection):
"""Create helper to track changes in a collection."""
changes = []
async def listener(*args):
changes.append(args)
coll.async_add_listener(listener)
return changes
async def test_tag_id_exists(hass, hass_ws_client, storage_setup):
"""Test scanning tags."""
assert await storage_setup()
changes = track_changes(hass.data[DOMAIN][TAGS])
client = await hass_ws_client(hass)
await client.send_json({"id": 2, "type": f"{DOMAIN}/create", "tag_id": "test tag"})
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "unknown_error"
assert len(changes) == 0