Write code like this:
async with Client("test.mosquitto.org") as client:
async with client.filtered_messages("floors/+/humidity") as messages:
await client.subscribe("floors/#")
async for message in messages:
print(message.payload.decode())
asyncio-mqtt combines the stability of the time-proven paho-mqtt library with a modern, asyncio-based interface.
- No more callbacks! π
- No more return codes (welcome to the
MqttError
) - Graceful disconnection (forget about
on_unsubscribe
,on_disconnect
, etc.) - Compatible with
async
code - Did we mention no more callbacks?
The whole thing is less than 400 lines of code.
pip install asyncio-mqtt
Let's make the example from before more interesting:
import asyncio
from contextlib import AsyncExitStack, asynccontextmanager
from random import randrange
from asyncio_mqtt import Client, MqttError
async def advanced_example():
# We π context managers. Let's create a stack to help
# us manage them.
async with AsyncExitStack() as stack:
# Keep track of the asyncio tasks that we create, so that
# we can cancel them on exit
tasks = set()
stack.push_async_callback(cancel_tasks, tasks)
# Connect to the MQTT broker
client = Client("test.mosquitto.org")
await stack.enter_async_context(client)
# You can create any number of topic filters
topic_filters = (
"floors/+/humidity",
"floors/rooftop/#"
# π Try to add more filters!
)
for topic_filter in topic_filters:
# Log all messages that matches the filter
manager = client.filtered_messages(topic_filter)
messages = await stack.enter_async_context(manager)
template = f'[topic_filter="{topic_filter}"] {{}}'
task = asyncio.create_task(log_messages(messages, template))
tasks.add(task)
# Messages that doesn't match a filter will get logged here
messages = await stack.enter_async_context(client.unfiltered_messages())
task = asyncio.create_task(log_messages(messages, "[unfiltered] {}"))
tasks.add(task)
# Subscribe to topic(s)
# π€ Note that we subscribe *after* starting the message
# loggers. Otherwise, we may miss retained messages.
await client.subscribe("floors/#")
# Publish a random value to each of these topics
topics = (
"floors/basement/humidity",
"floors/rooftop/humidity",
"floors/rooftop/illuminance",
# π Try to add more topics!
)
task = asyncio.create_task(post_to_topics(client, topics))
tasks.add(task)
# Wait for everything to complete (or fail due to, e.g., network
# errors)
await asyncio.gather(*tasks)
async def post_to_topics(client, topics):
while True:
for topic in topics:
message = randrange(100)
print(f'[topic="{topic}"] Publishing message={message}')
await client.publish(topic, message, qos=1)
await asyncio.sleep(2)
async def log_messages(messages, template):
async for message in messages:
# π€ Note that we assume that the message paylod is an
# UTF8-encoded string (hence the `bytes.decode` call).
print(template.format(message.payload.decode()))
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def main():
# Run the advanced_example indefinitely. Reconnect automatically
# if the connection is lost.
reconnect_interval = 3 # [seconds]
while True:
try:
await advanced_example()
except MqttError as error:
print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
finally:
await asyncio.sleep(reconnect_interval)
asyncio.run(main())
Is asyncio-mqtt not what you are looking for? Try another client:
- hbmqtt - Own protocol implementation. Includes a broker.
- gmqtt - Own protocol implementation. No dependencies.
- aiomqtt - Wrapper around paho-mqtt.
- mqttools - Own protocol implementation. No dependencies.
- aio-mqtt - Own protocol implementation. No dependencies.
This is not an exhaustive list.
- trio-paho-mqtt - Trio-based. Wrapper around paho-mqtt.
Python 3.7 or later.
There is only a single dependency:
Since Python 3.8, the default asyncio event loop is the ProactorEventLoop
. Said loop doesn't support the add_reader
method that is required by asyncio-mqtt. To use asyncio-mqtt, please switch to an event loop that supports the add_reader
method such as the built-in SelectorEventLoop
. E.g:
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
Please refer to the CHANGELOG document. It adheres to the principles of Keep a Changelog.
This project adheres to Semantic Versioning.
Expect API changes until we reach version 1.0.0
. After 1.0.0
, breaking changes will only occur in major release (e.g., 2.0.0
, 3.0.0
, etc.).
Note that the underlying paho-mqtt library is dual-licensed. One of the licenses is the so-called Eclipse Distribution License v1.0. It is almost word-for-word identical to the BSD 3-clause License. The only differences are:
- One use of "COPYRIGHT OWNER" (EDL) instead of "COPYRIGHT HOLDER" (BSD)
- One use of "Eclipse Foundation, Inc." (EDL) instead of "copyright holder" (BSD)