api: implement static dooris tokens
This commit is contained in:
parent
63a9485209
commit
186ab662fb
2 changed files with 37 additions and 6 deletions
|
|
@ -78,9 +78,10 @@ def main():
|
|||
required=False,
|
||||
nargs=1,
|
||||
action="append",
|
||||
default=[],
|
||||
default=[i for i in os.environ.get("DOORIS_STATIC_API_TOKENS", "").split(",") if bool(i)],
|
||||
)
|
||||
args = argp.parse_args()
|
||||
print(args.static_api_tokens)
|
||||
|
||||
# setup logging
|
||||
logging.basicConfig(
|
||||
|
|
|
|||
|
|
@ -1,11 +1,12 @@
|
|||
from typing import Annotated, Optional, Tuple
|
||||
from typing import Annotated, Optional
|
||||
import logging
|
||||
from datetime import datetime, UTC, timedelta
|
||||
from fastapi import Request, Depends, Response
|
||||
from fastapi import Request, Depends, Response, Header
|
||||
from fastapi.security import APIKeyHeader
|
||||
from simple_openid_connect.data import TokenSuccessResponse
|
||||
from simple_openid_connect.client import OpenidClient
|
||||
from simple_openid_connect.exceptions import ValidationError
|
||||
|
||||
from dooris_api import app_config
|
||||
from dooris_api import models, exceptions
|
||||
from dooris_api.ccujack import CCUJackClient
|
||||
|
||||
|
|
@ -13,6 +14,9 @@ from dooris_api.ccujack import CCUJackClient
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
api_key_security_scheme = APIKeyHeader(name="Authorization", scheme_name="Static-Token", auto_error=False)
|
||||
|
||||
|
||||
async def get_oidc_client(req: Request) -> OpenidClient:
|
||||
return req.app.extra["oidc_client"]
|
||||
|
||||
|
|
@ -129,13 +133,39 @@ def clear_oidc_auth_state(resp: Response):
|
|||
resp.set_cookie("auth_start_time", "", max_age=0)
|
||||
|
||||
|
||||
def get_logged_in_token_user(req: Request, token: Optional[str]):
|
||||
if not token or not token.startswith("Static-Token "):
|
||||
logger.debug("No valid API-Token was provided")
|
||||
return None
|
||||
|
||||
token = token.removeprefix("Static-Token ")
|
||||
valid_tokens = app_config.get().static_api_tokens
|
||||
|
||||
if any((i == token for i in valid_tokens)):
|
||||
logger.debug("Successfully authenticated a static API-Token")
|
||||
return models.ApiUser(
|
||||
is_anonymous=False,
|
||||
is_ccchh_user=False,
|
||||
is_token_user=True,
|
||||
may_operate_locks=True,
|
||||
username="static-token",
|
||||
guaranteed_session_until=None,
|
||||
raw_id_token=None,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
async def get_api_user(
|
||||
req: Request, resp: Response, oidc_client: OpenidClient
|
||||
req: Request, resp: Response, oidc_client: OpenidClient, token: Annotated[Optional[str], Depends(api_key_security_scheme)] = None
|
||||
) -> models.ApiUser:
|
||||
oidc_user = await get_logged_in_oidc_user(req, resp, oidc_client)
|
||||
# TODO: Implement API user based on static tokens
|
||||
token_user = get_logged_in_token_user(req, token)
|
||||
|
||||
if oidc_user is not None:
|
||||
return oidc_user
|
||||
elif token_user is not None:
|
||||
return token_user
|
||||
else:
|
||||
return models.ApiUser(
|
||||
is_anonymous=True,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue