api: implement logout endpoint

This commit is contained in:
lilly 2026-05-14 15:27:36 +02:00
commit 07c72c752f
Signed by: lilly
SSH key fingerprint: SHA256:y9T5GFw2A20WVklhetIxG1+kcg/Ce0shnQmbu1LQ37g
3 changed files with 200 additions and 64 deletions

View file

@ -8,11 +8,11 @@ from fastapi import FastAPI, Request, Response, status
from fastapi.responses import RedirectResponse
from contextlib import asynccontextmanager
from simple_openid_connect.client import OpenidClient
from simple_openid_connect.data import TokenSuccessResponse
from simple_openid_connect.data import TokenSuccessResponse, RpInitiatedLogoutRequest
from cachetools import TTLCache
from aiohttp import BasicAuth
from dooris_api import deps, models, exceptions
from dooris_api import deps, models, exceptions, app_config
from dooris_api.ccujack import CCUJackClient
@ -21,54 +21,79 @@ logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
from dooris_api import app_config
app_config = app_config.get()
app_cfg = app_config.get()
root_logger = logging.getLogger("")
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logging.StreamHandler(sys.stderr))
app_logger = logging.getLogger("dooris_api")
app_logger.setLevel(logging.DEBUG)
app.extra["oidc_client"] = OpenidClient.from_issuer_url(
url=app_config.openid_issuer,
authentication_redirect_uri=f"{app_config.base_url}/auth/login-callback",
client_id=app_config.openid_client_id,
client_secret=app_config.openid_client_secret,
scope=app_config.openid_scope,
url=app_cfg.openid_issuer,
authentication_redirect_uri=f"{app_cfg.base_url}/auth/login-callback",
client_id=app_cfg.openid_client_id,
client_secret=app_cfg.openid_client_secret,
scope=app_cfg.openid_scope,
)
app.extra["cache"] = TTLCache(maxsize=64, ttl=30 * 60)
app.extra["ccujack"] = CCUJackClient("https://hmdooris-ccu.ccchh.net:2122", auth=BasicAuth("dooris", os.environ["HMDOORIS_PW"]))
app.extra["ccujack"] = CCUJackClient(
"https://hmdooris-ccu.ccchh.net:2122",
auth=BasicAuth("dooris", os.environ["HMDOORIS_PW"]),
)
yield
app = FastAPI(
title="Dooris", summary="Server to interact with CCCHH doors via locks over CCUJACK over HomeMatic over WiFi", docs_url="/api/docs",
title="Dooris",
summary="Server to interact with CCCHH doors via locks over CCUJACK over HomeMatic over WiFi",
docs_url="/api/docs",
lifespan=lifespan,
)
app.add_exception_handler(exceptions.HttpProblemException, exceptions.problem_exception_handler)
app.add_exception_handler(
exceptions.HttpProblemException, exceptions.problem_exception_handler
)
@app.get("/api/user-info/", name="get-user-info", tags=["auth"], responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}})
async def get_user_info(req: Request, current_user: deps.CurrentUser) -> models.UserStatus:
@app.get(
"/api/user-info/",
name="get-user-info",
tags=["auth"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
)
async def get_user_info(
req: Request, current_user: deps.CurrentUser
) -> models.UserStatus:
if current_user is None:
return models.UserStatus(is_logged_in=False, is_authorized=False, username=None, guaranteed_session_until=None)
return models.UserStatus(
is_logged_in=False,
is_authorized=False,
username=None,
guaranteed_session_until=None,
)
else:
return models.UserStatus(
is_logged_in=True,
is_authorized=True,
guaranteed_session_until=datetime.fromtimestamp(current_user.id_token.exp, UTC),
guaranteed_session_until=datetime.fromtimestamp(
current_user.id_token.exp, UTC
),
username=current_user.id_token.preferred_username,
)
@app.get("/auth/login", tags=["auth"], response_class=RedirectResponse, status_code=302)
async def login_init(req: Request, resp: Response, oidc_client: deps.OpenidClient, next: Optional[str] = "") -> str:
async def login_init(
req: Request,
resp: Response,
oidc_client: deps.OpenidClient,
next: Optional[str] = "",
) -> str:
logger.debug("starting user authentication with upstream identity provider")
# save the ?next url for later redirection if the user requested that
if next:
resp.set_cookie("auth_next", next, max_age=60 * 10, httponly=True, secure=True)
@ -77,49 +102,94 @@ async def login_init(req: Request, resp: Response, oidc_client: deps.OpenidClien
# ref: https://simple-openid-connect.readthedocs.io/en/stable/nonce_and_state.html
state = secrets.token_urlsafe(32)
resp.set_cookie("auth_state", state, max_age=60 * 10, httponly=True, secure=True)
resp.set_cookie("auth_start_time", datetime.now(UTC).timestamp(), max_age=60 * 10, httponly=True, secure=True)
resp.set_cookie(
"auth_start_time",
datetime.now(UTC).timestamp(),
max_age=60 * 10,
httponly=True,
secure=True,
)
# prevent replay attacks by generating and specifying a nonce
# ref: https://simple-openid-connect.readthedocs.io/en/stable/nonce_and_state.html
nonce = secrets.token_urlsafe(32)
# assemble the response
resp.set_cookie("auth_nonce", nonce, max_age=60 * 10, httponly=True, secure=True)
return oidc_client.authorization_code_flow.start_authentication(state=state, nonce=nonce)
return oidc_client.authorization_code_flow.start_authentication(
state=state, nonce=nonce
)
@app.get("/auth/login-callback", tags=["auth"], response_class=RedirectResponse, status_code=302)
@app.get(
"/auth/login-callback",
tags=["auth"],
response_class=RedirectResponse,
status_code=302,
)
async def login_callback(req: Request, resp: Response, oidc_client: deps.OpenidClient):
# check that the user is currently in an authenticating state
# these cookies are set by the login_init() view
if "auth_state" not in req.cookies or "auth_nonce" not in req.cookies or "auth_start_time" not in req.cookies:
logger.debug("user tried to log in but cookies indicate they are in a wrong state; redirecting to error view")
if (
"auth_state" not in req.cookies
or "auth_nonce" not in req.cookies
or "auth_start_time" not in req.cookies
):
logger.debug(
"user tried to log in but cookies indicate they are in a wrong state; redirecting to error view"
)
return "/auth/login-error?error=todo"
# ensure cookies are always cleared in the response
resp.set_cookie("auth_state", "", max_age=0)
resp.set_cookie("auth_start_time", "", max_age=0)
# perform the actual authentication via OIDC token exchange
auth_result = oidc_client.authorization_code_flow.handle_authentication_result(current_url=req.url, state=req.cookies["auth_state"])
auth_result = oidc_client.authorization_code_flow.handle_authentication_result(
current_url=req.url, state=req.cookies["auth_state"]
)
# save the authentication result for later reuse
if isinstance(auth_result, TokenSuccessResponse):
auth_start_time = datetime.fromtimestamp(float(req.cookies["auth_start_time"]), UTC)
deps.persist_auth_state(oidc_client, resp, auth_result, auth_start_time, req.cookies["auth_nonce"])
auth_start_time = datetime.fromtimestamp(
float(req.cookies["auth_start_time"]), UTC
)
deps.persist_auth_state(
oidc_client, resp, auth_result, auth_start_time, req.cookies["auth_nonce"]
)
# redirect the user to the page they wanted to visit
# TODO: respect "auth_next" cookie to redirect the user to a specific url
logger.debug("successfully authenticated user")
return str(req.url_for("get-user-info"))
else:
logger.debug("could not authenticate user because of OIDC error; redirecting to error page with error messages intact")
logger.debug(
"could not authenticate user because of OIDC error; redirecting to error page with error messages intact"
)
return f"/auth/login-error?{req.query_params}"
@app.get("/api/locks/", tags=["locks"], responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}})
async def list_locks(ccujack: deps.CCUJackClient, cache: deps.Cache) -> List[models.Lock]:
@app.get(
"/auth/logout", tags=["auth"], response_class=RedirectResponse, status_code=302
)
async def logout(
resp: Response, oidc_client: deps.OpenidClient, current_user: deps.CurrentUser
):
deps.clear_auth_state(resp)
return oidc_client.initiate_logout(
RpInitiatedLogoutRequest(id_token_hint=current_user.raw_id_token, post_logout_redirect_uri=f"{app_config.get().base_url}/")
)
@app.get(
"/api/locks/",
tags=["locks"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
)
async def list_locks(
ccujack: deps.CCUJackClient, cache: deps.Cache
) -> List[models.Lock]:
# discover locks from ccujack
CACHE_KEY = "ccu-find-locks"
if CACHE_KEY in cache:
@ -134,7 +204,9 @@ async def list_locks(ccujack: deps.CCUJackClient, cache: deps.Cache) -> List[mod
status_data = dict()
for i_channel, channel_params in lock_channels:
for i_param in channel_params:
value = await ccujack.query_param_value(f"{i_lock.address}/{i_channel.index}/{i_param.id}")
value = await ccujack.query_param_value(
f"{i_lock.address}/{i_channel.index}/{i_param.id}"
)
match i_param.id:
case "LOCK_STATE":
match value.v:
@ -167,9 +239,8 @@ async def list_locks(ccujack: deps.CCUJackClient, cache: deps.Cache) -> List[mod
case "UNREACH":
status_data["is_unreachable"] = value.v
result.append(models.Lock(name=i_lock.title, status=models.LockStatus(**status_data)))
result.append(
models.Lock(name=i_lock.title, status=models.LockStatus(**status_data))
)
return result