From 2f991a6b02b3979a7ea661c4026cff516cea1df6 Mon Sep 17 00:00:00 2001
From: lilly
Date: Thu, 14 May 2026 16:23:52 +0200
Subject: [PATCH] api: restructure exception generation to be more ergonomic in
the code
---
api/src/dooris_api/app.py | 32 ++++++----
api/src/dooris_api/ccujack.py | 102 ++++++++++++++++++++-----------
api/src/dooris_api/deps.py | 12 ++--
api/src/dooris_api/exceptions.py | 55 +++++++++++++++--
api/src/dooris_api/models.py | 33 +++-------
5 files changed, 153 insertions(+), 81 deletions(-)
diff --git a/api/src/dooris_api/app.py b/api/src/dooris_api/app.py
index 96af9e4..8fc9dad 100644
--- a/api/src/dooris_api/app.py
+++ b/api/src/dooris_api/app.py
@@ -1,4 +1,4 @@
-from typing import Optional, List, Any
+from typing import Optional, List
import logging
import secrets
import sys
@@ -75,7 +75,7 @@ async def get_user_info(
else:
return models.UserStatus(
is_logged_in=True,
- is_authorized=True,
+ is_authorized=current_user.may_operate_locks,
guaranteed_session_until=datetime.fromtimestamp(
current_user.id_token.exp, UTC
),
@@ -188,9 +188,7 @@ async def logout(
tags=["locks"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
)
-async def list_locks(
- ccujack: deps.CCUJackClient
-) -> List[models.Lock]:
+async def list_locks(ccujack: deps.CCUJackClient) -> List[models.Lock]:
# assemble result objects
result = []
for i_lock, lock_channels in ccujack.locks:
@@ -233,7 +231,11 @@ async def list_locks(
status_data["is_unreachable"] = value.v
result.append(
- models.Lock(id=i_lock.identifier, name=i_lock.title, status=models.LockStatus(**status_data))
+ models.Lock(
+ id=i_lock.identifier,
+ name=i_lock.title,
+ status=models.LockStatus(**status_data),
+ )
)
return result
@@ -244,10 +246,19 @@ async def list_locks(
tags=["locks"],
responses={
status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail},
+ status.HTTP_403_FORBIDDEN: {"models": models.HttpProblemDetail},
status.HTTP_404_NOT_FOUND: {"model": models.HttpProblemDetail},
},
)
-async def operate_lock(req: Request, lock_id: str, requested_op: models.LockOperation, ccujack: deps.CCUJackClient, _current_user: deps.CurrentUser) -> None:
+async def operate_lock(
+ req: Request,
+ lock_id: str,
+ requested_op: models.LockOperation,
+ ccujack: deps.CCUJackClient,
+ current_user: deps.CurrentUser,
+) -> None:
+ if not current_user.may_operate_locks:
+ raise exceptions.HttpProblemException.forbidden_to_operate(req.url)
# TODO: Validate that the user is authorized
# find appropriate lock from ccujack
for i_lock, lock_channels in ccujack.locks:
@@ -268,9 +279,6 @@ async def operate_lock(req: Request, lock_id: str, requested_op: models.LockOper
# write to ccujack
await ccujack.set_param_value(addr, ccujack_value)
return
-
-
+
else:
- raise exceptions.HttpProblemException(
- models.HttpProblemDetail.new_lock_not_found(lock_id, req.url)
- )
+ raise exceptions.HttpProblemException.new_lock_not_found(lock_id, req.url)
diff --git a/api/src/dooris_api/ccujack.py b/api/src/dooris_api/ccujack.py
index 9925aee..5d5eca0 100644
--- a/api/src/dooris_api/ccujack.py
+++ b/api/src/dooris_api/ccujack.py
@@ -9,8 +9,15 @@ logger = logging.getLogger(__name__)
DEVICE_TYPE_LOCK = "HmIP-DLD"
-CHANNEL_TYPES_RELEVANT = [ "DOOR_LOCK_STATE_TRANSMITTER", "MAINTENANCE" ]
-PARAM_ADDRESSES_RELEVANT = [ "ACTIVITY_STATE", "LOCK_STATE", "LOCK_TARGET_LEVEL", "ERROR_JAMMED", "LOW_BAT", "UNREACH" ]
+CHANNEL_TYPES_RELEVANT = ["DOOR_LOCK_STATE_TRANSMITTER", "MAINTENANCE"]
+PARAM_ADDRESSES_RELEVANT = [
+ "ACTIVITY_STATE",
+ "LOCK_STATE",
+ "LOCK_TARGET_LEVEL",
+ "ERROR_JAMMED",
+ "LOW_BAT",
+ "UNREACH",
+]
class CCURef(BaseModel):
@@ -57,12 +64,18 @@ class CCUValue(BaseModel):
LockData = List[Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]]
+
class CCUJackClient:
base_uri: str
locks: LockData
-
+
def __init__(self, base_uri: str, auth: BasicAuth):
- self.http = ClientSession(base_url=base_uri, auth=auth, raise_for_status=True, connector=TCPConnector(ssl=False))
+ self.http = ClientSession(
+ base_url=base_uri,
+ auth=auth,
+ raise_for_status=True,
+ connector=TCPConnector(ssl=False),
+ )
self.locks = None
async def find_locks(self):
@@ -70,18 +83,15 @@ class CCUJackClient:
async with self.http.get("/device") as resp:
devices = CCUDeviceList.model_validate(await resp.json())
- device_infos = await asyncio.gather(*[
- self._inspect_ccu_device(i)
- for i in devices.links
- if i.rel == "device"
- ])
-
- self.locks = [
- i
- for i in device_infos
- if i[0].type == DEVICE_TYPE_LOCK
- ]
-
+ device_infos = await asyncio.gather(
+ *[
+ self._inspect_ccu_device(i)
+ for i in devices.links
+ if i.rel == "device"
+ ]
+ )
+
+ self.locks = [i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK]
async def query_param_value(self, address: str):
logger.debug("Querying parameter value from '%s'", address)
@@ -92,7 +102,9 @@ class CCUJackClient:
logger.debug("Writing parameter value '%s' to '%s'", value, address)
await self.http.put(f"/device/{address}/~pv", json={"v": value})
- async def _inspect_ccu_device(self, device_ref: CCURef) -> Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]:
+ async def _inspect_ccu_device(
+ self, device_ref: CCURef
+ ) -> Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]:
logger.debug("Inspecting device '%s' (%s)", device_ref.href, device_ref.title)
async with self.http.get(f"/device/{device_ref.href}") as resp:
device_info = CCUDeviceInfo.model_validate(await resp.json())
@@ -100,33 +112,49 @@ class CCUJackClient:
if device_info.type != DEVICE_TYPE_LOCK:
return device_info, []
- channel_infos = await asyncio.gather(*[
- self._inspect_ccu_channel(device_ref, i)
- for i in device_info.links
- if i.rel == "channel"
- ])
+ channel_infos = await asyncio.gather(
+ *[
+ self._inspect_ccu_channel(device_ref, i)
+ for i in device_info.links
+ if i.rel == "channel"
+ ]
+ )
return device_info, [
- i
- for i in channel_infos
- if i[0].type in CHANNEL_TYPES_RELEVANT
+ i for i in channel_infos if i[0].type in CHANNEL_TYPES_RELEVANT
]
- async def _inspect_ccu_channel(self, device_ref: CCURef, channel_ref: CCURef) -> Tuple[CCUChannelInfo, List[CCUParamInfo]]:
- logger.debug("Inspecting device channel '%s/%s'", device_ref.href, channel_ref.href)
- async with self.http.get(f"/device/{device_ref.href}/{channel_ref.href}") as resp:
+ async def _inspect_ccu_channel(
+ self, device_ref: CCURef, channel_ref: CCURef
+ ) -> Tuple[CCUChannelInfo, List[CCUParamInfo]]:
+ logger.debug(
+ "Inspecting device channel '%s/%s'", device_ref.href, channel_ref.href
+ )
+ async with self.http.get(
+ f"/device/{device_ref.href}/{channel_ref.href}"
+ ) as resp:
channel_info = CCUChannelInfo.model_validate(await resp.json())
- param_infos = await asyncio.gather(*[
- self._inspect_ccu_param(device_ref, channel_ref, i)
- for i in channel_info.links
- if i.rel == "parameter" and i.href in PARAM_ADDRESSES_RELEVANT
- ])
+ param_infos = await asyncio.gather(
+ *[
+ self._inspect_ccu_param(device_ref, channel_ref, i)
+ for i in channel_info.links
+ if i.rel == "parameter" and i.href in PARAM_ADDRESSES_RELEVANT
+ ]
+ )
return channel_info, param_infos
- async def _inspect_ccu_param(self, device_ref: CCURef, channel_ref: CCURef, param_ref: CCURef) -> CCUParamInfo:
- logger.debug("Inspecting device parameter '%s/%s/%s'", device_ref.href, channel_ref.href, param_ref.href)
- async with self.http.get(f"/device/{device_ref.href}/{channel_ref.href}/{param_ref.href}") as resp:
+ async def _inspect_ccu_param(
+ self, device_ref: CCURef, channel_ref: CCURef, param_ref: CCURef
+ ) -> CCUParamInfo:
+ logger.debug(
+ "Inspecting device parameter '%s/%s/%s'",
+ device_ref.href,
+ channel_ref.href,
+ param_ref.href,
+ )
+ async with self.http.get(
+ f"/device/{device_ref.href}/{channel_ref.href}/{param_ref.href}"
+ ) as resp:
return CCUParamInfo.model_validate(await resp.json())
-
diff --git a/api/src/dooris_api/deps.py b/api/src/dooris_api/deps.py
index ed7625c..c1df75a 100644
--- a/api/src/dooris_api/deps.py
+++ b/api/src/dooris_api/deps.py
@@ -31,7 +31,9 @@ async def get_current_user(
id_token = oidc_client.decode_id_token(
req.cookies["id_token"], nonce=req.cookies["auth_nonce"]
)
- return models.CurrentUser(id_token=id_token, raw_id_token=req.cookies["id_token"])
+ return models.CurrentUser(
+ id_token=id_token, raw_id_token=req.cookies["id_token"]
+ )
# if we have a refresh token, try to get new tokens
if all(i in req.cookies for i in ("refresh_token", "auth_nonce")):
@@ -45,13 +47,13 @@ async def get_current_user(
# return the newly gotten info
id_token = oidc_client.decode_id_token(token_resp.id_token)
- return models.CurrentUser(id_token=id_token, raw_id_token=token_resp.id_token)
+ return models.CurrentUser(
+ id_token=id_token, raw_id_token=token_resp.id_token
+ )
# otherwise we can't meaningfully recover any user information or the user is simply not authenticated
logger.debug("no currently authenticated user")
- raise exceptions.HttpProblemException(
- models.HttpProblemDetail.new_unauthorized(req.url)
- )
+ raise exceptions.HttpProblemException.unauthorized(req.url)
def persist_auth_state(
diff --git a/api/src/dooris_api/exceptions.py b/api/src/dooris_api/exceptions.py
index a372082..c88b07b 100644
--- a/api/src/dooris_api/exceptions.py
+++ b/api/src/dooris_api/exceptions.py
@@ -1,8 +1,11 @@
-from typing import Mapping, Optional
+from typing import Mapping, Optional, Self
from fastapi import HTTPException, Request, Response
from fastapi.responses import JSONResponse
from fastapi.utils import is_body_allowed_for_status_code
from fastapi.encoders import jsonable_encoder
+from fastapi import status
+from pydantic import HttpUrl
+from starlette.datastructures import URL
from dooris_api import models
@@ -11,16 +14,60 @@ class HttpProblemException(HTTPException):
problem: models.HttpProblemDetail
headers: Optional[Mapping[str, str]]
- def __init__(self, problem: models.HttpProblemDetail, headers: Optional[Mapping[str, str]] = None):
+ def __init__(
+ self,
+ problem: models.HttpProblemDetail,
+ headers: Optional[Mapping[str, str]] = None,
+ ):
self.problem = problem
super().__init__(status_code=problem.status, headers=headers)
def __str__(self) -> str:
return str(self.problem)
+ @classmethod
+ def unauthorized(cls, request_uri: str | URL) -> Self:
+ return cls(
+ problem=models.HttpProblemDetail(
+ type=models.HttpProblemType.UNAUTHORIZED,
+ status=status.HTTP_401_UNAUTHORIZED,
+ title="Unauthorized",
+ detail="You tried to access a ressource which requires authentication but you are not authenticated",
+ instance=HttpUrl(str(request_uri)),
+ )
+ )
-async def problem_exception_handler(request: Request, exc: HttpProblemException) -> Response:
+ @classmethod
+ def lock_not_found(cls, requested_lock: str, request_uri: str | URL) -> Self:
+ return cls(
+ problem=models.HttpProblemDetail(
+ type=models.HttpProblemType.LOCK_NOT_FOUND,
+ status=status.HTTP_404_NOT_FOUND,
+ title="Lock not found",
+ detail=f"You tried to interact with lock {requested_lock!r} that is not known to dooris",
+ instance=str(request_uri),
+ )
+ )
+
+ @classmethod
+ def forbidden_to_operate(cls, request_uri: str | URL) -> Self:
+ return cls(
+ problem=models.HttpProblemDetail(
+ type=models.HttpProblemType.FORBIDDEN_TO_OPERATE,
+ status=status.HTTP_403_FORBIDDEN,
+ title="Forbidden to operate locks",
+ detail="You are not allowed to operate locks",
+ instance=str(request_uri),
+ )
+ )
+
+
+async def problem_exception_handler(
+ request: Request, exc: HttpProblemException
+) -> Response:
headers = exc.headers
if not is_body_allowed_for_status_code(exc.problem.status):
return Response(status_code=exc.status_code, headers=headers)
- return JSONResponse(jsonable_encoder(exc.problem), status_code=exc.status_code, headers=exc.headers)
+ return JSONResponse(
+ jsonable_encoder(exc.problem), status_code=exc.status_code, headers=exc.headers
+ )
diff --git a/api/src/dooris_api/models.py b/api/src/dooris_api/models.py
index 5328d4a..cab53f2 100644
--- a/api/src/dooris_api/models.py
+++ b/api/src/dooris_api/models.py
@@ -1,10 +1,8 @@
-from typing import Optional, Self, Literal
+from typing import Optional, Literal, List
from datetime import datetime
from pydantic import BaseModel, HttpUrl
from enum import Enum
from simple_openid_connect.data import IdToken
-from starlette.datastructures import URL
-from fastapi import status
class HttpProblemType(Enum):
@@ -13,6 +11,7 @@ class HttpProblemType(Enum):
"""
UNAUTHORIZED = "type:noc@hamburg.ccc.de,2026:UNAUTHORIZED"
+ FORBIDDEN_TO_OPERATE = "type:noc@hamburg.ccc.de,2026,FORBIDDEN_TO_OPERATE"
LOCK_NOT_FOUND = "type:noc@hamburg.ccc.de,2026:LOCK_NOT_FOUND"
@@ -27,31 +26,19 @@ class HttpProblemDetail(BaseModel):
detail: str
instance: Optional[HttpUrl]
- @classmethod
- def new_unauthorized(cls, request_uri: str | URL) -> Self:
- return cls(
- type=HttpProblemType.UNAUTHORIZED,
- status=status.HTTP_401_UNAUTHORIZED,
- title="Unauthorized",
- detail="You tried to access a ressource which requires authentication but you are not authenticated",
- instance=HttpUrl(str(request_uri)),
- )
-
- @classmethod
- def new_lock_not_found(cls, requested_lock: str, request_uri: str | URL) -> Self:
- return cls(
- type=HttpProblemType.LOCK_NOT_FOUND,
- status=status.HTTP_404_NOT_FOUND,
- title="Lock not found",
- detail=f"You tried to interact with lock {requested_lock!r} that is not known to dooris",
- instance=str(request_uri),
- )
-
class CurrentUser(BaseModel):
id_token: IdToken
raw_id_token: str
+ @property
+ def ccchh_roles(self) -> List[str]:
+ return []
+
+ @property
+ def may_operate_locks(self) -> bool:
+ return True
+
class UserStatus(BaseModel):
is_logged_in: bool