Better manage of nested lists (#65176)

This commit is contained in:
Robert Svensson 2022-01-30 22:20:59 +01:00 committed by GitHub
parent 62fd31a1e7
commit ac1b30a78d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -31,24 +31,42 @@ REDACT_WLANS = {"bc_filter_list", "x_passphrase"}
@callback @callback
def async_replace_data(data: Mapping, to_replace: dict[str, str]) -> dict[str, Any]: def async_replace_dict_data(
"""Replace sensitive data in a dict.""" data: Mapping, to_replace: dict[str, str]
if not isinstance(data, (Mapping, list, set, tuple)): ) -> dict[str, Any]:
return to_replace.get(data, data) """Redact sensitive data in a dict."""
redacted = {**data} redacted = {**data}
for key, value in data.items():
for key, value in redacted.items():
if isinstance(value, dict): if isinstance(value, dict):
redacted[key] = async_replace_data(value, to_replace) redacted[key] = async_replace_dict_data(value, to_replace)
elif isinstance(value, (list, set, tuple)): elif isinstance(value, (list, set, tuple)):
redacted[key] = [async_replace_data(item, to_replace) for item in value] redacted[key] = async_replace_list_data(value, to_replace)
elif isinstance(value, str): elif isinstance(value, str):
if value in to_replace: if value in to_replace:
redacted[key] = to_replace[value] redacted[key] = to_replace[value]
elif value.count(":") == 5: elif value.count(":") == 5:
redacted[key] = REDACTED redacted[key] = REDACTED
return redacted
@callback
def async_replace_list_data(
data: list | set | tuple, to_replace: dict[str, str]
) -> list[Any]:
"""Redact sensitive data in a list."""
redacted = []
for item in data:
new_value = None
if isinstance(item, (list, set, tuple)):
new_value = async_replace_list_data(item, to_replace)
elif isinstance(item, Mapping):
new_value = async_replace_dict_data(item, to_replace)
elif isinstance(item, str):
if item in to_replace:
new_value = to_replace[item]
elif item.count(":") == 5:
new_value = REDACTED
redacted.append(new_value or item)
return redacted return redacted
@ -73,26 +91,28 @@ async def async_get_config_entry_diagnostics(
counter += 1 counter += 1
diag["config"] = async_redact_data( diag["config"] = async_redact_data(
async_replace_data(config_entry.as_dict(), macs_to_redact), REDACT_CONFIG async_replace_dict_data(config_entry.as_dict(), macs_to_redact), REDACT_CONFIG
) )
diag["site_role"] = controller.site_role diag["site_role"] = controller.site_role
diag["entities"] = async_replace_data(controller.entities, macs_to_redact) diag["entities"] = async_replace_dict_data(controller.entities, macs_to_redact)
diag["clients"] = { diag["clients"] = {
macs_to_redact[k]: async_redact_data( macs_to_redact[k]: async_redact_data(
async_replace_data(v.raw, macs_to_redact), REDACT_CLIENTS async_replace_dict_data(v.raw, macs_to_redact), REDACT_CLIENTS
) )
for k, v in controller.api.clients.items() for k, v in controller.api.clients.items()
} }
diag["devices"] = { diag["devices"] = {
macs_to_redact[k]: async_redact_data( macs_to_redact[k]: async_redact_data(
async_replace_data(v.raw, macs_to_redact), REDACT_DEVICES async_replace_dict_data(v.raw, macs_to_redact), REDACT_DEVICES
) )
for k, v in controller.api.devices.items() for k, v in controller.api.devices.items()
} }
diag["dpi_apps"] = {k: v.raw for k, v in controller.api.dpi_apps.items()} diag["dpi_apps"] = {k: v.raw for k, v in controller.api.dpi_apps.items()}
diag["dpi_groups"] = {k: v.raw for k, v in controller.api.dpi_groups.items()} diag["dpi_groups"] = {k: v.raw for k, v in controller.api.dpi_groups.items()}
diag["wlans"] = { diag["wlans"] = {
k: async_redact_data(async_replace_data(v.raw, macs_to_redact), REDACT_WLANS) k: async_redact_data(
async_replace_dict_data(v.raw, macs_to_redact), REDACT_WLANS
)
for k, v in controller.api.wlans.items() for k, v in controller.api.wlans.items()
} }