from typing import Optional import logging import secrets from fastapi import FastAPI, Request, Response from fastapi.responses import RedirectResponse from contextlib import asynccontextmanager from simple_openid_connect.client import OpenidClient from simple_openid_connect.data import TokenSuccessResponse from dooris_api.models import UserStatus logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): app.extra["oidc_client"] = OpenidClient.from_issuer_url( url="https://id.hamburg.ccc.de/realms/test/", authentication_redirect_uri="http://localhost:8000/auth/login-callback", client_id="dooris", client_secret="dp9HhnvUhAtKm3pRnxfGA7q8Nwrd1td8", ) yield app = FastAPI( title="Dooris", summary="API for interacting with HomeMatic door locks in CCCHH", docs_url="/api/docs", lifespan=lifespan, ) @app.get("/api/user-info/") async def get_user_info() -> UserStatus: return { "bla": "blub" } @app.get("/auth/login", response_class=RedirectResponse) async def login_init(req: Request, resp: Response, next: Optional[str] = "") -> str: # save the ?next url for later redirection if the user requested that if next: resp.set_cookie("auth_next", next, max_age=60 * 10, httponly=True, secure=True, samesite="strict") # save the login state into a cookie to prevent CSRF attacks # ref: https://simple-openid-connect.readthedocs.io/en/stable/nonce_and_state.html state = secrets.token_urlsafe(32) resp.set_cookie("auth_state", state, max_age=60 * 10, httponly=True, secure=True, samesite="strict") # prevent replay attacks by generating and specifying a nonce # ref: https://simple-openid-connect.readthedocs.io/en/stable/nonce_and_state.html nonce = secrets.token_urlsafe(32) # assemble the response oidc_client = req.app.extra["oidc_client"] # type: OpenidClient resp.set_cookie("auth_nonce", nonce, max_age=60*10, httponly=True, secure=True, samesite="strict") return oidc_client.authorization_code_flow.start_authentication(state=state, nonce=nonce) @app.get("/auth/login-callback") async def login_callback(req: Request, resp: Response): if "auth_state" not in req.cookies or "auth_nonce" not in req.cookies: raise ValueError("user is currently not authentication or the authentication expired. try again") oidc_client = req.app.extra["oidc_client"] # type: OpenidClient auth_result = oidc_client.authorization_code_flow.handle_authentication_result(current_url=req.url, state=req.cookies["auth_state"]) if isinstance(auth_result, TokenSuccessResponse): # extract the ID token once now to validate its authenticity _id_token = oidc_client.decode_id_token(auth_result.id_token, nonce=req.cookies["auth_nonce"]) resp.set_cookie("access_token", auth_result.access_token, httponly=True, secure=True, samesite="strict") resp.set_cookie("id_token", auth_result.id_token, httponly=False, secure=True, samesite="strict") return {"authenticated": True} else: return {"authenticated": False, "error": auth_result}