Source code for src.utils.sds011

import struct
from typing import Optional, Tuple
import serial


[docs] class SDS011(object): """ Provides functionality to read from a SDS011 air particlate density sensor using `UART <https://www.circuitbasics.com/basics-uart-communication/>`_ serial communication. """ HEAD = b'\xaa' TAIL = b'\xab' CMD_ID = b'\xb4' # The sent command is a read or a write READ = b"\x00" WRITE = b"\x01" REPORT_MODE_CMD = b"\x02" ACTIVE = b"\x00" PASSIVE = b"\x01" QUERY_CMD = b"\x04" # The sleep command ID SLEEP_CMD = b"\x06" # Sleep and work byte SLEEP = b"\x00" WORK = b"\x01" # The work period command ID WORK_PERIOD_CMD = b'\x08'
[docs] def __init__(self, serial_port: str, baudrate: int = 9600, timeout: int = 2, use_query_mode: bool = True): """ Initialise and open serial port. Args: serial_port (str): The serial port (device path) on which the sensor is mounted. Ex: "/dev/ttyUSB0" baudrate (int): The rate at which information is transferred in a communication channel. See `baudrate <https://www.setra.com/blog/what-is-baud-rate-and-what-cable-length-is-required-1>`_. timeout (int): The number of seconds to wait for a response from the sensor. use_query_mode (bool): Whether to use the sensor's query mode. This should always be set to true for out applications. """ self.ser = serial.Serial(port=serial_port, baudrate=baudrate, timeout=timeout) self.ser.flush() self.set_report_mode(active=not use_query_mode)
[docs] def _execute(self, cmd_bytes: bytes): """ Writes a byte sequence to the serial port (device). Args: cmd_bytes (bytes): The bytes that should be sent. """ self.ser.write(cmd_bytes)
[docs] def _get_reply(self) -> Optional[bytes]: """ Read reply from device. Returns: The 10 bytes consisting of the device's response. """ raw = self.ser.read(size=10) data = raw[2:8] if len(data) == 0: return None if (sum(d for d in data) & 255) != raw[8]: return None return raw
[docs] def cmd_begin(self) -> bytes: """ Get command header and command ID bytes. Returns: The sequence of bytes corresponding to the HEAD and CMD_ID bytes as identified in the device's data sheet. """ return self.HEAD + self.CMD_ID
[docs] def set_report_mode(self, read: bool = False, active: bool = False): """ Get sleep command. Does not contain checksum and tail. Args: read (bool): Boolean flag indicating whether we should query or set the report mode of the device. active (bool): Boolean flag indicating whether we should set the report mode of the device to active (True) or query (False) modes. """ cmd = self.cmd_begin() cmd += (self.REPORT_MODE_CMD + (self.READ if read else self.WRITE) + (self.ACTIVE if active else self.PASSIVE) + b"\x00" * 10) cmd = self._finish_cmd(cmd) self._execute(cmd) self._get_reply()
[docs] def query(self) -> Tuple[float, float]: """ Query the device and read the data. Returns: Air particulate density in micrograms per cubic meter in the form of a tuple of floats (pm25, pm10) """ cmd = self.cmd_begin() cmd += (self.QUERY_CMD + b"\x00" * 12) cmd = self._finish_cmd(cmd) self._execute(cmd) raw = self._get_reply() if raw is None: return float("NaN"), float("NaN") data = struct.unpack('<HH', raw[2:6]) pm25 = data[0] / 10.0 pm10 = data[1] / 10.0 return pm25, pm10
[docs] def sleep(self, read: bool = False, sleep: bool = True): """ Sleep/Wake up the sensor. Args: read (bool): Boolean flag indicating whether this should query the device or modify its behavior. sleep (bool): Boolean flag indicating whether the device should sleep or work. """ cmd = self.cmd_begin() cmd += (self.SLEEP_CMD + (self.READ if read else self.WRITE) + (self.SLEEP if sleep else self.WORK) + b"\x00" * 10) cmd = self._finish_cmd(cmd) self._execute(cmd) self._get_reply()
[docs] def set_work_period(self, read: bool = False, work_time: int = 0): """ Get work period command. Does not contain checksum and tail. Args: read (bool): Boolean flag indicating whether this should query the device or modify its behavior. work_time (int): The working period of the device in seconds """ assert 0 <= work_time <= 30 cmd = self.cmd_begin() cmd += (self.WORK_PERIOD_CMD + (self.READ if read else self.WRITE) + bytes([work_time]) + b"\x00" * 10) cmd = self._finish_cmd(cmd) self._execute(cmd) self._get_reply()
[docs] def _finish_cmd(self, cmd: bytes, id1: bytes = b"\xff", id2: bytes = b"\xff") -> bytes: """ Add device ID, checksum and tail bytes. Args: cmd (bytes): The partial command bytes to which we should append the device id, checksum, and tail bytes. id1 (bytes): The first id byte of the destination device (default all) id2 (bytes): The second id byte of the destination device (default all) Returns: The command cmd after the device ID, checksum, and tail bytes have been appended. """ cmd += id1 + id2 checksum = sum(d for d in cmd[2:]) % 256 cmd += bytes([checksum]) + self.TAIL return cmd
[docs] def _process_frame(self, data: bytes) -> Tuple[float, float]: """ Processes a SDS011 data frame. Byte positions: 0 - Header 1 - Command No. 2,3 - PM2.5 low/high byte 4,5 - PM10 low/high 6,7 - ID bytes 8 - Checksum - sum of bytes 2-7 9 - Tail Args: data (bytes): The data bytes that should be processed. Returns: A tuple containing the pm25 and pm10 values queried from the device in the form (pm25, pm10). """ raw = struct.unpack('<HHxxBBB', data[2:]) checksum = sum(v for v in data[2:8]) % 256 if checksum != data[8]: return float("NaN"), float("NaN") pm25 = raw[0] / 10.0 pm10 = raw[1] / 10.0 return pm25, pm10
[docs] def read(self) -> Tuple[float, float]: """ Read sensor data. Returns: PM25 and PM10 concentration in micrograms per cubic meter in the form (pm25, pm10). """ byte = 0 while byte != self.HEAD: byte = self.ser.read(size=1) d = self.ser.read(size=10) if d[0:1] == b"\xc0": data = self._process_frame(byte + d) return data