Handle HTTPS to CCU Jack
All checks were successful
docker-image / docker (push) Successful in 10m5s

This commit is contained in:
Stefan Bethke 2025-05-29 14:56:08 +02:00
commit 11e5b6e023
5 changed files with 144 additions and 42 deletions

View file

@ -1,5 +1,63 @@
# hmdooris - Dooris via HomeMatic # hmdooris - Dooris via HomeMatic
## Configuration
All configuration is handled through environment variables.
| Name | Default | Description |
|---------------------------------|-------------------------------------------------------------------------|------------------------------------------------------------------------------------------|
| `HMDOORIS_URL` | `http://localhost:3000` | URL of the application, used to construct links to itself |
| `HMDOORIS_DISCOVERY_URL` | `http://localhost:8080/realms/testing/.well-known/openid-configuration` | OIDC configuration discovery URL |
| `HMDOORIS_CLIENT_ID` | `hmdooris` | OIDC client ID |
| `HMDOORIS_CLIENT_SECRET` | - | ODIC client secret for the confidential flow |
| `IDINVITE_OIDC_SCOPE` | `["openid", "email", "profile"]` | JSON list of OIDC scopes to request. The OIDC IDP will need to send the group attribute. |
| `IDINVITE_OIDC_USER_ATTR` | `email` | The attribute to use as the user ID |
| `HMDOORIS_REQUIRES_GROUP` | - | Set to require users to be a member of this groups. |
| `HMDOORIS_CCUJACK_URL` | `https://raspberrymatic:2122` | URL of the CCU Jack server |
| `HMDOORIS_CCU_CERTIFICATE_PATH` | - | File of a private certificate, or `false` |
| `HMDOORIS_CCUJACK_USERNAME` | - | Username in CCU Jack |
| `HMDOORIS_CCUJACK_PASSWORD` | - | Password in CCU Jack |
### Required Group
If you would like to restrict lock operations to members of a particular group, configure the OIDC client to add group
information to the ID token, and set `HMDOORIS_REQUIRES_GROUP` to the name of the group you would like to use.
Otherwise, all users that can authenticate successfully can operate the locks.
### TLS Certificate Configuration
If you'd like to secure access to CCU Jack via TLS, you either need to install a publically trusted certificate on
RaspberryMatic. If you are using a private certificate, you will need to use `HMDOORIS_CCU_CERTIFICATE_PATH` to point
the HTTP client to a suitable CA certificate. Setting the variable to `false` will disable certificate verification.
Alternatively, you can use plain `http`.
## Managing the CCU certificate
If you want to talk to the RaspberryMatic/CCU-Jack and you are using a self-signed certificate (which is the default),
you will need to supply that certificate to `hmdooris`.
1. Create a self-signed certificate:
```shell
openssl req -x509 -newkey rsa:4096 -sha256 -days 3650 \
-nodes -keyout hmdooris-ccu.ccchh.net.key -out hmdooris-ccu.ccchh.net.crt -subj "/CN=hmdooris-ccu.ccchh.net" \
-addext "subjectAltName=DNS:hmdooris-ccu.ccchh.net"
cat hmdooris-ccu.ccchh.net.crt hmdooris-ccu.ccchh.net.key >hmdooris-ccu.ccchh.net.certkey.pem
```
2. Save the certificate to a file:
```shell
echo | \
openssl s_client -servername hmdooris-ccu.ccchh.net -connect hmdooris-ccu.ccchh.net:2122 | \
openssl x509 -text >self-signed.cert
```
2. Start `hmdooris` and pass the path to the file in the environment variable `HMDOORIS_CCU_CERTIFICATE_PATH`.
If you only want to use http, or your CCU has a public certificate (from for example Let's Encrypt), then you don't need
to do anything.
## Local Development Setup with Docker Compose ## Local Development Setup with Docker Compose

View file

@ -1,6 +1,10 @@
import json import json
import logging
import os
import stat
from json import JSONDecodeError from json import JSONDecodeError
from os import getenv, path from os import getenv, path
from pathlib import Path
class AppConfig: class AppConfig:
@ -12,9 +16,7 @@ class AppConfig:
self.debug = getenv("DEBUG", None) self.debug = getenv("DEBUG", None)
self.staticpath = path.join(self.basepath, "static") self.staticpath = path.join(self.basepath, "static")
self.templatepath = path.join(self.basepath, "templates") self.templatepath = path.join(self.basepath, "templates")
self.app_email_base_uri = getenv('APP_EMAIL_BASE_URI', 'http://localhost:3000') self.url = getenv('HMDOORIS_URL', 'http://localhost:3000')
self.app_keycloak_base_uri = getenv('APP_KEYCLOAK_BASE_URI', 'http://localhost:3000')
self.url = getenv('IDINVITE_URL', 'http://localhost:3000')
self.discovery_url = getenv('HMDOORIS_DISCOVERY_URL', self.discovery_url = getenv('HMDOORIS_DISCOVERY_URL',
'http://localhost:8080/realms/testing/.well-known/openid-configuration') 'http://localhost:8080/realms/testing/.well-known/openid-configuration')
self.client_id = getenv('HMDOORIS_CLIENT_ID', 'hmdooris') self.client_id = getenv('HMDOORIS_CLIENT_ID', 'hmdooris')
@ -22,25 +24,28 @@ class AppConfig:
self.oidc_scope = json.loads(getenv('IDINVITE_OIDC_SCOPE', '["openid", "email", "profile"]')) self.oidc_scope = json.loads(getenv('IDINVITE_OIDC_SCOPE', '["openid", "email", "profile"]'))
self.oidc_user_attr = getenv('IDINVITE_OIDC_USER_ATTR', 'email') self.oidc_user_attr = getenv('IDINVITE_OIDC_USER_ATTR', 'email')
self.requires_group = getenv('HMDOORIS_REQUIRES_GROUP', None) self.requires_group = getenv('HMDOORIS_REQUIRES_GROUP', None)
self.ccujack_url = getenv('HMDOORIS_CCUJACK_URL', None) self.ccujack_url = getenv('HMDOORIS_CCUJACK_URL', 'https://raspberrymatic:2122')
self.ccujack_certificate_path = getenv('HMDOORIS_CCU_CERTIFICATE_PATH', None)
self.ccujack_username = getenv('HMDOORIS_CCUJACK_USERNAME', None) self.ccujack_username = getenv('HMDOORIS_CCUJACK_USERNAME', None)
self.ccujack_password = getenv('HMDOORIS_CCUJACK_PASSWORD', None) self.ccujack_password = getenv('HMDOORIS_CCUJACK_PASSWORD', None)
self.log = logging.getLogger(__name__)
if self.debug is not None and self.debug != "0" and self.debug.lower() != "false": if self.debug is not None and self.debug.lower not in ('0', 'f', 'false'):
self.debug = True self.debug = True
else: else:
self.debug = False self.debug = False
try:
self.ccujack_locks = json.loads(getenv('HMDOORIS_CCUJACK_LOCKS', '[]'))
except json.decoder.JSONDecodeError as e:
raise ValueError(f"Unable to decode HMDOORIS_CCUJACK_LOCKS=\"{getenv('HMDOORIS_CCUJACK_LOCKS', '{}')}\"", e)
if self.client_secret is None or self.client_secret == '': if self.client_secret is None or self.client_secret == '':
raise ValueError('You need to provide HMDOORIS_CLIENT_SECRET') raise ValueError('You need to provide HMDOORIS_CLIENT_SECRET')
if self.ccujack_url is None or self.ccujack_url == '': if self.ccujack_url is None or self.ccujack_url == '':
raise ValueError('You need to provide HMDOORIS_CCUJACK_URL') raise ValueError('You need to provide HMDOORIS_CCUJACK_URL')
if len(self.ccujack_locks) == 0: if self.ccujack_certificate_path is not None:
raise ValueError('You need to provide HMDOORIS_CCUJACK_LOCKS as a JSON object of locks') if self.ccujack_certificate_path.lower() in ('0', 'f', 'false'):
self.ccujack_certificate_path = False
else:
p = Path(self.ccujack_certificate_path)
if not p.is_file():
self.log.warning(f'Unable to read certificate file {self.ccujack_certificate_path}, certificate verification might not work')
self.oidc = { self.oidc = {
'client_id': self.client_id, 'client_id': self.client_id,

55
hmdooris/BottleHelpers.py Normal file
View file

@ -0,0 +1,55 @@
from ipaddress import ip_address, IPv4Address, ip_network
from typing import Callable, List
from BottleOIDC import BottleOIDC
from BottleOIDC.bottle_utils import UnauthorizedError
from bottle import request
class BottleHelpers:
def __init__(self, auth: BottleOIDC, allowed=None, group=None):
if allowed is None:
self.allowed = []
else:
self.allowed = [ip_network(a) for a in allowed]
self.auth = auth
self.group = group
def require_login(self, func: Callable) -> Callable:
if self.group is not None:
return self.auth.require_login(auth.require_attribute('groups', self.group)(func))
else:
return self.auth.require_login(func)
def require_authz(self, func: Callable) -> Callable:
if self.group is not None:
return self.auth.require_attribute('groups', self.group)(func)
else:
def _outer_wrapper(f):
def _wrapper(*args, **kwargs):
if self.auth.my_username is not None:
return f(*args, **kwargs)
return UnauthorizedError('Not Authorized')
_wrapper.__name__ = f.__name__
return _wrapper
return _outer_wrapper(func)
def require_sourceip(self, func: Callable) -> Callable:
if self.allowed is None or len(self.allowed) == 0:
return func
def _outer_wrapper(f):
def _wrapper(*args, **kwargs):
addr = ip_network(request.remote_addr)
for allowed in self.allowed:
if addr.overlaps(allowed):
return f(*args, **kwargs)
return UnauthorizedError('Not Authorized')
_wrapper.__name__ = f.__name__
return _wrapper
return _outer_wrapper(func)

View file

@ -14,6 +14,7 @@ from bottle_websocket import websocket, GeventWebSocketServer
from geventwebsocket.websocket import WebSocket from geventwebsocket.websocket import WebSocket
from hmdooris.AppConfig import AppConfig from hmdooris.AppConfig import AppConfig
from hmdooris.BottleHelpers import BottleHelpers
from hmdooris.ccujack import CCUJack from hmdooris.ccujack import CCUJack
from hmdooris.updatepoller import UpdatePoller from hmdooris.updatepoller import UpdatePoller
from hmdooris.websocketcomm import WebSocketClients from hmdooris.websocketcomm import WebSocketClients
@ -22,7 +23,7 @@ config = AppConfig()
if config.debug: if config.debug:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
ccujack = CCUJack(config.ccujack_url, config.ccujack_username, config.ccujack_password, config.ccujack_locks) ccujack = CCUJack(config.ccujack_url, config.ccujack_certificate_path, config.ccujack_username, config.ccujack_password)
app = Bottle() app = Bottle()
if config.debug: if config.debug:
@ -39,30 +40,9 @@ auth = BottleOIDC(app, config={
}) })
websocket_clients = WebSocketClients() websocket_clients = WebSocketClients()
bottle_helpers = BottleHelpers(auth, config.requires_group)
update_poller = UpdatePoller(websocket_clients, ccujack, 1 if config.debug else 0.1) update_poller = UpdatePoller(websocket_clients, ccujack, 1 if config.debug else 0.1)
def require_login(func: Callable) -> Callable:
if config.requires_group is not None:
return auth.require_login(auth.require_attribute('groups', config.requires_group)(func))
else:
return auth.require_login(func)
def require_authz(func: Callable) -> Callable:
if config.requires_group is not None:
return auth.require_attribute('groups', config.requires_group)(func)
else:
def _outer_wrapper(f):
def _wrapper(*args, **kwargs):
if auth.my_username is not None:
return f(*args, **kwargs)
return UnauthorizedError('Not Authorized')
_wrapper.__name__ = f.__name__
return _wrapper
return _outer_wrapper(func)
@app.route("/static/<filepath>") @app.route("/static/<filepath>")
def server_static(filepath): def server_static(filepath):
@ -71,11 +51,12 @@ def server_static(filepath):
@app.get("/") @app.get("/")
@jinja2_view("home.html.j2") @jinja2_view("home.html.j2")
@bottle_helpers.require_sourceip
def root(): def root():
return {} return {}
@app.get("/operate") @app.get("/operate")
@require_login @bottle_helpers.require_login
@jinja2_view("operate.html.j2") @jinja2_view("operate.html.j2")
def root(): def root():
return {} return {}
@ -103,7 +84,7 @@ def get_api_lock(id):
return update_poller.get_lock(id) return update_poller.get_lock(id)
@app.post('/api/lock/<id>') @app.post('/api/lock/<id>')
@require_authz @bottle_helpers.require_authz
def post_api_lock(id): def post_api_lock(id):
return ccujack.lock_unlock(id, request.json["locking"]) return ccujack.lock_unlock(id, request.json["locking"])

View file

@ -1,5 +1,6 @@
import json import json
import logging import logging
import ssl
from typing import List, Dict from typing import List, Dict
import requests import requests
@ -35,18 +36,20 @@ class CCUJackHmIPDLD:
class CCUJack: class CCUJack:
def __init__(self, url: str, username: str, password: str, locks: Dict): def __init__(self, url: str, certpath: str, username: str, password: str):
self.log = logging.getLogger(__name__) self.log = logging.getLogger(__name__)
self.url = url self.url = url
self.username = username self.kvargs = {}
self.password = password if username is not None and username != '':
self.auth = HTTPBasicAuth(username, password) self.kvargs['auth'] = HTTPBasicAuth(username, password)
if certpath is not None and certpath != '':
self.kvargs['verify'] = certpath
self.locks : Dict[str, CCUJackHmIPDLD] = {} self.locks : Dict[str, CCUJackHmIPDLD] = {}
self.get_all_locks() self.get_all_locks()
def get_json(self, url: str): def get_json(self, url: str):
url = self.url + url url = self.url + url
r = requests.get(url, auth=self.auth) r = requests.get(url, **self.kvargs)
if r.status_code != 200: if r.status_code != 200:
raise AppException(f"Unable to talk to CCU at {url}: {r.status_code}") raise AppException(f"Unable to talk to CCU at {url}: {r.status_code}")
try: try:
@ -57,7 +60,7 @@ class CCUJack:
def put_json(self, url: str, data: dict): def put_json(self, url: str, data: dict):
url = self.url + url url = self.url + url
r = requests.put(url, auth=self.auth, json=data) r = requests.put(url, json=data, **self.kvargs)
if r.status_code != 200: if r.status_code != 200:
raise AppException(f"Unable to talk to CCU at {url}: {r.status_code}") raise AppException(f"Unable to talk to CCU at {url}: {r.status_code}")
try: try: