Narrow scope of various pylint inline disables (#15364)

* Narrow scope of various pylint inline disables

* Whitespace tweaks
This commit is contained in:
Ville Skyttä 2018-10-10 13:17:11 +03:00 committed by Paulus Schoutsen
parent 78c38749ab
commit 707b7c202d
30 changed files with 52 additions and 75 deletions

View File

@ -49,10 +49,9 @@ def setup(hass, config):
# It doesn't really matter why we're not able to get the status, just that # It doesn't really matter why we're not able to get the status, just that
# we can't. # we can't.
# pylint: disable=broad-except
try: try:
DATA.update(no_throttle=True) DATA.update(no_throttle=True)
except Exception: except Exception: # pylint: disable=broad-except
_LOGGER.exception("Failure while testing APCUPSd status retrieval.") _LOGGER.exception("Failure while testing APCUPSd status retrieval.")
return False return False
return True return True

View File

@ -68,6 +68,7 @@ class GoogleCalendarData:
self.event = None self.event = None
def _prepare_query(self): def _prepare_query(self):
# pylint: disable=import-error
from httplib2 import ServerNotFoundError from httplib2 import ServerNotFoundError
try: try:

View File

@ -53,14 +53,13 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
add_entities(devices) add_entities(devices)
# pylint: disable=import-error
class EQ3BTSmartThermostat(ClimateDevice): class EQ3BTSmartThermostat(ClimateDevice):
"""Representation of an eQ-3 Bluetooth Smart thermostat.""" """Representation of an eQ-3 Bluetooth Smart thermostat."""
def __init__(self, _mac, _name): def __init__(self, _mac, _name):
"""Initialize the thermostat.""" """Initialize the thermostat."""
# We want to avoid name clash with this module. # We want to avoid name clash with this module.
import eq3bt as eq3 import eq3bt as eq3 # pylint: disable=import-error
self.modes = { self.modes = {
eq3.Mode.Open: STATE_ON, eq3.Mode.Open: STATE_ON,
@ -176,7 +175,7 @@ class EQ3BTSmartThermostat(ClimateDevice):
def update(self): def update(self):
"""Update the data from the thermostat.""" """Update the data from the thermostat."""
from bluepy.btle import BTLEException from bluepy.btle import BTLEException # pylint: disable=import-error
try: try:
self._thermostat.update() self._thermostat.update()
except BTLEException as ex: except BTLEException as ex:

View File

@ -4,8 +4,6 @@ Support for Z-Wave cover components.
For more details about this platform, please refer to the documentation For more details about this platform, please refer to the documentation
https://home-assistant.io/components/cover.zwave/ https://home-assistant.io/components/cover.zwave/
""" """
# Because we do not compile openzwave on CI
# pylint: disable=import-error
import logging import logging
from homeassistant.components.cover import ( from homeassistant.components.cover import (
DOMAIN, SUPPORT_OPEN, SUPPORT_CLOSE, ATTR_POSITION) DOMAIN, SUPPORT_OPEN, SUPPORT_CLOSE, ATTR_POSITION)

View File

@ -43,8 +43,7 @@ class FritzBoxScanner(DeviceScanner):
self.password = config[CONF_PASSWORD] self.password = config[CONF_PASSWORD]
self.success_init = True self.success_init = True
# pylint: disable=import-error import fritzconnection as fc # pylint: disable=import-error
import fritzconnection as fc
# Establish a connection to the FRITZ!Box. # Establish a connection to the FRITZ!Box.
try: try:

View File

@ -86,10 +86,9 @@ class UnifiDeviceScanner(DeviceScanner):
def _disconnect(self): def _disconnect(self):
"""Disconnect the current SSH connection.""" """Disconnect the current SSH connection."""
# pylint: disable=broad-except
try: try:
self.ssh.logout() self.ssh.logout()
except Exception: except Exception: # pylint: disable=broad-except
pass pass
finally: finally:
self.ssh = None self.ssh = None

View File

@ -144,8 +144,7 @@ class GraphiteFeeder(threading.Thread):
try: try:
self._report_attributes( self._report_attributes(
event.data['entity_id'], event.data['new_state']) event.data['entity_id'], event.data['new_state'])
# pylint: disable=broad-except except Exception: # pylint: disable=broad-except
except Exception:
# Catch this so we can avoid the thread dying and # Catch this so we can avoid the thread dying and
# make it visible. # make it visible.
_LOGGER.exception("Failed to process STATE_CHANGED event") _LOGGER.exception("Failed to process STATE_CHANGED event")

View File

@ -65,8 +65,7 @@ def homekit_http_send(self, message_body=None, encode_chunked=False):
def get_serial(accessory): def get_serial(accessory):
"""Obtain the serial number of a HomeKit device.""" """Obtain the serial number of a HomeKit device."""
# pylint: disable=import-error import homekit # pylint: disable=import-error
import homekit
for service in accessory['services']: for service in accessory['services']:
if homekit.ServicesTypes.get_short(service['type']) != \ if homekit.ServicesTypes.get_short(service['type']) != \
'accessory-information': 'accessory-information':
@ -85,8 +84,7 @@ class HKDevice():
def __init__(self, hass, host, port, model, hkid, config_num, config): def __init__(self, hass, host, port, model, hkid, config_num, config):
"""Initialise a generic HomeKit device.""" """Initialise a generic HomeKit device."""
# pylint: disable=import-error import homekit # pylint: disable=import-error
import homekit
_LOGGER.info("Setting up Homekit device %s", model) _LOGGER.info("Setting up Homekit device %s", model)
self.hass = hass self.hass = hass
@ -132,8 +130,7 @@ class HKDevice():
def accessory_setup(self): def accessory_setup(self):
"""Handle setup of a HomeKit accessory.""" """Handle setup of a HomeKit accessory."""
# pylint: disable=import-error import homekit # pylint: disable=import-error
import homekit
try: try:
data = self.get_json('/accessories') data = self.get_json('/accessories')
@ -185,8 +182,7 @@ class HKDevice():
def device_config_callback(self, callback_data): def device_config_callback(self, callback_data):
"""Handle initial pairing.""" """Handle initial pairing."""
# pylint: disable=import-error import homekit # pylint: disable=import-error
import homekit
pairing_id = str(uuid.uuid4()) pairing_id = str(uuid.uuid4())
code = callback_data.get('code').strip() code = callback_data.get('code').strip()
try: try:

View File

@ -778,8 +778,7 @@ class HMDevice(Entity):
# Link events from pyhomematic # Link events from pyhomematic
self._subscribe_homematic_events() self._subscribe_homematic_events()
self._available = not self._hmdevice.UNREACH self._available = not self._hmdevice.UNREACH
# pylint: disable=broad-except except Exception as err: # pylint: disable=broad-except
except Exception as err:
self._connected = False self._connected = False
_LOGGER.error("Exception while linking %s: %s", _LOGGER.error("Exception while linking %s: %s",
self._address, str(err)) self._address, str(err))

View File

@ -58,8 +58,7 @@ class DlibFaceDetectEntity(ImageProcessingFaceEntity):
def process_image(self, image): def process_image(self, image):
"""Process image.""" """Process image."""
# pylint: disable=import-error import face_recognition # pylint: disable=import-error
import face_recognition
fak_file = io.BytesIO(image) fak_file = io.BytesIO(image)
fak_file.name = 'snapshot.jpg' fak_file.name = 'snapshot.jpg'

View File

@ -20,8 +20,7 @@ TAP_KEY_SCHEMA = vol.Schema({})
def setup(hass, config): def setup(hass, config):
"""Listen for keyboard events.""" """Listen for keyboard events."""
# pylint: disable=import-error import pykeyboard # pylint: disable=import-error
import pykeyboard
keyboard = pykeyboard.PyKeyboard() keyboard = pykeyboard.PyKeyboard()
keyboard.special_key_assignment() keyboard.special_key_assignment()

View File

@ -4,7 +4,6 @@ Receive signals from a keyboard and use it as a remote control.
For more details about this platform, please refer to the documentation at For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/keyboard_remote/ https://home-assistant.io/components/keyboard_remote/
""" """
# pylint: disable=import-error
import threading import threading
import logging import logging
import os import os
@ -90,6 +89,7 @@ class KeyboardRemoteThread(threading.Thread):
id_folder = '/dev/input/by-id/' id_folder = '/dev/input/by-id/'
if os.path.isdir(id_folder): if os.path.isdir(id_folder):
# pylint: disable=import-error
from evdev import InputDevice, list_devices from evdev import InputDevice, list_devices
device_names = [InputDevice(file_name).name device_names = [InputDevice(file_name).name
for file_name in list_devices()] for file_name in list_devices()]
@ -104,6 +104,7 @@ class KeyboardRemoteThread(threading.Thread):
def _get_keyboard_device(self): def _get_keyboard_device(self):
"""Get the keyboard device.""" """Get the keyboard device."""
# pylint: disable=import-error
from evdev import InputDevice, list_devices from evdev import InputDevice, list_devices
if self.device_name: if self.device_name:
devices = [InputDevice(file_name) for file_name in list_devices()] devices = [InputDevice(file_name) for file_name in list_devices()]
@ -121,6 +122,7 @@ class KeyboardRemoteThread(threading.Thread):
def run(self): def run(self):
"""Run the loop of the KeyboardRemote.""" """Run the loop of the KeyboardRemote."""
# pylint: disable=import-error
from evdev import categorize, ecodes from evdev import categorize, ecodes
if self.dev is not None: if self.dev is not None:

View File

@ -38,8 +38,7 @@ class HomeKitLight(HomeKitEntity, Light):
def update_characteristics(self, characteristics): def update_characteristics(self, characteristics):
"""Synchronise light state with Home Assistant.""" """Synchronise light state with Home Assistant."""
# pylint: disable=import-error import homekit # pylint: disable=import-error
import homekit
for characteristic in characteristics: for characteristic in characteristics:
ctype = characteristic['type'] ctype = characteristic['type']

View File

@ -447,16 +447,15 @@ class SonosDevice(MediaPlayerDevice):
"""Set available favorites.""" """Set available favorites."""
# SoCo 0.16 raises a generic Exception on invalid xml in favorites. # SoCo 0.16 raises a generic Exception on invalid xml in favorites.
# Filter those out now so our list is safe to use. # Filter those out now so our list is safe to use.
# pylint: disable=broad-except
try: try:
self._favorites = [] self._favorites = []
for fav in self.soco.music_library.get_sonos_favorites(): for fav in self.soco.music_library.get_sonos_favorites():
try: try:
if fav.reference.get_uri(): if fav.reference.get_uri():
self._favorites.append(fav) self._favorites.append(fav)
except Exception: except Exception: # pylint: disable=broad-except
_LOGGER.debug("Ignoring invalid favorite '%s'", fav.title) _LOGGER.debug("Ignoring invalid favorite '%s'", fav.title)
except Exception: except Exception: # pylint: disable=broad-except
_LOGGER.debug("Ignoring invalid favorite list") _LOGGER.debug("Ignoring invalid favorite list")
def _radio_artwork(self, url): def _radio_artwork(self, url):

View File

@ -4,7 +4,6 @@ Support for controlling raspihats boards.
For more details about this component, please refer to the documentation at For more details about this component, please refer to the documentation at
https://home-assistant.io/components/raspihats/ https://home-assistant.io/components/raspihats/
""" """
# pylint: disable=import-error,no-name-in-module
import logging import logging
import threading import threading
import time import time
@ -125,7 +124,7 @@ class I2CHatsManager(threading.Thread):
with self._lock: with self._lock:
i2c_hat = self._i2c_hats.get(address) i2c_hat = self._i2c_hats.get(address)
if i2c_hat is None: if i2c_hat is None:
# pylint: disable=import-error # pylint: disable=import-error,no-name-in-module
import raspihats.i2c_hats as module import raspihats.i2c_hats as module
constructor = getattr(module, board) constructor = getattr(module, board)
i2c_hat = constructor(address) i2c_hat = constructor(address)
@ -143,6 +142,7 @@ class I2CHatsManager(threading.Thread):
def run(self): def run(self):
"""Keep alive for I2C-HATs.""" """Keep alive for I2C-HATs."""
# pylint: disable=import-error,no-name-in-module
from raspihats.i2c_hats import ResponseException from raspihats.i2c_hats import ResponseException
_LOGGER.info(log_message(self, "starting")) _LOGGER.info(log_message(self, "starting"))
@ -205,6 +205,7 @@ class I2CHatsManager(threading.Thread):
def read_di(self, address, channel): def read_di(self, address, channel):
"""Read a value from a I2C-HAT digital input.""" """Read a value from a I2C-HAT digital input."""
# pylint: disable=import-error,no-name-in-module
from raspihats.i2c_hats import ResponseException from raspihats.i2c_hats import ResponseException
with self._lock: with self._lock:
@ -217,6 +218,7 @@ class I2CHatsManager(threading.Thread):
def write_dq(self, address, channel, value): def write_dq(self, address, channel, value):
"""Write a value to a I2C-HAT digital output.""" """Write a value to a I2C-HAT digital output."""
# pylint: disable=import-error,no-name-in-module
from raspihats.i2c_hats import ResponseException from raspihats.i2c_hats import ResponseException
with self._lock: with self._lock:
@ -228,6 +230,7 @@ class I2CHatsManager(threading.Thread):
def read_dq(self, address, channel): def read_dq(self, address, channel):
"""Read a value from a I2C-HAT digital output.""" """Read a value from a I2C-HAT digital output."""
# pylint: disable=import-error,no-name-in-module
from raspihats.i2c_hats import ResponseException from raspihats.i2c_hats import ResponseException
with self._lock: with self._lock:

View File

@ -4,7 +4,6 @@ Support for controlling GPIO pins of a Raspberry Pi.
For more details about this component, please refer to the documentation at For more details about this component, please refer to the documentation at
https://home-assistant.io/components/rpi_gpio/ https://home-assistant.io/components/rpi_gpio/
""" """
# pylint: disable=import-error
import logging import logging
from homeassistant.const import ( from homeassistant.const import (
@ -19,7 +18,7 @@ DOMAIN = 'rpi_gpio'
def setup(hass, config): def setup(hass, config):
"""Set up the Raspberry PI GPIO component.""" """Set up the Raspberry PI GPIO component."""
from RPi import GPIO from RPi import GPIO # pylint: disable=import-error
def cleanup_gpio(event): def cleanup_gpio(event):
"""Stuff to do before stopping.""" """Stuff to do before stopping."""
@ -36,32 +35,32 @@ def setup(hass, config):
def setup_output(port): def setup_output(port):
"""Set up a GPIO as output.""" """Set up a GPIO as output."""
from RPi import GPIO from RPi import GPIO # pylint: disable=import-error
GPIO.setup(port, GPIO.OUT) GPIO.setup(port, GPIO.OUT)
def setup_input(port, pull_mode): def setup_input(port, pull_mode):
"""Set up a GPIO as input.""" """Set up a GPIO as input."""
from RPi import GPIO from RPi import GPIO # pylint: disable=import-error
GPIO.setup(port, GPIO.IN, GPIO.setup(port, GPIO.IN,
GPIO.PUD_DOWN if pull_mode == 'DOWN' else GPIO.PUD_UP) GPIO.PUD_DOWN if pull_mode == 'DOWN' else GPIO.PUD_UP)
def write_output(port, value): def write_output(port, value):
"""Write a value to a GPIO.""" """Write a value to a GPIO."""
from RPi import GPIO from RPi import GPIO # pylint: disable=import-error
GPIO.output(port, value) GPIO.output(port, value)
def read_input(port): def read_input(port):
"""Read a value from a GPIO.""" """Read a value from a GPIO."""
from RPi import GPIO from RPi import GPIO # pylint: disable=import-error
return GPIO.input(port) return GPIO.input(port)
def edge_detect(port, event_callback, bounce): def edge_detect(port, event_callback, bounce):
"""Add detection for RISING and FALLING events.""" """Add detection for RISING and FALLING events."""
from RPi import GPIO from RPi import GPIO # pylint: disable=import-error
GPIO.add_event_detect( GPIO.add_event_detect(
port, port,
GPIO.BOTH, GPIO.BOTH,

View File

@ -42,11 +42,10 @@ def setup(hass, config):
device = config[DOMAIN][CONF_DEVICE] device = config[DOMAIN][CONF_DEVICE]
global SCSGATE global SCSGATE
# pylint: disable=broad-except
try: try:
SCSGATE = SCSGate(device=device, logger=_LOGGER) SCSGATE = SCSGate(device=device, logger=_LOGGER)
SCSGATE.start() SCSGATE.start()
except Exception as exception: except Exception as exception: # pylint: disable=broad-except
_LOGGER.error("Cannot setup SCSGate component: %s", exception) _LOGGER.error("Cannot setup SCSGate component: %s", exception)
return False return False
@ -101,10 +100,9 @@ class SCSGate:
if new_device_activated: if new_device_activated:
self._activate_next_device() self._activate_next_device()
# pylint: disable=broad-except
try: try:
self._devices[message.entity].process_event(message) self._devices[message.entity].process_event(message)
except Exception as exception: except Exception as exception: # pylint: disable=broad-except
msg = "Exception while processing event: {}".format(exception) msg = "Exception while processing event: {}".format(exception)
self._logger.error(msg) self._logger.error(msg)
else: else:

View File

@ -64,12 +64,11 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
}) })
# pylint: disable=import-error
async def async_setup_platform(hass, config, async_add_entities, async def async_setup_platform(hass, config, async_add_entities,
discovery_info=None): discovery_info=None):
"""Set up the BH1750 sensor.""" """Set up the BH1750 sensor."""
import smbus import smbus # pylint: disable=import-error
from i2csense.bh1750 import BH1750 from i2csense.bh1750 import BH1750 # pylint: disable=import-error
name = config.get(CONF_NAME) name = config.get(CONF_NAME)
bus_number = config.get(CONF_I2C_BUS) bus_number = config.get(CONF_I2C_BUS)

View File

@ -79,12 +79,11 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
}) })
# pylint: disable=import-error
async def async_setup_platform(hass, config, async_add_entities, async def async_setup_platform(hass, config, async_add_entities,
discovery_info=None): discovery_info=None):
"""Set up the BME280 sensor.""" """Set up the BME280 sensor."""
import smbus import smbus # pylint: disable=import-error
from i2csense.bme280 import BME280 from i2csense.bme280 import BME280 # pylint: disable=import-error
SENSOR_TYPES[SENSOR_TEMP][1] = hass.config.units.temperature_unit SENSOR_TYPES[SENSOR_TEMP][1] = hass.config.units.temperature_unit
name = config.get(CONF_NAME) name = config.get(CONF_NAME)

View File

@ -115,15 +115,15 @@ async def async_setup_platform(hass, config, async_add_entities,
return return
# pylint: disable=import-error, no-member
def _setup_bme680(config): def _setup_bme680(config):
"""Set up and configure the BME680 sensor.""" """Set up and configure the BME680 sensor."""
from smbus import SMBus from smbus import SMBus # pylint: disable=import-error
import bme680 import bme680
sensor_handler = None sensor_handler = None
sensor = None sensor = None
try: try:
# pylint: disable=no-member
i2c_address = config.get(CONF_I2C_ADDRESS) i2c_address = config.get(CONF_I2C_ADDRESS)
bus = SMBus(config.get(CONF_I2C_BUS)) bus = SMBus(config.get(CONF_I2C_BUS))
sensor = bme680.BME680(i2c_address, bus) sensor = bme680.BME680(i2c_address, bus)

View File

@ -53,8 +53,7 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
def setup_platform(hass, config, add_entities, discovery_info=None): def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the DHT sensor.""" """Set up the DHT sensor."""
# pylint: disable=import-error import Adafruit_DHT # pylint: disable=import-error
import Adafruit_DHT
SENSOR_TYPES[SENSOR_TEMPERATURE][1] = hass.config.units.temperature_unit SENSOR_TYPES[SENSOR_TEMPERATURE][1] = hass.config.units.temperature_unit
available_sensors = { available_sensors = {

View File

@ -138,8 +138,7 @@ class Monitor:
additional_info['namespace'], additional_info['instance'], additional_info['namespace'], additional_info['instance'],
packet.temperature) packet.temperature)
# pylint: disable=import-error from beacontools import ( # pylint: disable=import-error
from beacontools import (
BeaconScanner, EddystoneFilter, EddystoneTLMFrame) BeaconScanner, EddystoneFilter, EddystoneTLMFrame)
device_filters = [EddystoneFilter(d.namespace, d.instance) device_filters = [EddystoneFilter(d.namespace, d.instance)
for d in devices] for d in devices]

View File

@ -63,7 +63,6 @@ FILTER_SCHEMA = vol.Schema({
default=DEFAULT_PRECISION): vol.Coerce(int), default=DEFAULT_PRECISION): vol.Coerce(int),
}) })
# pylint: disable=redefined-builtin
FILTER_OUTLIER_SCHEMA = FILTER_SCHEMA.extend({ FILTER_OUTLIER_SCHEMA = FILTER_SCHEMA.extend({
vol.Required(CONF_FILTER_NAME): FILTER_NAME_OUTLIER, vol.Required(CONF_FILTER_NAME): FILTER_NAME_OUTLIER,
vol.Optional(CONF_FILTER_WINDOW_SIZE, vol.Optional(CONF_FILTER_WINDOW_SIZE,
@ -446,7 +445,8 @@ class TimeSMAFilter(Filter):
variant (enum): type of argorithm used to connect discrete values variant (enum): type of argorithm used to connect discrete values
""" """
def __init__(self, window_size, precision, entity, type): def __init__(self, window_size, precision, entity,
type): # pylint: disable=redefined-builtin
"""Initialize Filter.""" """Initialize Filter."""
super().__init__(FILTER_NAME_TIME_SMA, window_size, precision, entity) super().__init__(FILTER_NAME_TIME_SMA, window_size, precision, entity)
self._time_window = window_size self._time_window = window_size

View File

@ -239,8 +239,7 @@ class FritzBoxPhonebook:
self.number_dict = None self.number_dict = None
self.prefixes = prefixes or [] self.prefixes = prefixes or []
# pylint: disable=import-error import fritzconnection as fc # pylint: disable=import-error
import fritzconnection as fc
# Establish a connection to the FRITZ!Box. # Establish a connection to the FRITZ!Box.
self.fph = fc.FritzPhonebook( self.fph = fc.FritzPhonebook(
address=self.host, user=self.username, password=self.password) address=self.host, user=self.username, password=self.password)

View File

@ -38,12 +38,11 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
}) })
# pylint: disable=import-error
async def async_setup_platform(hass, config, async_add_entities, async def async_setup_platform(hass, config, async_add_entities,
discovery_info=None): discovery_info=None):
"""Set up the HTU21D sensor.""" """Set up the HTU21D sensor."""
import smbus import smbus # pylint: disable=import-error
from i2csense.htu21d import HTU21D from i2csense.htu21d import HTU21D # pylint: disable=import-error
name = config.get(CONF_NAME) name = config.get(CONF_NAME)
bus_number = config.get(CONF_I2C_BUS) bus_number = config.get(CONF_I2C_BUS)

View File

@ -32,8 +32,7 @@ class HomeKitSwitch(HomeKitEntity, SwitchDevice):
def update_characteristics(self, characteristics): def update_characteristics(self, characteristics):
"""Synchronise the switch state with Home Assistant.""" """Synchronise the switch state with Home Assistant."""
# pylint: disable=import-error import homekit # pylint: disable=import-error
import homekit
for characteristic in characteristics: for characteristic in characteristics:
ctype = characteristic['type'] ctype = characteristic['type']

View File

@ -87,7 +87,6 @@ PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
def get_engine(hass, config): def get_engine(hass, config):
"""Set up Amazon Polly speech component.""" """Set up Amazon Polly speech component."""
# pylint: disable=import-error
output_format = config.get(CONF_OUTPUT_FORMAT) output_format = config.get(CONF_OUTPUT_FORMAT)
sample_rate = config.get(CONF_SAMPLE_RATE, sample_rate = config.get(CONF_SAMPLE_RATE,
DEFAULT_SAMPLE_RATES[output_format]) DEFAULT_SAMPLE_RATES[output_format])

View File

@ -4,9 +4,9 @@ Support to check for available updates.
For more details about this component, please refer to the documentation at For more details about this component, please refer to the documentation at
https://home-assistant.io/components/updater/ https://home-assistant.io/components/updater/
""" """
# pylint: disable=no-name-in-module, import-error
import asyncio import asyncio
from datetime import timedelta from datetime import timedelta
# pylint: disable=import-error,no-name-in-module
from distutils.version import StrictVersion from distutils.version import StrictVersion
import json import json
import logging import logging

View File

@ -25,7 +25,7 @@ from typing import Any
def patch_weakref_tasks() -> None: def patch_weakref_tasks() -> None:
"""Replace weakref.WeakSet to address Python 3 bug.""" """Replace weakref.WeakSet to address Python 3 bug."""
# pylint: disable=no-self-use, protected-access, bare-except # pylint: disable=no-self-use, protected-access
import asyncio.tasks import asyncio.tasks
class IgnoreCalls: class IgnoreCalls:
@ -38,7 +38,7 @@ def patch_weakref_tasks() -> None:
asyncio.tasks.Task._all_tasks = IgnoreCalls() # type: ignore asyncio.tasks.Task._all_tasks = IgnoreCalls() # type: ignore
try: try:
del asyncio.tasks.Task.__del__ del asyncio.tasks.Task.__del__
except: # noqa: E722 except: # noqa: E722 pylint: disable=bare-except
pass pass

View File

@ -145,8 +145,7 @@ def run_coroutine_threadsafe(
"""Handle the call to the coroutine.""" """Handle the call to the coroutine."""
try: try:
_chain_future(ensure_future(coro, loop=loop), future) _chain_future(ensure_future(coro, loop=loop), future)
# pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
except Exception as exc:
if future.set_running_or_notify_cancel(): if future.set_running_or_notify_cancel():
future.set_exception(exc) future.set_exception(exc)
else: else:
@ -194,8 +193,7 @@ def run_callback_threadsafe(loop: AbstractEventLoop, callback: Callable,
"""Run callback and store result.""" """Run callback and store result."""
try: try:
future.set_result(callback(*args)) future.set_result(callback(*args))
# pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
except Exception as exc:
if future.set_running_or_notify_cancel(): if future.set_running_or_notify_cancel():
future.set_exception(exc) future.set_exception(exc)
else: else: