api: fix mqtt reconnect logic errors
This commit is contained in:
parent
2e1742279d
commit
b752888812
2 changed files with 23 additions and 12 deletions
|
|
@ -162,15 +162,18 @@ class CCUJackClient:
|
|||
while True:
|
||||
try:
|
||||
|
||||
await asyncio.sleep(15 * 60) # 15 minutes
|
||||
logger.info("Running CCUJack cron")
|
||||
await self.find_locks()
|
||||
if not self.mqtt.is_connected():
|
||||
if not self.mqtt.is_connected:
|
||||
logger.warning("MQTT client was discovered to be disconnected; reconnecting now")
|
||||
await self.mqtt.connect()
|
||||
|
||||
await self.find_locks()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception(f"Error in CCUJack cron task: {e}")
|
||||
finally:
|
||||
await asyncio.sleep(15 * 60) # 15 minutes
|
||||
|
||||
|
||||
async def query_param_value(self, address: str) -> CCUValue:
|
||||
if address in self.param_values:
|
||||
|
|
|
|||
|
|
@ -128,19 +128,21 @@ class AsyncMqttClient:
|
|||
|
||||
to_add = topics.difference(self.active_subscriptions)
|
||||
if to_add:
|
||||
self.active_subscriptions.update(to_add)
|
||||
if self.is_connected:
|
||||
logger.info(f"mqtt client subscribing to topics {', '.join(to_add)}")
|
||||
self.fut_subscribe = asyncio.get_running_loop().create_future()
|
||||
self.client.subscribe([(i, qos) for i in to_add])
|
||||
await self.fut_subscribe
|
||||
self.active_subscriptions.update(to_add)
|
||||
|
||||
to_remove = self.active_subscriptions.difference(topics)
|
||||
if to_remove:
|
||||
self.active_subscriptions.difference_update(to_remove)
|
||||
if self.is_connected:
|
||||
logger.info(f"mqtt client unsubscribing from topics {','.join(to_remove)}")
|
||||
self.fut_unsubscribe = asyncio.get_running_loop().create_future()
|
||||
self.client.unsubscribe(list(to_remove))
|
||||
await self.fut_unsubscribe
|
||||
self.active_subscriptions.difference_update(to_remove)
|
||||
|
||||
async def connect(self):
|
||||
server_host, server_port = self.connection_string.rsplit(":", maxsplit=1)
|
||||
|
|
@ -157,13 +159,19 @@ class AsyncMqttClient:
|
|||
# re-establish all supposed mqtt subscriptions
|
||||
if len(self.active_subscriptions) > 0:
|
||||
qos = 1
|
||||
await self.client.subscribe((i, qos) for i in self.active_subscriptions)
|
||||
self.fut_subscribe = asyncio.get_running_loop().create_future()
|
||||
self.client.subscribe([(i, qos) for i in self.active_subscriptions])
|
||||
await self.fut_subscribe
|
||||
|
||||
async def disconnect(self):
|
||||
if not self.is_connected:
|
||||
return
|
||||
|
||||
logger.info("Disconnecting mqtt client from broker")
|
||||
self.fut_disconnect = asyncio.get_running_loop().create_future()
|
||||
self.client.disconnect()
|
||||
await self.fut_disconnect
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return self.client.is_connected()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue