from typing import Optional import logging import secrets import sys from datetime import datetime, UTC from fastapi import FastAPI, Request, Response from fastapi.responses import RedirectResponse from contextlib import asynccontextmanager from simple_openid_connect.client import OpenidClient from simple_openid_connect.data import TokenSuccessResponse from dooris_api import deps from dooris_api.models import UserStatus, UserInfo logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): root_logger = logging.getLogger("") root_logger.setLevel(logging.INFO) root_logger.addHandler(logging.StreamHandler(sys.stderr)) app_logger = logging.getLogger("dooris_api") app_logger.setLevel(logging.DEBUG) app.extra["oidc_client"] = OpenidClient.from_issuer_url( url="https://id.hamburg.ccc.de/realms/test/", authentication_redirect_uri="http://localhost:8000/auth/login-callback", client_id="dooris", client_secret="dp9HhnvUhAtKm3pRnxfGA7q8Nwrd1td8", ) yield app = FastAPI( title="Dooris", summary="API for interacting with HomeMatic door locks in CCCHH", docs_url="/api/docs", lifespan=lifespan, ) @app.get("/api/user-info/", name="get-user-info") async def get_user_info(req: Request, current_user: deps.CurrentUser) -> UserStatus: if current_user is None: return UserStatus(is_logged_in=False, user_info=None) else: return UserStatus( is_logged_in=True, user_info=UserInfo( username=current_user.id_token.preferred_username, ) ) @app.get("/auth/login", response_class=RedirectResponse, status_code=302) async def login_init(req: Request, resp: Response, oidc_client: deps.OpenidClient, next: Optional[str] = "") -> str: logger.debug("starting user authentication with upstream identity provider") # save the ?next url for later redirection if the user requested that if next: resp.set_cookie("auth_next", next, max_age=60 * 10, httponly=True, secure=True) # save the login state into a cookie to prevent CSRF attacks # ref: https://simple-openid-connect.readthedocs.io/en/stable/nonce_and_state.html state = secrets.token_urlsafe(32) resp.set_cookie("auth_state", state, max_age=60 * 10, httponly=True, secure=True) resp.set_cookie("auth_start_time", datetime.now(UTC).timestamp(), max_age=60 * 10, httponly=True, secure=True) # prevent replay attacks by generating and specifying a nonce # ref: https://simple-openid-connect.readthedocs.io/en/stable/nonce_and_state.html nonce = secrets.token_urlsafe(32) # assemble the response resp.set_cookie("auth_nonce", nonce, max_age=60 * 10, httponly=True, secure=True) return oidc_client.authorization_code_flow.start_authentication(state=state, nonce=nonce) @app.get("/auth/login-callback", response_class=RedirectResponse, status_code=302) async def login_callback(req: Request, resp: Response, oidc_client: deps.OpenidClient): # check that the user is currently in an authenticating state # these cookies are set by the login_init() view if "auth_state" not in req.cookies or "auth_nonce" not in req.cookies or "auth_start_time" not in req.cookies: logger.debug("user tried to log in but cookies indicate they are in a wrong state; redirecting to error view") return "/auth/login-error?error=todo" # ensure cookies are always cleared in the response resp.set_cookie("auth_state", "", max_age=0) resp.set_cookie("auth_start_time", "", max_age=0) # perform the actual authentication via OIDC token exchange auth_result = oidc_client.authorization_code_flow.handle_authentication_result(current_url=req.url, state=req.cookies["auth_state"]) # save the authentication result for later reuse if isinstance(auth_result, TokenSuccessResponse): auth_start_time = datetime.fromtimestamp(float(req.cookies["auth_start_time"]), UTC) deps.persist_auth_state(oidc_client, resp, auth_result, auth_start_time, req.cookies["auth_nonce"]) # redirect the user to the page they wanted to visit # TODO: respect "auth_next" cookie to redirect the user to a specific url logger.debug("successfully authenticated user") return str(req.url_for("get-user-info")) else: logger.debu("could not authenticate user because of OIDC error; redirecting to error page with error messages intact") return f"/auth/login-error?{req.query_params}"