dooris/api/src/dooris_api/app.py

248 lines
8.4 KiB
Python

from typing import Optional, List, Any
import logging
import secrets
import sys
import os
from datetime import datetime, UTC
from fastapi import FastAPI, Request, Response, status
from fastapi.responses import RedirectResponse
from contextlib import asynccontextmanager
from simple_openid_connect.client import OpenidClient
from simple_openid_connect.data import TokenSuccessResponse, RpInitiatedLogoutRequest
from aiohttp import BasicAuth
from dooris_api import deps, models, exceptions, app_config
from dooris_api.ccujack import CCUJackClient
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
app_cfg = app_config.get()
root_logger = logging.getLogger("")
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logging.StreamHandler(sys.stderr))
app_logger = logging.getLogger("dooris_api")
app_logger.setLevel(logging.DEBUG)
app.extra["oidc_client"] = OpenidClient.from_issuer_url(
url=app_cfg.openid_issuer,
authentication_redirect_uri=f"{app_cfg.base_url}/auth/login-callback",
client_id=app_cfg.openid_client_id,
client_secret=app_cfg.openid_client_secret,
scope=app_cfg.openid_scope,
)
app.extra["ccujack"] = CCUJackClient(
"https://hmdooris-ccu.ccchh.net:2122",
auth=BasicAuth("dooris", os.environ["HMDOORIS_PW"]),
)
await app.extra["ccujack"].find_locks()
yield
app = FastAPI(
title="Dooris",
summary="Server to interact with CCCHH doors via locks over CCUJACK over HomeMatic over WiFi",
docs_url="/api/docs",
lifespan=lifespan,
)
app.add_exception_handler(
exceptions.HttpProblemException, exceptions.problem_exception_handler
)
@app.get(
"/api/user-info/",
name="get-user-info",
tags=["auth"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
)
async def get_user_info(
req: Request, current_user: deps.CurrentUser
) -> models.UserStatus:
if current_user is None:
return models.UserStatus(
is_logged_in=False,
is_authorized=False,
username=None,
guaranteed_session_until=None,
)
else:
return models.UserStatus(
is_logged_in=True,
is_authorized=True,
guaranteed_session_until=datetime.fromtimestamp(
current_user.id_token.exp, UTC
),
username=current_user.id_token.preferred_username,
)
@app.get("/auth/login", tags=["auth"], response_class=RedirectResponse, status_code=302)
async def login_init(
req: Request,
resp: Response,
oidc_client: deps.OpenidClient,
next: Optional[str] = "",
) -> str:
logger.debug("starting user authentication with upstream identity provider")
# save the ?next url for later redirection if the user requested that
if next:
resp.set_cookie("auth_next", next, max_age=60 * 10, httponly=True, secure=True)
# save the login state into a cookie to prevent CSRF attacks
# ref: https://simple-openid-connect.readthedocs.io/en/stable/nonce_and_state.html
state = secrets.token_urlsafe(32)
resp.set_cookie("auth_state", state, max_age=60 * 10, httponly=True, secure=True)
resp.set_cookie(
"auth_start_time",
datetime.now(UTC).timestamp(),
max_age=60 * 10,
httponly=True,
secure=True,
)
# prevent replay attacks by generating and specifying a nonce
# ref: https://simple-openid-connect.readthedocs.io/en/stable/nonce_and_state.html
nonce = secrets.token_urlsafe(32)
# assemble the response
resp.set_cookie("auth_nonce", nonce, max_age=60 * 10, httponly=True, secure=True)
return oidc_client.authorization_code_flow.start_authentication(
state=state, nonce=nonce
)
@app.get(
"/auth/login-callback",
tags=["auth"],
response_class=RedirectResponse,
status_code=302,
)
async def login_callback(req: Request, resp: Response, oidc_client: deps.OpenidClient):
# check that the user is currently in an authenticating state
# these cookies are set by the login_init() view
if (
"auth_state" not in req.cookies
or "auth_nonce" not in req.cookies
or "auth_start_time" not in req.cookies
):
logger.debug(
"user tried to log in but cookies indicate they are in a wrong state; redirecting to error view"
)
return "/auth/login-error?error=todo"
# ensure cookies are always cleared in the response
resp.set_cookie("auth_state", "", max_age=0)
resp.set_cookie("auth_start_time", "", max_age=0)
# perform the actual authentication via OIDC token exchange
auth_result = oidc_client.authorization_code_flow.handle_authentication_result(
current_url=req.url, state=req.cookies["auth_state"]
)
# save the authentication result for later reuse
if isinstance(auth_result, TokenSuccessResponse):
auth_start_time = datetime.fromtimestamp(
float(req.cookies["auth_start_time"]), UTC
)
deps.persist_auth_state(
oidc_client, resp, auth_result, auth_start_time, req.cookies["auth_nonce"]
)
# redirect the user to the page they wanted to visit
# TODO: respect "auth_next" cookie to redirect the user to a specific url
logger.debug("successfully authenticated user")
return str(req.url_for("get-user-info"))
else:
logger.debug(
"could not authenticate user because of OIDC error; redirecting to error page with error messages intact"
)
return f"/auth/login-error?{req.query_params}"
@app.get(
"/auth/logout", tags=["auth"], response_class=RedirectResponse, status_code=302
)
async def logout(
resp: Response, oidc_client: deps.OpenidClient, current_user: deps.CurrentUser
):
deps.clear_auth_state(resp)
return oidc_client.initiate_logout(
RpInitiatedLogoutRequest(
id_token_hint=current_user.raw_id_token,
post_logout_redirect_uri=f"{app_config.get().base_url}/",
)
)
@app.get(
"/api/locks/",
tags=["locks"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
)
async def list_locks(
ccujack: deps.CCUJackClient
) -> List[models.Lock]:
# assemble result objects
result = []
for i_lock, lock_channels in ccujack.locks:
status_data = dict()
for i_channel, channel_params in lock_channels:
for i_param in channel_params:
value = await ccujack.query_param_value(
f"{i_lock.address}/{i_channel.index}/{i_param.id}"
)
match i_param.id:
case "LOCK_STATE":
match value.v:
case 0:
status_data["lock_state"] = "unknown"
case 1:
status_data["lock_state"] = "locked"
case 2:
status_data["lock_state"] = "unlocked"
case "ACTIVITY_STATE":
match value.v:
case 1:
status_data["activity_state"] = "unlocking"
case 2:
status_data["activity_state"] = "locking"
case 3:
status_data["activity_state"] = "stable"
case "LOCK_TARGET_LEVEL":
match value.v:
case 0:
status_data["lock_target_level"] = "locked"
case 1:
status_data["lock_target_level"] = "unlocked"
case 2:
status_data["lock_target_level"] = "open"
case "LOW_BAT":
status_data["is_low_battery"] = value.v
case "ERROR_JAMMED":
status_data["is_error_jammed"] = value.v
case "UNREACH":
status_data["is_unreachable"] = value.v
result.append(
models.Lock(name=i_lock.title, status=models.LockStatus(**status_data))
)
return result
@app.patch(
"/api/locks/{name}",
tags=["locks"],
responses={status.HTTP_401_UNAUTHORIZED: {"model": models.HttpProblemDetail}},
)
async def operate_lock(name: str):
pass