All checks were successful
docker-image / docker (push) Successful in 1m24s
128 lines
4.3 KiB
Python
128 lines
4.3 KiB
Python
from ipaddress import ip_network
|
|
from typing import Callable
|
|
|
|
from BottleOIDC import BottleOIDC
|
|
from bottle import request, abort
|
|
|
|
|
|
class BottleHelpers:
|
|
def __init__(self, auth: BottleOIDC, allowed=None, group=None):
|
|
if allowed is None:
|
|
self.allowed = []
|
|
else:
|
|
self.allowed = [ip_network(a) for a in allowed]
|
|
self.auth = auth
|
|
self.group = group
|
|
|
|
def test_attrs(self, challenge, standard):
|
|
"""Compare list or val the standard."""
|
|
|
|
stand_list = standard if type(standard) is list else [standard]
|
|
chal_list = challenge if type(challenge) is list else [challenge]
|
|
|
|
for chal in chal_list:
|
|
if chal in stand_list:
|
|
return True
|
|
return False
|
|
|
|
def require_login(self, func: Callable) -> Callable:
|
|
"""
|
|
Check if user is logged in and redirect to OIDC provider if not.
|
|
If a group has been defined, check the group membership and abort with a 401 if the user is not a member.
|
|
:param func: decorator target
|
|
:return: decorated function
|
|
"""
|
|
if self.group is not None:
|
|
return self.auth.require_login(self.require_attribute('groups', self.group)(func))
|
|
else:
|
|
return self.auth.require_login(func)
|
|
|
|
def require_attribute(self, attr, value):
|
|
""" Decorator requires specific attribute value. """
|
|
|
|
def _outer_wrapper(f):
|
|
def _wrapper(*args, **kwargs):
|
|
if self.has_attribute(attr, value):
|
|
return f(*args, **kwargs)
|
|
abort(401, 'Not Authorized: Not In Group')
|
|
|
|
_wrapper.__name__ = f.__name__
|
|
return _wrapper
|
|
|
|
return _outer_wrapper
|
|
|
|
def require_authz(self, func: Callable) -> Callable:
|
|
"""
|
|
If a group has been defined, check the group membership; else check if user is logged in. Abort with 401 if not.
|
|
:param func: decorator target
|
|
:return: decorated function
|
|
"""
|
|
if self.group is not None:
|
|
return self.require_attribute('groups', self.group)(func)
|
|
else:
|
|
def _outer_wrapper(f):
|
|
def _wrapper(*args, **kwargs):
|
|
if self.is_logged_in():
|
|
return f(*args, **kwargs)
|
|
abort(401, 'Not Authorized: Not logged in')
|
|
return None
|
|
|
|
_wrapper.__name__ = f.__name__
|
|
return _wrapper
|
|
|
|
return _outer_wrapper(func)
|
|
|
|
def require_sourceip(self, func: Callable) -> Callable:
|
|
"""
|
|
Check that the user is coming from an allowed IP address. Abort with 401 if not.
|
|
:param func: decorator target
|
|
:return: decorated function
|
|
"""
|
|
if self.allowed is None or len(self.allowed) == 0:
|
|
return func
|
|
|
|
def _outer_wrapper(f):
|
|
def _wrapper(*args, **kwargs):
|
|
if self.is_authorized_ip():
|
|
return f(*args, **kwargs)
|
|
abort(401, 'Not Authorized: Wrong IP')
|
|
return None
|
|
|
|
_wrapper.__name__ = f.__name__
|
|
return _wrapper
|
|
|
|
return _outer_wrapper(func)
|
|
|
|
def has_attribute(self, attr, value):
|
|
if attr in self.auth.my_attrs:
|
|
resource = request.session[self.auth.sess_attr][attr]
|
|
return self.test_attrs(resource, value)
|
|
return False
|
|
|
|
def is_logged_in(self):
|
|
return self.auth.my_username is not None
|
|
|
|
def is_in_group(self):
|
|
"""
|
|
Returns True if user is in configured group, or no group has been configured.
|
|
:return: True if user is in configured group
|
|
"""
|
|
if self.group is not None:
|
|
return self.has_attribute('groups', self.group)
|
|
return True
|
|
|
|
def is_authorized_ip(self) -> bool:
|
|
if len(self.allowed) == 0:
|
|
return True
|
|
addr = ip_network(request.remote_addr)
|
|
for allowed in self.allowed:
|
|
if addr.overlaps(allowed):
|
|
return True
|
|
return False
|
|
|
|
def is_authorized(self) -> bool:
|
|
"""
|
|
Returns True if the user is coming from the right IP address, is logged in and belongs to the right group.
|
|
:return:
|
|
"""
|
|
return self.is_authorized_ip() and self.is_logged_in() and self.is_in_group()
|