from typing import List, Tuple, Optional, Any from aiohttp import ClientSession, BasicAuth, TCPConnector import logging import asyncio from pydantic import BaseModel, Field logger = logging.getLogger(__name__) DEVICE_TYPE_LOCK = "HmIP-DLD" CHANNEL_TYPES_RELEVANT = ["DOOR_LOCK_STATE_TRANSMITTER", "MAINTENANCE"] PARAM_ADDRESSES_RELEVANT = [ "ACTIVITY_STATE", "LOCK_STATE", "LOCK_TARGET_LEVEL", "ERROR_JAMMED", "LOW_BAT", "UNREACH", ] class CCURef(BaseModel): href: str rel: str title: str class CCUDeviceList(BaseModel): description: str identifier: str title: str links: List[CCURef] = Field(alias="~links") class CCUDeviceInfo(BaseModel): address: str identifier: str title: str type: str links: List[CCURef] = Field(alias="~links") class CCUChannelInfo(BaseModel): address: Optional[str] = None index: Optional[int] = None title: str description: str = Field(default="") type: Optional[str] = Field(default=None) links: List[CCURef] = Field(alias="~links") class CCUParamInfo(BaseModel): id: str mqttStatusTopic: str links: List[CCURef] = Field(alias="~links") class CCUValue(BaseModel): s: int ts: int v: Any LockData = List[Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]] class CCUJackClient: base_uri: str locks: LockData def __init__(self, base_uri: str, auth: BasicAuth): self.http = ClientSession( base_url=base_uri, auth=auth, raise_for_status=True, connector=TCPConnector(ssl=False), ) self.locks = None async def find_locks(self): logger.debug("Inspecting lock devices present in CCUJack") async with self.http.get("/device") as resp: devices = CCUDeviceList.model_validate(await resp.json()) device_infos = await asyncio.gather( *[ self._inspect_ccu_device(i) for i in devices.links if i.rel == "device" ] ) self.locks = [i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK] async def query_param_value(self, address: str): logger.debug("Querying parameter value from '%s'", address) async with self.http.get(f"/device/{address}/~pv") as resp: return CCUValue.model_validate(await resp.json()) async def set_param_value(self, address: str, value: Any): logger.debug("Writing parameter value '%s' to '%s'", value, address) await self.http.put(f"/device/{address}/~pv", json={"v": value}) async def _inspect_ccu_device( self, device_ref: CCURef ) -> Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]: logger.debug("Inspecting device '%s' (%s)", device_ref.href, device_ref.title) async with self.http.get(f"/device/{device_ref.href}") as resp: device_info = CCUDeviceInfo.model_validate(await resp.json()) if device_info.type != DEVICE_TYPE_LOCK: return device_info, [] channel_infos = await asyncio.gather( *[ self._inspect_ccu_channel(device_ref, i) for i in device_info.links if i.rel == "channel" ] ) return device_info, [ i for i in channel_infos if i[0].type in CHANNEL_TYPES_RELEVANT ] async def _inspect_ccu_channel( self, device_ref: CCURef, channel_ref: CCURef ) -> Tuple[CCUChannelInfo, List[CCUParamInfo]]: logger.debug( "Inspecting device channel '%s/%s'", device_ref.href, channel_ref.href ) async with self.http.get( f"/device/{device_ref.href}/{channel_ref.href}" ) as resp: channel_info = CCUChannelInfo.model_validate(await resp.json()) param_infos = await asyncio.gather( *[ self._inspect_ccu_param(device_ref, channel_ref, i) for i in channel_info.links if i.rel == "parameter" and i.href in PARAM_ADDRESSES_RELEVANT ] ) return channel_info, param_infos async def _inspect_ccu_param( self, device_ref: CCURef, channel_ref: CCURef, param_ref: CCURef ) -> CCUParamInfo: logger.debug( "Inspecting device parameter '%s/%s/%s'", device_ref.href, channel_ref.href, param_ref.href, ) async with self.http.get( f"/device/{device_ref.href}/{channel_ref.href}/{param_ref.href}" ) as resp: return CCUParamInfo.model_validate(await resp.json())