Skip to main content

Power monitoring with Home Assistant, MQTT, Rust and a Raspberry Pi

·4281 words·21 mins

About a year ago, I started to get into Home Assistant. I began automating devices in my home and tracking various sensors, collecting different kinds of data like energy consumption of specific devices, temperature, humidity, etc. I was also interested in the Rust programming language at the time, having done a bit of Rust programming over the past few years. By luck, I found a project that would allow me to combine these two interests.

In France, we have smart meters called Linky that provide detailed data about household electricity consumption. I wanted to get that data into Home Assistant, to be able to track it over time and get some nice graphs.

Extracting data from the Linky meter #

The energy provider has a public HTTP API to get the consumption data, however it is restricted to companies. There are ways to use this API with Home Assistant, such as ha-linky. Still, the data is not real-time, but only updated once per day.

Fortunately, the Linky meter has a serial port that outputs real-time data about the electricity consumption! This protocol is called “Téléinformation Client” (TIC) and is transmitted over the serial port in a simple format. It is made to be read by the customer, so this is not a “hack” or anything, it’s a feature of the meter. Pretty cool!

Off the shelf devices #

The french home-automation community has developed a few devices that read the TIC data and send it over various protocols (MQTT + Wifi, Zigbee). The TIC port of the meter has two pins for data, and a third one for power. The devices can be powered by the TIC port itself, and don’t need an external power source.

Examples of such devices are:

However they retail at around 50€… Can we make something cheaper?

The pitinfo shield #

Some cool guy named Charles made the PiTInfo shield, a little PCB shield that converts the serial data from the meter to GPIO pins. With shipping, it cost me about 13€.

A raspberry pi zero 2 W and the pitinfo shield
The PiTInfo shield and a Raspberry Pi Zero 2 W

Then, we can use any board that has GPIO pins, such as a Raspberry Pi or an ESP32. I wasn’t familiar with the ESP32 yet, so I went with the Raspberry Pi. I chose a Raspberry Pi Zero 2 W, because it’s cheap, tiny, powerful enough and has Wifi: I don’t have an Ethernet port near my meter.

This Raspberry Pi is supposed to be $15, however supply is scarce, so with taxes and shipping, it cost me about 29€.

So the total cost of the project was about 42€… Not that much cheaper 😅. But with a cheaper deal on the RPi or an ESP32, it can be less than 30€, or 20€ even. Anyway, it’s more fun to do it ourselves!

Linky setup
You might not see it, but there’s my terrible soldering job on the PiTInfo shield

teleinfo2mqtt & Home Assistant #

I needed something to get the data from the GPIO pins into Home Assistant. I found a project called teleinfo2mqtt. This is a nodejs program that reads the TIC data from the serial port, parses it, and sends it to an MQTT broker. It also supports Home Assistant MQTT discovery, which is a feature that allows Home Assistant to automatically detect new devices on the MQTT broker and create entities for them.

It worked perfectly! However, NodeJS is heavy on a Raspberry Pi, not so much at runtime but because of node_modules and my poor SD card. Not that I would need to redeploy it often… Anyway, I’m just trying to find another excuse to rewrite it myself. :D

I wanted to rewrite the project in a compiled language, to have a single binary to deploy and very low resource usage. This is perfect for a low-power device like a Raspberry Pi Zero 2 W.

Building my own #

I decided to start from scratch: not simply “rewrite” the project, try to understand

  • how to read the data from the meter
  • how to parse it
  • how to convert it to a format that Home Assistant understands
  • how to send it to the MQTT broker
sequenceDiagram participant tic as Linky TIC participant pitinfo as PITInfo participant rpiz as Raspberry Pi Zero box green This project participant t2m as teleinfo2mqtt-rs end participant mqtt as MQTT Broker participant ha as Home Assistant tic->>pitinfo: IEC 62056-21 serial pitinfo->>rpiz: GPIO serial rpiz->>t2m: Linux serial port t2m->>mqtt: mqtt mqtt->>ha: mqtt integration

My day-to-day language is Go, but I wanted some change and I happened to read a bit about Rust at the time. So I decided to write the project in Rust. It’s not my first Rust project, but it’s not a language I’m very familiar with.

This is not meant to be a tutorial, but I will provide code snippets to illustrate the project. The full code is available on GitHub.

Architecture of the program #

The project is architected around Streams from future-rs.

--- title: Data flow --- graph TD subgraph Main flow buffer[Serial buffer] -->|Reader| A A[SerialPort ASCII] -->|Stream| B[Teleinfo raw frame] B -->|Stream| C[TeleinfoFrame struct] C -->|Stream| task subgraph task[Tokio async task] D[MQTT publisher] -->|MQTT| E[MQTT broker] end end

Understanding the TIC protocol #

Enedis, the entity that manages the electricity grid in France, actually provides the specs of the TIC protocol.

Let’s see how and what we can read from the meter.

Physical layer #

When plugging the PiTInfo shield to the Raspberry Pi’s GPIO pins, the data can be read from /dev/ttyS0 as an UART device.

By default, systemd will start a getty service on the serial port, so it needs to be disabled otherwise we can’t access it:

systemctl disable --now [email protected]

Now, we know the following:

  • The baud rate is 1200 bauds
  • 7 data bits are used to represent each ASCII character

So teleinfo data can be read from the serial port using picocom:

sudo picocom -b 1200 -d 7 /dev/ttyS0

In Rust, I first used the serialport crate to read the serial port, but then switched to rppal as I also needed to control the GPIO pin for the LED (more on that later), and rppal does both (it’s made for the Raspberry Pi).

pub fn serial_stream(serial_device: String) -> impl Stream<Item = Vec<u8>> {
    let baud_rate = 1200;
    let data_bits = 7;
    let parity = Parity::None;
    let stop_bits = 1;

    let mut uart_device = Uart::with_path(serial_device, baud_rate, parity, data_bits, stop_bits)
        .expect("Failed to open UART");
    uart_device
        .set_read_mode(1, Duration::default())
        .expect("Failed to set read mode");

    event!(Level::INFO, ?uart_device, "Opened UART device");

    let mut buffer = [0u8; 1];
    stream! {
        loop {
            match uart_device.read(&mut buffer) {
                Ok(bytes_read) => {
                    if bytes_read > 0 {
                        yield buffer.to_vec();
                    }
                }
                Err(e) => {
                    event!(Level::ERROR, "Error reading from UART: {}", e);
                    break;
                }
            }
        }
    }
}

The TIC sends frames continuously. Each frame is separated by a delay of 16.7 ms < t < 33.4 ms.

  • Each frame is composed of multiple data sets
  • Each data set is composed of a label, a value and a checksum
  • Each frame starts with “Start TeXt” STX (0x02) and ends with “End TeXt” ETX (0x03)
STX<data set><data set>...<data set>ETX

In the code, I read the serial steam, and I split it into frames. I use a buffer to store the data until I have a full frame, then I yield it to another stream.

pub fn ascii_to_frames<S: Stream<Item = Vec<u8>>>(ascii_stream: S) -> impl Stream<Item = String> {
    let mut ascii_stream = Box::pin(ascii_stream);
    stream! {
        let mut teleinfo_buffer: Vec<Vec<u8>> = Vec::new();
        while let Some(value) = ascii_stream.next().await {
            teleinfo_buffer.push(value.clone());

            // A frame start with 0x02 and end with 0x03
            if value == vec![0x03] {
                if teleinfo_buffer.contains(&vec![0x02]) { // Only yield if we have a full frame
                    yield teleinfo_buffer.
                        iter().
                        flat_map(|v| v.iter()).
                        map(|b| *b as char).
                        collect::<String>();
                }


                // We reset the buffer for the next frame
                teleinfo_buffer = Vec::new();
            }
        }
    }
}

A data set is structured like this:

\n<Label>\t<Value>\t<Checksum>\r

The checksum is (S1 & 0x3F) + 0x20, considering S1 the sum of the ASCII values from the label (included) to the checksum (excluded)

A full frame on my Linky meter looks like this:

0x02
ADCO 012345678912 B
OPTARIF BASE 0
ISOUSC 30 9
BASE 002815235 %
PTEC TH.. $
IINST 001 X
IMAX 090 H
PAPP 00260 )
HHPHC A ,
MOTDETAT 000000 B0x03

It’s a bit cryptic like this, but it just a bunch of key-value pairs. I represented it in the code as a struct:

// A teleinfo frame is a set of data sets
// Each data set is a key-value pair + a checksum
pub struct TeleinfoFrame {
    pub adco: String,     // Adresse du compteur (Meter address)
    pub optarif: String,  // Option tarifaire (Price plan)
    pub isousc: String,   // Intensité souscrite, en A (Subscribed max current, in A)
    pub base: String,     // Index option base, en Wh (Cumulative energy consumption, in Wh)
    pub ptec: String,     // Période tarifaire en cours (Price plan period)
    pub iinst: String,    // Intensité instantanée, en A (Instantaneous current, in A)
    pub imax: String,     // Intensité maximale appelée, en A (Max current, in A)
    pub papp: String,     // Puissance apparente, en VA (Apparent power in VA, rounded to nearest 10)
    pub hhphc: String,    // Horaire Heures Pleines Heures Creuses (Full/Off-peak hours)
    pub motdetat: String, // Mot d'état du compteur (Meter status)
}

So the most interesting values are BASE (the cumulative energy consumption in Wh) and PAPP the current power usage in VA.

For the parsing, I treated each line as key-value pairs. I have not handled the checksum yet, but I haven’t seen any invalid data so far.

pub fn parse_teleinfo(teleinfo: &str) -> Result<TeleinfoFrame, Box<dyn Error>> {
    let mut teleinfo_map = HashMap::new();
    for line in teleinfo.lines() {
        let mut split = line.split_whitespace();

        let key = split.next();
        if [0x02, 0x03].contains(&key.as_bytes()[0]) {
            // Skip start and end of frame characters
            continue;
        }
        let value = split.next().ok_or("Missing value")?;
        teleinfo_map.insert(key, value);
    }
    Ok(TeleinfoFrame {
        adco: teleinfo_map.get("ADCO").ok_or("Missing ADCO")?.to_string(),
        optarif: teleinfo_map.get("OPTARIF").ok_or("Missing OPTARIF")?.to_string(),
        isousc: teleinfo_map.get("ISOUSC").ok_or("Missing ISOUSC")?.to_string(),
        base: teleinfo_map.get("BASE").ok_or("Missing BASE")?.to_string(),
        ptec: teleinfo_map.get("PTEC").ok_or("Missing PTEC")?.to_string(),
        iinst: teleinfo_map.get("IINST").ok_or("Missing IINST")?.to_string(),
        imax: teleinfo_map.get("IMAX").ok_or("Missing IMAX")?.to_string(),
        papp: teleinfo_map.get("PAPP").ok_or("Missing PAPP")?.to_string(),
        hhphc: teleinfo_map.get("HHPHC").ok_or("Missing HHPHC")?.to_string(),
        motdetat: teleinfo_map.get("MOTDETAT").ok_or("Missing MOTDETAT")?.to_string(),
    })
}

Sending the data to the MQTT broker #

After receiving the parsed frame in another stream, I simply publish it to the MQTT broker.

while let Some(value) = teleinfo_parsed_frames_stream.next().await {
    match mqtt::publish_teleinfo(&client, &value).await {
        Ok(_) => {}
        Err(e) => {
            event!(Level::ERROR, error = ?e, "Error while publishing teleinfo frame to MQTT");
        }
    }
}

The data is sent as JSON, I didn’t bother with Serde, I just hijacked the fmt::Display trait and used the write! macro. It works :D

impl fmt::Display for TeleinfoFrame {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(
            f,
            r#"{{
"ADCO": {{"raw": "{}", "value": {}}},
"OPTARIF": {{"raw": "{}", "value": "{}"}},
"ISOUSC": {{"raw": "{}", "value": {}}},
"BASE": {{"raw": "{}", "value": {}}},
"PTEC": {{"raw": "{}", "value": "{}"}},
"IINST": {{"raw": "{}", "value": {}}},
"IMAX": {{"raw": "{}", "value": {}}},
"PAPP": {{"raw": "{}", "value": {}}},
"HHPHC": {{"raw": "{}", "value": "{}"}}
}}"#,
            self.adco,
            self.adco.parse::<i64>().unwrap(),
            self.optarif,
            self.optarif,
            self.isousc,
            self.isousc.parse::<i32>().unwrap(),
            self.base,
            self.base.parse::<i64>().unwrap(),
            self.ptec,
            &self.ptec[0..2],
            self.iinst,
            self.iinst.parse::<i32>().unwrap(),
            self.imax,
            self.imax.parse::<i32>().unwrap(),
            self.papp,
            self.papp.parse::<i32>().unwrap(),
            self.hhphc,
            self.hhphc
        )
    }
}

As you can see, the format is pretty simple. The MQTT topic is just teleinfo/<the meter's ID>.

pub async fn publish_teleinfo(client: &Client, value: &TeleinfoFrame) -> Result<(), ClientError> {
    event!(Level::INFO, "Publishing teleinfo frame to MQTT");

    client
        .publish(format!("teleinfo/{}", value.adco), value.to_string())
        .await
}

And now I have the live data in Home Assistant 🎉

The device in Home Assistant
The device in Home Assistant
The kWh sensor graph in Home Assistant
The kWh sensor graph in Home Assistant
The VA sensor graph in Home Assistant
The VA sensor graph in Home Assistant

To debug the MQTT messages, I used the MQTT Explorer add-on in Home Assistant. It’s a great tool to see the messages on the broker:

About every second, a new message is sent, and MQTT Explorer can show the diff with the previous one

It’s also possible to subscribe to the topic with mosquitto_sub:

âžœ  ~ mosquitto_sub -L mqtt://homeassistant:[email protected]:1883/teleinfo/#
{
"ADCO": {"raw": "012345678912", "value": 32164208647},
"OPTARIF": {"raw": "BASE", "value": "BASE"},
"ISOUSC": {"raw": "30", "value": 30},
"BASE": {"raw": "004368115", "value": 4368115},
"PTEC": {"raw": "TH..", "value": "TH"},
"IINST": {"raw": "003", "value": 3},
"IMAX": {"raw": "090", "value": 90},
"PAPP": {"raw": "00610", "value": 610},
"HHPHC": {"raw": "A", "value": "A"}
}
^C

Running the program #

To deploy the binary, I simply scp’d an optimized build to the Raspberry Pi. Then I run it as a systemd service. The config is done via environment variables:

# /etc/systemd/system/teleinfo2mqtt.service

[Unit]
Description=teleinfo2mqtt
Wants=network-online.target
After=network-online.target

[Service]
Type=simple
ExecStart=/home/pi/teleinfo2mqtt-rs
Environment=MQTT_HOST=192.168.1.42
Environment=MQTT_USER=homeassistant
Environment=MQTT_PASS=pass
Environment=SERIAL_PORT=/dev/ttyS0
Restart=on-failure
RestartSec=5s
StandardOutput=null

[Install]
WantedBy=multi-user.target

It uses less than 1% of CPU and ~2 MB of RAM on my Raspberry Pi:

pi@raspberrypiz ~> ps -p $(pgrep teleinfo2mqtt-r) -o %cpu,%mem,rss
%CPU %MEM   RSS
 0.4  0.4  1792

Home Assistant MQTT discovery #

How can home assistant know about this new device and create entities for it? It’s called MQTT discovery. For each entity, a configuration message has to to be sent to a specific topic, and Home Assistant will create the entities from that.

I was able to piggyback on my existing entities, as teleinfo2mqtt (the JS one) already created them for me.

Here’s how teleinfo2mqtt does it.

For example, the consumption sensor is created like this, on the homeassistant/sensor/teleinfo/<meter_id>_base/config topic:

{
    "unique_id": "BASE",
    "name": "BASE",
    "state_topic": "teleinfo/012345678912",
    "state_class": "total_increasing",
    "device_class": "energy",
    "value_template": "{% if 'BASE' in value_json %}{{ value_json.BASE.value }}{% endif %}",
    "unit_of_measurement": "Wh",
    "device": {
        "identifiers": [
            "012345678912"
        ],
        "manufacturer": "Enedis",
        "model": "teleinfo_012345678912",
        "name": "Teleinfo 012345678912"
    }
}

I haven’t implemented this yet, but it would be needed if someone else wanted to use this project, or if I moved and got a new Linky meter.

There are two LEDs on the PiTInfo shield. I couldn’t control the blue one, that is always on (and pretty bright). I think it’s on when the shield is successfully connect to the meter. However the green LED is controllable via GPIO pin 4.

On the command line, it is controllable with raspi-gpio or pinctrl:

pinctrl set 4 op # Set pin 4 as output
pinctrl set 4 dl # Set pin 4 to low (off)
pinctrl set 4 dh # Set pin 4 to high (on)

To know if the program is running correctly, I made the LED blink when data is successfully sent to the MQTT broker. I set it on for 10ms, then off.

while let Some(value) = teleinfo_parsed_frames_stream.next().await {
    match mqtt::publish_teleinfo(&client, &value).await {
        Ok(_) => {
            let mut pin = Gpio::new()
                .unwrap()
                .get(GPIO_PITINFO_GREEN_LED)
                .unwrap()
                .into_output();

            pin.set_high();
            thread::sleep(Duration::from_millis(10));
            pin.set_low();
        }
        Err(e) => {
            event!(Level::ERROR, error = ?e, "Error while publishing teleinfo frame to MQTT");
        }
    }
}

Here is how it looks like:

Basic monitoring :D

Bonus: writing my own MQTT client in Rust #

To keep the fun going, I started looking into the MQTT protocol, and I decided to write my own MQTT client in Rust.

It’s called aimeqtt, and it basically only supports what I need for this project:

  • MQTT 3
  • QoS 0 (at most once)
  • Control packets:
    • Connect to a MQTT broker (CONNECT, CONNACK)
      • Username/password support
    • Send messages (PUBLISH)
    • Receive messages (SUBSCRIBE, SUBACK, PUBLISH)
    • Keep alive (PINGREQ, PINGRESP)

Luckily, it’s not a very complex protocol, I was able to find most of the information I needed in the specs, and some LLMs were able to help me for the rest.

I can’t say I’m very proud of the code, but it works, and it’s been working for a year now without issues. It has no purpose other than being used in this project. I don’t consider it a reference implementation by any means.

Here is how the client is used:

#[tokio::main]
async fn main() {
    let broker_host = "127.0.0.1";
    let broker_port = 1883;

    let mqtt_client_options = client::ClientOptions::new()
        .with_broker_host(broker_host.to_string())
        .with_broker_port(broker_port)
        .with_keep_alive(60)
        .with_callback_handler(callback_handler);

    let mut mqtt_client = client::new(mqtt_client_options).await;

    mqtt_client
        .subscribe("a/b".to_string())
        .expect("Failed to subscribe to topic.");

    loop {
        match mqtt_client
            .publish("a/b".to_string(), "msg".to_string())
            .await
        {
            Ok(_) => println!("Message published successfully"),
            Err(e) => println!("Failed to publish message: {:?}", e),
        }

        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

fn callback_handler(payload: String) {
    println!("Received message: {payload}");
}

I use the builder pattern for the client options. There is a single callback handler for all received messages, which might not be ideal for all use cases, but it’s enough for me, I actually don’t use it in teleinfo2mqtt-rs, it’s just for the example.

Crafting the TCP packets #

MQTT is a binary protocol, so we need to craft the content of the TCP packets. I’m not working with binary protocols often, so it was a nice change. To be honest, reading the spec was not enough, so I often resorted to Wireshark to see what the packets I sent looked like.

I used mosquitto_pub and mosquitto_sub as reference implementations to compare and reverse-engineer, in a way, the packets, to understand what I didn’t get from the specs.

Wireshark helping debug MQTT control packets

MQTT Packets are called control packets. There are a few of them such as CONNECT, CONNACK, PUBLISH, SUBSCRIBE, SUBACK, PINGREQ, PINGRESP, etc. This is how the communication between the client and the broker is done.

A MQTT control packet is composed of 3 parts:

struct RawPacket {
    fixed_header: Vec<u8>,
    variable_header: Vec<u8>, // optional, depends on the packet type
    payload: Vec<u8>, // optional, depends on the packet type
}

impl RawPacket {
    fn to_bytes(&self) -> Vec<u8> {
        let mut bytes = Vec::new();
        bytes.extend_from_slice(&self.fixed_header);
        bytes.extend_from_slice(&self.variable_header);
        bytes.extend_from_slice(&self.payload);
        bytes
    }
}

Crafting the CONNECT packet has been the most complex. But for example, the PUBLISH packet is pretty simple. We need to:

  • Set the packet type to PUBLISH in the fixed header
  • Set the topic name in the variable header
  • Set the message in the payload
  • Set the remaining length field in the fixed header (basically the length of the variable header and the payload)
impl Packet {
    fn to_raw_packet(&self) -> RawPacket {
        let mut packet = RawPacket {
            fixed_header: Vec::new(),
            variable_header: Vec::new(),
            payload: Vec::new(),
        };

        packet
            .fixed_header
            .push((self.packet_type.clone() as u8) << 4); // Packet Type

        match self.packet_type {
            PacketType::PUBLISH => {
                packet.variable_header.push(0x00); // Topic name Length MSB
                packet
                    .variable_header
                    .push(self.topic.as_ref().unwrap().len() as u8); // Topic name Length LSB
                packet
                    .variable_header
                    .extend_from_slice(self.topic.as_ref().unwrap().as_bytes()); // Topic Name

                packet
                    .payload
                    .extend_from_slice(self.message.as_ref().unwrap().as_bytes());

        [...]
        // more sutff, like computing the remaining length field
pub fn craft_publish_packet(topic: String, payload: String) -> Vec<u8> {
    Packet::new(PacketType::PUBLISH)
        .with_client_id("rust".to_string())
        .with_topic(topic)
        .with_message(payload)
        .to_raw_packet()
        .to_bytes()
}

The event loop #

Now, the internals of the client are a bit more complex. I ended up using an event loop pattern to handle the connection to the broker, sending packets, and reading packets from the broker.

Here is a diagram that tries to explain how it works:

graph TB Start[Start] --> ConnectBroker{Connect to MQTT broker} ConnectBroker -->|Success| SendConnect[Send CONNECT message] ConnectBroker -->|Failure| Retry[Wait 5 seconds and retry] Retry --> ConnectBroker SendConnect --> EventLoop{Event Loop} EventLoop -->|Ping tick for keep alive| SendPingReq[Send PINGREQ packet to TCP channel] EventLoop -->|Received PUBLISH message from channel| SendPublish[Send PUBLISH packet to TCP channel] EventLoop -->|Received packet from TCP channel| SendRawPacket[Send TCP packet to TCP stream] EventLoop -->|TCP stream ready to read| ReadResponse{Read broker response} SendPingReq --> EventLoop SendPublish --> EventLoop SendRawPacket --> EventLoop ReadResponse -->|Success| ParsePacket[Parse packet and handle accordingly] ReadResponse -->|Failure| Reconnect[Break and reconnect] ParsePacket --> EventLoop Reconnect --> ConnectBroker

The client will send a CONNECT packet to the broker, then enter an event loop. It will send a PINGREQ packet every x seconds to keep the connection alive.

Internally, the event loop is run in a tokio::task, and it uses tokio::sync::mpsc (multi-producer, single-consumer) to enqueue things to do for the event loop. On each execution of the event loop, it will try to:

  • Send a PINGREQ packet to the raw packet MPSC channel if half of the defined keep alive has passed
  • Send a PUBLISH packet to the raw packet MPSC channel if there is a message in the publish MPSC channel
  • Write a raw packet to the TCP stream if there is a packet in the raw packet MPSC channel
  • Read from the TCP stream if it’s ready to read
    • Parse it, and call the callback handler if it’s a PUBLISH packet

Here’s how it looks like in the code. The connection to the broker is done in connect_to_broker() in the event loop and the handle_connection() function is the main loop.

impl Client {
    async fn event_loop(&mut self) {
        let mut publish_channel_receiver = self.publish_channel_receiver.take().unwrap();

        loop {
            match self.connect_to_broker().await {
                Ok(mut stream) => {
                    if let Err(e) = self
                        .handle_connection(&mut stream, &mut publish_channel_receiver)
                        .await
                    {
                        event!(Level::ERROR, "Connection error: {}", e);
                    }
                    // Retry connection after error
                    tokio::time::sleep(Duration::from_secs(5)).await;
                }
                Err(e) => {
                    event!(Level::ERROR, "Failed to connect to MQTT broker: {}", e);
                    tokio::time::sleep(Duration::from_secs(5)).await;
                }
            }
        }
    }

    async fn connect_to_broker(&self) -> std::io::Result<TcpStream> {
        let mut stream = TcpStream::connect(self.broker_address.clone()).await?;
        event!(Level::DEBUG, "Connected to MQTT broker successfully.");

        let connect_packet =
            crate::packet::craft_connect_packet(self.username.clone(), self.password.clone());

        stream.write_all(&connect_packet).await?;
        event!(Level::DEBUG, "CONNECT message sent successfully.");

        Ok(stream)
    }

    async fn handle_connection(
        &mut self,
        stream: &mut TcpStream,
        publish_channel_receiver: &mut mpsc::UnboundedReceiver<PublishRequest>,
    ) -> std::io::Result<()> {
        let mut ping_interval = time::interval(Duration::from_secs(self.keep_alive as u64) / 2);

        loop {
            tokio::select! {
                _ = ping_interval.tick() => {
                    self.handle_ping().await?;
                }
                Some(publish_req) = publish_channel_receiver.recv() => {
                    self.handle_publish(publish_req).await?;
                }
                Some(packet) = self.raw_tcp_channel_receiver.as_mut().unwrap().recv() => {
                    self.handle_raw_packet(stream, packet).await?;
                }
                _ = stream.ready(Interest::READABLE) => {
                    match self.handle_incoming_packet(stream).await {
                        Ok(()) => continue,
                        Err(MqttError::ConnectionClosed) => {
                            event!(Level::INFO, "Broker closed connection");
                            return Err(std::io::Error::new(std::io::ErrorKind::ConnectionAborted, "Broker closed connection"));
                        }
                        Err(e) => {
                            event!(Level::ERROR, "Failed to handle incoming packet: {}", e);
                            return Err(e.into());
                        }
                    }
                }
            }
        }
    }

I also use tokio::sync::oneshot channels when publishing a message, to be able to await an error from the event loop:

mqtt_client
    .publish("a/b".to_string(), "msg".to_string())
    .await
    .expect("Failed to send message to client");


// internally:

pub async fn publish(&self, topic: String, payload: String) -> Result<(), ClientError> {
    let (resp_tx, resp_rx) = oneshot::channel();

    self.publish_channel_sender
        .send(PublishRequest {
            topic,
            payload,
            responder: resp_tx,
        })
        .map_err(|e| {
            event!(
                Level::ERROR,
                "Failed to send PUBLISH request to event loop: {}",
                e
            );
            ClientError::InternalError
        })?;

    resp_rx
        .await
        .map_err(|e| {
            event!(
                Level::ERROR,
                "Failed to receive response from event loop: {}",
                e
            );
            ClientError::InternalError
        })?
        .map_err(|e| {
            event!(Level::ERROR, "Failed to publish message: {}", e);
            ClientError::InternalError
        })?;

    Ok(())
}

I did implement basic subscription support, more to test the client with itself than to use for teleinfo2mqtt-rs. When the event loop reads the TCP stream, it will call the callback handler if it’s a PUBLISH packet, which runs in a tokio task to not block the event loop.

async fn handle_incoming_packet(&self, stream: &mut TcpStream) -> Result<(), MqttError> {
    let mut response = [0; 128];

    match stream.try_read(&mut response) {
        Ok(0) => Err(MqttError::ConnectionClosed),
        Ok(n) => {
            if n >= response.len() {
                return Err(MqttError::PacketTooLarge);
            }
            let packet = &response[0..n];
            self.process_packet(packet);
            Ok(())
        }
        Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(()),
        Err(e) => Err(e.into()),
    }
}

fn process_packet(&self, packet: &[u8]) {
    let packet_type = packet[0] >> 4;
    match crate::packet::PacketType::from(packet_type) {
        crate::packet::PacketType::CONNACK => crate::packet::parse_connack_packet(packet),
        crate::packet::PacketType::PUBLISH => {
            let (_, payload) = crate::packet::parse_publish_packet(packet);
            if let Some(callback_handler) = self.callback_handler {
                tokio::spawn(async move {
                    callback_handler(payload);
                });
            }
        }
        _ => event!(Level::DEBUG, "Unsupported packet type: {}", packet_type),
    }
}

That’s about it! It was really fun, I think the error handling is pretty lacking and I’m not sure about the design, but at home my environment is pretty stable, so it’s been working fine.

Dev experience with rust #

The dev experience with rust is nice, rust-analyzer has been the default language server for a while now I think, so not much new on that front, it’s solid. Coming from Go, the result type is convenient, so are the iterators. That being said, the borrow checker is still a pain.

I was reading the book while writing this project, which helped make sense of some things.

Developing a lib #

My MQTT library is made for this project, so I didn’t publish it on crates.io. I just added it as a git dependency in the Cargo.toml of teleinfo2mqtt-rs. Locally, I just added a path to the lib to test it with my project.

[dependencies]
    aimeqtt = { git = "https://github.com/angristan/aimeqtt", rev = "dcfce13" }
    # or, locally
    aimeqtt = { path = "../aimeqtt" }

Cross-compilation #

The rppal crate used C bindings and needs libudev to compile. I don’t want to compile on the Raspberry Pi (especially not rust lol), and I didn’t figure out how to cross-compile from my Macbook (if there’s even a way).

The last option is Docker, and I decided to use a devcontainer with VS Code, so that I could also make the language server, rust-analyzer work again. Microsoft provides a Rust devcontainer, so it’s pretty straightforward:

// .devcontainer/devcontainer.json
{
 "name": "teleinfo2mqtt-rs",
 "build": {
  "dockerfile": "Dockerfile"
 }
}
# .devcontainer/Dockerfile
FROM mcr.microsoft.com/devcontainers/rust:1-1-bullseye

RUN apt-get update && apt-get install -y build-essential pkg-config libudev-dev

Conclusion 🦀 #

This was a fun project. It’s a great reminder to me that sometimes, things are simpler than they look. I can just ask myself “how does this work?” and figure it out.

I learned a bit about UART/Serial interfaces, my meter’s TIC protocol, MQTT, Rust and tokio. Writing the MQTT client was unnecessary, but also the most fun part of the project. And it’s been working for a year now without issues!

I reached my objective: I have the data from my Linky meter in Home Assistant. It’s pretty useful to have insights over time of my energy consumption.

There are some improvements to be made, like MQTT discovery, or improving the MQTT client. The code can probably be improved as well, I’m sure there are some bad practices in there, but to be fair, I was 50% trying to make it work and 50% just begging the compiler to leave me alone. :D Next time I’ll stick to Go, probably.

I’ve also been experimenting with running teleinfo2mqtt-rs on a ESP32. It’s possible using esp-rs. I think it would make more sense than a Raspberry Pi to reduce the maintenance burden as this would remove the need to run a full Linux system.

The energy graph in Home Assistant
The energy graph in Home Assistant, powered by teleinfo2mqtt-rs!