Source code for sensor

import abc
import os
import subprocess
import sys
import tempfile
import time
import hx711_multi as hx711
import RPi.GPIO as GPIO
from multiprocessing import Process, Queue, Event
from queue import Empty
import platform
from libcamera import Transform
import picamera2
from picamera2.encoders import H264Encoder
import adafruit_dht
import board
from typing import Optional, Dict, Callable, Any
from loguru import logger
from datetime import datetime
from src.beemon.mqtt import MQTT
from src.beemon.configuration import Config
from src.utils import importutils, filehandlingutils, datetimeutils
from distutils.util import strtobool
from vcgencmd import Vcgencmd
from src.utils.sds011 import SDS011


[docs] class Sensor(Process, abc.ABC):
[docs] @classmethod def __subclasshook__(cls, subclass: type): """ Called when the built-in method :py:meth:`issubclass` is called on the :class:`Sensor` class. This method inspects vital class properties of :class:`Sensor` to ensure that the provided ``subclass`` is substitutable for the base :class:`Sensor` class. More formally, it is this method's responsibility to enforce the Liskov Substitution Principal (LSP). Args: subclass (type): The potential subclass of :class:`sensor.Sensor` whose membership is to be conclusively determined. Returns: bool: True if the provided ``subclass`` is an LSP-valid subclass of :class:`sensor.Sensor`, False otherwise. """ subclass_has_required_attrs: bool = True required_subclass_attributes: Dict[str, Callable] = { "record": getattr(cls, "record"), # 'start': getattr(cls, 'start'), # 'run': getattr(cls, 'run') # 'stop': getattr(cls, 'stop') } for attr_name, attr in required_subclass_attributes.items(): try: getattr(subclass, attr_name) except AttributeError: subclass_has_required_attrs = False break return subclass_has_required_attrs
[docs] def __init__( self, recording_start_time_queue: Queue, config: Optional[Config] = None ): """ Initializer for all ``Sensor`` objects. Concrete subclasses are expected to call the parent classes' initializer. Args: recording_start_time_queue (Queue): A reference/pointer to the :class:`multiprocessing.Queue` instance that houses the :class:`datetime.datetime`'s at which recording is to begin for the sensor. config (Optional[Config]): A reference/pointer to an existing beemon :class:`Config` object, should one exist. If ``None`` is provided, a new beemon configuration :class:`Config` object will be created for the ``beemon-config.ini`` in the repository's root directory. local_output_directory (Optional[str]): The root directory to which files containing data recorded by the sensor should be output. If ``None`` is provided, this value will be determined by the :class:`Config` object provided during instantiation. If no :class:`Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. capture_duration_seconds (Optional[int]): How long the sensor is to record for (in seconds) at the start of each pre-determined recording time in the ``recording_start_time_queue``. If ``None`` is provided, this value will be determined by the :class:`Config` object provided during instantiation. If no :class:`Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. capture_window_start_time (Optional[datetime]): When the first recording (of ``capture_duration_seconds`` in length) should transpire. This :class:`datetime.datetime` is inclusive, if a value of ``0800`` is specified, the first recording will be taken at ``8:00 AM EDT``. If ``None`` is provided, this value will be determined by the :class:`Config` object provided during instantiation. If no :class:`Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. capture_window_end_time (Optional[datetime]): When the last recording (of ``capture_duration_seconds`` in length) should transpire. This :class:`datetime.datetime` is inclusive, if a value of ``2000`` is specified, the last recording will be taken at ``8:00 PM EDT``. If ``None`` is provided, this value will be determined by the :class:`Config` object provided during instantiation. If no :class:`Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. """ if config is None: self._beemon_config: Config = Config() else: self._beemon_config: Config = config # See if the concrete sensor subclass has any overrides in the ``beemon-config.ini`` file: lower_case_concrete_sensor_subclass_name: str = type(self).__name__.lower() ( self._recording_time_zone, self._root_upload_directory, self._local_output_directory, self._capture_window_start_time, self._capture_window_end_time, self._capture_duration_seconds, self._capture_interval_seconds, self._auto_start, ) = importutils.parse_global_sensor_settings_from_config_file( beemon_config=self._beemon_config, sensor_name=lower_case_concrete_sensor_subclass_name, sensor_override=True, ) logger.debug( f"Sensor: {lower_case_concrete_sensor_subclass_name} " f'"local_output_directory": {self._local_output_directory}' ) logger.debug( f"Sensor: {lower_case_concrete_sensor_subclass_name} " f'"capture_window_start_time": {self._capture_window_start_time}' ) logger.debug( f"Sensor: {lower_case_concrete_sensor_subclass_name} " f'"capture_window_end_time": {self._capture_window_end_time}' ) logger.debug( f"Sensor: {lower_case_concrete_sensor_subclass_name} " f'"capture_duration_seconds": {self._capture_duration_seconds}' ) logger.debug( f"Sensor: {lower_case_concrete_sensor_subclass_name} " f'"capture_interval_seconds": {self._capture_interval_seconds}' ) logger.debug( f"Sensor: {lower_case_concrete_sensor_subclass_name} " f'"auto_start": {self._auto_start}' ) # Parse the ThingsBoard/Dashboard configurations from the beemon-config.ini: self._dashboard_host: str = self._beemon_config.dashboard(key="host") self._dashboard_access_token: str = self._beemon_config.dashboard( key="access_token" ) self._mqtt_lib: MQTT = MQTT( host=self._dashboard_host, access_token=self._dashboard_access_token ) # Sentinel value is originally False: self._halt_recording_request_received: Event = Event() # This is the queue which the concrete Sensor subclass will receive recording times from: self._recording_start_time_queue: Queue = recording_start_time_queue # The sensor will be initialized (but not running) until start() is called: self._is_running: Event = Event() super().__init__(args=(recording_start_time_queue,)) # repo_name_lower_case = os.path.abspath(__file__).split('/')[-1].lower() appmais_local_directory = "/".join(self._local_output_directory.split("/")[:-1]) self._things_board_local_root_directory: str = os.path.join( appmais_local_directory, "thingsboard" ) # Cleanup previous dates' local directories. sensor_local_output_dir = os.path.abspath( os.path.join( self._local_output_directory, lower_case_concrete_sensor_subclass_name ) ) filehandlingutils.cleanup_local_sensor_output_directory( sensor_dir=sensor_local_output_dir ) # Create local directories, the uploader watches ``bee_tmp`` and we aggregate for ThingsBoard in the # ``thingsboard`` directory: if not os.path.exists(self._local_output_directory): try: os.makedirs(self._local_output_directory) except OSError as err: logger.critical( f'Failed to create local storage directory: "{self._local_output_directory}". Error: {err}' ) sys.exit(-1) if not os.path.exists(self._things_board_local_root_directory): try: os.makedirs(self._things_board_local_root_directory) except OSError as err: logger.warning( "Failed to create ThingsBoard local storage directory: " f'"{self._things_board_local_root_directory}". Error: {err}' )
@property def beemon_config(self) -> Config: return self._beemon_config @property def local_output_directory(self) -> str: return self._local_output_directory @property def recording_start_time_queue(self) -> Queue: return self._recording_start_time_queue @property def capture_duration_seconds(self) -> int: return self._capture_duration_seconds @property def capture_interval_seconds(self) -> int: return self._capture_interval_seconds @property def capture_window_start_time(self) -> datetime: return self._capture_window_start_time @property def capture_window_end_time(self) -> datetime: return self._capture_window_end_time @property def auto_start(self) -> bool: return self._auto_start @property def is_running(self) -> Event: return self._is_running @property def halt_recording_request_received(self) -> Event: return self._halt_recording_request_received @property def dashboard_host(self) -> str: return self._dashboard_host @property def dashboard_access_token(self) -> str: return self._dashboard_access_token
[docs] @abc.abstractmethod def start(self) -> None: """ This method is called by the :class:`orchestrator.Orchestrator` class (specifically in the :meth:`orchestrator.Orchestrator.start_command` method) when a ``"start <sensor>"`` command is sent to the :class:`server.Server` process. The parent :class:`Sensor` class allows concrete subclasses to optionally override this method in order to perform potentially long-running hardware/firmware sensor initialization routines. Concrete subclasses are expected to call the parent classes' method first. Notes: The abstract version of this method is intended to contain operations common to all concrete subclass :class:`Sensor` objects during initialization. Currently, there are no common initialization operations besides logging. """ logger.info(f"Starting {type(self).__name__.lower()} sensor.") super().start() self._is_running.set() # Now that we have slept, spin-up the thread by delegating back to the concrete subclass... logger.debug( f"Delegating to concrete Sensor subclass: {type(self).__name__} for start()." ) return
# raise NotImplementedError(f"Concrete subclasses of {__class__} must implement {__name__}.")
[docs] def run(self) -> None: """ Houses the main-loop/process to manage recording for all beemon Sensor-like objects. This method runs repeatedly until the sensor's ``recording_start_time_queue`` is empty. This queue is purged by the :class:`orchestrator.Orchestrator` class when a ``"stop <sensor>"`` command is received. When this occurs, this method will return, allowing the garbage collector to terminate the :class:`multiprocessing.Process` instance gracefully. .. todo:: Modify exit condition to be a stop() command received in addition to an empty queue. Notes: This method is invoked automatically on :py:class:`multiprocessing.Process` instances when the :meth:`multiprocessing.Process.start` method is run. The :class:`orchestrator.Orchestrator` instance will call the :meth:`sensor.Sensor.start` method directly, which then invokes this method. """ super().run() logger.info("Starting main Sensor record loop!") while not self._halt_recording_request_received.is_set(): # logger.debug(f"Still recording? {not self._stop_request_received}") try: record_time: datetime = self._recording_start_time_queue.get( block=True, timeout=30 ) epoch_time = record_time.timestamp() * 1000 except Empty: # There is no more data in the queue. logger.info( f"Recording aborted. Sensor: {type(self).__name__} was instructed to abandon " f"recording." ) return seconds_to_sleep: float = datetimeutils.seconds_until_datetime(record_time) if seconds_to_sleep >= 0: logger.info( f"Sensor: {self.__class__.__name__} sleeping {seconds_to_sleep} seconds until the next " f"recording time: {record_time}." ) time.sleep(seconds_to_sleep) # Confirm that the sensor should still record by matching record_times: try: parity_check_record_time: datetime = ( self._recording_start_time_queue.get(block=True, timeout=30) ) except Empty: # A "stop sensor" command was issued while this process was sleeping. logger.info( f"Recording aborted. Sensor: {type(self).__name__} was instructed to abandon " f"recording." ) # .. todo:: Call self.stop()? return if parity_check_record_time == record_time: # No "stop sensor" command was issued while this process was sleeping. logger.info(f"Instructing Sensor: {type(self).__name__} to record.") _platform_name = platform.node() _date_repr = record_time.strftime("%Y-%m-%d") _time_repr = record_time.strftime("%H-%M-%S") sensor_temp_dir = os.path.join( self._local_output_directory, f"{type(self).__name__.lower()}/{_date_repr}", ) if not os.path.exists(sensor_temp_dir): os.makedirs(sensor_temp_dir) filename = f"{_platform_name}@{_date_repr}@{_time_repr}" try: # Instruct the concrete Sensor subclass to record: sensor_recording_kwargs = self.record( filename=filename, uploader_output_dir=sensor_temp_dir ) except Exception as err: logger.error( f"Sensor: {type(self).__name__} failed to record with error: {err}" ) return logger.info(f"Sensor: {type(self).__name__} finished recording!") try: # Pass the resulting recorded data to the concrete subclass to transmit to ThingsBoard: sensor_recording_kwargs = { "ts": epoch_time, "values": sensor_recording_kwargs, } self.transmit(**sensor_recording_kwargs) except Exception as err: logger.error( f"Sensor: {type(self).__name__} failed to transmit data to ThingsBoard. Error: {err}" ) else: # The request to record was aborted during the sensor's sleep cycle. logger.info( f"Recording aborted. Sensor: {type(self).__name__} was instructed to abandon " f"recording." ) # .. todo:: Call self.stop()? return else: # The record window was missed by this Sensor, do nothing. logger.warning( f"Recording time {record_time} missed by sensor: {type(self).__name__}." )
# Block the concrete subclass until it is time to record, then delegate back to the concrete subclass. # raise NotImplementedError(f"Concrete subclasses of {__class__} must implement {__name__}.")
[docs] @abc.abstractmethod def record(self, filename: str, uploader_output_dir: str) -> Dict[str, Any]: """ Concrete :class:`Sensor` subclasses are required to implement this method. It is called every time that a pre-specified recording start time is read from the :attr:`Sensor.recording_start_time_queue`. In the overridden version of this method, concrete subclasses are expected to do the following: 1) Instruct their constituent firmware/hardware to record for a duration of :attr:`Sensor.capture_duration_seconds` 2) Call the :meth:`Sensor.update_health_status` method with the result of the recording attempt. 3) Save the intermittent results of that recording to a temporary file (in ``/tmp``). This is only necessary for sensor recordings that are not instantaneous such as video and audio. Other sensors can record directly to ``uploader_output_dir`` as in step 4 below. 4) Rename the recorded file (when finished) to the provided ``filename`` parameter. 5) Move the final renamed file to the specified ``uploader_output_dir`` parameter (e.g. ``/home/bee/appmais/bee_tmp/``). This will trigger the Uploader watching this directory to transfer the recording via SFTP to the remote server. 6) Return a dictionary of ``key:value`` paired attributes which the concrete :class:`Sensor` subclass wishes to send northbound to the AppMAIS server. Notes: Concrete subclasses are welcome to use this method to reload the beemon configuration file (if changes made prior to the recording session should take effect without ``"stopping"`` and ``"starting"`` the sensor again). This is a safe place to do so, as this method is guaranteed to be called whenever a scheduled recording start time (from the :attr:`Sensor.recording_start_time_queue`) arrives. However, note that any access to the config file may need to be made multiprocessing safe, as multiple concrete subclass object/processes could be reading from the same object concurrently. Args: filename (str): What the file name of the recorded measurement should be. For instance: ``"rpi4-11@2021-11-27@19-30-00"``. Note that it is up to the concrete subclass to designate and append the file type (e.g. ``".wav"``, ``".h264"``) to the provided ``filename``. uploader_output_dir (str): The local output directory which the :class:`uploader.Uploader` :class:`multiprocessing.Process` is watching for file changes. The final (and renamed) recorded file should be output to this directory. For example: ``"home/bee/bee_tmp/video/2021-11-27/"``. Returns: Dict[str, Any] A dictionary of key-value pairs corresponding to the recorded measurement from the sensor. For instance, the temperature sensor may return ``{"Temperature": 45.5, "Humidity": 23.0}`` whereas the scale sensor may return ``{"Weight": 20.00}``, and the audio sensor may return an empty dictionary. Whatever is returned by this method will be passed to :meth:`Sensor.transmit()` as kwargs. You must return a dictionary from this method, although you do not have to override :meth:`Sensor.transmit()`. If :meth:`Sensor.transmit()` is not overridden, the dictionary you return from this method will be un-utilized. If :meth:`Sensor.transmit()` is overridden, the dictionary returned from this method will be passed as ``kwargs`` to the :meth:`Sensor.transmit()` method to facilitate the sending of data northbound to ThingsBoard. """ raise NotImplementedError( f"Concrete subclasses of {__class__} must implement {__name__}." )
[docs] def transmit(self, **kwargs) -> None: """ Transmits the latest sensor measurement (from the concrete subclass) to the ThingsBoard/Dashboard server (at the location specified in the ``beemon-config.ini``) in JSON format over MQTT. Args: **kwargs (Dict[str, Any]): A dictionary of keyword arguments returned by the concrete subclass in :func:`Sensor.record`. Can be any number of key:value pairs. Returns: Optional[str]: An optional error message which will be logged as a logger.error() by the parent Sensor class when this method is invoked. """ logger.info(f"Transmit received: {kwargs}") self._mqtt_lib.send_sensor_data(payload=kwargs) return
[docs] class Audio(Sensor):
[docs] def __init__( self, recording_start_time_queue: Queue, config: Optional[Config] = None, audio_sample_frequency_hz: Optional[int] = 48000, sample_format: Optional[int] = 24, channel_count: Optional[int] = 1, gain: Optional[int] = 49, ): """ Initializer for objects of type :class:`Audio`. Notes: This class is dynamically initialized (as with all concrete :class:`Sensor` subclasses) in the :meth:`orchestrator.Orchestrator._dynamically_initialize_sensors` method. Args: recording_start_time_queue (:class:`~multiprocessing.Queue`): A reference/pointer to the :class:`~multiprocessing.Queue` instance that houses the :class:`~datetime.datetime`'s at which recording is to begin for the sensor. config (Optional[Config]): A reference/pointer to an existing beemon :class:`~configuration.Config` object, should one exist. If ``None`` is provided, a new beemon configuration :class:`~configuration.Config` object will be created for the ``beemon-config.ini`` in the repository's root directory. audio_sample_frequency_hz (Optional[int]): .. todo:: Docstring. If ``None`` is provided, this value will be determined by the :class:`~configuration.Config` object provided during instantiation. If no :class:`~configuration.Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. sample_format (Optional[int]): .. todo:: Docstring. If ``None`` is provided, this value will be determined by the :class:`~configuration.Config` object provided during instantiation. If no :class:`~configuration.Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. channel_count (Optional[int]): The number of channels utilized by the microphone during recording. If the microphone is stereo (2-channel) then this class will attempt to convert to a mono signal (1-channel) via the :meth:`make_stereo_file_mono` method. gain (Optional[int]): .. todo:: Docstring. If ``None`` is provided, this value will be determined by the :class:`~configuration.Config` object provided during instantiation. If no :class:`~configuration.Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. """ super().__init__( recording_start_time_queue=recording_start_time_queue, config=config ) self._audio_sample_frequency_hz: int = int( self._beemon_config.audio( key="sampling_frequency", default=audio_sample_frequency_hz ) ) self._sample_format: int = int( self._beemon_config.audio(key="sample_format", default=sample_format) ) self._channel_count: int = int( self._beemon_config.audio(key="channel_count", default=channel_count) ) self._gain: int = int(self._beemon_config.audio(key="gain", default=gain)) self._ecasound: Optional[subprocess] = None
# self._halt_recording_request_received: bool = False
[docs] def record(self, filename: str, uploader_output_dir: str) -> Dict[str, int]: """ Instructs ``ecasound`` to record in a sub-process for ``self.capture_duration_seconds`` to a temporary file in `/tmp`. Then this method moves the temporary file to the upload directory (via subprocess) when the recording subprocess has finished. Args: filename (str): What the file name of the recorded measurement should be. For instance: ``"rpi4-11@2021-11-27@19-30-00"``. uploader_output_dir (str): The local output directory which the :class:`uploader.Uploader` :class:`multiprocessing.Process` is watching for file changes. The final (and renamed) recorded file should be output to this directory. For example: ``"home/bee/bee_tmp/video/2021-11-27/"``. Returns: Dict[str, Optional[int]]: Dictionary to upload to thingsboard, {'Audio': 0} if there are no errors and {'Audio': -1} if there is an error. """ logger.info("Started audio recording!") output_file_path = os.path.join(uploader_output_dir, f"{filename}.wav") if self._gain is not None: self.set_mic_gain(gain=self._gain) # record via subprocess for self._capture_duration_seconds while saving the file to the # temporary directory /tmp: file = "/tmp/" + next(tempfile._get_candidate_names()) + ".wav" # Note that ecasound does not work with a pre-created TemporaryFile, hence we only pass the name of the # temporary file to ecasound and allow ecasound to manage the file object itself: cmd = [ "ecasound", "-f:%s,%s,%s" % (self.sample_format, self.channel_count, self._audio_sample_frequency_hz), "-t", str(self._capture_duration_seconds), "-i:alsa,", "-o", "%s" % file, "-q", ] cmd_str = " ".join(cmd) self._ecasound: subprocess = subprocess.Popen(cmd) logger.debug("ecasound command: %s" % cmd_str) self._ecasound.wait() heartbeat_payload = {"Audio": 0} # Write CSV for health status: pi_name = filename.split("@")[0] date = filename.split("@")[1] daily_audio_sensor_csv_file_path = os.path.join( uploader_output_dir, f"{pi_name}@{date}.csv" ) # Create the temporary file for the day's recorded file sizes (should it not already exist): filename, date, sensor_name = importutils.parse_information_from_file_path( file_path=daily_audio_sensor_csv_file_path ) record_time = filename.split('@')[-1] record_time = record_time.split('.')[0] if self._ecasound.returncode != 0: # If the audio recording fails, log it. logger.critical( f"ecasound exited with failure code {self._ecasound.returncode}" ) heartbeat_payload["Audio"] = float("NAN") else: # Ensure the halt recording flag is not set: if self._halt_recording_request_received.is_set(): logger.info("Audio recording halted by stop request.") heartbeat_payload["Audio"] = float("NAN") return heartbeat_payload # Convert a stereo microphone to a mono sound channel: if int(self.channel_count) > 1: self.make_stereo_file_mono(file) # Move and re-name the file from /tmp to the upload directory /home/bee/appmais/bee_tmp/ via subprocess: mv_cmd = ["mv", file, output_file_path] mv = subprocess.Popen(mv_cmd) mv.wait() if mv.returncode != 0: logger.critical(f"mv exited with failure code: {mv.returncode}!") logger.info("Finished audio recording!") # Write the daily CSV file for the audio sensor: filehandlingutils.write_sensor_csv_file( start_time_queue=self._recording_start_time_queue, file_path=daily_audio_sensor_csv_file_path, data=[record_time, heartbeat_payload['Audio']] ) return heartbeat_payload
[docs] @staticmethod def make_stereo_file_mono(out_path: str): """ If the mic must record a stereo file with redundant channels, this method will create a new file from just the left channel, and then overwrite the original file with the mono file before uploading it to the server. """ logger.info("Making the stereo file mono.") mono_tmp_path = out_path[:-4] + "-mono-temp.wav" output_path = os.path.expanduser(out_path) # create a mono temp file mono_cmd = [ "ecasound", "-a:1", "-i", '"%s"' % output_path, "-a:1", "-f:24,1,48000", "-o", '"%s"' % mono_tmp_path, "-chmute:2", "-D", ] ecasound2 = subprocess.Popen( mono_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE ) # self.logger.debug("ecasound2 command: %s" % mono_cmd_str) out, err = ecasound2.communicate() # Code to print error and output # self.logger.debug(err) # self.logger.debug(out) if ecasound2.returncode != 0: logger.critical( f"ecasound2 exited with failure code {ecasound2.returncode}" ) # rename mono file with the correct name (overwrite the stereo file) mv_cmd = ["mv", mono_tmp_path, out_path] mv = subprocess.Popen(mv_cmd) mv.wait() if mv.returncode != 0: logger.critical(f"mv exited with failure code {mv.returncode}")
[docs] def start(self) -> None: """ Prints the optionally overridden configuration values for the audio device to the console for debugging purposes. """ super().start() logger.info("Initializing audio capture device:") # .. todo:: Here we can optionally reload the configuration file for the concrete subclass if we so wish. logger.info(f"Audio sample frequency: {self._audio_sample_frequency_hz}") logger.info(f"Sample format: {self._sample_format}") logger.info(f"Channel count: {self._channel_count}") logger.info(f"Gain: {self._gain}")
# def stop(self, socket_connection: Optional[socket.socket] = None) -> Optional[str]: # error_message: Optional[str] = None # super().stop(socket_connection=socket_connection) # logger.info('Stopping audio capture device.') # self._halt_recording_request_received = True # if self._ecasound is not None: # # sends SIGTERM to ecasound: # self._ecasound.terminate() # self._ecasound.wait() # logger.info('Halted audio capture device') # return error_message
[docs] @staticmethod def set_mic_gain(gain: int): """ Set the mic gain to something other than it's default level. """ # .. todo:: change these to config settings # self.side (left) doesn't work with amixer, so we would need to define the device name of the mic in the config mic_gain = f"{gain},{gain}" # set the gain on the mic to 0dB gain_cmd = ["amixer", "cset", "numid=3", "%s" % mic_gain, "> /dev/null 2>&1"] amixer = subprocess.Popen(gain_cmd) # self.logger.debug("amixer command: %s" % gain_cmd_str) amixer.wait() if amixer.returncode != 0: logger.critical(f"amixer exited with failure code {amixer.returncode}.")
# @staticmethod # def load_and_parse_sensor_settings_from_configuration_file() -> Tuple[str]: # local_directory: str # sample_frequency: int # # root_local_directory, _, _, _ = read_and_parse_global_configuration_settings() # local_directory: str = os.path.join(root_local_directory, 'audio') # sample_frequency: int = # def command_method_hooks(self) -> Dict[str, Callable]: # method_hooks: Dict[str, Callable] = { # 'set_gain': self.set_mic_gain # # 'stop_audio': self.stop # } # return method_hooks @property def audio_sample_frequency_hz(self) -> int: return self._audio_sample_frequency_hz @property def sample_format(self) -> int: return self._sample_format @property def channel_count(self) -> int: return self._channel_count @property def gain(self) -> int: return self._gain
[docs] class Video(Sensor):
[docs] def __init__( self, recording_start_time_queue: Queue, config: Optional[Config] = None, fps: Optional[int] = 30, resolution_x: Optional[int] = 640, resolution_y: Optional[int] = 480, flip_video: Optional[bool] = False, still_frame: Optional[bool] = False ): """ Args: recording_start_time_queue (~multiprocessing.Queue): A reference/pointer to the :class:`~multiprocessing.Queue` instance that houses the :class:`~datetime.datetime`'s at which recording is to begin for the sensor. config (Optional[Config]): A reference/pointer to an existing beemon :class:`~configuration.Config` object, should one exist. If ``None`` is provided, a new beemon configuration :class:`~configuration.Config` object will be created for the ``beemon-config.ini`` in the repository's root directory. fps (Optional[int]): .. todo:: Docstring. If ``None`` is provided, this value will be determined by the :class:`~configuration.Config` object provided during instantiation. If no :class:`~configuraiton.Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. resolution_x (Optional[int]): .. todo:: Docstring. If ``None`` is provided, this value will be determined by the :class:`~configuration.Config` object provided during instantiation. If no :class:`~configuration.Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. resolution_y (Optional[int]): .. todo:: Docstring. If ``None`` is provided, this value will be determined by the :class:`~configuration.Config` object provided during instantiation. If no :class:`~configuration.Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. flip_video (Optional[int]): .. todo:: Docstring. If ``None`` is provided, this value will be determined by the :class:`~configuration.Config` object provided during instantiation. If no :class:`~configuration.Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. still_frame (Optional[bool]): Whether or not to take images instead of a video. If ``True``, then a single frame is captured instead of a video. The ``frames_per_second`` setting is ignored when capturing a still frame and the ``flip_video`` setting will flip both videos and still frame recordings. If ``False`` a video will be captured instead. If ``None`` is provided, this value will be determined by the :class:`~configuration.Config` object provided during instantiation. If no :class:`~configuration.Config` object was provided during instantiation, this value will default to the value specified in the ``beemon-config.ini`` found in the repository's root directory. """ super().__init__( recording_start_time_queue=recording_start_time_queue, config=config ) self._fps: int = int(self._beemon_config.video(key="fps", default=fps)) self._resolution_x: int = int( self._beemon_config.video(key="resolution_x", default=resolution_x) ) self._resolution_y: int = int( self._beemon_config.video(key="resolution_y", default=resolution_y) ) self._flip_video: bool = bool( strtobool(self._beemon_config.video(key="flip_video", default=flip_video)) ) self._still_frame: bool = bool( strtobool(self._beemon_config.video(key="still_frame", default=still_frame)) )
# self._halt_recording_request_received: bool = False @property def fps(self) -> int: return self._fps @property def resolution_x(self) -> int: return self._resolution_x @property def resolution_y(self) -> int: return self._resolution_y @property def flip_video(self) -> bool: return self._flip_video @property def still_frame(self) -> bool: return self._still_frame
[docs] def start(self) -> None: """ Prints the optionally overridden configuration values for the video camera device to the console for debugging purposes. """ super().start() logger.info("Initializing video capture device.") logger.info(f"FPS: {self._fps}") logger.info(f"Resolution (x): {self._resolution_x}") logger.info(f"Resolution (y): {self._resolution_y}") logger.info(f"Flip video: {self._flip_video}") logger.info(f"Still frame: {self._still_frame}") return
# def stop(self, socket_connection: Optional[socket.socket] = None) -> Optional[str]: # error_message: Optional[str] = None # super().stop(socket_connection=socket_connection) # logger.info('Stopping video capture device.') # self._halt_recording_request_received = True # return error_message
[docs] def record(self, filename: str, uploader_output_dir: str) -> Dict[str, Any]: """ Presumably opens a filestream-like buffer (via the :class:`picamera2.Picamera2` context manager) and records a video performing an IO-blocking operation for the duration set with ``self.capture_duration_seconds``. The video initially records to a tmp file in the ``/tmp`` directory. The file is then moved to uploader_output_dir when the recoding is complete. If ``still_frame`` in the beemon-config.ini is set to ``True``, then a single frame is captured instead of a video. The ``frames_per_second`` setting is ignored when capturing a still frame and the ``flip_video`` setting will flip both videos and still frame recordings. Args: filename (str): What the file name of the recorded measurement should be. For instance: ``"rpi4-11@2021-11-27@19-30-00"``. uploader_output_dir (str): The local output directory which the :class:`uploader.Uploader` :class:`multiprocessing.Process` is watching for file changes. The final (and renamed) recorded file should be output to this directory. For example: ``"home/bee/bee_tmp/video/2021-11-27/"``. """ # When still_frame is set, we capture a single frame instead of a video. if self._still_frame: logger.info("Started still frame capture!") file_extension = "png" else: logger.info("Started video recording!") file_extension = "h264" # Parse some useful attributes from the filename: output_filename = f"{filename}.{file_extension}" output_filepath = os.path.join(uploader_output_dir, output_filename) pi_name = filename.split("@")[0] # For the video sensor, we append the file size of the recording to the same CSV file all day: date = filename.split("@")[1] daily_video_file_size_csv_file_path = os.path.join( uploader_output_dir, f"{pi_name}@{date}.csv" ) # Perform the video camera recording: try: with picamera2.Picamera2() as camera: temp_file_suffix = f".h264" if not self._still_frame else f".png" _temp_file = tempfile.NamedTemporaryFile( suffix=f"{temp_file_suffix}", delete=False ) if self._still_frame: camera_config = camera.create_preview_configuration( main={"size": (self._resolution_x, self._resolution_y)}, raw={"size": camera.sensor_resolution}, transform=Transform(vflip=1 if self._flip_video else 0), controls={"Brightness": 0.2} ) camera.configure(camera_config) camera.start_preview(picamera2.Preview.DRM) camera.start() time.sleep(2) camera.capture_file(_temp_file.name) else: logger.info( f"Recording video for {self.capture_duration_seconds} seconds" ) # Configuration docs: https://datasheets.raspberrypi.com/camera/picamera2-manual.pdf video_config = camera.create_video_configuration( main={ "size": (self._resolution_x, self._resolution_y), }, raw={ "size": camera.sensor_resolution }, transform=Transform(vflip=1 if self._flip_video else 0), controls={ # FrameDurationLimits is in microseconds per frame, thus a conversion is needed "FrameDurationLimits": ( int(1 / self._fps * 100000), int(1 / self._fps * 100000), ), # Brightness on a scale of -1.0 to 1.0 "Brightness": 0.2, } ) camera.configure(video_config) encoder = H264Encoder() camera.start_recording(encoder, _temp_file.name) try: seconds = self._capture_duration_seconds while seconds > 0: if self._halt_recording_request_received.is_set(): logger.info( "Video recording in progress halted by stop request." ) return {"file_path": None, "file_size": None} else: time.sleep(1) seconds -= 1 camera.stop_recording() except Exception as err: logger.error(f"Error recording video: {err}") camera.close() return { "Video_FileSize": float("NAN") } # video filesize is sent to thingsboard but is not directly picked up by the database # sentinel value, can be detected with == "nan" except Exception as err: logger.error(f"Error recording: {err}.") # Returning an error to thingsboard. return {"Video_FileSize": float("NAN")} # Note: At this point the camera is "closed". if not self._still_frame: # Convert the video container format without losing the lossless compression ffmpeg_cmd = [ "ffmpeg", "-framerate", "30", "-i", _temp_file.name, "-c", "copy", # "-vf", # This is a video filter which allows us to change the speed to 2/3 to correct the output # "setpts=1.5*PTS", # Specifically, this changes each frame presentation timestamp to 1.5x the current value "-loglevel", "warning", output_filepath, ] ffmpeg = subprocess.Popen(ffmpeg_cmd) ffmpeg.wait() if ffmpeg.returncode != 0: logger.critical(f"ffmpeg exited with failure code: {ffmpeg.returncode}!") else: # Remove the temp file if successfully transcoded rm_cmd = ["rm", _temp_file.name] rm = subprocess.Popen(rm_cmd) rm.wait() if rm.returncode != 0: logger.critical(f"rm exited with failure code {rm.returncode}.") # Create the temporary file for the day's recorded file sizes (should it not already exist): filename, date, sensor_name = importutils.parse_information_from_file_path( file_path=output_filepath ) record_time = filename.split("@")[-1] record_time = record_time.split(".")[0] # Get the file size and append it to the daily CSV file: file_size_bytes = os.path.getsize(output_filepath) if file_size_bytes is None: file_size_bytes = float( "NAN" ) # sentinel value, can be detected with == "nan" # If this is not the last recording of the day, write as normal. filehandlingutils.write_sensor_csv_file( start_time_queue=self._recording_start_time_queue, file_path=daily_video_file_size_csv_file_path, data=[record_time, file_size_bytes], ) logger.info("Finished video recording!") return {"Video_FileSize": file_size_bytes} else: mv_command = ["mv", _temp_file.name, output_filepath] mv = subprocess.Popen(mv_command) mv.wait() if mv.returncode != 0: logger.critical(f"mv exited with failure code {mv.returncode}.") return {"Video_FileSize": float("NAN")}
[docs] class AirQuality(Sensor):
[docs] def __init__( self, recording_start_time_queue: Queue, config: Optional[Config] = None ): """ Args: recording_start_time_queue (Queue): A reference/pointer to the :class:`multiprocessing.Queue` instance that houses the :class:`datetime.datetime`'s at which recording is to begin for the sensor. config (Optional[Config]): A reference/pointer to an existing beemon :class:`Config` object, should one exist. If ``None`` is provided, a new beemon configuration :class:`Config` object will be created for the ``beemon-config.ini`` in the repository's root directory. """ super().__init__(recording_start_time_queue, config=config) self._device_path = self._get_device_path() self._sensor = SDS011(self._device_path)
[docs] def _get_device_path(self) -> str: """ Dynamically obtains the device path of the air quality sensor and returns it. Ex. "/dev/tty/USB0" on success and None on failure. Returns: The device path on success, and the empty string on failure """ cmd = "ls -al /dev/ | grep ttyUSB | awk {' print $10 '}" dev_path = subprocess.getoutput(cmd) return "/dev/" + dev_path
[docs] def start(self) -> None: super().start() logger.info("Initializing Air Quality Sensor.") logger.info(f"Device Path: {self._device_path}")
[docs] def record(self, filename: str, uploader_output_dir: str) -> Dict[str, float]: """ Utilizes the :mod:`sds011` library to take a air quality reading from the sensor, appending it to a new CSV file daily. This method then copies the daily record of air quality readings into the upload directory, and copies an additional version to the sensor aggregation directory. This sensor writes directly to the ``uploader_output_dir``; it does NOT write to the ``/tmp`` directory. It writes to the csv file via the :func:`src.utils.filehandlingutils.write_sensor_csv_file` method. Logic for determining the final sensor recording of the day (via a .final file extension) is found there. See Also: The SDS011 sensor's `datasheet <https://cdn-reichelt.de/documents/datenblatt/X200/SDS011-DATASHEET.pdf>`_. The definitions of `pm25 and pm10 <https://www.insee.fr/en/metadonnees/definition/c2196>`_. Returns: Dict[str, float]: A dictionary that contains each type of reading (str) and their measurements (float) A reading would be returned as {"pm25": 1.6, "pm10": 2.0} """ pi_name = filename.split("@")[0] record_time = filename.split("@")[-1] record_time = record_time.split(".")[0] # For the air quality sensor, we append to the same csv all day: date = filename.split("@")[1] daily_air_quality_file_path = os.path.join( uploader_output_dir, f"{pi_name}@{date}.csv" ) # Read the current humidity and temperature (in Celsius) from the sensor: pm25, pm10 = self._sensor.query() if pm25 is None: pm25 = float("NAN") # sentinel value if pm10 is None: pm10 = float("NAN") # sentinel value logger.debug(f"pm25: {pm25} and pm10: {pm10}") # Create the temporary file for the day (should it not already exist) and append: filehandlingutils.write_sensor_csv_file( start_time_queue=self._recording_start_time_queue, file_path=daily_air_quality_file_path, data=[record_time, pm25, pm10], ) return {"pm25": pm10, "pm10": pm10}
[docs] class Temp(Sensor):
[docs] def __init__( self, recording_start_time_queue: Queue, config: Optional[Config] = None ): """ Args: recording_start_time_queue (Queue): A reference/pointer to the :class:`multiprocessing.Queue` instance that houses the :class:`datetime.datetime`'s at which recording is to begin for the sensor. config (Optional[Config]): A reference/pointer to an existing beemon :class:`Config` object, should one exist. If ``None`` is provided, a new beemon configuration :class:`Config` object will be created for the ``beemon-config.ini`` in the repository's root directory. """ super().__init__(recording_start_time_queue, config=config) self._dht_pin = board.D18 self._dht_sensor = adafruit_dht.DHT22(self._dht_pin, use_pulseio=False)
[docs] def start(self) -> None: super().start() logger.info("Initializing temp/humidity capture device.") logger.info(f"DHTSensor: {self._dht_sensor}") logger.info(f"DHT PIN: {self._dht_pin}")
[docs] def record(self, filename: str, uploader_output_dir: str) -> Dict[str, float]: """ Utilizes the :mod:`Adafruit_DHT` library to take a temperature and humidity reading from the sensor, appending it to a new CSV file daily. This method then copies the daily record of temperature and humidity readings into the upload directory, and copies an additional version to the sensor aggregation directory. This sensor writes directly to the ``uploader_output_dir``; it does NOT write to the ``/tmp`` directory. It writes to the csv file via the :func:`src.utils.filehandlingutils.write_sensor_csv_file` method. Logic for determining the final sensor recording of the day (via a .final file extension) is found there. Returns: Dict[str, float]: A dictionary that contains each type of reading (str) and their measurements (float) A humidity reading of 2 would be returned as {"Humidity": 2} """ pi_name = filename.split("@")[0] record_time = filename.split("@")[-1] record_time = record_time.split(".")[0] # For the temperature/humidity sensor, we append to the same csv all day: date = filename.split("@")[1] daily_temp_humidity_file_path = os.path.join( uploader_output_dir, f"{pi_name}@{date}.csv" ) # Read the current humidity and temperature (in Celsius) from the sensor: temperature = None humidity = None tries_left = 10 while (temperature is None and humidity is None) and tries_left > 0: try: humidity = self._dht_sensor.humidity temperature = self._dht_sensor.temperature except RuntimeError as error: logger.debug( f"Temp sensor failed (probably normal): {error.args[0]}. Tries left: {tries_left}" ) temperature = None humidity = None time.sleep(2) except Exception as error: logger.debug(f"TEMP SENSOR FATAL ERROR: {error.args[0]}") temperature = None humidity = None break tries_left -= 1 if humidity is None or humidity > 100: humidity = float("NAN") # sentinel value if temperature is None: # Sentinel value is lower since negative values are common with Celsius. temperature = float("NAN") # sentinel value logger.debug(f"Initial temperature: {temperature} and humidity: {humidity}") # Convert the temperature from Celsius to Fahrenheit: temperature_fahrenheit = (float(temperature) * 1.8) + 32 logger.debug(f"Temperature (Fahrenheit): {temperature_fahrenheit} degrees") # Create the temporary file for the day (should it not already exist) and append: filehandlingutils.write_sensor_csv_file( start_time_queue=self._recording_start_time_queue, file_path=daily_temp_humidity_file_path, data=[record_time, round(temperature_fahrenheit, 2), round(humidity, 2)], ) return {"Temperature": temperature_fahrenheit, "Humidity": humidity}
[docs] class Scale(Sensor):
[docs] def __init__( self, recording_start_time_queue: Queue, config: Optional[Config] = None ): """ Args: recording_start_time_queue (Queue): A reference/pointer to the :class:`multiprocessing.Queue` instance that houses the :class:`datetime.datetime`'s at which recording is to begin for the sensor. config (Optional[Config]): A reference/pointer to an existing beemon :class:`Config` object, should one exist. If ``None`` is provided, a new beemon configuration :class:`Config` object will be created for the ``beemon-config.ini`` in the repository's root directory. """ super().__init__(recording_start_time_queue, config=config) GPIO.setmode(GPIO.BCM) self._weight_multiple = float(self._beemon_config.scale("weight_multiple")) self._weight_offset = float(self._beemon_config.scale("weight_offset")) self._hx711 = hx711.HX711(dout_pins=[int(17)], sck_pin=int(27))
[docs] def start(self) -> None: super().start() logger.info("Starting scale.") GPIO.setmode(GPIO.BCM) self._hx711.reset() self._hx711.set_weight_multiples(self._weight_multiple) logger.info(f"Scale weight_multiple: {self._weight_multiple}") logger.debug(f"Scale offset: {self._hx711.get_offset()}") pass
[docs] def record(self, filename: str, uploader_output_dir: str) -> Dict[str, Any]: """ Utilizes the :mod:`hx711_multi` library to take a weight reading from the scale, appending it to a new CSV file daily. This method then copies the daily record of weight readings into the upload directory, and copies an additional version to the sensor aggregation directory. This sensor writes directly to the ``uploader_output_dir``; it does NOT write to the ``/tmp`` directory. It writes to the csv file via the :func:`src.utils.filehandlingutils.write_sensor_csv_file` method. Logic for determining the final sensor recording of the day (via a .final file extension) is found there. Returns: Dict[str, Any]: A dictionary that contains a "Scale" reading (str) and the weight measured (float) in kilograms (kg). A weight reading of 50kg would be returned as {"Scale": 50} """ # Get record time from file name record_time = filename.split("@")[-1] record_time = record_time.split(".")[0] # Prepare sensor for read self._hx711.reset() # Set the weight multiple again because the weight multiple does not persist from start() to record(). self._hx711.set_weight_multiples(self._weight_multiple) # For the scale sensor, we append to the same csv all day: pi_name = filename.split("@")[0] date = filename.split("@")[1] daily_scale_file_path = os.path.join( uploader_output_dir, f"{pi_name}@{date}.csv" ) logger.info("Recording weight") result = [None] try: # result returns a list, first element is the weight we need. if None not in self._hx711.read_raw(): result = self._hx711.read_weight( readings_to_average=30, use_prev_read=True ) if result[0] is None: logger.critical("Scale failed to read a valid result") result = [float("NAN")] # sentinel value else: result[0] -= self._weight_offset except Exception as ex: result = [float("NAN")] # sentinel value logger.critical(f"Scale failed to read weight: {ex}") # Clear GPIO pin configurations after reading. # Correct the weight with the zero offset logger.debug(f"Weight is: {result}") # If it is not the final recording of the day, append to the existing file. filehandlingutils.write_sensor_csv_file( start_time_queue=self.recording_start_time_queue, file_path=daily_scale_file_path, data=[record_time, result[0]], ) return {"Scale": result[0]}
[docs] class Cpu(Sensor):
[docs] def __init__( self, recording_start_time_queue: Queue, config: Optional[Config] = None ): """ Args: recording_start_time_queue (Queue): A reference/pointer to the :class:`multiprocessing.Queue` instance that houses the :class:`datetime.datetime`'s at which recording is to begin for the sensor. config (Optional[Config]): A reference/pointer to an existing beemon :class:`Config` object, should one exist. If ``None`` is provided, a new beemon configuration :class:`Config` object will be created for the ``beemon-config.ini`` in the repository's root directory. """ super().__init__(recording_start_time_queue, config=config)
[docs] def start(self) -> None: super().start() logger.info("Initializing cpu temperature reading.") logger.info("Initializing cpu voltage reading.")
[docs] def record(self, filename: str, uploader_output_dir: str) -> Dict[str, float]: """ This sensor writes directly to the ``uploader_output_dir``; it does NOT write to the ``/tmp`` directory. It writes to the csv file via the :func:`src.utils.filehandlingutils.write_sensor_csv_file` method. Logic for determining the final sensor recording of the day (via a .final file extension) is found there. Returns: Dict[str, float]: A dictionary that contains each type of reading (str) and their measurements (float) A voltage reading of 2 volts and temp of 80 Fahrenheit would be returned as ``{"Voltage": 2, "Cpu": 80}``. """ pi_name = filename.split("@")[0] record_time = filename.split("@")[-1] record_time = record_time.split(".")[0] vcgm = Vcgencmd() # For the cpu sensor, we append to the same csv all day: date = filename.split("@")[1] daily_cpu_temp_file_path = os.path.join( uploader_output_dir, f"{pi_name}@{date}.csv" ) # Read the cpu temperature (in Celsius): with open("/sys/class/thermal/thermal_zone0/temp") as file: cpu_temp = int(file.readline()) / 1000 if cpu_temp is None: cpu_temp = float("NAN") # sentinel value # Read the cpu voltage (in volts) cpu_voltage = vcgm.measure_volts("core") if cpu_voltage is None: cpu_voltage = float("NAN") # sentinel value # logger.debug(f"cpu temperature: {cpu_temp}") # Convert the temperature from Celsius to Fahrenheit: temperature_fahrenheit = (float(cpu_temp) * 1.8) + 32 logger.debug(f"cpu temperature (Fahrenheit): {temperature_fahrenheit}") logger.debug(f"cpu voltage (volts): {cpu_voltage}") # Create the temporary file for the day (should it not already exist) and append: filehandlingutils.write_sensor_csv_file( start_time_queue=self.recording_start_time_queue, file_path=daily_cpu_temp_file_path, data=[record_time, round(temperature_fahrenheit, 2), round(cpu_voltage, 2)], ) return {"Cpu": temperature_fahrenheit, "Voltage": cpu_voltage}
if __name__ == "__main__": logger.debug("Inside '__main__'")