api: restructure exception generation to be more ergonomic in the code

This commit is contained in:
lilly 2026-05-14 16:23:52 +02:00
commit 2f991a6b02
Signed by: lilly
SSH key fingerprint: SHA256:y9T5GFw2A20WVklhetIxG1+kcg/Ce0shnQmbu1LQ37g
5 changed files with 153 additions and 81 deletions

View file

@ -1,4 +1,4 @@
from typing import Optional, List, Any from typing import Optional, List
import logging import logging
import secrets import secrets
import sys import sys
@ -75,7 +75,7 @@ async def get_user_info(
else: else:
return models.UserStatus( return models.UserStatus(
is_logged_in=True, is_logged_in=True,
is_authorized=True, is_authorized=current_user.may_operate_locks,
guaranteed_session_until=datetime.fromtimestamp( guaranteed_session_until=datetime.fromtimestamp(
current_user.id_token.exp, UTC current_user.id_token.exp, UTC
), ),
@ -188,9 +188,7 @@ async def logout(
tags=["locks"], tags=["locks"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}}, responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
) )
async def list_locks( async def list_locks(ccujack: deps.CCUJackClient) -> List[models.Lock]:
ccujack: deps.CCUJackClient
) -> List[models.Lock]:
# assemble result objects # assemble result objects
result = [] result = []
for i_lock, lock_channels in ccujack.locks: for i_lock, lock_channels in ccujack.locks:
@ -233,7 +231,11 @@ async def list_locks(
status_data["is_unreachable"] = value.v status_data["is_unreachable"] = value.v
result.append( result.append(
models.Lock(id=i_lock.identifier, name=i_lock.title, status=models.LockStatus(**status_data)) models.Lock(
id=i_lock.identifier,
name=i_lock.title,
status=models.LockStatus(**status_data),
)
) )
return result return result
@ -244,10 +246,19 @@ async def list_locks(
tags=["locks"], tags=["locks"],
responses={ responses={
status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}, status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail},
status.HTTP_403_FORBIDDEN: {"models": models.HttpProblemDetail},
status.HTTP_404_NOT_FOUND: {"model": models.HttpProblemDetail}, status.HTTP_404_NOT_FOUND: {"model": models.HttpProblemDetail},
}, },
) )
async def operate_lock(req: Request, lock_id: str, requested_op: models.LockOperation, ccujack: deps.CCUJackClient, _current_user: deps.CurrentUser) -> None: async def operate_lock(
req: Request,
lock_id: str,
requested_op: models.LockOperation,
ccujack: deps.CCUJackClient,
current_user: deps.CurrentUser,
) -> None:
if not current_user.may_operate_locks:
raise exceptions.HttpProblemException.forbidden_to_operate(req.url)
# TODO: Validate that the user is authorized # TODO: Validate that the user is authorized
# find appropriate lock from ccujack # find appropriate lock from ccujack
for i_lock, lock_channels in ccujack.locks: for i_lock, lock_channels in ccujack.locks:
@ -268,9 +279,6 @@ async def operate_lock(req: Request, lock_id: str, requested_op: models.LockOper
# write to ccujack # write to ccujack
await ccujack.set_param_value(addr, ccujack_value) await ccujack.set_param_value(addr, ccujack_value)
return return
else: else:
raise exceptions.HttpProblemException( raise exceptions.HttpProblemException.new_lock_not_found(lock_id, req.url)
models.HttpProblemDetail.new_lock_not_found(lock_id, req.url)
)

View file

@ -9,8 +9,15 @@ logger = logging.getLogger(__name__)
DEVICE_TYPE_LOCK = "HmIP-DLD" DEVICE_TYPE_LOCK = "HmIP-DLD"
CHANNEL_TYPES_RELEVANT = [ "DOOR_LOCK_STATE_TRANSMITTER", "MAINTENANCE" ] CHANNEL_TYPES_RELEVANT = ["DOOR_LOCK_STATE_TRANSMITTER", "MAINTENANCE"]
PARAM_ADDRESSES_RELEVANT = [ "ACTIVITY_STATE", "LOCK_STATE", "LOCK_TARGET_LEVEL", "ERROR_JAMMED", "LOW_BAT", "UNREACH" ] PARAM_ADDRESSES_RELEVANT = [
"ACTIVITY_STATE",
"LOCK_STATE",
"LOCK_TARGET_LEVEL",
"ERROR_JAMMED",
"LOW_BAT",
"UNREACH",
]
class CCURef(BaseModel): class CCURef(BaseModel):
@ -57,12 +64,18 @@ class CCUValue(BaseModel):
LockData = List[Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]] LockData = List[Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]]
class CCUJackClient: class CCUJackClient:
base_uri: str base_uri: str
locks: LockData locks: LockData
def __init__(self, base_uri: str, auth: BasicAuth): def __init__(self, base_uri: str, auth: BasicAuth):
self.http = ClientSession(base_url=base_uri, auth=auth, raise_for_status=True, connector=TCPConnector(ssl=False)) self.http = ClientSession(
base_url=base_uri,
auth=auth,
raise_for_status=True,
connector=TCPConnector(ssl=False),
)
self.locks = None self.locks = None
async def find_locks(self): async def find_locks(self):
@ -70,18 +83,15 @@ class CCUJackClient:
async with self.http.get("/device") as resp: async with self.http.get("/device") as resp:
devices = CCUDeviceList.model_validate(await resp.json()) devices = CCUDeviceList.model_validate(await resp.json())
device_infos = await asyncio.gather(*[ device_infos = await asyncio.gather(
self._inspect_ccu_device(i) *[
for i in devices.links self._inspect_ccu_device(i)
if i.rel == "device" for i in devices.links
]) if i.rel == "device"
]
self.locks = [ )
i
for i in device_infos self.locks = [i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK]
if i[0].type == DEVICE_TYPE_LOCK
]
async def query_param_value(self, address: str): async def query_param_value(self, address: str):
logger.debug("Querying parameter value from '%s'", address) logger.debug("Querying parameter value from '%s'", address)
@ -92,7 +102,9 @@ class CCUJackClient:
logger.debug("Writing parameter value '%s' to '%s'", value, address) logger.debug("Writing parameter value '%s' to '%s'", value, address)
await self.http.put(f"/device/{address}/~pv", json={"v": value}) await self.http.put(f"/device/{address}/~pv", json={"v": value})
async def _inspect_ccu_device(self, device_ref: CCURef) -> Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]: async def _inspect_ccu_device(
self, device_ref: CCURef
) -> Tuple[CCUDeviceInfo, List[Tuple[CCUChannelInfo, List[CCUParamInfo]]]]:
logger.debug("Inspecting device '%s' (%s)", device_ref.href, device_ref.title) logger.debug("Inspecting device '%s' (%s)", device_ref.href, device_ref.title)
async with self.http.get(f"/device/{device_ref.href}") as resp: async with self.http.get(f"/device/{device_ref.href}") as resp:
device_info = CCUDeviceInfo.model_validate(await resp.json()) device_info = CCUDeviceInfo.model_validate(await resp.json())
@ -100,33 +112,49 @@ class CCUJackClient:
if device_info.type != DEVICE_TYPE_LOCK: if device_info.type != DEVICE_TYPE_LOCK:
return device_info, [] return device_info, []
channel_infos = await asyncio.gather(*[ channel_infos = await asyncio.gather(
self._inspect_ccu_channel(device_ref, i) *[
for i in device_info.links self._inspect_ccu_channel(device_ref, i)
if i.rel == "channel" for i in device_info.links
]) if i.rel == "channel"
]
)
return device_info, [ return device_info, [
i i for i in channel_infos if i[0].type in CHANNEL_TYPES_RELEVANT
for i in channel_infos
if i[0].type in CHANNEL_TYPES_RELEVANT
] ]
async def _inspect_ccu_channel(self, device_ref: CCURef, channel_ref: CCURef) -> Tuple[CCUChannelInfo, List[CCUParamInfo]]: async def _inspect_ccu_channel(
logger.debug("Inspecting device channel '%s/%s'", device_ref.href, channel_ref.href) self, device_ref: CCURef, channel_ref: CCURef
async with self.http.get(f"/device/{device_ref.href}/{channel_ref.href}") as resp: ) -> Tuple[CCUChannelInfo, List[CCUParamInfo]]:
logger.debug(
"Inspecting device channel '%s/%s'", device_ref.href, channel_ref.href
)
async with self.http.get(
f"/device/{device_ref.href}/{channel_ref.href}"
) as resp:
channel_info = CCUChannelInfo.model_validate(await resp.json()) channel_info = CCUChannelInfo.model_validate(await resp.json())
param_infos = await asyncio.gather(*[ param_infos = await asyncio.gather(
self._inspect_ccu_param(device_ref, channel_ref, i) *[
for i in channel_info.links self._inspect_ccu_param(device_ref, channel_ref, i)
if i.rel == "parameter" and i.href in PARAM_ADDRESSES_RELEVANT for i in channel_info.links
]) if i.rel == "parameter" and i.href in PARAM_ADDRESSES_RELEVANT
]
)
return channel_info, param_infos return channel_info, param_infos
async def _inspect_ccu_param(self, device_ref: CCURef, channel_ref: CCURef, param_ref: CCURef) -> CCUParamInfo: async def _inspect_ccu_param(
logger.debug("Inspecting device parameter '%s/%s/%s'", device_ref.href, channel_ref.href, param_ref.href) self, device_ref: CCURef, channel_ref: CCURef, param_ref: CCURef
async with self.http.get(f"/device/{device_ref.href}/{channel_ref.href}/{param_ref.href}") as resp: ) -> CCUParamInfo:
logger.debug(
"Inspecting device parameter '%s/%s/%s'",
device_ref.href,
channel_ref.href,
param_ref.href,
)
async with self.http.get(
f"/device/{device_ref.href}/{channel_ref.href}/{param_ref.href}"
) as resp:
return CCUParamInfo.model_validate(await resp.json()) return CCUParamInfo.model_validate(await resp.json())

View file

@ -31,7 +31,9 @@ async def get_current_user(
id_token = oidc_client.decode_id_token( id_token = oidc_client.decode_id_token(
req.cookies["id_token"], nonce=req.cookies["auth_nonce"] req.cookies["id_token"], nonce=req.cookies["auth_nonce"]
) )
return models.CurrentUser(id_token=id_token, raw_id_token=req.cookies["id_token"]) return models.CurrentUser(
id_token=id_token, raw_id_token=req.cookies["id_token"]
)
# if we have a refresh token, try to get new tokens # if we have a refresh token, try to get new tokens
if all(i in req.cookies for i in ("refresh_token", "auth_nonce")): if all(i in req.cookies for i in ("refresh_token", "auth_nonce")):
@ -45,13 +47,13 @@ async def get_current_user(
# return the newly gotten info # return the newly gotten info
id_token = oidc_client.decode_id_token(token_resp.id_token) id_token = oidc_client.decode_id_token(token_resp.id_token)
return models.CurrentUser(id_token=id_token, raw_id_token=token_resp.id_token) return models.CurrentUser(
id_token=id_token, raw_id_token=token_resp.id_token
)
# otherwise we can't meaningfully recover any user information or the user is simply not authenticated # otherwise we can't meaningfully recover any user information or the user is simply not authenticated
logger.debug("no currently authenticated user") logger.debug("no currently authenticated user")
raise exceptions.HttpProblemException( raise exceptions.HttpProblemException.unauthorized(req.url)
models.HttpProblemDetail.new_unauthorized(req.url)
)
def persist_auth_state( def persist_auth_state(

View file

@ -1,8 +1,11 @@
from typing import Mapping, Optional from typing import Mapping, Optional, Self
from fastapi import HTTPException, Request, Response from fastapi import HTTPException, Request, Response
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from fastapi.utils import is_body_allowed_for_status_code from fastapi.utils import is_body_allowed_for_status_code
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi import status
from pydantic import HttpUrl
from starlette.datastructures import URL
from dooris_api import models from dooris_api import models
@ -11,16 +14,60 @@ class HttpProblemException(HTTPException):
problem: models.HttpProblemDetail problem: models.HttpProblemDetail
headers: Optional[Mapping[str, str]] headers: Optional[Mapping[str, str]]
def __init__(self, problem: models.HttpProblemDetail, headers: Optional[Mapping[str, str]] = None): def __init__(
self,
problem: models.HttpProblemDetail,
headers: Optional[Mapping[str, str]] = None,
):
self.problem = problem self.problem = problem
super().__init__(status_code=problem.status, headers=headers) super().__init__(status_code=problem.status, headers=headers)
def __str__(self) -> str: def __str__(self) -> str:
return str(self.problem) return str(self.problem)
@classmethod
def unauthorized(cls, request_uri: str | URL) -> Self:
return cls(
problem=models.HttpProblemDetail(
type=models.HttpProblemType.UNAUTHORIZED,
status=status.HTTP_401_UNAUTHORIZED,
title="Unauthorized",
detail="You tried to access a ressource which requires authentication but you are not authenticated",
instance=HttpUrl(str(request_uri)),
)
)
async def problem_exception_handler(request: Request, exc: HttpProblemException) -> Response: @classmethod
def lock_not_found(cls, requested_lock: str, request_uri: str | URL) -> Self:
return cls(
problem=models.HttpProblemDetail(
type=models.HttpProblemType.LOCK_NOT_FOUND,
status=status.HTTP_404_NOT_FOUND,
title="Lock not found",
detail=f"You tried to interact with lock {requested_lock!r} that is not known to dooris",
instance=str(request_uri),
)
)
@classmethod
def forbidden_to_operate(cls, request_uri: str | URL) -> Self:
return cls(
problem=models.HttpProblemDetail(
type=models.HttpProblemType.FORBIDDEN_TO_OPERATE,
status=status.HTTP_403_FORBIDDEN,
title="Forbidden to operate locks",
detail="You are not allowed to operate locks",
instance=str(request_uri),
)
)
async def problem_exception_handler(
request: Request, exc: HttpProblemException
) -> Response:
headers = exc.headers headers = exc.headers
if not is_body_allowed_for_status_code(exc.problem.status): if not is_body_allowed_for_status_code(exc.problem.status):
return Response(status_code=exc.status_code, headers=headers) return Response(status_code=exc.status_code, headers=headers)
return JSONResponse(jsonable_encoder(exc.problem), status_code=exc.status_code, headers=exc.headers) return JSONResponse(
jsonable_encoder(exc.problem), status_code=exc.status_code, headers=exc.headers
)

View file

@ -1,10 +1,8 @@
from typing import Optional, Self, Literal from typing import Optional, Literal, List
from datetime import datetime from datetime import datetime
from pydantic import BaseModel, HttpUrl from pydantic import BaseModel, HttpUrl
from enum import Enum from enum import Enum
from simple_openid_connect.data import IdToken from simple_openid_connect.data import IdToken
from starlette.datastructures import URL
from fastapi import status
class HttpProblemType(Enum): class HttpProblemType(Enum):
@ -13,6 +11,7 @@ class HttpProblemType(Enum):
""" """
UNAUTHORIZED = "type:noc@hamburg.ccc.de,2026:UNAUTHORIZED" UNAUTHORIZED = "type:noc@hamburg.ccc.de,2026:UNAUTHORIZED"
FORBIDDEN_TO_OPERATE = "type:noc@hamburg.ccc.de,2026,FORBIDDEN_TO_OPERATE"
LOCK_NOT_FOUND = "type:noc@hamburg.ccc.de,2026:LOCK_NOT_FOUND" LOCK_NOT_FOUND = "type:noc@hamburg.ccc.de,2026:LOCK_NOT_FOUND"
@ -27,31 +26,19 @@ class HttpProblemDetail(BaseModel):
detail: str detail: str
instance: Optional[HttpUrl] instance: Optional[HttpUrl]
@classmethod
def new_unauthorized(cls, request_uri: str | URL) -> Self:
return cls(
type=HttpProblemType.UNAUTHORIZED,
status=status.HTTP_401_UNAUTHORIZED,
title="Unauthorized",
detail="You tried to access a ressource which requires authentication but you are not authenticated",
instance=HttpUrl(str(request_uri)),
)
@classmethod
def new_lock_not_found(cls, requested_lock: str, request_uri: str | URL) -> Self:
return cls(
type=HttpProblemType.LOCK_NOT_FOUND,
status=status.HTTP_404_NOT_FOUND,
title="Lock not found",
detail=f"You tried to interact with lock {requested_lock!r} that is not known to dooris",
instance=str(request_uri),
)
class CurrentUser(BaseModel): class CurrentUser(BaseModel):
id_token: IdToken id_token: IdToken
raw_id_token: str raw_id_token: str
@property
def ccchh_roles(self) -> List[str]:
return []
@property
def may_operate_locks(self) -> bool:
return True
class UserStatus(BaseModel): class UserStatus(BaseModel):
is_logged_in: bool is_logged_in: bool