diff --git a/api/src/dooris_api/app.py b/api/src/dooris_api/app.py index 7e2a948..54f97f1 100644 --- a/api/src/dooris_api/app.py +++ b/api/src/dooris_api/app.py @@ -1,7 +1,6 @@ from typing import Optional, List, AsyncIterable import logging import secrets -import sys from datetime import datetime, UTC from fastapi import FastAPI, Request, Response, status from fastapi.responses import RedirectResponse @@ -36,8 +35,7 @@ async def lifespan(app: FastAPI): auth=BasicAuth(app_cfg.ccujack_user, app_cfg.ccujack_password), mqtt_conn=app_cfg.ccujack_mqtt, ) - await app.extra["ccujack"].connect_mqtt() - await app.extra["ccujack"].find_locks() + await app.extra["ccujack"].start() yield diff --git a/api/src/dooris_api/ccujack.py b/api/src/dooris_api/ccujack.py index 1132e63..9edbcdb 100644 --- a/api/src/dooris_api/ccujack.py +++ b/api/src/dooris_api/ccujack.py @@ -72,6 +72,7 @@ class CCUJackClient: locks: LockData param_values: Dict[str, Any] task_process_messages: asyncio.Task + task_find_locks: asyncio.Task data_updated: asyncio.Event def __init__(self, base_uri: str, auth: BasicAuth, mqtt_conn: str): @@ -85,10 +86,16 @@ class CCUJackClient: self.locks = None self.param_values = dict() self.task_process_messages = None + self.task_find_locks = None self.data_updated = asyncio.Event() - async def connect_mqtt(self): + async def start(self): await self.mqtt.connect() + await self.find_locks() + + self.task_find_locks = asyncio.get_running_loop().create_task( + self.cron(), name="ccujack-cron" + ) self.task_process_messages = asyncio.get_running_loop().create_task( self.process_mqt_messages(), name="process-mqtt-messages" ) @@ -116,6 +123,7 @@ class CCUJackClient: # save the result new_locks = [i for i in device_infos if i[0].type == DEVICE_TYPE_LOCK] if new_locks != self.locks: + logger.info("Found new locks, updating state") self.locks = new_locks self.data_updated.set() self.data_updated.clear() @@ -148,6 +156,17 @@ class CCUJackClient: finally: self.data_updated.clear() + async def cron(self): + while True: + try: + + await asyncio.sleep(60 * 60) # 1 hour + logger.info("Running CCUJack cron") + await self.find_locks() + + except Exception as e: + logger.exception(f"Error in CCUJack cron task: {e}") + async def query_param_value(self, address: str) -> CCUValue: if address in self.param_values: return self.param_values[address] diff --git a/api/src/dooris_api/mqtt_client.py b/api/src/dooris_api/mqtt_client.py index 1548e66..03db2cc 100644 --- a/api/src/dooris_api/mqtt_client.py +++ b/api/src/dooris_api/mqtt_client.py @@ -3,7 +3,7 @@ # https://github.com/eclipse-paho/paho.mqtt.python/blob/master/examples/loop_asyncio.py # -from typing import Any, List, Set, Iterable +from typing import Any, Set, Iterable import logging import asyncio import socket @@ -132,6 +132,7 @@ class AsyncMqttClient: self.fut_subscribe = asyncio.get_running_loop().create_future() self.client.subscribe([(i, qos) for i in to_add]) await self.fut_subscribe + self.active_subscriptions.update(to_add) to_remove = self.active_subscriptions.difference(topics) if to_remove: @@ -139,6 +140,7 @@ class AsyncMqttClient: self.fut_unsubscribe = asyncio.get_running_loop().create_future() self.client.unsubscribe(list(to_remove)) await self.fut_unsubscribe + self.active_subscriptions.difference_update(to_remove) async def connect(self): server_host, server_port = self.connection_string.rsplit(":", maxsplit=1)