import abc
import csv
import os
import time
from collections import OrderedDict
from datetime import datetime, timedelta
from distutils.util import strtobool
from enum import Enum
import platform
from logging import Logger
from typing import Union, Dict, List, Optional, Any, Tuple
import pytz
from src.beemon.configuration import Config, ConfigurationSections
from src.utils import importutils
from loguru._logger import Logger as LoguruLogger
from src.utils.datetimeutils import military_time_to_edt_datetime_aware, military_time_to_cest_datetime_aware
[docs]
class HealthStatuses(Enum):
"""
Enumerated types representing the health status of the sensor in accordance with the OpenAPI specification detailed
in `AppMAIS/api/openapi.yaml`
"""
UNKNOWN = -2
NA = -1
OK = 0
WARNING = 1
CRITICAL = 2
[docs]
class SettingSectionProxy(metaclass=abc.ABCMeta):
def __init__(self, setting_section_name: str, section_type: ConfigurationSections, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
self._logger = logger
self._setting_section_name = setting_section_name
self._section_type = section_type
self._beemon_config = beemon_config
# See if the section has any overrides in the ``beemon-config.ini`` file:
(
self._pytz_time_zone,
self._root_upload_directory,
self._local_output_directory,
self._capture_window_start_time,
self._capture_window_end_time,
self._capture_duration_seconds,
self._capture_interval_seconds,
self._auto_start,
self._overridden_global_values
) = self._set_global_settings_with_section_overrides()
[docs]
def _set_global_settings_with_section_overrides(self) -> (
Tuple[pytz.timezone, str, str, datetime, datetime, int, int, bool, List[str]]
):
"""
Retrieves the global settings from the [global] section of the beemon-config.ini file and then overrides any
settings that are present in the section that this class is representing.
Returns:
Tuple[:class:`datetime.tzinfo`, str, str, :class:`datetime.datetime`, :class:`datetime.datetime`, int, int, bool]:
- **pytz_time_zone** (:class:`datetime.tzinfo`): The timezone to which ``capture_window_start_time`` and
``capture_window_end_time`` should be converted to when populating the recording start time queues.
- **root_upload_dir** (*str*): The absolute path to the upload directory on the specified ``'host'`` machine
in the ``['ftp']`` setting of the ``beemon-config.ini``. This parameter **is not** expected to be
overridden on a per-sensor basis.
- **local_output_dir** (*str*): The absolute path to the temporary data storage location on the Raspberry Pi
to which data will be written prior to being uploaded to the specified ``'host'``. This parameter
**is not** expected to be overridden on a per-sensor basis.
- **capture_window_start_datetime** (:class:datetime.datetime`): A :mod:`datetime` aware object which
specifies what time to start recording data from all sensors. Note that this parameter may be overridden
on a per-sensor basis.
- **capture_window_end_datetime** (:class:`datetime.datetime`): A :mod:`datetime` aware object which
specifies what time to start recording data from all sensors. Note that this parameter may be overridden
on a per-sensor basis.
- **capture_duration_seconds** (*int*): How long each sensor should be instructed to record for (if
applicable) in seconds. Note that this parameter may be overridden on a per-sensor basis.
- **capture_interval_seconds** (*int*): How long to wait (in seconds) before instructing all sensors to
record again for the duration specified by ``capture_duration_seconds``. Note that this parameter may be
overridden on a per-sensor basis.
- **auto_start** (*bool*): Whether all sensors should be instructed to automatically record at the
specified ``capture_window_start_time``. If this flag is disabled, then recording commands are expected
to be issued manually. Note that this parameter may be overridden on a per-sensor basis.
- **overridden_settings** (*List[str]*): A list of all global settings that are overridden in the specific
settings section.
Raises:
NotImplementedError: This method will raise a ``NotImplementedError`` in the event that the
specified ``pytz_time_zone`` in the ``beemon-config.ini`` does not yet have a corresponding datetime
conversion method in :mod:`~src.utils.datetimeutils`.
"""
# Retrieve the global settings with overrides from the section:
(
pytz_time_zone,
root_upload_directory,
local_output_directory,
capture_window_start_time,
capture_window_end_time,
capture_duration_seconds,
capture_interval_seconds,
auto_start
) = importutils.parse_global_sensor_settings_from_config_file(
beemon_config=self._beemon_config,
sensor_name=self._section_type.value.lower(),
sensor_override=True
)
global_setting_names = [
"pytz_time_zone",
"root_upload_directory",
"local_output_directory",
"capture_window_start_time",
"capture_window_end_time",
"capture_duration_seconds",
"capture_interval_seconds",
"auto_start"
]
overridden_settings = []
# Retrieve the section specific settings:
section_specific_settings = self.get_settings_section()
# Find all the settings that are in both the global settings and the section specific settings:
for global_setting_name in global_setting_names:
if global_setting_name in section_specific_settings.keys():
overridden_settings.append(global_setting_name)
return (
pytz_time_zone,
root_upload_directory,
local_output_directory,
capture_window_start_time,
capture_window_end_time,
capture_duration_seconds,
capture_interval_seconds,
auto_start,
overridden_settings
)
[docs]
def update_setting(self, setting: str, value: Union[str, int, float, bool]):
"""
Updates a setting in the beemon-config.ini under a particular section.
Args:
setting (str): The setting to update.
value (Union[str, int, float, bool]): The value for the setting to update.
"""
self._beemon_config.instance().configuration.set(self._section_type.value.lower(), setting, value)
self._beemon_config.instance().write()
[docs]
def update_settings_section(self, settings: Dict[str, Union[str, int, float, bool]]):
"""
Updates multiple settings in the ``beemon-config.ini`` under a particular section.
Args:
settings (Dict[str, Union[str, int, float, bool]]): A dictionary of setting names and setting values to
update.
"""
self.logger.debug(f"update_setting_section({settings})")
self.update_settings_section_in_memory(settings=settings)
self._beemon_config.instance().write()
[docs]
def update_settings_section_in_memory(self, settings: Dict[str, Union[str, int, float, bool]]):
"""
Updates the settings in memory for a particular section. This method is useful for updating the settings in
memory without writing to the configuration file. **Be Careful** when using this method, as any crash could
cause the settings changes to be lost. This should only be used when attempting a bulk write, with the
:class:`Config` write method called as soon as all changes are staged.
Args:
settings (Dict[str, Union[str, int, float, bool]]): A dictionary of setting names and setting values to
update.
"""
for setting, value in settings.items():
# configuration.set method needs strings for section, setting, and value, so we cast value to a string
self._beemon_config.instance().configuration.set(self._section_type.value.lower(), setting, str(value))
self._update_setting_on_proxy(setting_name=setting, value=value)
[docs]
def _update_setting_on_proxy(self, setting_name: str, value: Union[str, int, float, bool]):
"""
Updates a setting on the proxy object, to keep the settings on the object and in the configuration file in sync.
If updating the ``pytz_time_zone``, then the ``capture_window_start_time`` and ``capture_window_end_time`` will
be updated to reflect the new timezone as well.
Args:
setting_name (str): The name of the setting to update.
value (Union[str, int, float, bool]): The value of the setting to update. Some settings may require
conversion to the appropriate type before updating the setting.
"""
if setting_name == "capture_window_start_time" or setting_name == "capture_window_end_time":
# Convert the value to a datetime object:
if self._pytz_time_zone.zone == "US/Eastern":
value = military_time_to_edt_datetime_aware(military_time=value)
elif self._pytz_time_zone.zone == "Europe/Brussels":
value = military_time_to_cest_datetime_aware(military_time=value)
if setting_name == "pytz_time_zone":
# Convert the value to a pytz.timezone object:
value = pytz.timezone(value)
# Update the capture window start and end times to reflect the new timezone:
capture_window_start_military_time = self._capture_window_start_time.strftime("%H%M")
capture_window_end_military_time = self._capture_window_end_time.strftime("%H%M")
self.update_settings_section_in_memory({
"capture_window_start_time": capture_window_start_military_time,
"capture_window_end_time": capture_window_end_military_time
})
self.__setattr__(f"_{setting_name}", value)
self.logger.debug(f"Updated attribute {setting_name}: {self.__getattribute__(f'_{setting_name}')}")
[docs]
def get_settings_section(self) -> Dict[str, Union[str, int, float, bool]]:
"""
Retrieves a particular setting section from the ``beemon-config.ini`` file.
Returns:
Dict[str, Union[str, int, float, bool]]: A dictionary of the settings in the section.
"""
return dict(self._beemon_config.instance().configuration[self._section_type.value.lower()])
[docs]
@abc.abstractmethod
def settings(self) -> Dict[str, Any]:
"""
Retrieves the settings for the section. This method retrieves all the settings for the section, including any
default values or values that are retrieved from the [global] section of the beemon-config.ini file.
Returns:
Dict[str, any]: The key-value pairs of 'setting_name': 'setting_value'.
"""
raise NotImplementedError(
f"Concrete classes of type {self.__class__.__name__} must implement this method and return the key-value "
f"pairs of 'setting_name': 'setting_value'."
)
[docs]
def configuration_values(self) -> Dict[str, str]:
"""
Retrieves the configuration values for the section. This method retrieves only the values that are present in
the configuration file, not including any default values or values that are retrieved from the [global] section
of the beemon-config.ini file. Should add any values that are present in the configuration file, as well as
unpacking all of ``self._overridden_settings`` for all settings that override the global values.
Returns:
Dict[str, str]: The key-value pairs of 'setting_name': 'setting_value'. All values should be strings to
match the configuration file as closely as possible. The objects associated with the values should be
returned by the :meth:``settings`` method.
"""
section_specific_config = self._section_specific_configuration_values()
overridden_values_dict = {
overridden_setting_name: str(self.__getattribute__(f"_{overridden_setting_name}"))
for overridden_setting_name in self._overridden_global_values
}
# Convert to military time
if "capture_window_start_time" in self._overridden_global_values:
overridden_values_dict["capture_window_start_time"] = self._capture_window_start_time.strftime("%H%M")
if "capture_window_end_time" in self._overridden_global_values:
overridden_values_dict["capture_window_end_time"] = self._capture_window_end_time.strftime("%H%M")
return {
**section_specific_config,
**overridden_values_dict
}
[docs]
@abc.abstractmethod
def _section_specific_configuration_values(self) -> Dict[str, str]:
"""
.. todo:: Docstrings.
"""
raise NotImplementedError(
f"Concrete classes of type {self.__class__.__name__} must implement this method and return the key-value "
f"pairs of 'config_value_name': 'config_value'."
)
@property
def setting_section_name(self) -> str:
return self._setting_section_name
@property
def logger(self) -> Union[Logger, LoguruLogger]:
return self._logger
@property
def pytz_time_zone(self) -> pytz.timezone:
return self._pytz_time_zone
@property
def root_upload_directory(self) -> str:
return self._root_upload_directory
@property
def local_output_directory(self) -> str:
return self._local_output_directory
@property
def capture_window_start_time(self) -> datetime:
return self._capture_window_start_time
@property
def capture_window_end_time(self) -> datetime:
return self._capture_window_end_time
@property
def capture_duration_seconds(self) -> int:
return self._capture_duration_seconds
@property
def capture_interval_seconds(self) -> int:
return self._capture_interval_seconds
@property
def auto_start(self) -> bool:
return self._auto_start
[docs]
class GlobalProxy(SettingSectionProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(
setting_section_name="global", section_type=ConfigurationSections.GLOBAL, beemon_config=beemon_config,
logger=logger
)
[docs]
def settings(self) -> Dict[str, Union[str, int, float, bool]]:
# pytz_time_zone needs to be cast because pytz.timezone objects are not JSON serializable.
return {
'pytz_time_zone': str(self._pytz_time_zone),
'root_upload_directory': self._root_upload_directory,
'local_output_directory': self._local_output_directory,
'capture_window_start_time': self._capture_window_start_time,
'capture_window_end_time': self._capture_window_end_time,
'capture_duration_seconds': self._capture_duration_seconds,
'capture_interval_seconds': self._capture_interval_seconds,
'auto_start': self._auto_start
}
def _section_specific_configuration_values(self) -> Dict[str, Any]:
return {}
[docs]
class SFTPProxy(SettingSectionProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(
setting_section_name="sftp", section_type=ConfigurationSections.SFTP, beemon_config=beemon_config,
logger=logger
)
self._host: str = self._beemon_config.sftp(key="host")
self._port: int = int(self._beemon_config.sftp(key="port", default=21))
self._upload_delay: int = int(self._beemon_config.sftp(key="upload_delay"))
self._username: str = self._beemon_config.sftp(key="username")
self._private_key_file_path: str = self._beemon_config.sftp(key="private_key_file_path")
[docs]
def settings(self) -> Dict[str, Any]:
return {
'username': self._username,
'host': self._host,
'port': self._port,
'private_key_file_path': self._private_key_file_path,
'upload_delay': self._upload_delay,
'auto_start': self._auto_start,
'capture_window_start_time': self._capture_window_start_time,
'capture_window_end_time': self._capture_window_end_time
}
def _section_specific_configuration_values(self) -> Dict[str, str]:
return {
'username': self._username,
'host': self._host,
'port': str(self._port),
'private_key_file_path': self._private_key_file_path,
'upload_delay': str(self._upload_delay),
}
@property
def host(self) -> str:
return self._host
@property
def port(self) -> int:
return self._port
@property
def username(self) -> str:
return self._username
@property
def upload_delay(self) -> int:
return self._upload_delay
@property
def private_key_file_path(self) -> str:
return self._private_key_file_path
[docs]
class DashboardProxy(SettingSectionProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(
setting_section_name="dashboard", section_type=ConfigurationSections.DASHBOARD, beemon_config=beemon_config,
logger=logger
)
self._host: str = self._beemon_config.dashboard(key="host")
self._access_token: str = self._beemon_config.dashboard(key="access_token")
[docs]
def settings(self) -> Dict[str, Any]:
return {
'host': self._host,
'access_token': self._access_token,
'auto_start': self._auto_start,
}
def _section_specific_configuration_values(self) -> Dict[str, str]:
return {
'host': self._host,
'access_token': self._access_token
}
@property
def host(self) -> str:
return self._host
@property
def access_token(self) -> str:
return self._access_token
[docs]
class LogProxy(SettingSectionProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(
setting_section_name="log", section_type=ConfigurationSections.LOG, beemon_config=beemon_config,
logger=logger
)
self._logging_level: str = self._beemon_config.log(key="logging_level")
self._log_cycle_days: int = int(self._beemon_config.log(key="log_cycle_days", default=7))
self._max_file_size_megabytes: int = int(self._beemon_config.log(key="max_file_size_megabytes", default=10))
self._max_archived_files: int = int(self._beemon_config.log(key="max_archived_files", default=5))
[docs]
def settings(self) -> Dict[str, Any]:
return {
'logging_level': self._logging_level,
'log_cycle_days': self._log_cycle_days,
'max_file_size_megabytes': self._max_file_size_megabytes,
'max_archived_files': self._max_archived_files,
}
def _section_specific_configuration_values(self) -> Dict[str, str]:
return {
'logging_level': self._logging_level,
'log_cycle_days': str(self._log_cycle_days),
'max_file_size_megabytes': str(self._max_file_size_megabytes),
'max_archived_files': str(self._max_archived_files)
}
@property
def logging_level(self) -> str:
return self._logging_level
@property
def log_cycle_days(self) -> int:
return self._log_cycle_days
@property
def max_file_size_megabytes(self) -> int:
return self._max_file_size_megabytes
@property
def max_archived_files(self) -> int:
return self._max_archived_files
[docs]
class SensorProxy(SettingSectionProxy, metaclass=abc.ABCMeta):
def __init__(self, setting_section_name: str, section_type: ConfigurationSections, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(
setting_section_name, section_type, beemon_config, logger
)
# Parse the ThingsBoard/Dashboard configurations from the beemon-config.ini:
self._dashboard_host: str = self._beemon_config.dashboard(key="host")
self._dashboard_access_token: str = self._beemon_config.dashboard(
key="access_token"
)
appmais_local_directory = "/".join(self._local_output_directory.split("/")[:-1])
self._things_board_local_root_directory: str = os.path.join(
appmais_local_directory, "thingsboard"
)
# Construct an artificial recording time queue based on the sensor's settings:
self._sensor_recording_start_times = self.get_all_recording_start_times_within_window(
capture_window_start_time=self._capture_window_start_time,
capture_window_end_time=self._capture_window_end_time,
capture_duration_seconds=self._capture_duration_seconds,
capture_interval_seconds=self._capture_interval_seconds
)
# Variables relating to health status and the sensor's operational state:
if self._auto_start:
# If the sensor is set to auto-start, then the initial health status is UNKNOWN:
self.__health_status = HealthStatuses.UNKNOWN
else:
# If the sensor is not set to auto-start, then the health status will always be NOT APPLICABLE (until
# manually started):
self.__health_status = HealthStatuses.NA
def _update_setting_on_proxy(self, setting_name: str, value: Union[str, int, float, bool]):
super()._update_setting_on_proxy(setting_name=setting_name, value=value)
if setting_name == "capture_window_start_time" or setting_name == "capture_window_end_time" or \
setting_name == "capture_duration_seconds" or setting_name == "capture_interval_seconds":
self._sensor_recording_start_times = self.get_all_recording_start_times_within_window(
capture_window_start_time=self._capture_window_start_time,
capture_window_end_time=self._capture_window_end_time,
capture_duration_seconds=self._capture_duration_seconds,
capture_interval_seconds=self._capture_interval_seconds
)
[docs]
def get_all_recording_start_times_within_window(
self, capture_window_start_time: datetime, capture_window_end_time: datetime,
capture_duration_seconds: float, capture_interval_seconds: float) -> List[datetime]:
"""
Computes the maximum number of recordings which can occur between the the range of datetime objects (inclusive),
given that each recording has the specified interval of: ``capture_interval_seconds``. This method then returns
a list of datetime objects (non-naive and EDT timezone aware) which constitute the times at which recording
should begin in order to successfully record a maximum number of times between the supplied range of times.
Args:
capture_window_start_time (~datetime.datetime): The starting datetime (non-naive and inclusive) at which the first
recording session (of ``capture_duration_seconds`` length in seconds) is to begin at.
capture_window_end_time (~datetime.datetime): The ending datetime (non-naive and inclusive) at which the last
recording session (of ``capture_duration_seconds`` length in seconds) is to begin at.
capture_duration_seconds (float): The length/duration that each recording session between
``capture_window_start_time`` and ``capture_window_end_time`` should be. This method will produce the
times which recording commands should be issued to all registered :class:`Sensor` concrete subclass
objects.
capture_interval_seconds (float): How frequently to initialize recording sessions. For instance, a value of
``120.0`` seconds indicates that recording should be performed every two minutes from
``capture_window_start_time`` to ``capture_window_end_time`` (inclusive) for a duration of
``capture_duration_seconds``.
Returns:
list[~datetime.datetime]: .. todo:: Docstring.
"""
recording_start_times: List[datetime] = []
# We have a problem if the capture_duration meets or exceeds the capture_interval.
if capture_duration_seconds == capture_interval_seconds:
self.logger.warning(f"The specified \"capture_duration_seconds\" of: {capture_duration_seconds} seconds is "
f"equal to the specified \"capture_interval_seconds\" of: "
f"{capture_interval_seconds} seconds. The Sensor may not have enough time to perform "
f"post-processing operations between recording sessions. Some recording sessions may be "
f"skipped as a result (depending on the speed of the sensor\'s post-recording operations).")
elif capture_duration_seconds > capture_interval_seconds:
error_message = f"Invalid configuration. The specified \"capture_duration_seconds\" of: " \
f"{capture_duration_seconds} must not exceed the specified \"capture_interval_seconds\" " \
f"of: {capture_interval_seconds}. It does not make sense for a sensor to record for " \
f"{capture_duration_seconds} seconds for a period of every {capture_interval_seconds} " \
f"seconds (as the previous recording would still be in progress)."
self.logger.critical(error_message)
raise NotImplementedError(error_message)
elif capture_duration_seconds < capture_interval_seconds:
# This is the desired state.
# The sensor will hopefully have time to handle post-processing operations while waiting for the next
# scheduled recording time.
pass
next_recording_start_time = capture_window_start_time
while next_recording_start_time <= capture_window_end_time:
recording_start_times.append(next_recording_start_time)
next_recording_start_time = recording_start_times[-1] + timedelta(seconds=capture_interval_seconds)
# logger.debug(f"Recording start times: {[stime.isoformat() for stime in recording_start_times]}")
return recording_start_times
[docs]
def get_closest_recording_start_time(self, a_time: datetime):
"""
Returns the closest recording start time to the current time (inclusive).
Returns:
~datetime.datetime: The closest recording start time to the current time.
"""
deltas = [abs(recording_time - a_time) for recording_time in self._sensor_recording_start_times]
return self._sensor_recording_start_times[deltas.index(min(deltas))]
[docs]
def get_closest_previous_recording_start_time(self, a_time: datetime) -> Optional[datetime]:
"""
Returns the closest previous recording start time (inclusive) to the specified time. This can be useful for
determining if a sensor should be currently recording.
Args:
a_time (~datetime.datetime): The time to compare against the previous recording start times (inclusive).
Returns:
Optional[~datetime.datetime]: The closest previous recording time to the specified time, or `None` if there
were no previous recording times to the time provided.
"""
elapsed_recording_times = self.get_elapsed_recording_start_times(a_time=a_time, inclusive=False)
if len(elapsed_recording_times) > 0:
return elapsed_recording_times[-1]
else:
return None
[docs]
def get_closest_recording_time_with_tolerance(self, time: datetime, tolerance_seconds: int):
"""
Returns the closest recording time to the specified time within a tolerance of ``tolerance_seconds``. This can be
useful for determining the closest recording time if the sensor is already recording.
.. todo:: Docstrings.
"""
raise NotImplementedError()
# tolerance = timedelta(self._capture_duration_seconds)
# closest_last_recording_time = None
# for recording_time in self._sensor_recording_start_times:
# if abs(recording_time - time) == tolerance:
# raise NotImplementedError()
# if recording_time < time:
# closest_last_recording_time = recording_time
# else:
# break
# return closest_last_recording_time
[docs]
def get_elapsed_recording_start_times(self, a_time: datetime, inclusive: bool) -> List[Optional[datetime]]:
"""
Returns all recording start times for the sensor that were to have occurred before the specified time. This
method is optionally exclusive, in which case it will not return a recording start time that occurred at the
specified time.
Args:
a_time (datetime): The time to compare against the recording start times (exclusive).
inclusive (bool): If `True`, the recording start time that occurred at the specified time will be included in
the returned list. If `False`, the recording start time that occurred at the specified time will not be
included in the returned list.
Returns:
List[Optional[~datetime.datetime]]: A list of recording start times that have occurred before the provided
time. The returned list will be empty if there are no recording start times that have occurred before the
provided time.
"""
if a_time.tzinfo is None:
a_time = self._pytz_time_zone.localize(a_time)
elapsed_recording_start_times = []
for recording_time in self._sensor_recording_start_times:
if inclusive:
if recording_time <= a_time:
elapsed_recording_start_times.append(recording_time)
else:
if recording_time < a_time:
elapsed_recording_start_times.append(recording_time)
return elapsed_recording_start_times
[docs]
def should_have_recorded(self) -> bool:
"""
Determines if the sensor should have recorded by the current time.
Returns:
bool: True if the sensor should have recorded by the current time, False otherwise.
"""
# .. todo:: This needs some wiggle room for if the recording time is right now.
elapsed_recording_times = self.get_elapsed_recording_start_times(
a_time=datetime.now(tz=self._pytz_time_zone),
inclusive=False
)
return len(elapsed_recording_times) > 0
[docs]
def should_be_recording(self) -> bool:
"""
Determines if the sensor should be currently recording.
Returns:
bool: True if the sensor should be currently recording, False otherwise.
"""
closest_recording_time = self.get_closest_recording_start_time(
a_time=datetime.now(tz=self._pytz_time_zone)
)
recording_window = timedelta(seconds=self._capture_duration_seconds)
return closest_recording_time + recording_window > datetime.now(tz=self._pytz_time_zone)
[docs]
def get_health_status(self) -> HealthStatuses:
"""
Returns the health status of the sensor by reading the CSV in the output directory of the sensor. The health
status of the sensor is defined in the OpenAPI specification detailed in `AppMAIS/api/openapi.yaml`.
Returns:
HealthStatuses: The health status of the sensor.
"""
if not self._auto_start:
# If the sensor is not set to auto-start, then the health status will always be NOT APPLICABLE (until
# manually started):
# .. todo:: Logic for checking if the sensor has been manually started.
return HealthStatuses.NA
# The sensor was intended to auto-start.
''' Construct the file path that the sensor would be writing to: '''
_platform_name = platform.node()
# closest recording time:
closest_recording_time = self.get_closest_recording_start_time(
a_time=datetime.now(tz=self._pytz_time_zone)
)
self.logger.debug(f"Closest recording time: {closest_recording_time.isoformat()}")
_date_repr = closest_recording_time.strftime("%Y-%m-%d")
_time_repr = closest_recording_time.strftime("%H-%M-%S")
sensor_output_dir = os.path.join(self._local_output_directory, f"{self._setting_section_name.lower()}/{_date_repr}")
csv_filename = f"{_platform_name}@{_date_repr}.csv"
alternate_csv_filename = f"{csv_filename}.final"
csv_file = os.path.join(sensor_output_dir, csv_filename)
alternate_csv_file = os.path.join(sensor_output_dir, alternate_csv_filename)
if not os.path.exists(csv_file) and not os.path.exists(alternate_csv_file):
# The sensor's CSV file doesn't exist but the auto-start flag was set to true.
if self.should_have_recorded():
self.logger.debug(f"Sensor should have recorded by now, but no CSV file found.")
# The sensor should have recorded at least once by now
# We know there were no successful recordings (no CSV) so the severity of the failure is based on how
# many there were:
elapsed_recording_start_times = self.get_elapsed_recording_start_times(
a_time=datetime.now(tz=self._pytz_time_zone),
inclusive=False
)
self.logger.debug(f"Elapsed recording start times: {elapsed_recording_start_times}")
if len(elapsed_recording_start_times) > 1:
self.logger.debug(f"Sensor should have recorded at least once by now, but multiple missed recordings.")
# The sensor should have recorded at least once by now, but there are multiple missed recordings.
return HealthStatuses.CRITICAL
else:
# The sensor should have recorded only once by now, so there is only a single missed recording.
self.logger.debug(f"Sensor should have recorded only once by now, but missed its recording.")
return HealthStatuses.WARNING
else:
# The sensor should not have recorded at least once by now. We don't know its health status yet.
self.logger.debug(f"Sensor should not have recorded at least once by now. We don't know its health status yet.")
return HealthStatuses.UNKNOWN
else:
self.logger.debug(f"Sensor's CSV file exists.")
# The sensor's output CSV exists and the auto-start flag was set to true.
sensor_data_dict = self.read_csv(date=datetime.now(tz=self._pytz_time_zone).date()) # Read the sensor's recordings only once.
last_1_recording_time_values = self.get_last_n_successful_recording_values(
num_recording_values=1, sensor_data_dict=sensor_data_dict
)
if len(last_1_recording_time_values) == 1:
# The sensor did not miss the most recent recording time.
self.logger.debug(f"Sensor did not miss the most recent recording time. HealthStatus OK")
return HealthStatuses.OK
else:
# The sensor missed the last record time.
last_few_recording_time_values = self.get_last_n_successful_recording_values(
num_recording_values=2, sensor_data_dict=sensor_data_dict
)
if len(last_few_recording_time_values) == 1:
# The sensor missed only a single record time.
self.logger.debug(f"Sensor missed only a single record time. HealthStatus WARNING")
return HealthStatuses.WARNING
else:
# The sensor missed multiple record times.
self.logger.debug(f"Sensor missed multiple record times. HealthStatus CRITICAL")
return HealthStatuses.CRITICAL
[docs]
def get_last_n_successful_recording_values(
self, num_recording_values: int, sensor_data_dict: Optional[Dict[str, Union[float, Dict[str, float]]]] = None
) -> List[Union[float, Dict[str, float]]]:
"""
Retrieves the last ``num_recording_values`` successful recording values from the sensor's CSV file.
Args:
num_recording_values (int): The number of recording values to retrieve from the sensor's CSV file.
sensor_data_dict (Optional[Dict[str, Union[float, Dict[str, float]]]]): The dictionary of the sensor's CSV
file data. If ``None``, then the CSV file will be read. Provided as an argument such that this method
can be called multiple times without having to re-read the CSV file.
Returns:
List[Union[float, Dict[str, float]]: Ordered list of last ``num_recording_values`` recordings. This
is based on the number of anticipated recordings, so any missed or non-existent recordings will mean
less than ``num_recording_values`` values will be returned. Ordered by most recent first.
"""
if sensor_data_dict is None:
sensor_data_dict = self.read_csv(date=datetime.now(tz=self._pytz_time_zone).date())
recording_values = []
anticipated_recording_start_times_since_now = self.get_elapsed_recording_start_times(
a_time=datetime.now(tz=self._pytz_time_zone),
inclusive=False
)
last_n_anticipated_recording_start_times = anticipated_recording_start_times_since_now[-num_recording_values:]
for recording_time in last_n_anticipated_recording_start_times:
recording_time_str = recording_time.strftime("%H-%M-%S")
if recording_time_str in sensor_data_dict.keys():
recording_data = sensor_data_dict[recording_time_str]
if self._section_type is ConfigurationSections.TEMP:
recording = recording_data["temperature"]
if recording is not None:
recording_values.append(recording)
elif self._section_type is ConfigurationSections.AIRQUALITY:
pm10, pm25 = recording_data["pm10"], recording_data["pm25"]
if pm10 is not None and pm25 is not None:
recording_values.append({"pm10": pm10, "pm25": pm25})
elif self._section_type is ConfigurationSections.CPU:
cpu_temp, voltage = recording_data["cpu"], recording_data["voltage"]
if cpu_temp is not None and voltage is not None:
recording_values.append({"cpu_temp": cpu_temp, "voltage": voltage})
elif recording_data is not None:
recording_values.append(recording_data)
return recording_values
[docs]
def get_number_of_successful_recordings_on_date(self, date: datetime.date) -> int:
"""
Retrieves the number of successful (non `None`) recordings present in the sensor's CSV file for the specified
date.
Args:
date (~datetime.date): The date of recording for the sensor to read the CSV from.
Returns:
int: The number of successful (non `None`) recordings present in the sensor's CSV file for the specified
date.
"""
num_successful_recordings = 0
sensor_data_dict = self.read_csv(date=date)
for i, (recording_time_str, recording_data) in enumerate(sensor_data_dict.items()):
if self._section_type is ConfigurationSections.TEMP:
recording = recording_data["temperature"]
if recording is not None:
num_successful_recordings += 1
elif self._section_type is ConfigurationSections.AIRQUALITY:
pm10, pm25 = recording_data["pm10"], recording_data["pm25"]
if pm10 is not None and pm25 is not None:
num_successful_recordings += 1
elif self._section_type is ConfigurationSections.CPU:
cpu_temp, voltage = recording_data["cpu"], recording_data["voltage"]
if cpu_temp is not None and voltage is not None:
num_successful_recordings += 1
else:
if recording_data is not None:
num_successful_recordings += 1
return num_successful_recordings
[docs]
@abc.abstractmethod
def read_csv(self, date: datetime.date) -> Optional[Dict[str, Any]]:
"""
Reads the CSV file for the sensor on the specified date and returns the rows of the CSV in a dictionary,
indexed by the recording time.
Args:
date (~datetime.date): The date of recording for the sensor to read the CSV from.
Returns:
Optional[Dict[str, Any]]: A dictionary of the rows in the CSV file, indexed by the recording time. If the
CSV file does not exist, `None` will be returned.
"""
raise NotImplementedError(f"Concrete classes of type {self.__class__.__name__} must implement this method.")
@property
def local_output_directory(self) -> str:
return self._local_output_directory
@property
def capture_window_start_time(self) -> datetime:
return self._capture_window_start_time
@property
def capture_window_end_time(self) -> datetime:
return self._capture_window_end_time
@property
def capture_duration_seconds(self) -> int:
return self._capture_duration_seconds
@property
def capture_interval_seconds(self) -> int:
return self._capture_interval_seconds
@property
def auto_start(self) -> bool:
return self._auto_start
@property
def dashboard_host(self) -> str:
return self._dashboard_host
@property
def dashboard_access_token(self) -> str:
return self._dashboard_access_token
[docs]
class AudioProxy(SensorProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(setting_section_name="audio", section_type=ConfigurationSections.AUDIO, beemon_config=beemon_config, logger=logger)
self._sampling_frequency: int = int(
self._beemon_config.audio(
key="sampling_frequency", default=48000
)
)
self._sample_format: int = int(
self._beemon_config.audio(key="sample_format", default=24)
)
self._channel_count: int = int(
self._beemon_config.audio(key="channel_count", default=1)
)
self._gain: int = int(self._beemon_config.audio(key="gain", default=49))
self._set_gain: bool = bool(
strtobool(self._beemon_config.audio(key="set_gain", default=True))
)
[docs]
def settings(self) -> Dict[str, Union[int, bool]]:
return {
'sampling_frequency': self._sampling_frequency,
'sample_format': self._sample_format,
'channel_count': self._channel_count,
'gain': self._gain,
'capture_window_start_time': self._capture_window_start_time,
'capture_window_end_time': self._capture_window_end_time,
'capture_duration_seconds': self._capture_duration_seconds,
'capture_interval_seconds': self._capture_interval_seconds,
'auto_start': self._auto_start
}
def _section_specific_configuration_values(self) -> Dict[str, str]:
return {
'sampling_frequency': str(self._sampling_frequency),
'sample_format': str(self._sample_format),
'channel_count': str(self._channel_count),
'gain': str(self._gain)
}
[docs]
def read_csv(self, date: datetime.date) -> Optional[Dict[str, Optional[bool]]]:
csv_file = os.path.join(
self._local_output_directory,
f"{self._setting_section_name.lower()}/{date}/{platform.node()}@{date}.csv"
)
if not os.path.exists(csv_file):
return None
audio_sentinel_data = OrderedDict()
with open(csv_file, newline='\n') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',', quotechar='"')
for i, row in enumerate(csv_reader):
time_str = row[0]
audio_sentinel = row[1]
if audio_sentinel == 'nan':
audio_sentinel = None
else:
audio_sentinel = True
audio_sentinel_data[time_str] = audio_sentinel
return audio_sentinel_data
@property
def sampling_frequency(self) -> int:
return self._sampling_frequency
@property
def sample_format(self) -> int:
return self._sample_format
@property
def channel_count(self) -> int:
return self._channel_count
@property
def gain(self) -> int:
return self._gain
# def read_csv(self) -> Dict[str, ]:
[docs]
class VideoProxy(SensorProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(setting_section_name="video", section_type=ConfigurationSections.VIDEO,
beemon_config=beemon_config, logger=logger)
self._frames_per_second: int = int(self._beemon_config.video(key="frames_per_second", default=30))
self._resolution_x: int = int(
self._beemon_config.video(key="resolution_x", default=640)
)
self._resolution_y: int = int(
self._beemon_config.video(key="resolution_y", default=480)
)
self._flip_video: bool = bool(
strtobool(self._beemon_config.video(key="flip_video", default=False))
)
[docs]
def settings(self) -> Dict[str, Union[bool, int]]:
return {
'frames_per_second': self._frames_per_second,
'resolution_x': self._resolution_x,
'resolution_y': self._resolution_y,
'flip_video': self._flip_video,
'capture_window_start_time': self._capture_window_start_time,
'capture_window_end_time': self._capture_window_end_time,
'capture_duration_seconds': self._capture_duration_seconds,
'capture_interval_seconds': self._capture_interval_seconds,
'auto_start': self._auto_start
}
def _section_specific_configuration_values(self) -> Dict[str, any]:
return {
'frames_per_second': str(self._frames_per_second),
'resolution_x': str(self._resolution_x),
'resolution_y': str(self._resolution_y),
'flip_video': str(self._flip_video)
}
[docs]
def read_csv(self, date: datetime.date) -> Optional[Dict[str, Optional[float]]]:
csv_file = os.path.join(
self._local_output_directory,
f"{self._setting_section_name.lower()}/{date}/{platform.node()}@{date}.csv"
)
if not os.path.exists(csv_file):
return None
video_file_size_data = OrderedDict()
with open(csv_file, newline='\n') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',', quotechar='"')
for i, row in enumerate(csv_reader):
time_str = row[0]
video_file_size = row[1]
if video_file_size == "nan":
video_file_size = None
else:
video_file_size = float(video_file_size)
video_file_size_data[time_str] = video_file_size
return video_file_size_data
@property
def frames_per_second(self) -> int:
return self._frames_per_second
@property
def resolution_x(self) -> int:
return self._resolution_x
@property
def resolution_y(self) -> int:
return self._resolution_y
@property
def flip_video(self) -> bool:
return self._flip_video
[docs]
class AirQualityProxy(SensorProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(setting_section_name="airquality", section_type=ConfigurationSections.AIRQUALITY, beemon_config=beemon_config, logger=logger)
[docs]
def read_csv(self, date: datetime.date) -> Optional[Dict[str, Any]]:
csv_file = os.path.join(
self._local_output_directory,
f"{self._setting_section_name.lower()}/{date}/{platform.node()}@{date}.csv"
)
if not os.path.exists(csv_file):
return None
air_quality_data = OrderedDict()
with open(csv_file, newline='\n') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',', quotechar='"')
for i, row in enumerate(csv_reader):
time_str = row[0]
pm25 = row[1]
pm10 = row[2]
if pm25 == "nan":
pm25 = None
else:
pm25 = float(pm25)
if pm10 == "nan":
pm10 = None
else:
pm10 = float(pm10)
air_quality_data[time_str] = {"pm25": pm25, "pm10": pm10}
return air_quality_data
[docs]
def settings(self) -> Dict[str, Any]:
return {
'capture_window_start_time': self._capture_window_start_time,
'capture_window_end_time': self._capture_window_end_time,
'capture_interval_seconds': self._capture_interval_seconds,
'auto_start': self._auto_start
}
def _section_specific_configuration_values(self) -> Dict[str, any]:
return {}
[docs]
class TempProxy(SensorProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(setting_section_name="temp", section_type=ConfigurationSections.TEMP, beemon_config=beemon_config, logger=logger)
[docs]
def read_csv(self, date: datetime.date) -> Optional[Dict[str, Dict[str, Optional[float]]]]:
csv_file = os.path.join(
self._local_output_directory,
f"{self._setting_section_name.lower()}/{date}/{platform.node()}@{date}.csv"
)
if not os.path.exists(csv_file):
return None
temp_humid_data = OrderedDict()
with open(csv_file, newline='\n') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',', quotechar='"')
for i, row in enumerate(csv_reader):
time_str = row[0]
temperature = row[1]
if temperature == "nan":
temperature = None
else:
temperature = float(temperature)
humidity = row[2]
if humidity == "nan":
humidity = None
else:
humidity = float(humidity)
temp_humid_data[time_str] = {"temperature": temperature, "humidity": humidity}
return temp_humid_data
[docs]
def settings(self) -> Dict[str, Any]:
return {
'capture_window_start_time': self._capture_window_start_time,
'capture_window_end_time': self._capture_window_end_time,
'capture_interval_seconds': self._capture_interval_seconds,
'auto_start': self._auto_start
}
def _section_specific_configuration_values(self) -> Dict[str, any]:
return {}
[docs]
class ScaleProxy(SensorProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(setting_section_name="scale", section_type=ConfigurationSections.SCALE, beemon_config=beemon_config, logger=logger)
self._weight_multiple = float(self._beemon_config.scale("weight_multiple"))
self._weight_offset = float(self._beemon_config.scale("weight_offset"))
[docs]
def read_csv(self, date: datetime.date) -> Optional[Dict[str, Optional[float]]]:
csv_file = os.path.join(
self._local_output_directory,
f"{self._setting_section_name.lower()}/{date}/{platform.node()}@{date}.csv"
)
if not os.path.exists(csv_file):
return None
weight_data = OrderedDict()
with open(csv_file, newline='\n') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',', quotechar='"')
for i, row in enumerate(csv_reader):
time_str = row[0]
weight_kg = row[1]
if weight_kg == "nan":
weight_kg = None
else:
weight_kg = float(weight_kg)
weight_data[time_str] = weight_kg
return weight_data
[docs]
def settings(self) -> Dict[str, float]:
return {
'weight_multiple': self._weight_multiple,
'weight_offset': self._weight_offset,
'capture_window_start_time': self._capture_window_start_time,
'capture_window_end_time': self._capture_window_end_time,
'capture_interval_seconds': self._capture_interval_seconds,
'auto_start': self._auto_start
}
def _section_specific_configuration_values(self) -> Dict[str, any]:
return {
'weight_multiple': str(self._weight_multiple),
'weight_offset': str(self._weight_offset)
}
@property
def weight_multiple(self) -> float:
return self._weight_multiple
@property
def weight_offset(self) -> float:
return self._weight_offset
[docs]
class CPUProxy(SensorProxy):
def __init__(self, beemon_config: Config, logger: Union[Logger, LoguruLogger]):
super().__init__(setting_section_name="cpu", section_type=ConfigurationSections.CPU, beemon_config=beemon_config, logger=logger)
[docs]
def read_csv(self, date: datetime.date) -> Optional[Dict[str, Optional[float]]]:
csv_file = os.path.join(
self._local_output_directory,
f"{self._setting_section_name.lower()}/{date}/{platform.node()}@{date}.csv"
)
if not os.path.exists(csv_file):
return None
cpu_data = OrderedDict()
with open(csv_file, newline='\n') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',', quotechar='"')
for i, row in enumerate(csv_reader):
time_str = row[0]
cpu_temp = row[1]
voltage = row[2]
if cpu_temp == "nan":
cpu_temp = None
else:
cpu_temp = float(cpu_temp)
if voltage == "nan":
voltage = None
else:
voltage = float(voltage)
cpu_data[time_str] = {"cpu": cpu_temp, "voltage": voltage}
return cpu_data
[docs]
def settings(self) -> Dict[str, Any]:
return {
'capture_window_start_time': self._capture_window_start_time,
'capture_window_end_time': self._capture_window_end_time,
'capture_interval_seconds': self._capture_interval_seconds,
'auto_start': self._auto_start
}
def _section_specific_configuration_values(self) -> Dict[str, any]:
return {}