from typing import List, Tuple, Optional, Any from aiohttp import ClientSession, BasicAuth, TCPConnector import logging import asyncio from pydantic import BaseModel, Field logger = logging.getLogger(__name__) DEVICE_TYPE_LOCK = "HmIP-DLD" CHANNEL_TYPES_RELEVANT = [ "DOOR_LOCK_STATE_TRANSMITTER", "MAINTENANCE" ] PARAM_ADDRESSES_RELEVANT = [ "ACTIVITY_STATE", "LOCK_STATE", "LOCK_TARGET_LEVEL", "ERROR_JAMMED", "LOW_BAT", "UNREACH" ] class CCURef(BaseModel): href: str rel: str title: str class CCUDeviceList(BaseModel): description: str identifier: str title: str links: List[CCURef] = Field(alias="~links") class CCUDeviceInfo(BaseModel): address: str identifier: str title: str type: str links: List[CCURef] = Field(alias="~links") class CCUChannelInfo(BaseModel): address: Optional[str] = None index: Optional[int] = None title: str description: str = Field(default="") type: Optional[str] = Field(default=None) links: List[CCURef] = Field(alias="~links") class CCUParamInfo(BaseModel): id: str mqttStatusTopic: str links: List[CCURef] = Field(alias="~links") class CCUValue(BaseModel): s: int ts: int v: Any LockData = List[Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]] class CCUJackClient: base_uri: str locks: LockData def __init__(self, base_uri: str, auth: BasicAuth): self.http = ClientSession(base_url=base_uri, auth=auth, raise_for_status=True, connector=TCPConnector(ssl=False)) self.locks = None async def find_locks(self): logger.debug("Inspecting lock devices present in CCUJack") async with self.http.get("/device") as resp: devices = CCUDeviceList.model_validate(await resp.json()) device_infos = await asyncio.gather(*[ self._inspect_ccu_device(i) for i in devices.links if i.rel == "device" ]) self.locks = [ i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK ] async def query_param_value(self, address: str): logger.debug("Querying parameter value from '%s'", address) async with self.http.get(f"/device/{address}/~pv") as resp: return CCUValue.model_validate(await resp.json()) async def set_param_value(self, address: str, value: Any): logger.debug("Writing parameter value '%s' to '%s'", value, address) await self.http.put(f"/device/{address}/~pv", json={"v": value}) async def _inspect_ccu_device(self, device_ref: CCURef) -> Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]: logger.debug("Inspecting device '%s' (%s)", device_ref.href, device_ref.title) async with self.http.get(f"/device/{device_ref.href}") as resp: device_info = CCUDeviceInfo.model_validate(await resp.json()) if device_info.type != DEVICE_TYPE_LOCK: return device_info, [] channel_infos = await asyncio.gather(*[ self._inspect_ccu_channel(device_ref, i) for i in device_info.links if i.rel == "channel" ]) return device_info, [ i for i in channel_infos if i[0].type in CHANNEL_TYPES_RELEVANT ] async def _inspect_ccu_channel(self, device_ref: CCURef, channel_ref: CCURef) -> Tuple[CCUChannelInfo, List[CCUParamInfo]]: logger.debug("Inspecting device channel '%s/%s'", device_ref.href, channel_ref.href) async with self.http.get(f"/device/{device_ref.href}/{channel_ref.href}") as resp: channel_info = CCUChannelInfo.model_validate(await resp.json()) param_infos = await asyncio.gather(*[ self._inspect_ccu_param(device_ref, channel_ref, i) for i in channel_info.links if i.rel == "parameter" and i.href in PARAM_ADDRESSES_RELEVANT ]) return channel_info, param_infos async def _inspect_ccu_param(self, device_ref: CCURef, channel_ref: CCURef, param_ref: CCURef) -> CCUParamInfo: logger.debug("Inspecting device parameter '%s/%s/%s'", device_ref.href, channel_ref.href, param_ref.href) async with self.http.get(f"/device/{device_ref.href}/{channel_ref.href}/{param_ref.href}") as resp: return CCUParamInfo.model_validate(await resp.json())