import struct
from typing import Optional, Tuple
import serial
[docs]
class SDS011(object):
"""
Provides functionality to read from a SDS011 air particlate density sensor using
`UART <https://www.circuitbasics.com/basics-uart-communication/>`_ serial communication.
"""
HEAD = b'\xaa'
TAIL = b'\xab'
CMD_ID = b'\xb4'
# The sent command is a read or a write
READ = b"\x00"
WRITE = b"\x01"
REPORT_MODE_CMD = b"\x02"
ACTIVE = b"\x00"
PASSIVE = b"\x01"
QUERY_CMD = b"\x04"
# The sleep command ID
SLEEP_CMD = b"\x06"
# Sleep and work byte
SLEEP = b"\x00"
WORK = b"\x01"
# The work period command ID
WORK_PERIOD_CMD = b'\x08'
[docs]
def __init__(self, serial_port: str, baudrate: int = 9600, timeout: int = 2,
use_query_mode: bool = True):
"""
Initialise and open serial port.
Args:
serial_port (str): The serial port (device path) on which the sensor is mounted. Ex: "/dev/ttyUSB0"
baudrate (int): The rate at which information is transferred in a communication channel. See
`baudrate <https://www.setra.com/blog/what-is-baud-rate-and-what-cable-length-is-required-1>`_.
timeout (int): The number of seconds to wait for a response from the sensor.
use_query_mode (bool): Whether to use the sensor's query mode. This should always be set to true for
out applications.
"""
self.ser = serial.Serial(port=serial_port,
baudrate=baudrate,
timeout=timeout)
self.ser.flush()
self.set_report_mode(active=not use_query_mode)
[docs]
def _execute(self, cmd_bytes: bytes):
"""
Writes a byte sequence to the serial port (device).
Args:
cmd_bytes (bytes): The bytes that should be sent.
"""
self.ser.write(cmd_bytes)
[docs]
def _get_reply(self) -> Optional[bytes]:
"""
Read reply from device.
Returns:
The 10 bytes consisting of the device's response.
"""
raw = self.ser.read(size=10)
data = raw[2:8]
if len(data) == 0:
return None
if (sum(d for d in data) & 255) != raw[8]:
return None
return raw
[docs]
def cmd_begin(self) -> bytes:
"""
Get command header and command ID bytes.
Returns:
The sequence of bytes corresponding to the HEAD and CMD_ID bytes as identified in the device's data sheet.
"""
return self.HEAD + self.CMD_ID
[docs]
def set_report_mode(self, read: bool = False, active: bool = False):
"""
Get sleep command. Does not contain checksum and tail.
Args:
read (bool): Boolean flag indicating whether we should query or set the report mode of the device.
active (bool): Boolean flag indicating whether we should set the report mode of the device
to active (True) or query (False) modes.
"""
cmd = self.cmd_begin()
cmd += (self.REPORT_MODE_CMD
+ (self.READ if read else self.WRITE)
+ (self.ACTIVE if active else self.PASSIVE)
+ b"\x00" * 10)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
self._get_reply()
[docs]
def query(self) -> Tuple[float, float]:
"""
Query the device and read the data.
Returns:
Air particulate density in micrograms per cubic meter in the form of a tuple of floats (pm25, pm10)
"""
cmd = self.cmd_begin()
cmd += (self.QUERY_CMD
+ b"\x00" * 12)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
raw = self._get_reply()
if raw is None:
return float("NaN"), float("NaN")
data = struct.unpack('<HH', raw[2:6])
pm25 = data[0] / 10.0
pm10 = data[1] / 10.0
return pm25, pm10
[docs]
def sleep(self, read: bool = False, sleep: bool = True):
"""
Sleep/Wake up the sensor.
Args:
read (bool): Boolean flag indicating whether this should query the device or modify its behavior.
sleep (bool): Boolean flag indicating whether the device should sleep or work.
"""
cmd = self.cmd_begin()
cmd += (self.SLEEP_CMD
+ (self.READ if read else self.WRITE)
+ (self.SLEEP if sleep else self.WORK)
+ b"\x00" * 10)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
self._get_reply()
[docs]
def set_work_period(self, read: bool = False, work_time: int = 0):
"""
Get work period command. Does not contain checksum and tail.
Args:
read (bool): Boolean flag indicating whether this should query the device or modify its behavior.
work_time (int): The working period of the device in seconds
"""
assert 0 <= work_time <= 30
cmd = self.cmd_begin()
cmd += (self.WORK_PERIOD_CMD
+ (self.READ if read else self.WRITE)
+ bytes([work_time])
+ b"\x00" * 10)
cmd = self._finish_cmd(cmd)
self._execute(cmd)
self._get_reply()
[docs]
def _finish_cmd(self, cmd: bytes, id1: bytes = b"\xff", id2: bytes = b"\xff") -> bytes:
"""
Add device ID, checksum and tail bytes.
Args:
cmd (bytes): The partial command bytes to which we should append the device id, checksum, and tail bytes.
id1 (bytes): The first id byte of the destination device (default all)
id2 (bytes): The second id byte of the destination device (default all)
Returns:
The command cmd after the device ID, checksum, and tail bytes have been appended.
"""
cmd += id1 + id2
checksum = sum(d for d in cmd[2:]) % 256
cmd += bytes([checksum]) + self.TAIL
return cmd
[docs]
def _process_frame(self, data: bytes) -> Tuple[float, float]:
"""
Processes a SDS011 data frame.
Byte positions:
0 - Header
1 - Command No.
2,3 - PM2.5 low/high byte
4,5 - PM10 low/high
6,7 - ID bytes
8 - Checksum - sum of bytes 2-7
9 - Tail
Args:
data (bytes): The data bytes that should be processed.
Returns:
A tuple containing the pm25 and pm10 values queried from the device in the form (pm25, pm10).
"""
raw = struct.unpack('<HHxxBBB', data[2:])
checksum = sum(v for v in data[2:8]) % 256
if checksum != data[8]:
return float("NaN"), float("NaN")
pm25 = raw[0] / 10.0
pm10 = raw[1] / 10.0
return pm25, pm10
[docs]
def read(self) -> Tuple[float, float]:
"""
Read sensor data.
Returns:
PM25 and PM10 concentration in micrograms per cubic meter in the form (pm25, pm10).
"""
byte = 0
while byte != self.HEAD:
byte = self.ser.read(size=1)
d = self.ser.read(size=10)
if d[0:1] == b"\xc0":
data = self._process_frame(byte + d)
return data