From 412587a4572ac2c3eef7cc9e2c77bdd1d426973c Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 15 Jan 2023 15:06:47 -1000 Subject: [PATCH] Add support for streaming history --- src/data/history.ts | 87 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 82 insertions(+), 5 deletions(-) diff --git a/src/data/history.ts b/src/data/history.ts index f0b145a8f0..adab7e4d01 100644 --- a/src/data/history.ts +++ b/src/data/history.ts @@ -79,12 +79,8 @@ interface EntityHistoryState { lu: number; } -interface EntityStateDict { - [entity_id: string]: T; -} - export interface HistoryStreamMessage { - states: EntityStateDict; + states: HistoryStates; start_time?: number; // Start time of this historical chunk end_time?: number; // End time of this historical chunk } @@ -207,6 +203,87 @@ export const subscribeHistory = ( ); }; +class HistoryStream { + hass: HomeAssistant; + + startTime: Date; + + endTime: Date; + + combinedHistory: HistoryStates; + + constructor(hass: HomeAssistant, startTime: Date, endTime: Date) { + this.hass = hass; + this.startTime = startTime; + this.endTime = endTime; + this.combinedHistory = {}; + } + + processMessage(streamMessage: HistoryStreamMessage): HistoryStates { + // Put newest ones on top. Reverse works in-place so + // make a copy first. + if (!this.combinedHistory || !this.combinedHistory.length) { + this.combinedHistory = streamMessage.states; + return this.combinedHistory; + } + if (!streamMessage.states.length) { + // Empty messages are still sent to + // indicate no more historical events + return this.combinedHistory; + } + const purgeBeforePythonTime = this.startTime.getTime() / 1000; + const newHistory: HistoryStates = {}; + Object.keys(streamMessage.states).forEach((entityId) => { + newHistory[entityId] = []; + }); + Object.keys(this.combinedHistory).forEach((entityId) => { + newHistory[entityId] = []; + }); + Object.keys(newHistory).forEach((entityId) => { + if ( + entityId in this.combinedHistory && + entityId in streamMessage.states + ) { + newHistory[entityId] = this.combinedHistory[entityId] + .concat(streamMessage.states[entityId]) + .filter((entry) => entry.lu > purgeBeforePythonTime); + } else if (entityId in this.combinedHistory) { + newHistory[entityId] = this.combinedHistory[entityId].filter( + (entry) => entry.lu > purgeBeforePythonTime + ); + } else { + newHistory[entityId] = streamMessage.states[entityId]; + } + }); + this.combinedHistory = newHistory; + return this.combinedHistory; + } +} + +export const subscribeHistoryStates = ( + hass: HomeAssistant, + callbackFunction: (data: HistoryStates) => void, + startTime: Date, + endTime: Date, + entityIds: string[] +): Promise<() => Promise> => { + // If all specified filters are empty lists, we can return an empty list. + const params = { + type: "history/history_during_period", + start_time: startTime.toISOString(), + end_time: endTime.toISOString(), + minimal_response: true, + no_attributes: !entityIds.some((entityId) => + entityIdHistoryNeedsAttributes(hass, entityId) + ), + }; + const stream = new HistoryStream(hass, startTime, endTime); + return hass.connection.subscribeMessage( + (message) => callbackFunction(stream.processMessage(message)), + params + ); +}; + const equalState = (obj1: LineChartState, obj2: LineChartState) => obj1.state === obj2.state && // Only compare attributes if both states have an attributes object.