api: restructure user authentication modelling to support other user backends
All checks were successful
Build Container / Build Container (push) Successful in 1m26s

This commit is contained in:
lilly 2026-05-19 19:28:12 +02:00
commit 63a9485209
Signed by: lilly
SSH key fingerprint: SHA256:y9T5GFw2A20WVklhetIxG1+kcg/Ce0shnQmbu1LQ37g
4 changed files with 99 additions and 58 deletions

View file

@ -1,5 +1,4 @@
import os
import sys
import logging
from contextvars import ContextVar
from argparse import ArgumentParser, Namespace
@ -51,7 +50,9 @@ def main():
argp.add_argument(
"--ccujack-url",
required=False,
default=os.environ.get("DOORIS_CCUJACK_URL", "https://hmdooris-ccu.ccchh.net:2122"),
default=os.environ.get(
"DOORIS_CCUJACK_URL", "https://hmdooris-ccu.ccchh.net:2122"
),
help="The URL under which a CCUJACK instance is hosted that actually operates the locks",
)
argp.add_argument(
@ -70,12 +71,21 @@ def main():
"--ccujack-password",
required="DOORIS_CCUJACK_PASSWORD" not in os.environ,
default=os.environ.get("DOORIS_CCUJACK_PASSWORD", None),
help="The password used to authenticate against the CCUJACK"
help="The password used to authenticate against the CCUJACK",
)
argp.add_argument(
"--static-api-tokens",
required=False,
nargs=1,
action="append",
default=[],
)
args = argp.parse_args()
# setup logging
logging.basicConfig(level=logging.DEBUG, format="[%(levelname)s] %(filename)s: %(message)s")
logging.basicConfig(
level=logging.DEBUG, format="[%(levelname)s] %(filename)s: %(message)s"
)
# setup app
app_config.set(args)
@ -84,8 +94,11 @@ def main():
if args.serve_static:
from fastapi.staticfiles import StaticFiles
app.mount("/", StaticFiles(directory=args.serve_static, html=True), name="static")
app.mount(
"/", StaticFiles(directory=args.serve_static, html=True), name="static"
)
# start webserver
config = uvicorn.Config(app, port=8000, log_level="debug")
server = uvicorn.Server(config)

View file

@ -58,17 +58,9 @@ app.add_exception_handler(
"/api/user-info/",
name="get-user-info",
tags=["auth"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
)
async def get_user_info(
req: Request, current_user: deps.CurrentUser
) -> models.UserStatus:
return models.UserStatus(
is_authorized=current_user.may_operate_locks,
guaranteed_session_until=datetime.fromtimestamp(current_user.id_token.exp, UTC),
username=current_user.id_token.preferred_username,
ccchh_roles=current_user.ccchh_roles,
)
async def get_user_info(req: Request, current_user: deps.ApiUser) -> models.ApiUser:
return current_user
@app.get("/auth/login", tags=["auth"], response_class=RedirectResponse, status_code=302)
@ -143,7 +135,7 @@ async def login_callback(
auth_start_time = datetime.fromtimestamp(
float(req.cookies["auth_start_time"]), UTC
)
deps.persist_auth_state(
deps.persist_oidc_auth_state(
oidc_client, resp, auth_result, auth_start_time, req.cookies["auth_nonce"]
)
logger.debug("successfully authenticated user")
@ -165,9 +157,9 @@ async def login_callback(
"/auth/logout", tags=["auth"], response_class=RedirectResponse, status_code=302
)
async def logout(
resp: Response, oidc_client: deps.OpenidClient, current_user: deps.CurrentUser
resp: Response, oidc_client: deps.OpenidClient, current_user: deps.AuthenticatedUser
) -> str:
deps.clear_auth_state(resp)
deps.clear_oidc_auth_state(resp)
return oidc_client.initiate_logout(
RpInitiatedLogoutRequest(
id_token_hint=current_user.raw_id_token,
@ -244,7 +236,7 @@ async def watch_locks(ccujack: deps.CCUJackClient) -> AsyncIterable[List[models.
while True:
yield await list_locks(ccujack)
await ccujack.data_updated.wait()
await asyncio.sleep(0.1) # debounce multiple mqtt parameter updates
await asyncio.sleep(0.1) # debounce multiple mqtt parameter updates
@app.patch(
@ -261,7 +253,7 @@ async def operate_lock(
lock_id: str,
requested_op: models.LockOperation,
ccujack: deps.CCUJackClient,
current_user: deps.CurrentUser,
current_user: deps.AuthenticatedUser,
) -> None:
if not current_user.may_operate_locks:
raise exceptions.HttpProblemException.forbidden_to_operate(req.url)

View file

@ -1,4 +1,4 @@
from typing import Annotated, Optional
from typing import Annotated, Optional, Tuple
import logging
from datetime import datetime, UTC, timedelta
from fastapi import Request, Depends, Response
@ -20,9 +20,9 @@ async def get_oidc_client(req: Request) -> OpenidClient:
OpenidClient = Annotated[OpenidClient, Depends(get_oidc_client)]
async def get_current_user(
async def get_logged_in_oidc_user(
req: Request, resp: Response, oidc_client: OpenidClient
) -> Optional[models.CurrentUser]:
) -> Optional[models.ApiUser]:
# easiest case: we still have an access token (which is the most fleeting component)
# everything else should still be valid so we can just use it
if all(i in req.cookies for i in ("access_token", "id_token")):
@ -30,11 +30,10 @@ async def get_current_user(
"user is fully authenticated, returning current user from existing id_token"
)
id_token = oidc_client.decode_id_token(
req.cookies["id_token"], nonce=req.cookies.get("auth_nonce", None),
)
return models.CurrentUser(
id_token=id_token, raw_id_token=req.cookies["id_token"]
req.cookies["id_token"],
nonce=req.cookies.get("auth_nonce", None),
)
return models.ApiUser.from_id_token(id_token, req.cookies["id_token"])
# if we have a refresh token, try to get new tokens
elif all(i in req.cookies for i in ("refresh_token",)):
@ -45,24 +44,26 @@ async def get_current_user(
token_resp = oidc_client.exchange_refresh_token(req.cookies["refresh_token"])
if isinstance(token_resp, TokenSuccessResponse):
logger.debug("successfully got new tokens from refresh token")
persist_auth_state(oidc_client, resp, token_resp, auth_start_time, None)
persist_oidc_auth_state(
oidc_client, resp, token_resp, auth_start_time, None
)
# return the newly gotten info
id_token = oidc_client.decode_id_token(token_resp.id_token)
return models.CurrentUser(
id_token=id_token, raw_id_token=token_resp.id_token
)
return models.ApiUser.from_id_token(id_token, token_resp.id_token)
else:
logger.debug("failed to exchange refresh token for new access token: %s", token_resp)
logger.debug(
"failed to exchange refresh token for new access token: %s", token_resp
)
# otherwise we can't meaningfully recover any user information or the user is simply not authenticated
else:
logger.debug("no currently authenticated user")
logger.debug("no currently authenticated oidc user")
raise exceptions.HttpProblemException.unauthorized(req.url)
return None
def persist_auth_state(
def persist_oidc_auth_state(
oidc_client: OpenidClient,
resp: Response,
tokens: TokenSuccessResponse,
@ -118,7 +119,7 @@ def persist_auth_state(
)
def clear_auth_state(resp: Response):
def clear_oidc_auth_state(resp: Response):
resp.set_cookie("access_token", "", max_age=0)
resp.set_cookie("refresh_token", "", max_age=0)
resp.set_cookie("id_token", "", max_age=0)
@ -128,7 +129,39 @@ def clear_auth_state(resp: Response):
resp.set_cookie("auth_start_time", "", max_age=0)
CurrentUser = Annotated[Optional[models.CurrentUser], Depends(get_current_user)]
async def get_api_user(
req: Request, resp: Response, oidc_client: OpenidClient
) -> models.ApiUser:
oidc_user = await get_logged_in_oidc_user(req, resp, oidc_client)
# TODO: Implement API user based on static tokens
if oidc_user is not None:
return oidc_user
else:
return models.ApiUser(
is_anonymous=True,
is_ccchh_user=False,
is_token_user=False,
may_operate_locks=False,
username="anonymous",
guaranteed_session_until=None,
raw_id_token=None,
)
ApiUser = Annotated[models.ApiUser, Depends(get_api_user)]
async def get_authenticated_user(
req: Request, resp: Response, oidc_client: OpenidClient
) -> models.ApiUser:
user = await get_api_user(req, resp, oidc_client)
if user.is_anonymous:
raise exceptions.HttpProblemException.unauthorized(req.url)
else:
return user
AuthenticatedUser = Annotated[models.ApiUser, Depends(get_authenticated_user)]
def get_ccujack(req: Request) -> CCUJackClient:

View file

@ -1,6 +1,6 @@
from typing import Optional, Literal, List
from datetime import datetime
from pydantic import BaseModel, HttpUrl
from typing import Optional, Literal, Self
from datetime import datetime, UTC
from pydantic import BaseModel, HttpUrl, Field
from enum import Enum
from simple_openid_connect.data import IdToken
@ -27,24 +27,27 @@ class HttpProblemDetail(BaseModel):
instance: Optional[HttpUrl]
class CurrentUser(BaseModel):
id_token: IdToken
raw_id_token: str
@property
def ccchh_roles(self) -> List[str]:
return getattr(self.id_token, "ccchh-roles", [])
@property
def may_operate_locks(self) -> bool:
return "intern@" in self.ccchh_roles
class UserStatus(BaseModel):
is_authorized: bool
guaranteed_session_until: datetime
class ApiUser(BaseModel):
is_anonymous: bool
is_ccchh_user: bool
is_token_user: bool
may_operate_locks: bool
username: str
ccchh_roles: List[str]
guaranteed_session_until: Optional[datetime]
raw_id_token: Optional[str] = Field(exclude=True)
@classmethod
def from_id_token(cls, id_token: IdToken, raw_id_token: str) -> Self:
return cls(
is_anonymous=False,
is_ccchh_user=True,
is_token_user=False,
may_operate_locks="intern@" in getattr(id_token, "ccchh-roles", []),
username=id_token.preferred_username,
guaranteed_session_until=datetime.fromtimestamp(id_token.exp, UTC),
raw_id_token=raw_id_token,
)
class LockStatus(BaseModel):