From b75288881283c679ab593f9d3c7f6001e41dae95 Mon Sep 17 00:00:00 2001 From: lilly Date: Sun, 31 May 2026 20:53:14 +0200 Subject: [PATCH] api: fix mqtt reconnect logic errors --- api/src/dooris_api/ccujack.py | 9 ++++++--- api/src/dooris_api/mqtt_client.py | 26 +++++++++++++++++--------- 2 files changed, 23 insertions(+), 12 deletions(-) diff --git a/api/src/dooris_api/ccujack.py b/api/src/dooris_api/ccujack.py index 03d0480..a6b0640 100644 --- a/api/src/dooris_api/ccujack.py +++ b/api/src/dooris_api/ccujack.py @@ -162,15 +162,18 @@ class CCUJackClient: while True: try: - await asyncio.sleep(15 * 60) # 15 minutes logger.info("Running CCUJack cron") - await self.find_locks() - if not self.mqtt.is_connected(): + if not self.mqtt.is_connected: logger.warning("MQTT client was discovered to be disconnected; reconnecting now") await self.mqtt.connect() + await self.find_locks() + except Exception as e: logger.exception(f"Error in CCUJack cron task: {e}") + finally: + await asyncio.sleep(15 * 60) # 15 minutes + async def query_param_value(self, address: str) -> CCUValue: if address in self.param_values: diff --git a/api/src/dooris_api/mqtt_client.py b/api/src/dooris_api/mqtt_client.py index 3df1f19..a090c5f 100644 --- a/api/src/dooris_api/mqtt_client.py +++ b/api/src/dooris_api/mqtt_client.py @@ -128,19 +128,21 @@ class AsyncMqttClient: to_add = topics.difference(self.active_subscriptions) if to_add: - logger.info(f"mqtt client subscribing to topics {', '.join(to_add)}") - self.fut_subscribe = asyncio.get_running_loop().create_future() - self.client.subscribe([(i, qos) for i in to_add]) - await self.fut_subscribe self.active_subscriptions.update(to_add) + if self.is_connected: + logger.info(f"mqtt client subscribing to topics {', '.join(to_add)}") + self.fut_subscribe = asyncio.get_running_loop().create_future() + self.client.subscribe([(i, qos) for i in to_add]) + await self.fut_subscribe to_remove = self.active_subscriptions.difference(topics) if to_remove: - logger.info(f"mqtt client unsubscribing from topics {','.join(to_remove)}") - self.fut_unsubscribe = asyncio.get_running_loop().create_future() - self.client.unsubscribe(list(to_remove)) - await self.fut_unsubscribe self.active_subscriptions.difference_update(to_remove) + if self.is_connected: + logger.info(f"mqtt client unsubscribing from topics {','.join(to_remove)}") + self.fut_unsubscribe = asyncio.get_running_loop().create_future() + self.client.unsubscribe(list(to_remove)) + await self.fut_unsubscribe async def connect(self): server_host, server_port = self.connection_string.rsplit(":", maxsplit=1) @@ -157,13 +159,19 @@ class AsyncMqttClient: # re-establish all supposed mqtt subscriptions if len(self.active_subscriptions) > 0: qos = 1 - await self.client.subscribe((i, qos) for i in self.active_subscriptions) + self.fut_subscribe = asyncio.get_running_loop().create_future() + self.client.subscribe([(i, qos) for i in self.active_subscriptions]) + await self.fut_subscribe async def disconnect(self): + if not self.is_connected: + return + logger.info("Disconnecting mqtt client from broker") self.fut_disconnect = asyncio.get_running_loop().create_future() self.client.disconnect() await self.fut_disconnect + @property def is_connected(self) -> bool: return self.client.is_connected()