From 7ac0a4106cdc621f7b62dab7317d7c0c6317fb9b Mon Sep 17 00:00:00 2001 From: lilly Date: Tue, 19 May 2026 14:37:55 +0200 Subject: [PATCH] api: implement Server-Sent-Events on top of mqtt parameters --- api/src/dooris_api/app.py | 15 ++++++++++++++- api/src/dooris_api/ccujack.py | 34 ++++++++++++++++++++++------------ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/api/src/dooris_api/app.py b/api/src/dooris_api/app.py index 32d626d..edb338f 100644 --- a/api/src/dooris_api/app.py +++ b/api/src/dooris_api/app.py @@ -1,10 +1,11 @@ -from typing import Optional, List +from typing import Optional, List, AsyncIterable import logging import secrets import sys from datetime import datetime, UTC from fastapi import FastAPI, Request, Response, status from fastapi.responses import RedirectResponse +from fastapi.sse import EventSourceResponse from contextlib import asynccontextmanager from simple_openid_connect.client import OpenidClient from simple_openid_connect.data import TokenSuccessResponse, RpInitiatedLogoutRequest @@ -241,6 +242,18 @@ async def list_locks(ccujack: deps.CCUJackClient) -> List[models.Lock]: return result +@app.get( + "/api/locks/stream", + tags=["locks"], + responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}}, + response_class=EventSourceResponse, +) +async def watch_locks(ccujack: deps.CCUJackClient) -> AsyncIterable[List[models.Lock]]: + while True: + yield await list_locks(ccujack) + await ccujack.data_updated.wait() + + @app.patch( "/api/locks/{lock_id}", tags=["locks"], diff --git a/api/src/dooris_api/ccujack.py b/api/src/dooris_api/ccujack.py index 112fbc1..1132e63 100644 --- a/api/src/dooris_api/ccujack.py +++ b/api/src/dooris_api/ccujack.py @@ -72,6 +72,7 @@ class CCUJackClient: locks: LockData param_values: Dict[str, Any] task_process_messages: asyncio.Task + data_updated: asyncio.Event def __init__(self, base_uri: str, auth: BasicAuth, mqtt_conn: str): self.http = ClientSession( @@ -84,6 +85,7 @@ class CCUJackClient: self.locks = None self.param_values = dict() self.task_process_messages = None + self.data_updated = asyncio.Event() async def connect_mqtt(self): await self.mqtt.connect() @@ -99,10 +101,10 @@ class CCUJackClient: async def find_locks(self): logger.debug("Inspecting lock devices present in CCUJack") - # iterate through the CCUJACK API to find all devices async with self.http.get("/device") as resp: devices = CCUDeviceList.model_validate(await resp.json()) + # inspect CCUJACK for locks device_infos = await asyncio.gather( *[ self._inspect_ccu_device(i) @@ -111,17 +113,22 @@ class CCUJackClient: ] ) - self.locks = [i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK] - - # update active mqtt subscriptions - mqtt_topics = set() - for i_lock, lock_channels in self.locks: - for i_channel, channel_params in lock_channels: - for i_param in channel_params: - mqtt_topics.add( - f"device/status/{i_lock.address}/{i_channel.index}/{i_param.id}" - ) - await self.mqtt.update_subscriptions(mqtt_topics) + # save the result + new_locks = [i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK] + if new_locks != self.locks: + self.locks = new_locks + self.data_updated.set() + self.data_updated.clear() + + # update active mqtt subscriptions based on newly discovered devices + mqtt_topics = set() + for i_lock, lock_channels in self.locks: + for i_channel, channel_params in lock_channels: + for i_param in channel_params: + mqtt_topics.add( + f"device/status/{i_lock.address}/{i_channel.index}/{i_param.id}" + ) + await self.mqtt.update_subscriptions(mqtt_topics) async def process_mqt_messages(self): while True: @@ -134,9 +141,12 @@ class CCUJackClient: f"Got new value from MQTT for parameter {param_name}: {param_value}" ) self.param_values[param_name] = param_value + self.data_updated.set() except Exception as e: logger.exception(f"could not process incoming mqtt message: {e}") + finally: + self.data_updated.clear() async def query_param_value(self, address: str) -> CCUValue: if address in self.param_values: