# # This whole implementation is adapted from the upstream GitHub example # https://github.com/eclipse-paho/paho.mqtt.python/blob/master/examples/loop_asyncio.py # import logging import asyncio import socket import paho.mqtt.client as mqtt logger = logging.getLogger(__name__) class AsyncLooper: """ Helper class to implement loopgin with asyncio for the underlying mqtt IO """ def __init__(self, loop: asyncio.AbstractEventLoop, client: mqtt.Client): self.loop = loop self.client = client self.client.on_socket_open = self.on_socket_open self.client.on_socket_close = self.on_socket_close self.client.on_socket_register_write = self.on_socket_register_write self.client.on_socket_unregister_write = self.on_socket_unregister_write def on_socket_open(self, client, userdata, sock): logger.debug("mqtt socket opened") def cb(): logger.debug("mqtt socket is readable, calling loop_read()") client.loop_read() self.loop.add_reader(sock, cb) self.task_misc = self.loop.create_task(self.misc_loop()) def on_socket_close(self, client, userdata, sock): logger.debug("mqtt socket closed") self.loop.remove_reader(sock) self.task_misc.cancel() def on_socket_register_write(self, client, userdata, sock): logger.debug("watching mqtt socket for writability") def cb(): logger.debug("mqtt socket ist writable, calling loop_write()") client.loop_write() self.loop.add_writer(sock, cb) def on_socket_unregister_write(self, client, userdata, sock): logger.debug("stopping to watch mqtt socket for writability") self.loop.remove_writer(sock) async def misc_loop(self): logger.debug("mqtt misc_loop() started") while self.client.loop_misc() == mqtt.MQTT_ERR_SUCCESS: try: await asyncio.sleep(1) except asyncio.CancelledError: break logger.debug("mqtt exiting misc_loop()") class AsyncMqttClient: loop: asyncio.AbstractEventLoop def __init__(self): self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="dooris") self.client.on_connect = self.on_connect self.client.on_message = self.on_message self.client.on_disconnect = self.on_disconnect def on_connect(self, client, userdata, flags, reason_code, properties): logger.debug("mqtt client connected") print("client", type(client), client) print("userdata", type(userdata), userdata) print("flags", type(flags), flags) print("reason_code", type(reason_code), reason_code) print("properties", type(properties), properties) def on_disconnect(self, client, userdata, flags, reason_code, properties): logger.debug("mqtt client disconnected") print("client", type(client), client) print("userdata", type(userdata), userdata) print("flags", type(flags), flags) print("reason_code", type(reason_code), reason_code) print("properties", type(properties), properties) def on_message(self, client, userdata, msg): logger.debug("mqtt client got message") print("client", type(client), client) print("userdata", type(userdata), userdata) print("msg", type(msg), msg) async def connect(self): looper = AsyncLooper(asyncio.get_running_loop(), self.client) self.client.connect("mqtt.eclipseprojects.io", 1883, 60) self.client.socket().setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 2048)