Building my own MQTT client in Rust

Table of Contents
In my previous post, Power monitoring with Home Assistant, MQTT, Rust and a Raspberry Pi, I explained how I built a Rust program to read data from my power meter and send it to Home Assistant via MQTT.
To keep the fun going, I started looking into the MQTT protocol, and I decided to write my own MQTT client in Rust.
It’s called aimeqtt, and it basically only supports what I need for this project:
- MQTT 3
- QoS 0 (at most once)
- Control packets:
- Connect to a MQTT broker (
CONNECT
,CONNACK
)- Username/password support
- Send messages (
PUBLISH
) - Receive messages (
SUBSCRIBE
,SUBACK
,PUBLISH
) - Keep alive (
PINGREQ
,PINGRESP
)
- Connect to a MQTT broker (
Luckily, it’s not a very complex protocol, I was able to find most of the information I needed in the specs, and some LLMs were able to help me for the rest.
I can’t say I’m very proud of the code, but it works, and it’s been working for a year now without issues. It has no purpose other than being used in my project. I don’t consider it a reference implementation by any means.
Here is how the client is used:
#[tokio::main]
async fn main() {
let broker_host = "127.0.0.1";
let broker_port = 1883;
let mqtt_client_options = client::ClientOptions::new()
.with_broker_host(broker_host.to_string())
.with_broker_port(broker_port)
.with_keep_alive(60)
.with_callback_handler(callback_handler);
let mut mqtt_client = client::new(mqtt_client_options).await;
mqtt_client
.subscribe("a/b".to_string())
.expect("Failed to subscribe to topic.");
loop {
match mqtt_client
.publish("a/b".to_string(), "msg".to_string())
.await
{
Ok(_) => println!("Message published successfully"),
Err(e) => println!("Failed to publish message: {:?}", e),
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
fn callback_handler(payload: String) {
println!("Received message: {payload}");
}
I use the builder pattern for the client options. There is a single callback handler for all received messages, which might not be ideal for all use cases, but it’s enough for me, I actually don’t use it in teleinfo2mqtt-rs
, it’s just for the example.
Crafting the TCP packets #
MQTT is a binary protocol, so we need to craft the content of the TCP packets. I’m not working with binary protocols often, so it was a nice change. To be honest, reading the spec was not enough, so I often resorted to Wireshark to see what the packets I sent looked like.
I usedmosquitto_pub
andmosquitto_sub
as reference implementations to compare and reverse-engineer, in a way, the packets, to understand what I didn’t get from the specs.
MQTT Packets are called control packets. There are a few of them such as CONNECT
, CONNACK
, PUBLISH
, SUBSCRIBE
, SUBACK
, PINGREQ
, PINGRESP
, etc. This is how the communication between the client and the broker is done.
A MQTT control packet is composed of 3 parts:
struct RawPacket {
fixed_header: Vec<u8>,
variable_header: Vec<u8>, // optional, depends on the packet type
payload: Vec<u8>, // optional, depends on the packet type
}
impl RawPacket {
fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(&self.fixed_header);
bytes.extend_from_slice(&self.variable_header);
bytes.extend_from_slice(&self.payload);
bytes
}
}
Crafting the CONNECT
packet has been the most complex. But for example, the PUBLISH
packet is pretty simple. We need to:
- Set the packet type to
PUBLISH
in the fixed header - Set the topic name in the variable header
- Set the message in the payload
- Set the remaining length field in the fixed header (basically the length of the variable header and the payload)
impl Packet {
fn to_raw_packet(&self) -> RawPacket {
let mut packet = RawPacket {
fixed_header: Vec::new(),
variable_header: Vec::new(),
payload: Vec::new(),
};
packet
.fixed_header
.push((self.packet_type.clone() as u8) << 4); // Packet Type
match self.packet_type {
PacketType::PUBLISH => {
packet.variable_header.push(0x00); // Topic name Length MSB
packet
.variable_header
.push(self.topic.as_ref().unwrap().len() as u8); // Topic name Length LSB
packet
.variable_header
.extend_from_slice(self.topic.as_ref().unwrap().as_bytes()); // Topic Name
packet
.payload
.extend_from_slice(self.message.as_ref().unwrap().as_bytes());
[...]
// more sutff, like computing the remaining length field
pub fn craft_publish_packet(topic: String, payload: String) -> Vec<u8> {
Packet::new(PacketType::PUBLISH)
.with_client_id("rust".to_string())
.with_topic(topic)
.with_message(payload)
.to_raw_packet()
.to_bytes()
}
The event loop #
Now, the internals of the client are a bit more complex. I ended up using an event loop pattern to handle the connection to the broker, sending packets, and reading packets from the broker.
Here is a diagram that tries to explain how it works:
The client will send a CONNECT
packet to the broker, then enter an event loop. It will send a PINGREQ
packet every x seconds to keep the connection alive.
Internally, the event loop is run in a tokio::task
, and it uses tokio::sync::mpsc
(multi-producer, single-consumer) to enqueue things to do for the event loop. On each execution of the event loop, it will try to:
- Send a
PINGREQ
packet to the raw packet MPSC channel if half of the defined keep alive duration has passed - Send a
PUBLISH
packet to the raw packet MPSC channel if there is a message in the publish MPSC channel - Write a raw packet to the TCP stream if there is a packet in the raw packet MPSC channel
- Read from the TCP stream if it’s ready to read
- Parse it, and call the callback handler if it’s a
PUBLISH
packet
- Parse it, and call the callback handler if it’s a
Here’s how it looks like in the code. The connection to the broker is done in connect_to_broker()
in the event loop and the handle_connection()
function is the main loop.
impl Client {
async fn event_loop(&mut self) {
let mut publish_channel_receiver = self.publish_channel_receiver.take().unwrap();
loop {
match self.connect_to_broker().await {
Ok(mut stream) => {
if let Err(e) = self
.handle_connection(&mut stream, &mut publish_channel_receiver)
.await
{
event!(Level::ERROR, "Connection error: {}", e);
}
// Retry connection after error
tokio::time::sleep(Duration::from_secs(5)).await;
}
Err(e) => {
event!(Level::ERROR, "Failed to connect to MQTT broker: {}", e);
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
}
}
async fn connect_to_broker(&self) -> std::io::Result<TcpStream> {
let mut stream = TcpStream::connect(self.broker_address.clone()).await?;
event!(Level::DEBUG, "Connected to MQTT broker successfully.");
let connect_packet =
crate::packet::craft_connect_packet(self.username.clone(), self.password.clone());
stream.write_all(&connect_packet).await?;
event!(Level::DEBUG, "CONNECT message sent successfully.");
Ok(stream)
}
async fn handle_connection(
&mut self,
stream: &mut TcpStream,
publish_channel_receiver: &mut mpsc::UnboundedReceiver<PublishRequest>,
) -> std::io::Result<()> {
let mut ping_interval = time::interval(Duration::from_secs(self.keep_alive as u64) / 2);
loop {
tokio::select! {
_ = ping_interval.tick() => {
self.handle_ping().await?;
}
Some(publish_req) = publish_channel_receiver.recv() => {
self.handle_publish(publish_req).await?;
}
Some(packet) = self.raw_tcp_channel_receiver.as_mut().unwrap().recv() => {
self.handle_raw_packet(stream, packet).await?;
}
_ = stream.ready(Interest::READABLE) => {
match self.handle_incoming_packet(stream).await {
Ok(()) => continue,
Err(MqttError::ConnectionClosed) => {
event!(Level::INFO, "Broker closed connection");
return Err(std::io::Error::new(std::io::ErrorKind::ConnectionAborted, "Broker closed connection"));
}
Err(e) => {
event!(Level::ERROR, "Failed to handle incoming packet: {}", e);
return Err(e.into());
}
}
}
}
}
}
I also use tokio::sync::oneshot
channels when publishing a message, to be able to await an error from the event loop:
mqtt_client
.publish("a/b".to_string(), "msg".to_string())
.await
.expect("Failed to send message to client");
// internally:
pub async fn publish(&self, topic: String, payload: String) -> Result<(), ClientError> {
let (resp_tx, resp_rx) = oneshot::channel();
self.publish_channel_sender
.send(PublishRequest {
topic,
payload,
responder: resp_tx,
})
.map_err(|e| {
event!(
Level::ERROR,
"Failed to send PUBLISH request to event loop: {}",
e
);
ClientError::InternalError
})?;
resp_rx
.await
.map_err(|e| {
event!(
Level::ERROR,
"Failed to receive response from event loop: {}",
e
);
ClientError::InternalError
})?
.map_err(|e| {
event!(Level::ERROR, "Failed to publish message: {}", e);
ClientError::InternalError
})?;
Ok(())
}
I did implement basic subscription support, more to test the client with itself than to use for teleinfo2mqtt-rs
. When the event loop reads the TCP stream, it will call the callback handler if it’s a PUBLISH
packet, which runs in a tokio task to not block the event loop.
async fn handle_incoming_packet(&self, stream: &mut TcpStream) -> Result<(), MqttError> {
let mut response = [0; 128];
match stream.try_read(&mut response) {
Ok(0) => Err(MqttError::ConnectionClosed),
Ok(n) => {
if n >= response.len() {
return Err(MqttError::PacketTooLarge);
}
let packet = &response[0..n];
self.process_packet(packet);
Ok(())
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
Err(e) => Err(e.into()),
}
}
fn process_packet(&self, packet: &[u8]) {
let packet_type = packet[0] >> 4;
match crate::packet::PacketType::from(packet_type) {
crate::packet::PacketType::CONNACK => crate::packet::parse_connack_packet(packet),
crate::packet::PacketType::PUBLISH => {
let (_, payload) = crate::packet::parse_publish_packet(packet);
if let Some(callback_handler) = self.callback_handler {
tokio::spawn(async move {
callback_handler(payload);
});
}
}
_ => event!(Level::DEBUG, "Unsupported packet type: {}", packet_type),
}
}
That’s about it! It was really fun, I think the error handling is pretty lacking and I’m not sure about the design, but at home my environment is pretty stable, so it’s been working fine.