mirror of
https://github.com/esphome/esphome.git
synced 2025-07-28 14:16:40 +00:00
commit
4a1eec567f
@ -375,10 +375,12 @@ def upload_program(config, args, host):
|
||||
password = ota_conf.get(CONF_PASSWORD, "")
|
||||
|
||||
if (
|
||||
not is_ip_address(CORE.address) # pylint: disable=too-many-boolean-expressions
|
||||
and (get_port_type(host) == "MQTT" or config[CONF_MDNS][CONF_DISABLED])
|
||||
and CONF_MQTT in config
|
||||
CONF_MQTT in config # pylint: disable=too-many-boolean-expressions
|
||||
and (not args.device or args.device in ("MQTT", "OTA"))
|
||||
and (
|
||||
((config[CONF_MDNS][CONF_DISABLED]) and not is_ip_address(CORE.address))
|
||||
or get_port_type(host) == "MQTT"
|
||||
)
|
||||
):
|
||||
from esphome import mqtt
|
||||
|
||||
|
@ -128,7 +128,7 @@ void AM2315C::update() {
|
||||
data[2] = 0x00;
|
||||
if (this->write(data, 3) != i2c::ERROR_OK) {
|
||||
ESP_LOGE(TAG, "Write failed!");
|
||||
this->mark_failed();
|
||||
this->status_set_warning();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -138,12 +138,12 @@ void AM2315C::update() {
|
||||
uint8_t status = 0;
|
||||
if (this->read(&status, 1) != i2c::ERROR_OK) {
|
||||
ESP_LOGE(TAG, "Read failed!");
|
||||
this->mark_failed();
|
||||
this->status_set_warning();
|
||||
return;
|
||||
}
|
||||
if ((status & 0x80) == 0x80) {
|
||||
ESP_LOGE(TAG, "HW still busy!");
|
||||
this->mark_failed();
|
||||
this->status_set_warning();
|
||||
return;
|
||||
}
|
||||
|
||||
@ -151,7 +151,7 @@ void AM2315C::update() {
|
||||
uint8_t data[7];
|
||||
if (this->read(data, 7) != i2c::ERROR_OK) {
|
||||
ESP_LOGE(TAG, "Read failed!");
|
||||
this->mark_failed();
|
||||
this->status_set_warning();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -52,9 +52,8 @@ void Sml::loop() {
|
||||
break;
|
||||
|
||||
// remove start/end sequence
|
||||
this->sml_data_.erase(this->sml_data_.begin(), this->sml_data_.begin() + START_SEQ.size());
|
||||
this->sml_data_.resize(this->sml_data_.size() - 8);
|
||||
this->process_sml_file_(this->sml_data_);
|
||||
this->process_sml_file_(
|
||||
BytesView(this->sml_data_).subview(START_SEQ.size(), this->sml_data_.size() - START_SEQ.size() - 8));
|
||||
}
|
||||
break;
|
||||
};
|
||||
@ -66,8 +65,8 @@ void Sml::add_on_data_callback(std::function<void(std::vector<uint8_t>, bool)> &
|
||||
this->data_callbacks_.add(std::move(callback));
|
||||
}
|
||||
|
||||
void Sml::process_sml_file_(const bytes &sml_data) {
|
||||
SmlFile sml_file = SmlFile(sml_data);
|
||||
void Sml::process_sml_file_(const BytesView &sml_data) {
|
||||
SmlFile sml_file(sml_data);
|
||||
std::vector<ObisInfo> obis_info = sml_file.get_obis_info();
|
||||
this->publish_obis_info_(obis_info);
|
||||
|
||||
@ -75,6 +74,7 @@ void Sml::process_sml_file_(const bytes &sml_data) {
|
||||
}
|
||||
|
||||
void Sml::log_obis_info_(const std::vector<ObisInfo> &obis_info_vec) {
|
||||
#ifdef ESPHOME_LOG_HAS_DEBUG
|
||||
ESP_LOGD(TAG, "OBIS info:");
|
||||
for (auto const &obis_info : obis_info_vec) {
|
||||
std::string info;
|
||||
@ -83,6 +83,7 @@ void Sml::log_obis_info_(const std::vector<ObisInfo> &obis_info_vec) {
|
||||
info += " [0x" + bytes_repr(obis_info.value) + "]";
|
||||
ESP_LOGD(TAG, "%s", info.c_str());
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
void Sml::publish_obis_info_(const std::vector<ObisInfo> &obis_info_vec) {
|
||||
@ -92,10 +93,11 @@ void Sml::publish_obis_info_(const std::vector<ObisInfo> &obis_info_vec) {
|
||||
}
|
||||
|
||||
void Sml::publish_value_(const ObisInfo &obis_info) {
|
||||
const auto obis_code = obis_info.code_repr();
|
||||
for (auto const &sml_listener : sml_listeners_) {
|
||||
if ((!sml_listener->server_id.empty()) && (bytes_repr(obis_info.server_id) != sml_listener->server_id))
|
||||
continue;
|
||||
if (obis_info.code_repr() != sml_listener->obis_code)
|
||||
if (obis_code != sml_listener->obis_code)
|
||||
continue;
|
||||
sml_listener->publish_val(obis_info);
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ class Sml : public Component, public uart::UARTDevice {
|
||||
void add_on_data_callback(std::function<void(std::vector<uint8_t>, bool)> &&callback);
|
||||
|
||||
protected:
|
||||
void process_sml_file_(const bytes &sml_data);
|
||||
void process_sml_file_(const BytesView &sml_data);
|
||||
void log_obis_info_(const std::vector<ObisInfo> &obis_info_vec);
|
||||
void publish_obis_info_(const std::vector<ObisInfo> &obis_info_vec);
|
||||
char check_start_end_bytes_(uint8_t byte);
|
||||
|
@ -5,17 +5,17 @@
|
||||
namespace esphome {
|
||||
namespace sml {
|
||||
|
||||
SmlFile::SmlFile(bytes buffer) : buffer_(std::move(buffer)) {
|
||||
SmlFile::SmlFile(const BytesView &buffer) : buffer_(buffer) {
|
||||
// extract messages
|
||||
this->pos_ = 0;
|
||||
while (this->pos_ < this->buffer_.size()) {
|
||||
if (this->buffer_[this->pos_] == 0x00)
|
||||
break; // EndOfSmlMsg
|
||||
|
||||
SmlNode message = SmlNode();
|
||||
SmlNode message;
|
||||
if (!this->setup_node(&message))
|
||||
break;
|
||||
this->messages.emplace_back(message);
|
||||
this->messages.emplace_back(std::move(message));
|
||||
}
|
||||
}
|
||||
|
||||
@ -62,22 +62,20 @@ bool SmlFile::setup_node(SmlNode *node) {
|
||||
return false;
|
||||
|
||||
node->type = type;
|
||||
node->nodes.clear();
|
||||
node->value_bytes.clear();
|
||||
|
||||
if (type == SML_LIST) {
|
||||
node->nodes.reserve(length);
|
||||
for (size_t i = 0; i != length; i++) {
|
||||
SmlNode child_node = SmlNode();
|
||||
SmlNode child_node;
|
||||
if (!this->setup_node(&child_node))
|
||||
return false;
|
||||
node->nodes.emplace_back(child_node);
|
||||
node->nodes.emplace_back(std::move(child_node));
|
||||
}
|
||||
} else {
|
||||
// Value starts at the current position
|
||||
// Value ends "length" bytes later,
|
||||
// (since the TL field is counted but already subtracted from length)
|
||||
node->value_bytes = bytes(this->buffer_.begin() + this->pos_, this->buffer_.begin() + this->pos_ + length);
|
||||
node->value_bytes = buffer_.subview(this->pos_, length);
|
||||
// Increment the pointer past all consumed bytes
|
||||
this->pos_ += length;
|
||||
}
|
||||
@ -87,14 +85,14 @@ bool SmlFile::setup_node(SmlNode *node) {
|
||||
std::vector<ObisInfo> SmlFile::get_obis_info() {
|
||||
std::vector<ObisInfo> obis_info;
|
||||
for (auto const &message : messages) {
|
||||
SmlNode message_body = message.nodes[3];
|
||||
const auto &message_body = message.nodes[3];
|
||||
uint16_t message_type = bytes_to_uint(message_body.nodes[0].value_bytes);
|
||||
if (message_type != SML_GET_LIST_RES)
|
||||
continue;
|
||||
|
||||
SmlNode get_list_response = message_body.nodes[1];
|
||||
bytes server_id = get_list_response.nodes[1].value_bytes;
|
||||
SmlNode val_list = get_list_response.nodes[4];
|
||||
const auto &get_list_response = message_body.nodes[1];
|
||||
const auto &server_id = get_list_response.nodes[1].value_bytes;
|
||||
const auto &val_list = get_list_response.nodes[4];
|
||||
|
||||
for (auto const &val_list_entry : val_list.nodes) {
|
||||
obis_info.emplace_back(server_id, val_list_entry);
|
||||
@ -103,7 +101,7 @@ std::vector<ObisInfo> SmlFile::get_obis_info() {
|
||||
return obis_info;
|
||||
}
|
||||
|
||||
std::string bytes_repr(const bytes &buffer) {
|
||||
std::string bytes_repr(const BytesView &buffer) {
|
||||
std::string repr;
|
||||
for (auto const value : buffer) {
|
||||
repr += str_sprintf("%02x", value & 0xff);
|
||||
@ -111,7 +109,7 @@ std::string bytes_repr(const bytes &buffer) {
|
||||
return repr;
|
||||
}
|
||||
|
||||
uint64_t bytes_to_uint(const bytes &buffer) {
|
||||
uint64_t bytes_to_uint(const BytesView &buffer) {
|
||||
uint64_t val = 0;
|
||||
for (auto const value : buffer) {
|
||||
val = (val << 8) + value;
|
||||
@ -119,7 +117,7 @@ uint64_t bytes_to_uint(const bytes &buffer) {
|
||||
return val;
|
||||
}
|
||||
|
||||
int64_t bytes_to_int(const bytes &buffer) {
|
||||
int64_t bytes_to_int(const BytesView &buffer) {
|
||||
uint64_t tmp = bytes_to_uint(buffer);
|
||||
int64_t val;
|
||||
|
||||
@ -135,14 +133,14 @@ int64_t bytes_to_int(const bytes &buffer) {
|
||||
return val;
|
||||
}
|
||||
|
||||
std::string bytes_to_string(const bytes &buffer) { return std::string(buffer.begin(), buffer.end()); }
|
||||
std::string bytes_to_string(const BytesView &buffer) { return std::string(buffer.begin(), buffer.end()); }
|
||||
|
||||
ObisInfo::ObisInfo(bytes server_id, SmlNode val_list_entry) : server_id(std::move(server_id)) {
|
||||
ObisInfo::ObisInfo(const BytesView &server_id, const SmlNode &val_list_entry) : server_id(server_id) {
|
||||
this->code = val_list_entry.nodes[0].value_bytes;
|
||||
this->status = val_list_entry.nodes[1].value_bytes;
|
||||
this->unit = bytes_to_uint(val_list_entry.nodes[3].value_bytes);
|
||||
this->scaler = bytes_to_int(val_list_entry.nodes[4].value_bytes);
|
||||
SmlNode value_node = val_list_entry.nodes[5];
|
||||
const auto &value_node = val_list_entry.nodes[5];
|
||||
this->value = value_node.value_bytes;
|
||||
this->value_type = value_node.type;
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <string>
|
||||
@ -11,44 +12,73 @@ namespace sml {
|
||||
|
||||
using bytes = std::vector<uint8_t>;
|
||||
|
||||
class BytesView {
|
||||
public:
|
||||
BytesView() noexcept = default;
|
||||
|
||||
explicit BytesView(const uint8_t *first, size_t count) noexcept : data_{first}, count_{count} {}
|
||||
|
||||
explicit BytesView(const bytes &bytes) noexcept : data_{bytes.data()}, count_{bytes.size()} {}
|
||||
|
||||
size_t size() const noexcept { return count_; }
|
||||
|
||||
uint8_t operator[](size_t index) const noexcept {
|
||||
assert(index < count_);
|
||||
return data_[index];
|
||||
}
|
||||
|
||||
BytesView subview(size_t offset, size_t count) const noexcept {
|
||||
assert(offset + count <= count_);
|
||||
return BytesView{data_ + offset, count};
|
||||
}
|
||||
|
||||
const uint8_t *begin() const noexcept { return data_; }
|
||||
|
||||
const uint8_t *end() const noexcept { return data_ + count_; }
|
||||
|
||||
private:
|
||||
const uint8_t *data_ = nullptr;
|
||||
size_t count_ = 0;
|
||||
};
|
||||
|
||||
class SmlNode {
|
||||
public:
|
||||
uint8_t type;
|
||||
bytes value_bytes;
|
||||
BytesView value_bytes;
|
||||
std::vector<SmlNode> nodes;
|
||||
};
|
||||
|
||||
class ObisInfo {
|
||||
public:
|
||||
ObisInfo(bytes server_id, SmlNode val_list_entry);
|
||||
bytes server_id;
|
||||
bytes code;
|
||||
bytes status;
|
||||
ObisInfo(const BytesView &server_id, const SmlNode &val_list_entry);
|
||||
BytesView server_id;
|
||||
BytesView code;
|
||||
BytesView status;
|
||||
char unit;
|
||||
char scaler;
|
||||
bytes value;
|
||||
BytesView value;
|
||||
uint16_t value_type;
|
||||
std::string code_repr() const;
|
||||
};
|
||||
|
||||
class SmlFile {
|
||||
public:
|
||||
SmlFile(bytes buffer);
|
||||
SmlFile(const BytesView &buffer);
|
||||
bool setup_node(SmlNode *node);
|
||||
std::vector<SmlNode> messages;
|
||||
std::vector<ObisInfo> get_obis_info();
|
||||
|
||||
protected:
|
||||
const bytes buffer_;
|
||||
const BytesView buffer_;
|
||||
size_t pos_;
|
||||
};
|
||||
|
||||
std::string bytes_repr(const bytes &buffer);
|
||||
std::string bytes_repr(const BytesView &buffer);
|
||||
|
||||
uint64_t bytes_to_uint(const bytes &buffer);
|
||||
uint64_t bytes_to_uint(const BytesView &buffer);
|
||||
|
||||
int64_t bytes_to_int(const bytes &buffer);
|
||||
int64_t bytes_to_int(const BytesView &buffer);
|
||||
|
||||
std::string bytes_to_string(const bytes &buffer);
|
||||
std::string bytes_to_string(const BytesView &buffer);
|
||||
} // namespace sml
|
||||
} // namespace esphome
|
||||
|
@ -1499,30 +1499,9 @@ def dimensions(value):
|
||||
|
||||
|
||||
def directory(value):
|
||||
import json
|
||||
|
||||
value = string(value)
|
||||
path = CORE.relative_config_path(value)
|
||||
|
||||
if CORE.vscode and (
|
||||
not CORE.ace or os.path.abspath(path) == os.path.abspath(CORE.config_path)
|
||||
):
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"type": "check_directory_exists",
|
||||
"path": path,
|
||||
}
|
||||
)
|
||||
)
|
||||
data = json.loads(input())
|
||||
assert data["type"] == "directory_exists_response"
|
||||
if data["content"]:
|
||||
return value
|
||||
raise Invalid(
|
||||
f"Could not find directory '{path}'. Please make sure it exists (full path: {os.path.abspath(path)})."
|
||||
)
|
||||
|
||||
if not os.path.exists(path):
|
||||
raise Invalid(
|
||||
f"Could not find directory '{path}'. Please make sure it exists (full path: {os.path.abspath(path)})."
|
||||
@ -1535,30 +1514,9 @@ def directory(value):
|
||||
|
||||
|
||||
def file_(value):
|
||||
import json
|
||||
|
||||
value = string(value)
|
||||
path = CORE.relative_config_path(value)
|
||||
|
||||
if CORE.vscode and (
|
||||
not CORE.ace or os.path.abspath(path) == os.path.abspath(CORE.config_path)
|
||||
):
|
||||
print(
|
||||
json.dumps(
|
||||
{
|
||||
"type": "check_file_exists",
|
||||
"path": path,
|
||||
}
|
||||
)
|
||||
)
|
||||
data = json.loads(input())
|
||||
assert data["type"] == "file_exists_response"
|
||||
if data["content"]:
|
||||
return value
|
||||
raise Invalid(
|
||||
f"Could not find file '{path}'. Please make sure it exists (full path: {os.path.abspath(path)})."
|
||||
)
|
||||
|
||||
if not os.path.exists(path):
|
||||
raise Invalid(
|
||||
f"Could not find file '{path}'. Please make sure it exists (full path: {os.path.abspath(path)})."
|
||||
|
@ -1,6 +1,6 @@
|
||||
"""Constants used by esphome."""
|
||||
|
||||
__version__ = "2025.4.0b2"
|
||||
__version__ = "2025.4.0b3"
|
||||
|
||||
ALLOWED_NAME_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789-_"
|
||||
VALID_SUBSTITUTIONS_CHARACTERS = (
|
||||
|
@ -475,7 +475,6 @@ class EsphomeCore:
|
||||
self.dashboard = False
|
||||
# True if command is run from vscode api
|
||||
self.vscode = False
|
||||
self.ace = False
|
||||
# The name of the node
|
||||
self.name: Optional[str] = None
|
||||
# The friendly name of the node
|
||||
|
@ -78,28 +78,47 @@ def _print_file_read_event(path: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def _request_and_get_stream_on_stdin(fname: str) -> StringIO:
|
||||
_print_file_read_event(fname)
|
||||
raw_yaml_stream = StringIO(_read_file_content_from_json_on_stdin())
|
||||
return raw_yaml_stream
|
||||
|
||||
|
||||
def _vscode_loader(fname: str) -> dict[str, Any]:
|
||||
raw_yaml_stream = _request_and_get_stream_on_stdin(fname)
|
||||
# it is required to set the name on StringIO so document on start_mark
|
||||
# is set properly. Otherwise it is initialized with "<file>"
|
||||
raw_yaml_stream.name = fname
|
||||
return parse_yaml(fname, raw_yaml_stream, _vscode_loader)
|
||||
|
||||
|
||||
def _ace_loader(fname: str) -> dict[str, Any]:
|
||||
raw_yaml_stream = _request_and_get_stream_on_stdin(fname)
|
||||
return parse_yaml(fname, raw_yaml_stream)
|
||||
|
||||
|
||||
def read_config(args):
|
||||
while True:
|
||||
CORE.reset()
|
||||
data = json.loads(input())
|
||||
assert data["type"] == "validate"
|
||||
assert data["type"] == "validate" or data["type"] == "exit"
|
||||
if data["type"] == "exit":
|
||||
return
|
||||
CORE.vscode = True
|
||||
CORE.ace = args.ace
|
||||
f = data["file"]
|
||||
if CORE.ace:
|
||||
CORE.config_path = os.path.join(args.configuration, f)
|
||||
if args.ace: # Running from ESPHome Compiler dashboard, not vscode
|
||||
CORE.config_path = os.path.join(args.configuration, data["file"])
|
||||
loader = _ace_loader
|
||||
else:
|
||||
CORE.config_path = data["file"]
|
||||
loader = _vscode_loader
|
||||
|
||||
file_name = CORE.config_path
|
||||
_print_file_read_event(file_name)
|
||||
raw_yaml = _read_file_content_from_json_on_stdin()
|
||||
command_line_substitutions: dict[str, Any] = (
|
||||
dict(args.substitution) if args.substitution else {}
|
||||
)
|
||||
vs = VSCodeResult()
|
||||
try:
|
||||
config = parse_yaml(file_name, StringIO(raw_yaml))
|
||||
config = loader(file_name)
|
||||
res = validate_config(config, command_line_substitutions)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
vs.add_yaml_error(str(err))
|
||||
|
@ -3,12 +3,12 @@ from __future__ import annotations
|
||||
import fnmatch
|
||||
import functools
|
||||
import inspect
|
||||
from io import TextIOWrapper
|
||||
from io import BytesIO, TextIOBase, TextIOWrapper
|
||||
from ipaddress import _BaseAddress
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
from typing import Any
|
||||
from typing import Any, Callable
|
||||
import uuid
|
||||
|
||||
import yaml
|
||||
@ -69,7 +69,10 @@ class ESPForceValue:
|
||||
pass
|
||||
|
||||
|
||||
def make_data_base(value, from_database: ESPHomeDataBase = None):
|
||||
def make_data_base(
|
||||
value, from_database: ESPHomeDataBase = None
|
||||
) -> ESPHomeDataBase | Any:
|
||||
"""Wrap a value in a ESPHomeDataBase object."""
|
||||
try:
|
||||
value = add_class_to_obj(value, ESPHomeDataBase)
|
||||
if from_database is not None:
|
||||
@ -102,6 +105,11 @@ def _add_data_ref(fn):
|
||||
class ESPHomeLoaderMixin:
|
||||
"""Loader class that keeps track of line numbers."""
|
||||
|
||||
def __init__(self, name: str, yaml_loader: Callable[[str], dict[str, Any]]) -> None:
|
||||
"""Initialize the loader."""
|
||||
self.name = name
|
||||
self.yaml_loader = yaml_loader
|
||||
|
||||
@_add_data_ref
|
||||
def construct_yaml_int(self, node):
|
||||
return super().construct_yaml_int(node)
|
||||
@ -127,7 +135,7 @@ class ESPHomeLoaderMixin:
|
||||
return super().construct_yaml_seq(node)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_yaml_map(self, node):
|
||||
def construct_yaml_map(self, node: yaml.MappingNode) -> OrderedDict[str, Any]:
|
||||
"""Traverses the given mapping node and returns a list of constructed key-value pairs."""
|
||||
assert isinstance(node, yaml.MappingNode)
|
||||
# A list of key-value pairs we find in the current mapping
|
||||
@ -231,7 +239,7 @@ class ESPHomeLoaderMixin:
|
||||
return OrderedDict(pairs)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_env_var(self, node):
|
||||
def construct_env_var(self, node: yaml.Node) -> str:
|
||||
args = node.value.split()
|
||||
# Check for a default value
|
||||
if len(args) > 1:
|
||||
@ -243,23 +251,23 @@ class ESPHomeLoaderMixin:
|
||||
)
|
||||
|
||||
@property
|
||||
def _directory(self):
|
||||
def _directory(self) -> str:
|
||||
return os.path.dirname(self.name)
|
||||
|
||||
def _rel_path(self, *args):
|
||||
def _rel_path(self, *args: str) -> str:
|
||||
return os.path.join(self._directory, *args)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_secret(self, node):
|
||||
def construct_secret(self, node: yaml.Node) -> str:
|
||||
try:
|
||||
secrets = _load_yaml_internal(self._rel_path(SECRET_YAML))
|
||||
secrets = self.yaml_loader(self._rel_path(SECRET_YAML))
|
||||
except EsphomeError as e:
|
||||
if self.name == CORE.config_path:
|
||||
raise e
|
||||
try:
|
||||
main_config_dir = os.path.dirname(CORE.config_path)
|
||||
main_secret_yml = os.path.join(main_config_dir, SECRET_YAML)
|
||||
secrets = _load_yaml_internal(main_secret_yml)
|
||||
secrets = self.yaml_loader(main_secret_yml)
|
||||
except EsphomeError as er:
|
||||
raise EsphomeError(f"{e}\n{er}") from er
|
||||
|
||||
@ -272,7 +280,9 @@ class ESPHomeLoaderMixin:
|
||||
return val
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include(self, node):
|
||||
def construct_include(
|
||||
self, node: yaml.Node
|
||||
) -> dict[str, Any] | OrderedDict[str, Any]:
|
||||
from esphome.const import CONF_VARS
|
||||
|
||||
def extract_file_vars(node):
|
||||
@ -290,71 +300,93 @@ class ESPHomeLoaderMixin:
|
||||
else:
|
||||
file, vars = node.value, None
|
||||
|
||||
result = _load_yaml_internal(self._rel_path(file))
|
||||
result = self.yaml_loader(self._rel_path(file))
|
||||
if not vars:
|
||||
vars = {}
|
||||
result = substitute_vars(result, vars)
|
||||
return result
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include_dir_list(self, node):
|
||||
def construct_include_dir_list(self, node: yaml.Node) -> list[dict[str, Any]]:
|
||||
files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml"))
|
||||
return [_load_yaml_internal(f) for f in files]
|
||||
return [self.yaml_loader(f) for f in files]
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include_dir_merge_list(self, node):
|
||||
def construct_include_dir_merge_list(self, node: yaml.Node) -> list[dict[str, Any]]:
|
||||
files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml"))
|
||||
merged_list = []
|
||||
for fname in files:
|
||||
loaded_yaml = _load_yaml_internal(fname)
|
||||
loaded_yaml = self.yaml_loader(fname)
|
||||
if isinstance(loaded_yaml, list):
|
||||
merged_list.extend(loaded_yaml)
|
||||
return merged_list
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include_dir_named(self, node):
|
||||
def construct_include_dir_named(
|
||||
self, node: yaml.Node
|
||||
) -> OrderedDict[str, dict[str, Any]]:
|
||||
files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml"))
|
||||
mapping = OrderedDict()
|
||||
for fname in files:
|
||||
filename = os.path.splitext(os.path.basename(fname))[0]
|
||||
mapping[filename] = _load_yaml_internal(fname)
|
||||
mapping[filename] = self.yaml_loader(fname)
|
||||
return mapping
|
||||
|
||||
@_add_data_ref
|
||||
def construct_include_dir_merge_named(self, node):
|
||||
def construct_include_dir_merge_named(
|
||||
self, node: yaml.Node
|
||||
) -> OrderedDict[str, dict[str, Any]]:
|
||||
files = filter_yaml_files(_find_files(self._rel_path(node.value), "*.yaml"))
|
||||
mapping = OrderedDict()
|
||||
for fname in files:
|
||||
loaded_yaml = _load_yaml_internal(fname)
|
||||
loaded_yaml = self.yaml_loader(fname)
|
||||
if isinstance(loaded_yaml, dict):
|
||||
mapping.update(loaded_yaml)
|
||||
return mapping
|
||||
|
||||
@_add_data_ref
|
||||
def construct_lambda(self, node):
|
||||
def construct_lambda(self, node: yaml.Node) -> Lambda:
|
||||
return Lambda(str(node.value))
|
||||
|
||||
@_add_data_ref
|
||||
def construct_force(self, node):
|
||||
def construct_force(self, node: yaml.Node) -> ESPForceValue:
|
||||
obj = self.construct_scalar(node)
|
||||
return add_class_to_obj(obj, ESPForceValue)
|
||||
|
||||
@_add_data_ref
|
||||
def construct_extend(self, node):
|
||||
def construct_extend(self, node: yaml.Node) -> Extend:
|
||||
return Extend(str(node.value))
|
||||
|
||||
@_add_data_ref
|
||||
def construct_remove(self, node):
|
||||
def construct_remove(self, node: yaml.Node) -> Remove:
|
||||
return Remove(str(node.value))
|
||||
|
||||
|
||||
class ESPHomeLoader(ESPHomeLoaderMixin, FastestAvailableSafeLoader):
|
||||
"""Loader class that keeps track of line numbers."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
stream: TextIOBase | BytesIO,
|
||||
name: str,
|
||||
yaml_loader: Callable[[str], dict[str, Any]],
|
||||
) -> None:
|
||||
FastestAvailableSafeLoader.__init__(self, stream)
|
||||
ESPHomeLoaderMixin.__init__(self, name, yaml_loader)
|
||||
|
||||
|
||||
class ESPHomePurePythonLoader(ESPHomeLoaderMixin, PurePythonLoader):
|
||||
"""Loader class that keeps track of line numbers."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
stream: TextIOBase | BytesIO,
|
||||
name: str,
|
||||
yaml_loader: Callable[[str], dict[str, Any]],
|
||||
) -> None:
|
||||
PurePythonLoader.__init__(self, stream)
|
||||
ESPHomeLoaderMixin.__init__(self, name, yaml_loader)
|
||||
|
||||
|
||||
for _loader in (ESPHomeLoader, ESPHomePurePythonLoader):
|
||||
_loader.add_constructor("tag:yaml.org,2002:int", _loader.construct_yaml_int)
|
||||
@ -388,17 +420,30 @@ def load_yaml(fname: str, clear_secrets: bool = True) -> Any:
|
||||
return _load_yaml_internal(fname)
|
||||
|
||||
|
||||
def parse_yaml(file_name: str, file_handle: TextIOWrapper) -> Any:
|
||||
def _load_yaml_internal(fname: str) -> Any:
|
||||
"""Load a YAML file."""
|
||||
try:
|
||||
with open(fname, encoding="utf-8") as f_handle:
|
||||
return parse_yaml(fname, f_handle)
|
||||
except (UnicodeDecodeError, OSError) as err:
|
||||
raise EsphomeError(f"Error reading file {fname}: {err}") from err
|
||||
|
||||
|
||||
def parse_yaml(
|
||||
file_name: str, file_handle: TextIOWrapper, yaml_loader=_load_yaml_internal
|
||||
) -> Any:
|
||||
"""Parse a YAML file."""
|
||||
try:
|
||||
return _load_yaml_internal_with_type(ESPHomeLoader, file_name, file_handle)
|
||||
return _load_yaml_internal_with_type(
|
||||
ESPHomeLoader, file_name, file_handle, yaml_loader
|
||||
)
|
||||
except EsphomeError:
|
||||
# Loading failed, so we now load with the Python loader which has more
|
||||
# readable exceptions
|
||||
# Rewind the stream so we can try again
|
||||
file_handle.seek(0, 0)
|
||||
return _load_yaml_internal_with_type(
|
||||
ESPHomePurePythonLoader, file_name, file_handle
|
||||
ESPHomePurePythonLoader, file_name, file_handle, yaml_loader
|
||||
)
|
||||
|
||||
|
||||
@ -435,23 +480,14 @@ def substitute_vars(config, vars):
|
||||
return result
|
||||
|
||||
|
||||
def _load_yaml_internal(fname: str) -> Any:
|
||||
"""Load a YAML file."""
|
||||
try:
|
||||
with open(fname, encoding="utf-8") as f_handle:
|
||||
return parse_yaml(fname, f_handle)
|
||||
except (UnicodeDecodeError, OSError) as err:
|
||||
raise EsphomeError(f"Error reading file {fname}: {err}") from err
|
||||
|
||||
|
||||
def _load_yaml_internal_with_type(
|
||||
loader_type: type[ESPHomeLoader] | type[ESPHomePurePythonLoader],
|
||||
fname: str,
|
||||
content: TextIOWrapper,
|
||||
yaml_loader: Any,
|
||||
) -> Any:
|
||||
"""Load a YAML file."""
|
||||
loader = loader_type(content)
|
||||
loader.name = fname
|
||||
loader = loader_type(content, fname, yaml_loader)
|
||||
try:
|
||||
return loader.get_single_data() or OrderedDict()
|
||||
except yaml.YAMLError as exc:
|
||||
@ -470,7 +506,7 @@ def dump(dict_, show_secrets=False):
|
||||
)
|
||||
|
||||
|
||||
def _is_file_valid(name):
|
||||
def _is_file_valid(name: str) -> bool:
|
||||
"""Decide if a file is valid."""
|
||||
return not name.startswith(".")
|
||||
|
||||
|
@ -14,7 +14,7 @@ esptool==4.8.1
|
||||
click==8.1.7
|
||||
esphome-dashboard==20250415.0
|
||||
aioesphomeapi==29.10.0
|
||||
zeroconf==0.146.4
|
||||
zeroconf==0.146.5
|
||||
puremagic==1.28
|
||||
ruamel.yaml==0.18.10 # dashboard_import
|
||||
esphome-glyphsets==0.2.0
|
||||
|
125
tests/unit_tests/test_vscode.py
Normal file
125
tests/unit_tests/test_vscode.py
Normal file
@ -0,0 +1,125 @@
|
||||
import json
|
||||
import os
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
from esphome import vscode
|
||||
|
||||
|
||||
def _run_repl_test(input_data):
|
||||
"""Reusable test function for different input scenarios."""
|
||||
input_data.append(_exit())
|
||||
with (
|
||||
patch("builtins.input", side_effect=input_data),
|
||||
patch("sys.stdout") as mock_stdout,
|
||||
):
|
||||
args = Mock([])
|
||||
args.ace = False
|
||||
args.substitution = None
|
||||
vscode.read_config(args)
|
||||
|
||||
# Capture printed output
|
||||
full_output = "".join(call[0][0] for call in mock_stdout.write.call_args_list)
|
||||
return full_output.strip().split("\n")
|
||||
|
||||
|
||||
def _validate(file_path: str):
|
||||
return json.dumps({"type": "validate", "file": file_path})
|
||||
|
||||
|
||||
def _file_response(data: str):
|
||||
return json.dumps({"type": "file_response", "content": data})
|
||||
|
||||
|
||||
def _read_file(file_path: str):
|
||||
return json.dumps({"type": "read_file", "path": file_path})
|
||||
|
||||
|
||||
def _exit():
|
||||
return json.dumps({"type": "exit"})
|
||||
|
||||
|
||||
RESULT_NO_ERROR = '{"type": "result", "yaml_errors": [], "validation_errors": []}'
|
||||
|
||||
|
||||
def test_multi_file():
|
||||
source_path = os.path.join("dir_path", "x.yaml")
|
||||
output_lines = _run_repl_test(
|
||||
[
|
||||
_validate(source_path),
|
||||
# read_file x.yaml
|
||||
_file_response("""esphome:
|
||||
name: test1
|
||||
esp8266:
|
||||
board: !secret my_secret_board
|
||||
"""),
|
||||
# read_file secrets.yaml
|
||||
_file_response("""my_secret_board: esp1f"""),
|
||||
]
|
||||
)
|
||||
|
||||
expected_lines = [
|
||||
_read_file(source_path),
|
||||
_read_file(os.path.join("dir_path", "secrets.yaml")),
|
||||
RESULT_NO_ERROR,
|
||||
]
|
||||
|
||||
assert output_lines == expected_lines
|
||||
|
||||
|
||||
def test_shows_correct_range_error():
|
||||
source_path = os.path.join("dir_path", "x.yaml")
|
||||
output_lines = _run_repl_test(
|
||||
[
|
||||
_validate(source_path),
|
||||
# read_file x.yaml
|
||||
_file_response("""esphome:
|
||||
name: test1
|
||||
esp8266:
|
||||
broad: !secret my_secret_board # typo here
|
||||
"""),
|
||||
# read_file secrets.yaml
|
||||
_file_response("""my_secret_board: esp1f"""),
|
||||
]
|
||||
)
|
||||
|
||||
assert len(output_lines) == 3
|
||||
error = json.loads(output_lines[2])
|
||||
validation_error = error["validation_errors"][0]
|
||||
assert validation_error["message"].startswith("[broad] is an invalid option for")
|
||||
range = validation_error["range"]
|
||||
assert range["document"] == source_path
|
||||
assert range["start_line"] == 3
|
||||
assert range["start_col"] == 2
|
||||
assert range["end_line"] == 3
|
||||
assert range["end_col"] == 7
|
||||
|
||||
|
||||
def test_shows_correct_loaded_file_error():
|
||||
source_path = os.path.join("dir_path", "x.yaml")
|
||||
output_lines = _run_repl_test(
|
||||
[
|
||||
_validate(source_path),
|
||||
# read_file x.yaml
|
||||
_file_response("""esphome:
|
||||
name: test1
|
||||
|
||||
packages:
|
||||
board: !include .pkg.esp8266.yaml
|
||||
"""),
|
||||
# read_file .pkg.esp8266.yaml
|
||||
_file_response("""esp8266:
|
||||
broad: esp1f # typo here
|
||||
"""),
|
||||
]
|
||||
)
|
||||
|
||||
assert len(output_lines) == 3
|
||||
error = json.loads(output_lines[2])
|
||||
validation_error = error["validation_errors"][0]
|
||||
assert validation_error["message"].startswith("[broad] is an invalid option for")
|
||||
range = validation_error["range"]
|
||||
assert range["document"] == os.path.join("dir_path", ".pkg.esp8266.yaml")
|
||||
assert range["start_line"] == 1
|
||||
assert range["start_col"] == 2
|
||||
assert range["end_line"] == 1
|
||||
assert range["end_col"] == 7
|
@ -42,3 +42,23 @@ def test_loading_a_missing_file(fixture_path):
|
||||
yaml_util.load_yaml(yaml_file)
|
||||
except EsphomeError as err:
|
||||
assert "missing.yaml" in str(err)
|
||||
|
||||
|
||||
def test_parsing_with_custom_loader(fixture_path):
|
||||
"""Test custom loader used for vscode connection
|
||||
Default loader is tested in test_include_with_vars
|
||||
"""
|
||||
yaml_file = fixture_path / "yaml_util" / "includetest.yaml"
|
||||
|
||||
loader_calls = []
|
||||
|
||||
def custom_loader(fname):
|
||||
loader_calls.append(fname)
|
||||
|
||||
with open(yaml_file, encoding="utf-8") as f_handle:
|
||||
yaml_util.parse_yaml(yaml_file, f_handle, custom_loader)
|
||||
|
||||
assert len(loader_calls) == 3
|
||||
assert loader_calls[0].endswith("includes/included.yaml")
|
||||
assert loader_calls[1].endswith("includes/list.yaml")
|
||||
assert loader_calls[2].endswith("includes/scalar.yaml")
|
||||
|
Loading…
x
Reference in New Issue
Block a user