Add support for streaming history

This commit is contained in:
J. Nick Koston 2023-01-15 15:06:47 -10:00
parent 5b3a13f8d4
commit 412587a457

View File

@ -79,12 +79,8 @@ interface EntityHistoryState {
lu: number;
}
interface EntityStateDict<T> {
[entity_id: string]: T;
}
export interface HistoryStreamMessage {
states: EntityStateDict<EntityHistoryState[]>;
states: HistoryStates;
start_time?: number; // Start time of this historical chunk
end_time?: number; // End time of this historical chunk
}
@ -207,6 +203,87 @@ export const subscribeHistory = (
);
};
class HistoryStream {
hass: HomeAssistant;
startTime: Date;
endTime: Date;
combinedHistory: HistoryStates;
constructor(hass: HomeAssistant, startTime: Date, endTime: Date) {
this.hass = hass;
this.startTime = startTime;
this.endTime = endTime;
this.combinedHistory = {};
}
processMessage(streamMessage: HistoryStreamMessage): HistoryStates {
// Put newest ones on top. Reverse works in-place so
// make a copy first.
if (!this.combinedHistory || !this.combinedHistory.length) {
this.combinedHistory = streamMessage.states;
return this.combinedHistory;
}
if (!streamMessage.states.length) {
// Empty messages are still sent to
// indicate no more historical events
return this.combinedHistory;
}
const purgeBeforePythonTime = this.startTime.getTime() / 1000;
const newHistory: HistoryStates = {};
Object.keys(streamMessage.states).forEach((entityId) => {
newHistory[entityId] = [];
});
Object.keys(this.combinedHistory).forEach((entityId) => {
newHistory[entityId] = [];
});
Object.keys(newHistory).forEach((entityId) => {
if (
entityId in this.combinedHistory &&
entityId in streamMessage.states
) {
newHistory[entityId] = this.combinedHistory[entityId]
.concat(streamMessage.states[entityId])
.filter((entry) => entry.lu > purgeBeforePythonTime);
} else if (entityId in this.combinedHistory) {
newHistory[entityId] = this.combinedHistory[entityId].filter(
(entry) => entry.lu > purgeBeforePythonTime
);
} else {
newHistory[entityId] = streamMessage.states[entityId];
}
});
this.combinedHistory = newHistory;
return this.combinedHistory;
}
}
export const subscribeHistoryStates = (
hass: HomeAssistant,
callbackFunction: (data: HistoryStates) => void,
startTime: Date,
endTime: Date,
entityIds: string[]
): Promise<() => Promise<void>> => {
// If all specified filters are empty lists, we can return an empty list.
const params = {
type: "history/history_during_period",
start_time: startTime.toISOString(),
end_time: endTime.toISOString(),
minimal_response: true,
no_attributes: !entityIds.some((entityId) =>
entityIdHistoryNeedsAttributes(hass, entityId)
),
};
const stream = new HistoryStream(hass, startTime, endTime);
return hass.connection.subscribeMessage<HistoryStreamMessage>(
(message) => callbackFunction(stream.processMessage(message)),
params
);
};
const equalState = (obj1: LineChartState, obj2: LineChartState) =>
obj1.state === obj2.state &&
// Only compare attributes if both states have an attributes object.