1
0
mirror of https://github.com/home-assistant/core.git synced 2025-08-11 06:20:01 +00:00
Files
.devcontainer
.github
.vscode
docs
homeassistant
machine
pylint
rootfs
script
tests
auth
backports
components
abode
accuweather
acmeda
adax
adguard
advantage_air
aemet
agent_dvr
air_quality
airly
airnow
airthings
airtouch4
airvisual
alarm_control_panel
alarmdecoder
alert
alexa
almond
ambee
amberelectric
ambiclimate
ambient_station
analytics
androidtv
apache_kafka
api
api_streams
apple_tv
apprise
aprs
arcam_fmj
arlo
aseko_pool_live
asuswrt
atag
august
aurora
aurora_abb_powerone
aussie_broadband
auth
automation
awair
aws
axis
azure_devops
azure_event_hub
balboa
bayesian
binary_sensor
blackbird
blebox
blink
blueprint
bluetooth_le_tracker
bmw_connected_drive
bond
bosch_shc
braviatv
broadlink
brother
brunt
bsblan
buienradar
button
caldav
calendar
camera
canary
cast
cert_expiry
climacell
climate
cloud
cloudflare
co2signal
coinbase
color_extractor
comfoconnect
command_line
compensation
config
configurator
control4
conversation
coolmaster
coronavirus
counter
cover
cpuspeed
crownstone
daikin
darksky
datadog
debugpy
deconz
default_config
demo
denonavr
derivative
device_automation
device_sun_light_trigger
device_tracker
devolo_home_control
devolo_home_network
dexcom
dhcp
diagnostics
dialogflow
directv
discovery
dlna_dmr
dnsip
doorbird
dsmr
__init__.py
conftest.py
test_config_flow.py
test_sensor.py
dte_energy_bridge
duckdns
dunehd
dynalite
eafm
ecobee
econet
efergy
elgato
elkm1
elmax
emonitor
emulated_hue
emulated_kasa
emulated_roku
energy
enocean
enphase_envoy
environment_canada
epson
esphome
everlights
evil_genius_labs
ezviz
faa_delays
facebook
facebox
fail2ban
fan
feedreader
ffmpeg
fido
file
filesize
filter
fireservicerota
firmata
fivem
fjaraskupan
flic
flick_electric
flipr
flo
flume
flunearyou
flux
flux_led
folder
folder_watcher
foobot
forecast_solar
forked_daapd
foscam
freebox
freedns
freedompro
fritz
fritzbox
fritzbox_callmonitor
fronius
frontend
garages_amsterdam
gdacs
generic
generic_hygrostat
generic_thermostat
geo_json_events
geo_location
geo_rss_events
geofency
geonetnz_quakes
geonetnz_volcano
gios
github
glances
goalzero
gogogate2
goodwe
google
google_assistant
google_domains
google_pubsub
google_translate
google_travel_time
google_wifi
gpslogger
graphite
gree
greeneye_monitor
group
growatt_server
guardian
habitica
hangouts
harmony
hassio
hddtemp
heos
here_travel_time
hisense_aehw4a1
history
history_stats
hive
hlk_sw16
home_connect
home_plus_control
homeassistant
homekit
homekit_controller
homematic
homematicip_cloud
homewizard
honeywell
html5
http
huawei_lte
hue
huisbaasje
humidifier
hunterdouglas_powerview
hvv_departures
hyperion
ialarm
iaqualink
icloud
ifttt
ign_sismologia
image
image_processing
imap_email_content
influxdb
input_boolean
input_button
input_datetime
input_number
input_select
input_text
insteon
integration
intellifire
intent
intent_script
ios
iotawatt
ipma
ipp
iqvia
islamic_prayer_times
iss
isy994
izone
jellyfin
jewish_calendar
juicenet
keenetic_ndms2
kira
kmtronic
knx
kodi
konnected
kostal_plenticore
kraken
kulersky
lastfm
launch_library
lcn
light
litejet
litterrobot
local_file
local_ip
locative
lock
logbook
logentries
logger
logi_circle
london_air
lookin
lovelace
luftdaten
lutron_caseta
lyric
mailbox
mailgun
manual
manual_mqtt
marytts
maxcube
mazda
media_player
media_source
melcloud
melissa
meraki
met
met_eireann
meteo_france
meteoclimatic
metoffice
mfi
mhz19
microsoft_face
microsoft_face_detect
microsoft_face_identify
mikrotik
mill
min_max
minecraft_server
minio
mjpeg
mobile_app
mochad
modbus
modem_callerid
modern_forms
moehlenhoff_alpha2
mold_indicator
monoprice
moon
motion_blinds
motioneye
mqtt
mqtt_eventstream
mqtt_json
mqtt_room
mqtt_statestream
mullvad
mutesync
my
myq
mysensors
mythicbeastsdns
nam
namecheapdns
nanoleaf
neato
ness_alarm
nest
netatmo
netgear
network
nexia
nextbus
nfandroidtv
nightscout
nina
nmap_tracker
no_ip
notify
notify_events
notion
nsw_fuel_station
nsw_rural_fire_service_feed
nuheat
nuki
numato
number
nut
nws
nx584
nzbget
octoprint
omnilogic
onboarding
oncue
ondilo_ico
onewire
onvif
open_meteo
openalpr_cloud
openalpr_local
openerz
opengarage
openhardwaremonitor
opentherm_gw
openuv
openweathermap
opnsense
overkiz
ovo_energy
owntracks
ozw
p1_monitor
panasonic_viera
panel_custom
panel_iframe
persistent_notification
person
philips_js
pi_hole
picnic
pilight
ping
plaato
plant
plex
plugwise
plum_lightpad
point
poolsense
powerwall
profiler
progettihwsw
prometheus
prosegur
proximity
ps4
push
pushbullet
pvoutput
pvpc_hourly_pricing
python_script
qld_bushfire
qwikswitch
rachio
radarr
rainforest_eagle
rainmachine
random
rdw
recollect_waste
recorder
reddit
remember_the_milk
remote
renault
rest
rest_command
rflink
rfxtrx
ridwell
ring
risco
rituals_perfume_genie
rmvtransport
roku
roomba
roon
rpi_power
rss_feed_template
rtsp_to_webrtc
ruckus_unleashed
safe_mode
samsungtv
scene
scrape
screenlogic
script
search
season
select
sense
senseme
sensibo
sensor
sentry
seventeentrack
sharkiq
shell_command
shelly
shopping_list
sia
sigfox
sighthound
signal_messenger
simplisafe
simulated
siren
slack
sleepiq
sma
smappee
smart_meter_texas
smarthab
smartthings
smarttub
smhi
smtp
snips
solaredge
solarlog
solax
soma
somfy
somfy_mylink
sonarr
songpal
sonos
soundtouch
spaceapi
spc
speedtestdotnet
spider
spotify
sql
squeezebox
srp_energy
ssdp
starline
startca
statistics
statsd
steamist
stookalert
stream
stt
subaru
sun
surepetcare
switch
switchbot
switcher_kis
syncthing
syncthru
synology_dsm
system_bridge
system_health
system_log
tado
tag
tailscale
tasmota
tcp
telegram
tellduslive
template
tesla_wall_connector
threshold
tibber
tile
time_date
timer
tod
tolo
tomato
toon
totalconnect
tplink
traccar
trace
tractive
tradfri
trafikverket_weatherstation
transmission
transport_nsw
trend
tts
tuya
twentemilieu
twilio
twinkly
twitch
uk_transport
unifi
unifi_direct
unifiprotect
universal
upb
upcloud
updater
upnp
uptime
uptimerobot
usb
usgs_earthquakes_feed
utility_meter
uvc
vacuum
vallox
velbus
venstar
vera
verisure
version
vesync
vicare
vilfo
vizio
vlc_telnet
voicerss
volumio
vultr
wake_on_lan
wallbox
water_heater
watttime
waze_travel_time
weather
webhook
webostv
websocket_api
wemo
whirlpool
whois
wiffi
wilight
withings
wiz
wled
wolflink
workday
worldclock
wsdot
xbox
xiaomi
xiaomi_aqara
xiaomi_miio
yale_smart_alarm
yamaha
yamaha_musiccast
yandex_transport
yandextts
yeelight
youless
zeroconf
zerproc
zha
zodiac
zone
zwave
zwave_js
zwave_me
__init__.py
conftest.py
fixtures
hassfest
helpers
mock
pylint
resources
scripts
test_util
testing_config
util
__init__.py
bandit.yaml
common.py
conftest.py
ignore_uncaught_exceptions.py
test_bootstrap.py
test_config.py
test_config_entries.py
test_core.py
test_data_entry_flow.py
test_exceptions.py
test_loader.py
test_main.py
test_requirements.py
test_runner.py
test_setup.py
test_test_fixtures.py
.core_files.yaml
.coveragerc
.dockerignore
.gitattributes
.gitignore
.hadolint.yaml
.ignore
.pre-commit-config.yaml
.prettierignore
.readthedocs.yml
.strict-typing
.yamllint
CLA.md
CODEOWNERS
CODE_OF_CONDUCT.md
CONTRIBUTING.md
Dockerfile
Dockerfile.dev
LICENSE.md
MANIFEST.in
README.rst
build.yaml
codecov.yml
mypy.ini
pyproject.toml
requirements.txt
requirements_all.txt
requirements_docs.txt
requirements_test.txt
requirements_test_all.txt
requirements_test_pre_commit.txt
setup.cfg
setup.py
tox.ini
core/tests/components/dsmr/conftest.py
Gunnar Klauberg 0f2e2aef2f Add DSMR config options for EasyMeter/Q3D ()
Co-authored-by: Rob Bierbooms <mail@robbierbooms.nl>
2022-01-10 13:08:41 +01:00

98 lines
3.1 KiB
Python

"""Common test tools."""
import asyncio
from unittest.mock import MagicMock, patch
from dsmr_parser.clients.protocol import DSMRProtocol
from dsmr_parser.obis_references import (
EQUIPMENT_IDENTIFIER,
EQUIPMENT_IDENTIFIER_GAS,
LUXEMBOURG_EQUIPMENT_IDENTIFIER,
P1_MESSAGE_TIMESTAMP,
Q3D_EQUIPMENT_IDENTIFIER,
)
from dsmr_parser.objects import CosemObject
import pytest
@pytest.fixture
async def dsmr_connection_fixture(hass):
"""Fixture that mocks serial connection."""
transport = MagicMock(spec=asyncio.Transport)
protocol = MagicMock(spec=DSMRProtocol)
async def connection_factory(*args, **kwargs):
"""Return mocked out Asyncio classes."""
return (transport, protocol)
connection_factory = MagicMock(wraps=connection_factory)
with patch(
"homeassistant.components.dsmr.sensor.create_dsmr_reader", connection_factory
), patch(
"homeassistant.components.dsmr.sensor.create_tcp_dsmr_reader",
connection_factory,
):
yield (connection_factory, transport, protocol)
@pytest.fixture
async def dsmr_connection_send_validate_fixture(hass):
"""Fixture that mocks serial connection."""
transport = MagicMock(spec=asyncio.Transport)
protocol = MagicMock(spec=DSMRProtocol)
protocol.telegram = {
EQUIPMENT_IDENTIFIER: CosemObject([{"value": "12345678", "unit": ""}]),
EQUIPMENT_IDENTIFIER_GAS: CosemObject([{"value": "123456789", "unit": ""}]),
P1_MESSAGE_TIMESTAMP: CosemObject([{"value": "12345678", "unit": ""}]),
}
async def connection_factory(*args, **kwargs):
"""Return mocked out Asyncio classes."""
if args[1] == "5L":
protocol.telegram = {
LUXEMBOURG_EQUIPMENT_IDENTIFIER: CosemObject(
[{"value": "12345678", "unit": ""}]
),
EQUIPMENT_IDENTIFIER_GAS: CosemObject(
[{"value": "123456789", "unit": ""}]
),
}
if args[1] == "5S":
protocol.telegram = {
P1_MESSAGE_TIMESTAMP: CosemObject([{"value": "12345678", "unit": ""}]),
}
if args[1] == "Q3D":
protocol.telegram = {
Q3D_EQUIPMENT_IDENTIFIER: CosemObject(
[{"value": "12345678", "unit": ""}]
),
}
return (transport, protocol)
connection_factory = MagicMock(wraps=connection_factory)
async def wait_closed():
if isinstance(connection_factory.call_args_list[0][0][2], str):
# TCP
telegram_callback = connection_factory.call_args_list[0][0][3]
else:
# Serial
telegram_callback = connection_factory.call_args_list[0][0][2]
telegram_callback(protocol.telegram)
protocol.wait_closed = wait_closed
with patch(
"homeassistant.components.dsmr.config_flow.create_dsmr_reader",
connection_factory,
), patch(
"homeassistant.components.dsmr.config_flow.create_tcp_dsmr_reader",
connection_factory,
):
yield (connection_factory, transport, protocol)