add interface dbus class

This commit is contained in:
Pascal Vizeli 2018-04-23 23:30:21 +02:00
parent 10219a348f
commit 6f770b78af
4 changed files with 50 additions and 10 deletions

View File

@ -1,6 +1,7 @@
"""DBus interface objects.""" """DBus interface objects."""
from .systemd import Systemd from .systemd import Systemd
from .hostname import Hostname
from ..coresys import CoreSysAttributes from ..coresys import CoreSysAttributes
@ -11,12 +12,19 @@ class DBusManager(CoreSysAttributes):
"""Initialize DBus Interface.""" """Initialize DBus Interface."""
self.coresys = coresys self.coresys = coresys
self._systemd = Systemd() self._systemd = Systemd()
self._hostname = Hostname()
@property @property
def systemd(self): def systemd(self):
"""Return Systemd Interface.""" """Return Systemd Interface."""
return self._systemd return self._systemd
@property
def hostname(self):
"""Return hostname Interface."""
return self._hostname
async def load(self): async def load(self):
"""Connect interfaces to dbus.""" """Connect interfaces to dbus."""
await self.systemd.connect() await self.systemd.connect()
await self.hostname.connect()

View File

@ -1 +1,23 @@
"""DBus interface for hostname.""" """DBus interface for hostname."""
import logging
from .interface import DBusInterface
from .utils import dbus_connected
from ..exceptions import DBusError
from ..utils.gdbus import DBus
_LOGGER = logging.getLogger(__name__)
DBUS_NAME = 'org.freedesktop.hostname1'
DBUS_OBJECT = '/org/freedesktop/hostname1'
class Hostname(DBusInterface):
"""Handle DBus interface for hostname/system."""
async def connect(self):
"""Connect do bus."""
try:
self.dbus = await DBus.connect(DBUS_NAME, DBUS_OBJECT)
except DBusError:
_LOGGER.warning("Can't connect to hostname")

18
hassio/dbus/interface.py Normal file
View File

@ -0,0 +1,18 @@
"""Interface class for dbus wrappers."""
class DBusInterface:
"""Handle DBus interface for hostname/system."""
def __init__(self):
"""Initialize systemd."""
self.dbus = None
@property
def is_connected(self):
"""Return True, if they is connected to dbus."""
return self.dbus is not None
async def connect(self):
"""Connect do bus."""
raise NotImplementedError()

View File

@ -1,6 +1,7 @@
"""Interface to Systemd over dbus.""" """Interface to Systemd over dbus."""
import logging import logging
from .interface import DBusInterface
from .utils import dbus_connected from .utils import dbus_connected
from ..exceptions import DBusError from ..exceptions import DBusError
from ..utils.gdbus import DBus from ..utils.gdbus import DBus
@ -11,18 +12,9 @@ DBUS_NAME = 'org.freedesktop.systemd1'
DBUS_OBJECT = '/org/freedesktop/systemd1' DBUS_OBJECT = '/org/freedesktop/systemd1'
class Systemd: class Systemd(DBusInterface):
"""Systemd function handler.""" """Systemd function handler."""
def __init__(self):
"""Initialize systemd."""
self.dbus = None
@property
def is_connected(self):
"""Return True, if they is connected to dbus."""
return self.dbus is not None
async def connect(self): async def connect(self):
"""Connect do bus.""" """Connect do bus."""
try: try: