mirror of
				https://github.com/home-assistant/supervisor.git
				synced 2025-11-04 00:19:36 +00:00 
			
		
		
		
	* Init services discovery * extend it * Add mqtt provider * Service support * More protocol stuff * Update validate.py * Update validate.py * Update API.md * Update API.md * update api * add API for services * fix lint * add security middleware * Add discovery layout * update * Finish discovery * improve discovery * fix * Update API * Update api * fix * Fix lint * Update API.md * Update __init__.py * Update API.md * Update interface.py * Update mqtt.py * Update discovery.py * Update const.py * Update validate.py * Update validate.py * Update mqtt.py * Update mqtt.py * Update discovery.py * Update discovery.py * Update discovery.py * Update interface.py * Update mqtt.py * Update mqtt.py * Update services.py * Update discovery.py * Update discovery.py * Update mqtt.py * Update discovery.py * Update services.py * Update discovery.py * Update discovery.py * Update mqtt.py * Update discovery.py * fix aiohttp * test * Update const.py * Update addon.py * Update homeassistant.py * Update const.py * Update addon.py * Update homeassistant.py * Update addon.py * Update security.py * Update const.py * Update validate.py * Update const.py * Update addon.py * Update API.md * Update addons.py * Update addon.py * Update validate.py * Update security.py * Update security.py * Update const.py * Update services.py * Update discovery.py * Update API.md * Update services.py * Update API.md * Update services.py * Update discovery.py * Update discovery.py * Update mqtt.py * Update discovery.py * Update discovery.py * Update __init__.py * Update mqtt.py * Update security.py * fix lint * Update core.py * Update API.md * Update services.py
		
			
				
	
	
		
			56 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			56 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Init file for HassIO network rest api."""
 | 
						|
 | 
						|
from .utils import api_process, api_validate
 | 
						|
from ..const import (
 | 
						|
    ATTR_AVAILABLE, ATTR_PROVIDER, ATTR_SLUG, ATTR_SERVICES, REQUEST_FROM)
 | 
						|
from ..coresys import CoreSysAttributes
 | 
						|
 | 
						|
 | 
						|
class APIServices(CoreSysAttributes):
 | 
						|
    """Handle rest api for services functions."""
 | 
						|
 | 
						|
    def _extract_service(self, request):
 | 
						|
        """Return service and if not exists trow a exception."""
 | 
						|
        service = self._services.get(request.match_info.get('service'))
 | 
						|
        if not service:
 | 
						|
            raise RuntimeError("Service not exists")
 | 
						|
 | 
						|
        return service
 | 
						|
 | 
						|
    @api_process
 | 
						|
    async def list(self, request):
 | 
						|
        """Show register services."""
 | 
						|
        services = []
 | 
						|
        for service in self._services.list_services:
 | 
						|
            services.append({
 | 
						|
                ATTR_SLUG: service.slug,
 | 
						|
                ATTR_AVAILABLE: service.enabled,
 | 
						|
                ATTR_PROVIDER: service.provider,
 | 
						|
            })
 | 
						|
 | 
						|
        return {ATTR_SERVICES: services}
 | 
						|
 | 
						|
    @api_process
 | 
						|
    async def set_service(self, request):
 | 
						|
        """Write data into a service."""
 | 
						|
        service = self._extract_service(request)
 | 
						|
        body = await api_validate(service.schema, request)
 | 
						|
 | 
						|
        return service.set_service_data(request[REQUEST_FROM], body)
 | 
						|
 | 
						|
    @api_process
 | 
						|
    async def get_service(self, request):
 | 
						|
        """Read data into a service."""
 | 
						|
        service = self._extract_service(request)
 | 
						|
 | 
						|
        return {
 | 
						|
            ATTR_AVAILABLE: service.enabled,
 | 
						|
            service.slug: service.get_service_data(),
 | 
						|
        }
 | 
						|
 | 
						|
    @api_process
 | 
						|
    async def del_service(self, request):
 | 
						|
        """Delete data into a service."""
 | 
						|
        service = self._extract_service(request)
 | 
						|
        return service.del_service_data(request[REQUEST_FROM])
 |