api: implement Server-Sent-Events on top of mqtt parameters
All checks were successful
Build Container / Build Container (push) Successful in 1m47s

This commit is contained in:
lilly 2026-05-19 14:37:55 +02:00
commit 7ac0a4106c
Signed by: lilly
SSH key fingerprint: SHA256:y9T5GFw2A20WVklhetIxG1+kcg/Ce0shnQmbu1LQ37g
2 changed files with 36 additions and 13 deletions

View file

@ -1,10 +1,11 @@
from typing import Optional, List
from typing import Optional, List, AsyncIterable
import logging
import secrets
import sys
from datetime import datetime, UTC
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import RedirectResponse
from fastapi.sse import EventSourceResponse
from contextlib import asynccontextmanager
from simple_openid_connect.client import OpenidClient
from simple_openid_connect.data import TokenSuccessResponse, RpInitiatedLogoutRequest
@ -241,6 +242,18 @@ async def list_locks(ccujack: deps.CCUJackClient) -> List[models.Lock]:
return result
@app.get(
"/api/locks/stream",
tags=["locks"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
response_class=EventSourceResponse,
)
async def watch_locks(ccujack: deps.CCUJackClient) -> AsyncIterable[List[models.Lock]]:
while True:
yield await list_locks(ccujack)
await ccujack.data_updated.wait()
@app.patch(
"/api/locks/{lock_id}",
tags=["locks"],

View file

@ -72,6 +72,7 @@ class CCUJackClient:
locks: LockData
param_values: Dict[str, Any]
task_process_messages: asyncio.Task
data_updated: asyncio.Event
def __init__(self, base_uri: str, auth: BasicAuth, mqtt_conn: str):
self.http = ClientSession(
@ -84,6 +85,7 @@ class CCUJackClient:
self.locks = None
self.param_values = dict()
self.task_process_messages = None
self.data_updated = asyncio.Event()
async def connect_mqtt(self):
await self.mqtt.connect()
@ -99,10 +101,10 @@ class CCUJackClient:
async def find_locks(self):
logger.debug("Inspecting lock devices present in CCUJack")
# iterate through the CCUJACK API to find all devices
async with self.http.get("/device") as resp:
devices = CCUDeviceList.model_validate(await resp.json())
# inspect CCUJACK for locks
device_infos = await asyncio.gather(
*[
self._inspect_ccu_device(i)
@ -111,17 +113,22 @@ class CCUJackClient:
]
)
self.locks = [i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK]
# save the result
new_locks = [i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK]
if new_locks != self.locks:
self.locks = new_locks
self.data_updated.set()
self.data_updated.clear()
# update active mqtt subscriptions
mqtt_topics = set()
for i_lock, lock_channels in self.locks:
for i_channel, channel_params in lock_channels:
for i_param in channel_params:
mqtt_topics.add(
f"device/status/{i_lock.address}/{i_channel.index}/{i_param.id}"
)
await self.mqtt.update_subscriptions(mqtt_topics)
# update active mqtt subscriptions based on newly discovered devices
mqtt_topics = set()
for i_lock, lock_channels in self.locks:
for i_channel, channel_params in lock_channels:
for i_param in channel_params:
mqtt_topics.add(
f"device/status/{i_lock.address}/{i_channel.index}/{i_param.id}"
)
await self.mqtt.update_subscriptions(mqtt_topics)
async def process_mqt_messages(self):
while True:
@ -134,9 +141,12 @@ class CCUJackClient:
f"Got new value from MQTT for parameter {param_name}: {param_value}"
)
self.param_values[param_name] = param_value
self.data_updated.set()
except Exception as e:
logger.exception(f"could not process incoming mqtt message: {e}")
finally:
self.data_updated.clear()
async def query_param_value(self, address: str) -> CCUValue:
if address in self.param_values: