api: implement ccu querying

This commit is contained in:
lilly 2026-05-09 21:16:43 +02:00
commit c1a78e4dc9
Signed by: lilly
SSH key fingerprint: SHA256:y9T5GFw2A20WVklhetIxG1+kcg/Ce0shnQmbu1LQ37g
6 changed files with 628 additions and 22 deletions

View file

@ -1,16 +1,20 @@
from typing import Optional, List
from typing import Optional, List, Any
import logging
import secrets
import sys
import os
from datetime import datetime, UTC
from fastapi import FastAPI, Request, Response, HTTPException, status
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import RedirectResponse
from contextlib import asynccontextmanager
from simple_openid_connect.client import OpenidClient
from simple_openid_connect.data import TokenSuccessResponse
from cachetools import TTLCache
from aiohttp import BasicAuth
import asyncio
from dooris_api import deps, models, exceptions
from dooris_api.ccujack import CCUJackClient
logger = logging.getLogger(__name__)
@ -32,6 +36,8 @@ async def lifespan(app: FastAPI):
)
app.extra["cache"] = TTLCache(maxsize=64, ttl=30 * 60)
app.extra["ccujack"] = CCUJackClient("https://hmdooris-ccu.ccchh.net:2122", auth=BasicAuth("dooris", os.environ["HMDOORIS_PW"]))
yield
@ -106,19 +112,64 @@ async def login_callback(req: Request, resp: Response, oidc_client: deps.OpenidC
logger.debug("successfully authenticated user")
return str(req.url_for("get-user-info"))
else:
logger.debu("could not authenticate user because of OIDC error; redirecting to error page with error messages intact")
logger.debug("could not authenticate user because of OIDC error; redirecting to error page with error messages intact")
return f"/auth/login-error?{req.query_params}"
@app.get("/api/doors/", tags=["doors"], responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}})
async def list_doors(cache: deps.Cache, _user: deps.CurrentUser) -> List[models.Door]:
return [
models.Door(name="door1", description="A static door for testing", status="unknown"),
models.Door(name="door2", description="another static door for testing", status="unknown"),
]
@app.get("/api/locks/", tags=["locks"], responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}})
async def list_locks(ccujack: deps.CCUJackClient) -> List[models.Lock]:
locks = await ccujack.find_locks()
# TODO: Properly associate parameters with their values
# values = await asyncio.gather(*[
# ccujack.query_param_value(f"{i_lock.address}/{i_channel.index}/{i_param.id}/~pv")
# for i_lock, lock_channels in locks
# for i_channel, channel_params in lock_channels
# for i_param in channel_params
# ])
# assemble result objects
result = []
for i_lock, lock_channels in locks:
status_data = dict()
for i_channel, channel_params in lock_channels:
for i_param in channel_params:
value = await ccujack.query_param_value(f"{i_lock.address}/{i_channel.index}/{i_param.id}")
match i_param.id:
case "LOCK_STATE":
match value.v:
case 0:
status_data["lock_state"] = "unknown"
case 1:
status_data["lock_state"] = "locked"
case 2:
status_data["lock_state"] = "unlocked"
case "ACTIVITY_STATE":
match value.v:
case 1:
status_data["activity_state"] = "unlocking"
case 2:
status_data["activity_state"] = "locking"
case 3:
status_data["activity_state"] = "stable"
case "LOCK_TARGET_LEVEL":
match value.v:
case 0:
status_data["lock_target_level"] = "locked"
case 1:
status_data["lock_target_level"] = "unlocked"
case 2:
status_data["lock_target_level"] = "open"
case "LOW_BAT":
status_data["is_low_battery"] = value.v
case "ERROR_JAMMED":
status_data["is_error_jammed"] = value.v
case "UNREACH":
status_data["is_unreachable"] = value.v
result.append(models.Lock(name=i_lock.title, status=models.LockStatus(**status_data)))
return result
@app.get("/api/doors/{name}", tags=["doors"], responses={status.HTTP_404_NOT_FOUND: {"model": models.HttpProblemDetail}, status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}})
async def get_door(name: str, req: Request, cache: deps.Cache, _user: deps.CurrentUser) -> Optional[models.Door]:
raise exceptions.HttpProblemException(models.HttpProblemDetail.new_door_not_found(name, req.url))

View file

@ -0,0 +1,127 @@
from typing import List, Tuple, Optional, Any
from aiohttp import ClientSession, BasicAuth, TCPConnector
import logging
import asyncio
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
DEVICE_TYPE_LOCK = "HmIP-DLD"
CHANNEL_TYPES_RELEVANT = [ "DOOR_LOCK_STATE_TRANSMITTER", "MAINTENANCE" ]
PARAM_ADDRESSES_RELEVANT = [ "ACTIVITY_STATE", "LOCK_STATE", "LOCK_TARGET_LEVEL", "ERROR_JAMMED", "LOW_BAT", "UNREACH" ]
class CCURef(BaseModel):
href: str
rel: str
title: str
class CCUDeviceList(BaseModel):
description: str
identifier: str
title: str
links: List[CCURef] = Field(alias="~links")
class CCUDeviceInfo(BaseModel):
address: str
identifier: str
title: str
type: str
links: List[CCURef] = Field(alias="~links")
class CCUChannelInfo(BaseModel):
address: Optional[str] = None
index: Optional[int] = None
title: str
description: str = Field(default="")
type: Optional[str] = Field(default=None)
links: List[CCURef] = Field(alias="~links")
class CCUParamInfo(BaseModel):
id: str
mqttStatusTopic: str
links: List[CCURef] = Field(alias="~links")
class CCUValue(BaseModel):
s: int
ts: int
v: Any
class CCUJackClient:
base_uri: str
def __init__(self, base_uri: str, auth: BasicAuth):
self.http = ClientSession(base_url=base_uri, auth=auth, raise_for_status=True, connector=TCPConnector(ssl=False))
async def find_locks(self) -> List[Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]]:
logger.debug("Inspecting lock devices present in CCUJack")
async with self.http.get("/device") as resp:
devices = CCUDeviceList.model_validate(await resp.json())
device_infos = await asyncio.gather(*[
self._inspect_ccu_device(i)
for i in devices.links
if i.rel == "device"
])
return [
i
for i in device_infos
if i[0].type == DEVICE_TYPE_LOCK
]
async def query_param_value(self, address):
logger.debug("Querying parameter value from '%s'", address)
async with self.http.get(f"/device/{address}/~pv") as resp:
return CCUValue.model_validate(await resp.json())
# async def toggle_lock(self, lock_id, new_state):
# pass
async def _inspect_ccu_device(self, device_ref: CCURef) -> Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]:
logger.debug("Inspecting device '%s' (%s)", device_ref.href, device_ref.title)
async with self.http.get(f"/device/{device_ref.href}") as resp:
device_info = CCUDeviceInfo.model_validate(await resp.json())
if device_info.type != DEVICE_TYPE_LOCK:
return device_info, []
channel_infos = await asyncio.gather(*[
self._inspect_ccu_channel(device_ref, i)
for i in device_info.links
if i.rel == "channel"
])
return device_info, [
i
for i in channel_infos
if i[0].type in CHANNEL_TYPES_RELEVANT
]
async def _inspect_ccu_channel(self, device_ref: CCURef, channel_ref: CCURef) -> Tuple[CCUChannelInfo, List[CCUParamInfo]]:
logger.debug("Inspecting device channel '%s/%s'", device_ref.href, channel_ref.href)
async with self.http.get(f"/device/{device_ref.href}/{channel_ref.href}") as resp:
channel_info = CCUChannelInfo.model_validate(await resp.json())
param_infos = await asyncio.gather(*[
self._inspect_ccu_param(device_ref, channel_ref, i)
for i in channel_info.links
if i.rel == "parameter" and i.href in PARAM_ADDRESSES_RELEVANT
])
return channel_info, param_infos
async def _inspect_ccu_param(self, device_ref: CCURef, channel_ref: CCURef, param_ref: CCURef) -> CCUParamInfo:
logger.debug("Inspecting device parameter '%s/%s/%s'", device_ref.href, channel_ref.href, param_ref.href)
async with self.http.get(f"/device/{device_ref.href}/{channel_ref.href}/{param_ref.href}") as resp:
return CCUParamInfo.model_validate(await resp.json())

View file

@ -7,6 +7,7 @@ from simple_openid_connect.client import OpenidClient
from cachetools import Cache
from dooris_api import models, exceptions
from dooris_api.ccujack import CCUJackClient
logger = logging.getLogger(__name__)
@ -77,3 +78,10 @@ def get_cache(req: Request) -> Cache:
Cache = Annotated[Cache, Depends(get_cache)]
def get_ccujack(req: Request) -> CCUJackClient:
return req.app.extra["ccujack"]
CCUJackClient = Annotated[CCUJackClient, Depends(get_ccujack)]

View file

@ -1,4 +1,4 @@
from typing import Optional, Self
from typing import Optional, Self, Mapping, Any, Literal
from datetime import datetime
from pydantic import BaseModel, HttpUrl
from enum import Enum
@ -48,14 +48,16 @@ class UserStatus(BaseModel):
user_info: Optional[UserInfo]
class DoorStatus(Enum):
OPEN = "open"
CLOSED = "closed"
UNKNOWN = "unknown"
class LockStatus(BaseModel):
is_unreachable: bool
is_low_battery: bool
is_error_jammed: bool
lock_target_level: Literal["locked", "unlocked", "open"]
lock_state: Literal["unknown", "locked", "unlocked"]
activity_state: Literal["unknown", "locking", "unlocking", "stable"]
class Door(BaseModel):
class Lock(BaseModel):
name: str
description: str
status: DoorStatus
status: LockStatus