This commit is contained in:
J. Nick Koston 2023-01-15 16:21:04 -10:00
parent c34e845886
commit 87d781f786

View File

@ -189,6 +189,7 @@ export const subscribeHistory = (
): Promise<() => Promise<void>> => {
const params = {
type: "history/stream",
entity_ids: entityIds,
start_time: startTime.toISOString(),
end_time: endTime.toISOString(),
minimal_response: true,
@ -216,13 +217,11 @@ class HistoryStream {
}
processMessage(streamMessage: HistoryStreamMessage): HistoryStates {
// Put newest ones on top. Reverse works in-place so
// make a copy first.
if (!this.combinedHistory || !this.combinedHistory.length) {
if (!this.combinedHistory || !Object.keys(this.combinedHistory).length) {
this.combinedHistory = streamMessage.states;
return this.combinedHistory;
}
if (!streamMessage.states.length) {
if (!Object.keys(streamMessage.states).length) {
// Empty messages are still sent to
// indicate no more historical events
return this.combinedHistory;
@ -230,27 +229,52 @@ class HistoryStream {
const purgeBeforePythonTime =
(new Date().getTime() - 60 * 60 * this.hoursToShow * 1000) / 1000;
const newHistory: HistoryStates = {};
Object.keys(streamMessage.states).forEach((entityId) => {
newHistory[entityId] = [];
});
Object.keys(this.combinedHistory).forEach((entityId) => {
newHistory[entityId] = [];
});
Object.keys(streamMessage.states).forEach((entityId) => {
newHistory[entityId] = [];
});
Object.keys(newHistory).forEach((entityId) => {
let purgeOld = false;
if (
entityId in this.combinedHistory &&
entityId in streamMessage.states
) {
newHistory[entityId] = this.combinedHistory[entityId]
.concat(streamMessage.states[entityId])
.filter((entry) => entry.lu > purgeBeforePythonTime);
} else if (entityId in this.combinedHistory) {
newHistory[entityId] = this.combinedHistory[entityId].filter(
(entry) => entry.lu > purgeBeforePythonTime
newHistory[entityId] = this.combinedHistory[entityId].concat(
streamMessage.states[entityId]
);
if (
streamMessage.states[entityId][0] > this.combinedHistory[entityId][-1]
) {
// If the history is out of order we have to sort it.
newHistory[entityId] = newHistory[entityId].sort(
(a, b) => a.lu - b.lu
);
}
purgeOld = true;
} else if (entityId in this.combinedHistory) {
newHistory[entityId] = this.combinedHistory[entityId];
purgeOld = true;
} else {
newHistory[entityId] = streamMessage.states[entityId];
}
if (purgeOld) {
const entityHistory = newHistory[entityId];
while (entityHistory[0].lu < purgeBeforePythonTime) {
if (entityHistory.length > 1) {
if (entityHistory[1].lu < purgeBeforePythonTime) {
newHistory[entityId].shift();
continue;
}
}
// Update the first entry to the start time state
// as we need to preserve the start time state and
// only expire the rest of the history as it ages.
entityHistory[0].lu = purgeBeforePythonTime;
break;
}
}
});
this.combinedHistory = newHistory;
return this.combinedHistory;
@ -265,6 +289,7 @@ export const subscribeHistoryStatesWindow = (
): Promise<() => Promise<void>> => {
const params = {
type: "history/stream",
entity_ids: entityIds,
start_time: new Date(
new Date().getTime() - 60 * 60 * hoursToShow * 1000
).toISOString(),