Skip to content

IoT Devices

This example demonstrates using PDC Struct to decode data from I2C sensors on a Raspberry Pi. We'll read from a BME280 temperature/humidity/pressure sensor and parse its binary register data.

BME280 Sensor Overview

The BME280 is a popular environmental sensor that communicates over I2C or SPI. It returns raw ADC values in a packed binary format that must be decoded using calibration data.

Key registers:

Register Address Size Content
Calibration 0x88-0xA1 26 bytes Temperature/pressure calibration
Calibration 0xE1-0xE7 7 bytes Humidity calibration
Data 0xF7-0xFE 8 bytes Raw ADC readings

Defining the Register Structures

Calibration Data

The BME280 stores factory calibration values that we need to decode the raw readings:

from pydantic import Field
from pdc_struct import StructModel, StructConfig, StructMode, ByteOrder
from pdc_struct.c_types import Int8, UInt8, Int16, UInt16


class BME280CalibrationTP(StructModel):
    """Temperature and Pressure calibration data (registers 0x88-0xA1)."""

    # Temperature calibration
    dig_T1: UInt16 = Field(description="Temperature coefficient 1")
    dig_T2: Int16 = Field(description="Temperature coefficient 2")
    dig_T3: Int16 = Field(description="Temperature coefficient 3")

    # Pressure calibration
    dig_P1: UInt16 = Field(description="Pressure coefficient 1")
    dig_P2: Int16 = Field(description="Pressure coefficient 2")
    dig_P3: Int16 = Field(description="Pressure coefficient 3")
    dig_P4: Int16 = Field(description="Pressure coefficient 4")
    dig_P5: Int16 = Field(description="Pressure coefficient 5")
    dig_P6: Int16 = Field(description="Pressure coefficient 6")
    dig_P7: Int16 = Field(description="Pressure coefficient 7")
    dig_P8: Int16 = Field(description="Pressure coefficient 8")
    dig_P9: Int16 = Field(description="Pressure coefficient 9")

    struct_config = StructConfig(
        mode=StructMode.C_COMPATIBLE,
        byte_order=ByteOrder.LITTLE_ENDIAN  # BME280 uses little-endian
    )


class BME280CalibrationH(StructModel):
    """Humidity calibration data (registers 0xE1-0xE7)."""

    dig_H2: Int16 = Field(description="Humidity coefficient 2")
    dig_H3: UInt8 = Field(description="Humidity coefficient 3")
    # H4 and H5 share bytes and need special handling
    dig_H4_H5_raw: bytes = Field(
        json_schema_extra={"struct_length": 3},
        description="Raw bytes for H4/H5 (needs bit manipulation)"
    )
    dig_H6: Int8 = Field(description="Humidity coefficient 6")

    struct_config = StructConfig(
        mode=StructMode.C_COMPATIBLE,
        byte_order=ByteOrder.LITTLE_ENDIAN
    )

    @property
    def dig_H4(self) -> int:
        """Extract H4 from shared bytes (12-bit value)."""
        raw = self.dig_H4_H5_raw
        return (raw[0] << 4) | (raw[1] & 0x0F)

    @property
    def dig_H5(self) -> int:
        """Extract H5 from shared bytes (12-bit value)."""
        raw = self.dig_H4_H5_raw
        return (raw[2] << 4) | ((raw[1] >> 4) & 0x0F)

Raw Sensor Data

The sensor returns 8 bytes of raw ADC data:

class BME280RawData(StructModel):
    """Raw ADC data from registers 0xF7-0xFE.

    Data is packed as:
    - Pressure: 20-bit unsigned (3 bytes)
    - Temperature: 20-bit unsigned (3 bytes)
    - Humidity: 16-bit unsigned (2 bytes)
    """

    press_msb: UInt8 = Field(description="Pressure MSB [19:12]")
    press_lsb: UInt8 = Field(description="Pressure LSB [11:4]")
    press_xlsb: UInt8 = Field(description="Pressure XLSB [3:0] in upper nibble")
    temp_msb: UInt8 = Field(description="Temperature MSB [19:12]")
    temp_lsb: UInt8 = Field(description="Temperature LSB [11:4]")
    temp_xlsb: UInt8 = Field(description="Temperature XLSB [3:0] in upper nibble")
    hum_msb: UInt8 = Field(description="Humidity MSB [15:8]")
    hum_lsb: UInt8 = Field(description="Humidity LSB [7:0]")

    struct_config = StructConfig(
        mode=StructMode.C_COMPATIBLE,
        byte_order=ByteOrder.BIG_ENDIAN  # Data registers are big-endian
    )

    @property
    def raw_pressure(self) -> int:
        """Get 20-bit raw pressure value."""
        return (self.press_msb << 12) | (self.press_lsb << 4) | (self.press_xlsb >> 4)

    @property
    def raw_temperature(self) -> int:
        """Get 20-bit raw temperature value."""
        return (self.temp_msb << 12) | (self.temp_lsb << 4) | (self.temp_xlsb >> 4)

    @property
    def raw_humidity(self) -> int:
        """Get 16-bit raw humidity value."""
        return (self.hum_msb << 8) | self.hum_lsb

Reading from the Sensor

Using smbus2 on Raspberry Pi

import smbus2

BME280_ADDRESS = 0x76  # or 0x77 depending on SDO pin


class BME280:
    """BME280 sensor driver using PDC Struct for data parsing."""

    def __init__(self, bus_number: int = 1, address: int = BME280_ADDRESS):
        self.bus = smbus2.SMBus(bus_number)
        self.address = address
        self._load_calibration()

    def _read_registers(self, start: int, length: int) -> bytes:
        """Read multiple registers from the sensor."""
        return bytes(self.bus.read_i2c_block_data(
            self.address, start, length
        ))

    def _load_calibration(self):
        """Load calibration data from sensor."""
        # Read temperature/pressure calibration (0x88-0xA1, 26 bytes)
        # Note: We only need 24 bytes for our struct
        tp_data = self._read_registers(0x88, 24)
        self.calib_tp = BME280CalibrationTP.from_bytes(tp_data)

        # Read H1 separately (single byte at 0xA1)
        self.dig_H1 = self._read_registers(0xA1, 1)[0]

        # Read humidity calibration (0xE1-0xE7, 7 bytes)
        h_data = self._read_registers(0xE1, 7)
        self.calib_h = BME280CalibrationH.from_bytes(h_data)

        print(f"Calibration loaded:")
        print(f"  T1={self.calib_tp.dig_T1}, T2={self.calib_tp.dig_T2}, T3={self.calib_tp.dig_T3}")

    def read_raw(self) -> BME280RawData:
        """Read raw ADC values from sensor."""
        data = self._read_registers(0xF7, 8)
        return BME280RawData.from_bytes(data)

    def read(self) -> tuple[float, float, float]:
        """Read compensated temperature, pressure, and humidity.

        Returns:
            Tuple of (temperature_celsius, pressure_pa, humidity_percent)
        """
        raw = self.read_raw()

        # Apply compensation formulas (from BME280 datasheet)
        temp, t_fine = self._compensate_temperature(raw.raw_temperature)
        pressure = self._compensate_pressure(raw.raw_pressure, t_fine)
        humidity = self._compensate_humidity(raw.raw_humidity, t_fine)

        return temp, pressure, humidity

    def _compensate_temperature(self, adc_T: int) -> tuple[float, int]:
        """Compensate raw temperature reading."""
        c = self.calib_tp

        var1 = (adc_T / 16384.0 - c.dig_T1 / 1024.0) * c.dig_T2
        var2 = ((adc_T / 131072.0 - c.dig_T1 / 8192.0) ** 2) * c.dig_T3
        t_fine = int(var1 + var2)
        temperature = t_fine / 5120.0

        return temperature, t_fine

    def _compensate_pressure(self, adc_P: int, t_fine: int) -> float:
        """Compensate raw pressure reading."""
        c = self.calib_tp

        var1 = t_fine / 2.0 - 64000.0
        var2 = var1 * var1 * c.dig_P6 / 32768.0
        var2 = var2 + var1 * c.dig_P5 * 2.0
        var2 = var2 / 4.0 + c.dig_P4 * 65536.0
        var1 = (c.dig_P3 * var1 * var1 / 524288.0 + c.dig_P2 * var1) / 524288.0
        var1 = (1.0 + var1 / 32768.0) * c.dig_P1

        if var1 == 0:
            return 0

        pressure = 1048576.0 - adc_P
        pressure = ((pressure - var2 / 4096.0) * 6250.0) / var1
        var1 = c.dig_P9 * pressure * pressure / 2147483648.0
        var2 = pressure * c.dig_P8 / 32768.0
        pressure = pressure + (var1 + var2 + c.dig_P7) / 16.0

        return pressure

    def _compensate_humidity(self, adc_H: int, t_fine: int) -> float:
        """Compensate raw humidity reading."""
        h = self.calib_h

        var_H = t_fine - 76800.0
        if var_H == 0:
            return 0

        var_H = (adc_H - (h.dig_H4 * 64.0 + h.dig_H5 / 16384.0 * var_H)) * \
                (h.dig_H2 / 65536.0 * (1.0 + h.dig_H6 / 67108864.0 * var_H * \
                (1.0 + h.dig_H3 / 67108864.0 * var_H)))

        var_H = var_H * (1.0 - self.dig_H1 * var_H / 524288.0)

        return max(0.0, min(100.0, var_H))

Example Usage

def main():
    # Initialize sensor
    sensor = BME280()

    print("\nReading sensor data...")
    print(f"Raw data struct size: {BME280RawData.struct_size()} bytes")
    print(f"Raw data format: {BME280RawData.struct_format_string()}\n")

    # Read and display values
    for i in range(5):
        temp, pressure, humidity = sensor.read()

        print(f"Reading {i + 1}:")
        print(f"  Temperature: {temp:.2f}°C")
        print(f"  Pressure: {pressure / 100:.2f} hPa")
        print(f"  Humidity: {humidity:.2f}%")
        print()

        time.sleep(1)


if __name__ == "__main__":
    import time
    main()

Output

Calibration loaded:
  T1=27504, T2=26435, T3=-1000

Reading sensor data...
Raw data struct size: 8 bytes
Raw data format: >BBBBBBBB

Reading 1:
  Temperature: 23.45°C
  Pressure: 1013.25 hPa
  Humidity: 45.32%

Why PDC Struct for IoT?

Without PDC Struct

# Manual byte unpacking (error-prone)
def read_calibration_manual(data: bytes):
    dig_T1 = int.from_bytes(data[0:2], 'little', signed=False)
    dig_T2 = int.from_bytes(data[2:4], 'little', signed=True)
    dig_T3 = int.from_bytes(data[4:6], 'little', signed=True)
    # ... 20+ more fields
    return dig_T1, dig_T2, dig_T3, ...

With PDC Struct

  • Self-documenting - Field names and descriptions in the model
  • Type-safe - Proper signed/unsigned handling via Int16/UInt16
  • Validated - Pydantic validates all values automatically
  • Maintainable - Easy to update when sensor registers change
  • Testable - Create mock sensor data easily for testing

Testing Without Hardware

You can test your parsing logic without the actual sensor:

def test_raw_data_parsing():
    """Test parsing of raw sensor data."""
    # Simulated raw data bytes
    test_data = bytes([
        0x50, 0x00, 0x00,  # Pressure (20-bit)
        0x80, 0x00, 0x00,  # Temperature (20-bit)
        0x80, 0x00,        # Humidity (16-bit)
    ])

    raw = BME280RawData.from_bytes(test_data)

    assert raw.raw_pressure == 0x50000
    assert raw.raw_temperature == 0x80000
    assert raw.raw_humidity == 0x8000

    print("✓ Raw data parsing test passed")


def test_calibration_parsing():
    """Test parsing of calibration data."""
    # Create known calibration values
    calib = BME280CalibrationTP(
        dig_T1=27504,
        dig_T2=26435,
        dig_T3=-1000,
        dig_P1=36477,
        dig_P2=-10685,
        dig_P3=3024,
        dig_P4=2855,
        dig_P5=140,
        dig_P6=-7,
        dig_P7=15500,
        dig_P8=-14600,
        dig_P9=6000,
    )

    # Round-trip through bytes
    data = calib.to_bytes()
    restored = BME280CalibrationTP.from_bytes(data)

    assert restored.dig_T1 == calib.dig_T1
    assert restored.dig_T2 == calib.dig_T2
    assert restored.dig_P9 == calib.dig_P9

    print("✓ Calibration parsing test passed")


if __name__ == "__main__":
    test_raw_data_parsing()
    test_calibration_parsing()

Other I2C Sensors

The same pattern works for other I2C devices:

Sensor Data Size Use Case
BME280 8 bytes Temperature, humidity, pressure
MPU6050 14 bytes Accelerometer, gyroscope
ADS1115 2 bytes ADC readings
SHT31 6 bytes Temperature, humidity
BMP280 6 bytes Temperature, pressure

Requirements

# On Raspberry Pi
pip install pdc-struct smbus2

# Enable I2C
sudo raspi-config  # Interface Options → I2C → Enable

Hardware Setup

Connect the BME280 to your Raspberry Pi:

BME280 Pin Raspberry Pi Pin
VCC 3.3V (Pin 1)
GND Ground (Pin 6)
SDA GPIO 2 / SDA (Pin 3)
SCL GPIO 3 / SCL (Pin 5)

Verify the sensor is detected:

i2cdetect -y 1
# Should show 76 or 77