diff --git a/src/data/history.ts b/src/data/history.ts index c8c0c9f95d..7a5bd11542 100644 --- a/src/data/history.ts +++ b/src/data/history.ts @@ -189,6 +189,7 @@ export const subscribeHistory = ( ): Promise<() => Promise> => { const params = { type: "history/stream", + entity_ids: entityIds, start_time: startTime.toISOString(), end_time: endTime.toISOString(), minimal_response: true, @@ -216,13 +217,11 @@ class HistoryStream { } processMessage(streamMessage: HistoryStreamMessage): HistoryStates { - // Put newest ones on top. Reverse works in-place so - // make a copy first. - if (!this.combinedHistory || !this.combinedHistory.length) { + if (!this.combinedHistory || !Object.keys(this.combinedHistory).length) { this.combinedHistory = streamMessage.states; return this.combinedHistory; } - if (!streamMessage.states.length) { + if (!Object.keys(streamMessage.states).length) { // Empty messages are still sent to // indicate no more historical events return this.combinedHistory; @@ -230,27 +229,52 @@ class HistoryStream { const purgeBeforePythonTime = (new Date().getTime() - 60 * 60 * this.hoursToShow * 1000) / 1000; const newHistory: HistoryStates = {}; - Object.keys(streamMessage.states).forEach((entityId) => { - newHistory[entityId] = []; - }); Object.keys(this.combinedHistory).forEach((entityId) => { newHistory[entityId] = []; }); + Object.keys(streamMessage.states).forEach((entityId) => { + newHistory[entityId] = []; + }); Object.keys(newHistory).forEach((entityId) => { + let purgeOld = false; if ( entityId in this.combinedHistory && entityId in streamMessage.states ) { - newHistory[entityId] = this.combinedHistory[entityId] - .concat(streamMessage.states[entityId]) - .filter((entry) => entry.lu > purgeBeforePythonTime); - } else if (entityId in this.combinedHistory) { - newHistory[entityId] = this.combinedHistory[entityId].filter( - (entry) => entry.lu > purgeBeforePythonTime + newHistory[entityId] = this.combinedHistory[entityId].concat( + streamMessage.states[entityId] ); + if ( + streamMessage.states[entityId][0] > this.combinedHistory[entityId][-1] + ) { + // If the history is out of order we have to sort it. + newHistory[entityId] = newHistory[entityId].sort( + (a, b) => a.lu - b.lu + ); + } + purgeOld = true; + } else if (entityId in this.combinedHistory) { + newHistory[entityId] = this.combinedHistory[entityId]; + purgeOld = true; } else { newHistory[entityId] = streamMessage.states[entityId]; } + if (purgeOld) { + const entityHistory = newHistory[entityId]; + while (entityHistory[0].lu < purgeBeforePythonTime) { + if (entityHistory.length > 1) { + if (entityHistory[1].lu < purgeBeforePythonTime) { + newHistory[entityId].shift(); + continue; + } + } + // Update the first entry to the start time state + // as we need to preserve the start time state and + // only expire the rest of the history as it ages. + entityHistory[0].lu = purgeBeforePythonTime; + break; + } + } }); this.combinedHistory = newHistory; return this.combinedHistory; @@ -265,6 +289,7 @@ export const subscribeHistoryStatesWindow = ( ): Promise<() => Promise> => { const params = { type: "history/stream", + entity_ids: entityIds, start_time: new Date( new Date().getTime() - 60 * 60 * hoursToShow * 1000 ).toISOString(),