Add support for streaming history (#15112)

* Add support for streaming history

* Add support for streaming history

* Add support for streaming history

* Add support for streaming history

* fixes

* cleanup

* redraw

* naming is hard

* drop cached history

* backport

* Update src/data/history.ts

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>

* Update src/data/history.ts

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>

* review

* review

* review

* review

* review

* review

* review

* review

* adjust

* Revert "adjust"

This reverts commit 6ba31da4a5a619a0da1bfbcfe18723de595e19aa.

Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
This commit is contained in:
J. Nick Koston 2023-01-21 17:41:41 -10:00 committed by GitHub
parent fd22afedd0
commit 815d4c165d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 296 additions and 275 deletions

View File

@ -1,200 +0,0 @@
import { LocalizeFunc } from "../common/translations/localize";
import { HomeAssistant } from "../types";
import {
computeHistory,
HistoryStates,
HistoryResult,
LineChartUnit,
TimelineEntity,
entityIdHistoryNeedsAttributes,
fetchRecentWS,
} from "./history";
export interface CacheConfig {
cacheKey: string;
hoursToShow: number;
}
interface CachedResults {
prom: Promise<HistoryResult>;
startTime: Date;
endTime: Date;
language: string;
data: HistoryResult;
}
const stateHistoryCache: { [cacheKey: string]: CachedResults } = {};
// Cache type 2 functionality
function getEmptyCache(
language: string,
startTime: Date,
endTime: Date
): CachedResults {
return {
prom: Promise.resolve({ line: [], timeline: [] }),
language,
startTime,
endTime,
data: { line: [], timeline: [] },
};
}
export const getRecentWithCache = (
hass: HomeAssistant,
entityIds: string[],
cacheConfig: CacheConfig,
localize: LocalizeFunc,
language: string
) => {
const cacheKey = cacheConfig.cacheKey;
const fullCacheKey = cacheKey + `_${cacheConfig.hoursToShow}`;
const endTime = new Date();
const startTime = new Date(endTime);
startTime.setHours(startTime.getHours() - cacheConfig.hoursToShow);
let toFetchStartTime = startTime;
let appendingToCache = false;
let cache = stateHistoryCache[fullCacheKey];
if (
cache &&
toFetchStartTime >= cache.startTime &&
toFetchStartTime <= cache.endTime &&
cache.language === language
) {
toFetchStartTime = cache.endTime;
appendingToCache = true;
// This pretty much never happens as endTime is usually set to now
if (endTime <= cache.endTime) {
return cache.prom;
}
} else {
cache = stateHistoryCache[fullCacheKey] = getEmptyCache(
language,
startTime,
endTime
);
}
const curCacheProm = cache.prom;
const noAttributes = !entityIds.some((entityId) =>
entityIdHistoryNeedsAttributes(hass, entityId)
);
const genProm = async () => {
let fetchedHistory: HistoryStates;
try {
const results = await Promise.all([
curCacheProm,
fetchRecentWS(
hass,
entityIds,
toFetchStartTime,
endTime,
appendingToCache,
undefined,
true,
noAttributes
),
]);
fetchedHistory = results[1];
} catch (err: any) {
delete stateHistoryCache[fullCacheKey];
throw err;
}
const stateHistory = computeHistory(hass, fetchedHistory, localize);
if (appendingToCache) {
if (stateHistory.line.length) {
mergeLine(stateHistory.line, cache.data.line);
}
if (stateHistory.timeline.length) {
mergeTimeline(stateHistory.timeline, cache.data.timeline);
// Replace the timeline array to force an update
cache.data.timeline = [...cache.data.timeline];
}
pruneStartTime(startTime, cache.data);
} else {
cache.data = stateHistory;
}
return cache.data;
};
cache.prom = genProm();
cache.startTime = startTime;
cache.endTime = endTime;
return cache.prom;
};
const mergeLine = (
historyLines: LineChartUnit[],
cacheLines: LineChartUnit[]
) => {
historyLines.forEach((line) => {
const unit = line.unit;
const oldLine = cacheLines.find((cacheLine) => cacheLine.unit === unit);
if (oldLine) {
line.data.forEach((entity) => {
const oldEntity = oldLine.data.find(
(cacheEntity) => entity.entity_id === cacheEntity.entity_id
);
if (oldEntity) {
oldEntity.states = oldEntity.states.concat(entity.states);
} else {
oldLine.data.push(entity);
}
});
// Replace the cached line data to force an update
oldLine.data = [...oldLine.data];
} else {
cacheLines.push(line);
}
});
};
const mergeTimeline = (
historyTimelines: TimelineEntity[],
cacheTimelines: TimelineEntity[]
) => {
historyTimelines.forEach((timeline) => {
const oldTimeline = cacheTimelines.find(
(cacheTimeline) => cacheTimeline.entity_id === timeline.entity_id
);
if (oldTimeline) {
oldTimeline.data = oldTimeline.data.concat(timeline.data);
} else {
cacheTimelines.push(timeline);
}
});
};
const pruneArray = (originalStartTime: Date, arr) => {
if (arr.length === 0) {
return arr;
}
const changedAfterStartTime = arr.findIndex(
(state) => new Date(state.last_changed) > originalStartTime
);
if (changedAfterStartTime === 0) {
// If all changes happened after originalStartTime then we are done.
return arr;
}
// If all changes happened at or before originalStartTime. Use last index.
const updateIndex =
changedAfterStartTime === -1 ? arr.length - 1 : changedAfterStartTime - 1;
arr[updateIndex].last_changed = originalStartTime;
return arr.slice(updateIndex);
};
const pruneStartTime = (originalStartTime: Date, cacheData: HistoryResult) => {
cacheData.line.forEach((line) => {
line.data.forEach((entity) => {
entity.states = pruneArray(originalStartTime, entity.states);
});
});
cacheData.timeline.forEach((timeline) => {
timeline.data = pruneArray(originalStartTime, timeline.data);
});
};

View File

@ -17,6 +17,8 @@ const NEED_ATTRIBUTE_DOMAINS = [
"input_datetime",
"thermostat",
"water_heater",
"person",
"device_tracker",
];
const LINE_ATTRIBUTES_TO_KEEP = [
"temperature",
@ -68,7 +70,7 @@ export interface HistoryStates {
[entityId: string]: EntityHistoryState[];
}
interface EntityHistoryState {
export interface EntityHistoryState {
/** state */
s: string;
/** attributes */
@ -79,6 +81,12 @@ interface EntityHistoryState {
lu: number;
}
export interface HistoryStreamMessage {
states: HistoryStates;
start_time?: number; // Start time of this historical chunk
end_time?: number; // End time of this historical chunk
}
export const entityIdHistoryNeedsAttributes = (
hass: HomeAssistant,
entityId: string
@ -174,6 +182,135 @@ export const fetchDateWS = (
return hass.callWS<HistoryStates>(params);
};
export const subscribeHistory = (
hass: HomeAssistant,
callbackFunction: (message: HistoryStreamMessage) => void,
startTime: Date,
endTime: Date,
entityIds: string[]
): Promise<() => Promise<void>> => {
const params = {
type: "history/stream",
entity_ids: entityIds,
start_time: startTime.toISOString(),
end_time: endTime.toISOString(),
minimal_response: true,
no_attributes: !entityIds.some((entityId) =>
entityIdHistoryNeedsAttributes(hass, entityId)
),
};
return hass.connection.subscribeMessage<HistoryStreamMessage>(
(message) => callbackFunction(message),
params
);
};
class HistoryStream {
hass: HomeAssistant;
hoursToShow: number;
combinedHistory: HistoryStates;
constructor(hass: HomeAssistant, hoursToShow: number) {
this.hass = hass;
this.hoursToShow = hoursToShow;
this.combinedHistory = {};
}
processMessage(streamMessage: HistoryStreamMessage): HistoryStates {
if (!this.combinedHistory || !Object.keys(this.combinedHistory).length) {
this.combinedHistory = streamMessage.states;
return this.combinedHistory;
}
if (!Object.keys(streamMessage.states).length) {
// Empty messages are still sent to
// indicate no more historical events
return this.combinedHistory;
}
const purgeBeforePythonTime =
(new Date().getTime() - 60 * 60 * this.hoursToShow * 1000) / 1000;
const newHistory: HistoryStates = {};
for (const entityId of Object.keys(this.combinedHistory)) {
newHistory[entityId] = [];
}
for (const entityId of Object.keys(streamMessage.states)) {
newHistory[entityId] = [];
}
for (const entityId of Object.keys(newHistory)) {
if (
entityId in this.combinedHistory &&
entityId in streamMessage.states
) {
const entityCombinedHistory = this.combinedHistory[entityId];
const lastEntityCombinedHistory =
entityCombinedHistory[entityCombinedHistory.length - 1];
newHistory[entityId] = entityCombinedHistory.concat(
streamMessage.states[entityId]
);
if (
streamMessage.states[entityId][0].lu < lastEntityCombinedHistory.lu
) {
// If the history is out of order we have to sort it.
newHistory[entityId] = newHistory[entityId].sort(
(a, b) => a.lu - b.lu
);
}
} else if (entityId in this.combinedHistory) {
newHistory[entityId] = this.combinedHistory[entityId];
} else {
newHistory[entityId] = streamMessage.states[entityId];
}
// Remove old history
if (entityId in this.combinedHistory) {
const entityHistory = newHistory[entityId];
while (entityHistory[0].lu < purgeBeforePythonTime) {
if (entityHistory.length > 1) {
if (entityHistory[1].lu < purgeBeforePythonTime) {
newHistory[entityId].shift();
continue;
}
}
// Update the first entry to the start time state
// as we need to preserve the start time state and
// only expire the rest of the history as it ages.
entityHistory[0].lu = purgeBeforePythonTime;
break;
}
}
}
this.combinedHistory = newHistory;
return this.combinedHistory;
}
}
export const subscribeHistoryStatesTimeWindow = (
hass: HomeAssistant,
callbackFunction: (data: HistoryStates) => void,
hoursToShow: number,
entityIds: string[],
minimalResponse = true,
significantChangesOnly = true
): Promise<() => Promise<void>> => {
const params = {
type: "history/stream",
entity_ids: entityIds,
start_time: new Date(
new Date().getTime() - 60 * 60 * hoursToShow * 1000
).toISOString(),
minimal_response: minimalResponse,
significant_changes_only: significantChangesOnly,
no_attributes: !entityIds.some((entityId) =>
entityIdHistoryNeedsAttributes(hass, entityId)
),
};
const stream = new HistoryStream(hass, hoursToShow);
return hass.connection.subscribeMessage<HistoryStreamMessage>(
(message) => callbackFunction(stream.processMessage(message)),
params
);
};
const equalState = (obj1: LineChartState, obj2: LineChartState) =>
obj1.state === obj2.state &&
// Only compare attributes if both states have an attributes object.

View File

@ -3,10 +3,12 @@ import { css, html, LitElement, PropertyValues, TemplateResult } from "lit";
import { customElement, property, state } from "lit/decorators";
import { isComponentLoaded } from "../../common/config/is_component_loaded";
import { fireEvent } from "../../common/dom/fire_event";
import { throttle } from "../../common/util/throttle";
import "../../components/chart/state-history-charts";
import { getRecentWithCache } from "../../data/cached-history";
import { HistoryResult } from "../../data/history";
import {
HistoryResult,
subscribeHistoryStatesTimeWindow,
computeHistory,
} from "../../data/history";
import {
fetchStatistics,
getStatisticMetadata,
@ -39,9 +41,11 @@ export class MoreInfoHistory extends LitElement {
private _statNames?: Record<string, string>;
private _throttleGetStateHistory = throttle(() => {
this._getStateHistory();
}, 10000);
private _interval?: number;
private _subscribed?: Promise<(() => Promise<void>) | void>;
private _error?: string;
protected render(): TemplateResult {
if (!this.entityId) {
@ -59,7 +63,9 @@ export class MoreInfoHistory extends LitElement {
)}</a
>
</div>
${this._statistics
${this._error
? html`<div class="errors">${this._error}</div>`
: this._statistics
? html`<statistics-chart
.hass=${this.hass}
.isLoadingData=${!this._statistics}
@ -94,24 +100,45 @@ export class MoreInfoHistory extends LitElement {
this.entityId
}&start_date=${startOfYesterday().toISOString()}`;
this._throttleGetStateHistory();
this._getStateHistory();
}
}
public connectedCallback() {
super.connectedCallback();
if (this.hasUpdated && this.entityId) {
this._getStateHistory();
}
}
public disconnectedCallback() {
super.disconnectedCallback();
this._unsubscribeHistoryTimeWindow();
}
private _unsubscribeHistoryTimeWindow() {
if (!this._subscribed) {
return;
}
clearInterval(this._interval);
this._subscribed.then((unsubscribe) => {
if (unsubscribe) {
unsubscribe();
}
this._subscribed = undefined;
});
}
if (this._statistics || !this.entityId || !changedProps.has("hass")) {
// Don't update statistics on a state update, as they only update every 5 minutes.
return;
private _redrawGraph() {
if (this._stateHistory) {
this._stateHistory = { ...this._stateHistory };
}
}
const oldHass = changedProps.get("hass") as HomeAssistant | undefined;
if (
oldHass &&
this.hass.states[this.entityId] !== oldHass?.states[this.entityId]
) {
// wait for commit of data (we only account for the default setting of 1 sec)
setTimeout(this._throttleGetStateHistory, 1000);
}
private _setRedrawTimer() {
// redraw the graph every minute to update the time axis
clearInterval(this._interval);
this._interval = window.setInterval(() => this._redrawGraph(), 1000 * 60);
}
private async _getStateHistory(): Promise<void> {
@ -134,19 +161,32 @@ export class MoreInfoHistory extends LitElement {
return;
}
}
if (!isComponentLoaded(this.hass, "history")) {
if (!isComponentLoaded(this.hass, "history") || this._subscribed) {
return;
}
this._stateHistory = await getRecentWithCache(
if (this._subscribed) {
this._unsubscribeHistoryTimeWindow();
}
this._subscribed = subscribeHistoryStatesTimeWindow(
this.hass!,
[this.entityId],
{
cacheKey: `more_info.${this.entityId}`,
hoursToShow: 24,
(combinedHistory) => {
if (!this._subscribed) {
// Message came in before we had a chance to unload
return;
}
this._stateHistory = computeHistory(
this.hass!,
combinedHistory,
this.hass!.localize
);
},
this.hass!.localize,
this.hass!.language
);
24,
[this.entityId]
).catch((err) => {
this._subscribed = undefined;
this._error = err;
});
this._setRedrawTimer();
}
private _close(): void {

View File

@ -8,11 +8,14 @@ import {
} from "lit";
import { customElement, property, state } from "lit/decorators";
import { classMap } from "lit/directives/class-map";
import { throttle } from "../../../common/util/throttle";
import "../../../components/ha-card";
import "../../../components/chart/state-history-charts";
import { CacheConfig, getRecentWithCache } from "../../../data/cached-history";
import { HistoryResult } from "../../../data/history";
import { isComponentLoaded } from "../../../common/config/is_component_loaded";
import {
HistoryResult,
subscribeHistoryStatesTimeWindow,
computeHistory,
} from "../../../data/history";
import { HomeAssistant } from "../../../types";
import { hasConfigOrEntitiesChanged } from "../common/has-changed";
import { processConfigEntities } from "../common/process-config-entities";
@ -42,11 +45,15 @@ export class HuiHistoryGraphCard extends LitElement implements LovelaceCard {
private _names: Record<string, string> = {};
private _cacheConfig?: CacheConfig;
private _entityIds: string[] = [];
private _fetching = false;
private _hoursToShow = 24;
private _throttleGetStateHistory?: () => void;
private _error?: string;
private _interval?: number;
private _subscribed?: Promise<(() => Promise<void>) | void>;
public getCardSize(): number {
return this._config?.title
@ -67,27 +74,81 @@ export class HuiHistoryGraphCard extends LitElement implements LovelaceCard {
? processConfigEntities(config.entities)
: [];
const _entities: string[] = [];
this._configEntities.forEach((entity) => {
_entities.push(entity.entity);
this._entityIds.push(entity.entity);
if (entity.name) {
this._names[entity.entity] = entity.name;
}
});
this._throttleGetStateHistory = throttle(() => {
this._getStateHistory();
}, config.refresh_interval || 10 * 1000);
this._cacheConfig = {
cacheKey: _entities.join(),
hoursToShow: config.hours_to_show || 24,
};
this._hoursToShow = config.hours_to_show || 24;
this._config = config;
}
public connectedCallback() {
super.connectedCallback();
if (this.hasUpdated) {
this._subscribeHistoryTimeWindow();
}
}
public disconnectedCallback() {
super.disconnectedCallback();
this._unsubscribeHistoryTimeWindow();
}
private _subscribeHistoryTimeWindow() {
if (!isComponentLoaded(this.hass!, "history") || this._subscribed) {
return;
}
this._subscribed = subscribeHistoryStatesTimeWindow(
this.hass!,
(combinedHistory) => {
if (!this._subscribed) {
// Message came in before we had a chance to unload
return;
}
this._stateHistory = computeHistory(
this.hass!,
combinedHistory,
this.hass!.localize
);
},
this._hoursToShow,
this._entityIds
).catch((err) => {
this._subscribed = undefined;
this._error = err;
});
this._setRedrawTimer();
}
private _redrawGraph() {
if (this._stateHistory) {
this._stateHistory = { ...this._stateHistory };
}
}
private _setRedrawTimer() {
// redraw the graph every minute to update the time axis
clearInterval(this._interval);
this._interval = window.setInterval(() => this._redrawGraph(), 1000 * 60);
}
private _unsubscribeHistoryTimeWindow() {
if (!this._subscribed) {
return;
}
clearInterval(this._interval);
this._subscribed.then((unsubscribe) => {
if (unsubscribe) {
unsubscribe();
}
this._subscribed = undefined;
});
}
protected shouldUpdate(changedProps: PropertyValues): boolean {
if (changedProps.has("_stateHistory")) {
return true;
@ -100,8 +161,8 @@ export class HuiHistoryGraphCard extends LitElement implements LovelaceCard {
if (
!this._config ||
!this.hass ||
!this._throttleGetStateHistory ||
!this._cacheConfig
!this._hoursToShow ||
!this._entityIds.length
) {
return;
}
@ -116,13 +177,12 @@ export class HuiHistoryGraphCard extends LitElement implements LovelaceCard {
if (
changedProps.has("_config") &&
(oldConfig?.entities !== this._config.entities ||
oldConfig?.hours_to_show !== this._config.hours_to_show)
(!this._subscribed ||
oldConfig?.entities !== this._config.entities ||
oldConfig?.hours_to_show !== this._hoursToShow)
) {
this._throttleGetStateHistory();
} else if (changedProps.has("hass")) {
// wait for commit of data (we only account for the default setting of 1 sec)
setTimeout(this._throttleGetStateHistory, 1000);
this._unsubscribeHistoryTimeWindow();
this._subscribeHistoryTimeWindow();
}
}
@ -131,6 +191,10 @@ export class HuiHistoryGraphCard extends LitElement implements LovelaceCard {
return html``;
}
if (this._error) {
return html`<div class="errors">${this._error}</div>`;
}
return html`
<ha-card .header=${this._config.title}>
<div
@ -153,26 +217,6 @@ export class HuiHistoryGraphCard extends LitElement implements LovelaceCard {
`;
}
private async _getStateHistory(): Promise<void> {
if (this._fetching) {
return;
}
this._fetching = true;
try {
this._stateHistory = {
...(await getRecentWithCache(
this.hass!,
this._configEntities!.map((config) => config.entity),
this._cacheConfig!,
this.hass!.localize,
this.hass!.language
)),
};
} finally {
this._fetching = false;
}
}
static get styles(): CSSResultGroup {
return css`
ha-card {