Skip to main content

Building my own MQTT client in Rust

·1418 words·7 mins

In my previous post, Power monitoring with Home Assistant, MQTT, Rust and a Raspberry Pi, I explained how I built a Rust program to read data from my power meter and send it to Home Assistant via MQTT.

To keep the fun going, I started looking into the MQTT protocol, and I decided to write my own MQTT client in Rust.

It’s called aimeqtt, and it basically only supports what I need for this project:

  • MQTT 3
  • QoS 0 (at most once)
  • Control packets:
    • Connect to a MQTT broker (CONNECT, CONNACK)
      • Username/password support
    • Send messages (PUBLISH)
    • Receive messages (SUBSCRIBE, SUBACK, PUBLISH)
    • Keep alive (PINGREQ, PINGRESP)

Luckily, it’s not a very complex protocol, I was able to find most of the information I needed in the specs, and some LLMs were able to help me for the rest.

I can’t say I’m very proud of the code, but it works, and it’s been working for a year now without issues. It has no purpose other than being used in my project. I don’t consider it a reference implementation by any means.

Here is how the client is used:

#[tokio::main]
async fn main() {
    let broker_host = "127.0.0.1";
    let broker_port = 1883;

    let mqtt_client_options = client::ClientOptions::new()
        .with_broker_host(broker_host.to_string())
        .with_broker_port(broker_port)
        .with_keep_alive(60)
        .with_callback_handler(callback_handler);

    let mut mqtt_client = client::new(mqtt_client_options).await;

    mqtt_client
        .subscribe("a/b".to_string())
        .expect("Failed to subscribe to topic.");

    loop {
        match mqtt_client
            .publish("a/b".to_string(), "msg".to_string())
            .await
        {
            Ok(_) => println!("Message published successfully"),
            Err(e) => println!("Failed to publish message: {:?}", e),
        }

        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

fn callback_handler(payload: String) {
    println!("Received message: {payload}");
}

I use the builder pattern for the client options. There is a single callback handler for all received messages, which might not be ideal for all use cases, but it’s enough for me, I actually don’t use it in teleinfo2mqtt-rs, it’s just for the example.

Crafting the TCP packets #

MQTT is a binary protocol, so we need to craft the content of the TCP packets. I’m not working with binary protocols often, so it was a nice change. To be honest, reading the spec was not enough, so I often resorted to Wireshark to see what the packets I sent looked like.

I used mosquitto_pub and mosquitto_sub as reference implementations to compare and reverse-engineer, in a way, the packets, to understand what I didn’t get from the specs.

Wireshark helping debug MQTT control packets

MQTT Packets are called control packets. There are a few of them such as CONNECT, CONNACK, PUBLISH, SUBSCRIBE, SUBACK, PINGREQ, PINGRESP, etc. This is how the communication between the client and the broker is done.

A MQTT control packet is composed of 3 parts:

struct RawPacket {
    fixed_header: Vec<u8>,
    variable_header: Vec<u8>, // optional, depends on the packet type
    payload: Vec<u8>, // optional, depends on the packet type
}

impl RawPacket {
    fn to_bytes(&self) -> Vec<u8> {
        let mut bytes = Vec::new();
        bytes.extend_from_slice(&self.fixed_header);
        bytes.extend_from_slice(&self.variable_header);
        bytes.extend_from_slice(&self.payload);
        bytes
    }
}

Crafting the CONNECT packet has been the most complex. But for example, the PUBLISH packet is pretty simple. We need to:

  • Set the packet type to PUBLISH in the fixed header
  • Set the topic name in the variable header
  • Set the message in the payload
  • Set the remaining length field in the fixed header (basically the length of the variable header and the payload)
impl Packet {
    fn to_raw_packet(&self) -> RawPacket {
        let mut packet = RawPacket {
            fixed_header: Vec::new(),
            variable_header: Vec::new(),
            payload: Vec::new(),
        };

        packet
            .fixed_header
            .push((self.packet_type.clone() as u8) << 4); // Packet Type

        match self.packet_type {
            PacketType::PUBLISH => {
                packet.variable_header.push(0x00); // Topic name Length MSB
                packet
                    .variable_header
                    .push(self.topic.as_ref().unwrap().len() as u8); // Topic name Length LSB
                packet
                    .variable_header
                    .extend_from_slice(self.topic.as_ref().unwrap().as_bytes()); // Topic Name

                packet
                    .payload
                    .extend_from_slice(self.message.as_ref().unwrap().as_bytes());

        [...]
        // more sutff, like computing the remaining length field
pub fn craft_publish_packet(topic: String, payload: String) -> Vec<u8> {
    Packet::new(PacketType::PUBLISH)
        .with_client_id("rust".to_string())
        .with_topic(topic)
        .with_message(payload)
        .to_raw_packet()
        .to_bytes()
}

The event loop #

Now, the internals of the client are a bit more complex. I ended up using an event loop pattern to handle the connection to the broker, sending packets, and reading packets from the broker.

Here is a diagram that tries to explain how it works:

graph TB Start[Start] --> ConnectBroker{Connect to MQTT broker} ConnectBroker -->|Success| SendConnect[Send CONNECT message] ConnectBroker -->|Failure| Retry[Wait 5 seconds and retry] Retry --> ConnectBroker SendConnect --> EventLoop{Event Loop} EventLoop -->|Ping tick for keep alive| SendPingReq[Send PINGREQ packet to TCP channel] EventLoop -->|Received PUBLISH message from channel| SendPublish[Send PUBLISH packet to TCP channel] EventLoop -->|Received packet from TCP channel| SendRawPacket[Send TCP packet to TCP stream] EventLoop -->|TCP stream ready to read| ReadResponse{Read broker response} SendPingReq --> EventLoop SendPublish --> EventLoop SendRawPacket --> EventLoop ReadResponse -->|Success| ParsePacket[Parse packet and handle accordingly] ReadResponse -->|Failure| Reconnect[Break and reconnect] ParsePacket --> EventLoop Reconnect --> ConnectBroker

The client will send a CONNECT packet to the broker, then enter an event loop. It will send a PINGREQ packet every x seconds to keep the connection alive.

Internally, the event loop is run in a tokio::task, and it uses tokio::sync::mpsc (multi-producer, single-consumer) to enqueue things to do for the event loop. On each execution of the event loop, it will try to:

  • Send a PINGREQ packet to the raw packet MPSC channel if half of the defined keep alive duration has passed
  • Send a PUBLISH packet to the raw packet MPSC channel if there is a message in the publish MPSC channel
  • Write a raw packet to the TCP stream if there is a packet in the raw packet MPSC channel
  • Read from the TCP stream if it’s ready to read
    • Parse it, and call the callback handler if it’s a PUBLISH packet

Here’s how it looks like in the code. The connection to the broker is done in connect_to_broker() in the event loop and the handle_connection() function is the main loop.

impl Client {
    async fn event_loop(&mut self) {
        let mut publish_channel_receiver = self.publish_channel_receiver.take().unwrap();

        loop {
            match self.connect_to_broker().await {
                Ok(mut stream) => {
                    if let Err(e) = self
                        .handle_connection(&mut stream, &mut publish_channel_receiver)
                        .await
                    {
                        event!(Level::ERROR, "Connection error: {}", e);
                    }
                    // Retry connection after error
                    tokio::time::sleep(Duration::from_secs(5)).await;
                }
                Err(e) => {
                    event!(Level::ERROR, "Failed to connect to MQTT broker: {}", e);
                    tokio::time::sleep(Duration::from_secs(5)).await;
                }
            }
        }
    }

    async fn connect_to_broker(&self) -> std::io::Result<TcpStream> {
        let mut stream = TcpStream::connect(self.broker_address.clone()).await?;
        event!(Level::DEBUG, "Connected to MQTT broker successfully.");

        let connect_packet =
            crate::packet::craft_connect_packet(self.username.clone(), self.password.clone());

        stream.write_all(&connect_packet).await?;
        event!(Level::DEBUG, "CONNECT message sent successfully.");

        Ok(stream)
    }

    async fn handle_connection(
        &mut self,
        stream: &mut TcpStream,
        publish_channel_receiver: &mut mpsc::UnboundedReceiver<PublishRequest>,
    ) -> std::io::Result<()> {
        let mut ping_interval = time::interval(Duration::from_secs(self.keep_alive as u64) / 2);

        loop {
            tokio::select! {
                _ = ping_interval.tick() => {
                    self.handle_ping().await?;
                }
                Some(publish_req) = publish_channel_receiver.recv() => {
                    self.handle_publish(publish_req).await?;
                }
                Some(packet) = self.raw_tcp_channel_receiver.as_mut().unwrap().recv() => {
                    self.handle_raw_packet(stream, packet).await?;
                }
                _ = stream.ready(Interest::READABLE) => {
                    match self.handle_incoming_packet(stream).await {
                        Ok(()) => continue,
                        Err(MqttError::ConnectionClosed) => {
                            event!(Level::INFO, "Broker closed connection");
                            return Err(std::io::Error::new(std::io::ErrorKind::ConnectionAborted, "Broker closed connection"));
                        }
                        Err(e) => {
                            event!(Level::ERROR, "Failed to handle incoming packet: {}", e);
                            return Err(e.into());
                        }
                    }
                }
            }
        }
    }

I also use tokio::sync::oneshot channels when publishing a message, to be able to await an error from the event loop:

mqtt_client
    .publish("a/b".to_string(), "msg".to_string())
    .await
    .expect("Failed to send message to client");


// internally:

pub async fn publish(&self, topic: String, payload: String) -> Result<(), ClientError> {
    let (resp_tx, resp_rx) = oneshot::channel();

    self.publish_channel_sender
        .send(PublishRequest {
            topic,
            payload,
            responder: resp_tx,
        })
        .map_err(|e| {
            event!(
                Level::ERROR,
                "Failed to send PUBLISH request to event loop: {}",
                e
            );
            ClientError::InternalError
        })?;

    resp_rx
        .await
        .map_err(|e| {
            event!(
                Level::ERROR,
                "Failed to receive response from event loop: {}",
                e
            );
            ClientError::InternalError
        })?
        .map_err(|e| {
            event!(Level::ERROR, "Failed to publish message: {}", e);
            ClientError::InternalError
        })?;

    Ok(())
}

I did implement basic subscription support, more to test the client with itself than to use for teleinfo2mqtt-rs. When the event loop reads the TCP stream, it will call the callback handler if it’s a PUBLISH packet, which runs in a tokio task to not block the event loop.

async fn handle_incoming_packet(&self, stream: &mut TcpStream) -> Result<(), MqttError> {
    let mut response = [0; 128];

    match stream.try_read(&mut response) {
        Ok(0) => Err(MqttError::ConnectionClosed),
        Ok(n) => {
            if n >= response.len() {
                return Err(MqttError::PacketTooLarge);
            }
            let packet = &response[0..n];
            self.process_packet(packet);
            Ok(())
        }
        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
        Err(e) => Err(e.into()),
    }
}

fn process_packet(&self, packet: &[u8]) {
    let packet_type = packet[0] >> 4;
    match crate::packet::PacketType::from(packet_type) {
        crate::packet::PacketType::CONNACK => crate::packet::parse_connack_packet(packet),
        crate::packet::PacketType::PUBLISH => {
            let (_, payload) = crate::packet::parse_publish_packet(packet);
            if let Some(callback_handler) = self.callback_handler {
                tokio::spawn(async move {
                    callback_handler(payload);
                });
            }
        }
        _ => event!(Level::DEBUG, "Unsupported packet type: {}", packet_type),
    }
}

That’s about it! It was really fun, I think the error handling is pretty lacking and I’m not sure about the design, but at home my environment is pretty stable, so it’s been working fine.