import configparser
import glob
import os
import pathlib
import sys
from datetime import datetime
from distutils.util import strtobool
from threading import Lock
from loguru import logger
import pytz
from pytz.exceptions import UnknownTimeZoneError
__author__ = 'Nathan Hernandez, Alek Ratzloff, Chris Campell <[email protected]>, Aedan Simons-Rudolph'
from typing import Tuple, Any, Dict, Optional, Union
from src.utils import datetimeutils
_Config__instance = None
__instance = None
from enum import Enum
class ConfigurationSections(Enum):
GLOBAL = 'global'
AUDIO = 'audio'
VIDEO = 'video'
TEMP = 'temp'
AIRQUALITY = 'airquality'
SCALE = 'scale'
CPU = 'cpu'
SFTP = 'sftp'
DASHBOARD = 'dashboard'
LOG = 'log'
# def __str__(self):
# return self.value
#
# def __repr__(self):
# return self.value
# Type alias for recognized types of beemon-config.ini setting values:
SettingType = Union[int, float, bool, str]
class SensorConfigurationSections(Enum):
AUDIO = 'audio'
VIDEO = 'video'
TEMP = 'temp'
AIRQUALITY = 'airquality'
SCALE = 'scale'
CPU = 'cpu'
class SettingConfigurationSections(Enum):
GLOBAL = 'global'
SFTP = 'sftp'
DASHBOARD = 'dashboard'
LOG = 'log'
def setup_config(configuration_file: str):
"""
Attempts to read an existing ``beemon-config.ini`` from the disk.
Args:
configuration_file (str): The absolute path to the ``beemon-config.ini`` file on the device.
"""
config_path = configuration_file
try:
# If the path doesn't exist, make the file there
if not os.path.exists(config_path):
create_config(config_path)
configuration = Config(config_path)
return configuration
except Exception as ex:
logger.error(f"Error reading config file. Reason: {ex}")
return None
[docs]
class Config:
"""
.. todo:: Docstrings.
"""
class __ConfigInstance:
"""
.. todo:: Docstrings.
"""
def __init__(self, path):
"""
.. todo:: Docstrings.
Args:
path:
"""
assert (os.path.exists(path))
# self.logger = logging.getLogger("beemon.ConfigInstance")
# self.logger.debug("In ConfigInstance")
self.configuration = configparser.ConfigParser()
self.configuration.read(path)
self.path = path
self.lock = Lock()
def write(self):
"""
Writes the configuration file representation in memory to disk in a multiprocessing safe manner.
"""
self.lock.acquire()
with open(self.path, 'w') as configfile:
self.configuration.write(configfile)
self.lock.release()
def get(self, section: str, key: str, default: Any):
"""
.. todo:: Docstrings, return type and type-hints.
Gets a configuration value, deferring to a custom configuration section, if specified.
Args:
section: the section to get the value from
key: the key of the configuration value
default: what to return if it doesn't exist
Returns:
The configuration value if it is found, otherwise the default specified.
"""
if section not in self.configuration:
return default
if type(section) is not str:
return default
# Get the 'settings' config value; this determines whether to use the raw config, or whether to use the
# "settings" sub-keys
self.lock.acquire()
if 'settings' not in self.configuration[section]:
self.lock.release()
return self.get_raw(section, key, default)
else:
setting = self.configuration[section]['settings']
self.lock.release()
new_section = '%s/%s' % (section, setting)
if new_section not in self.configuration:
# key not defined in custom section, try to get it from the original section
return self.get_raw(section, key, default)
else:
return self.get_raw(new_section, key, default)
def get_raw(self, section, key, default):
"""
.. todo:: Docstrings, return types, and type-hints.
Gets a configuration value, without checking for a custom configuration section.
Args:
section: the section to get the value from
key: the key of the configuration value
default: what to return if it doesn't exist
Returns:
The configuration value if it is found, otherwise the default specified.
"""
self.lock.acquire()
if section not in self.configuration:
self.lock.release()
return default
if key not in self.configuration[section]:
self.lock.release()
return default
gotten = self.configuration[section][key]
self.lock.release()
return gotten
def reload(self):
self.lock.acquire()
self.configuration.read(self.path)
self.lock.release()
[docs]
def __init__(self, path=None):
self._type_table = create_type_table()
global __instance
self.path = path
if __instance is None:
assert (path is not None)
__instance = self.__ConfigInstance(path)
self.__instance = __instance
def instance(self):
return self.__instance
def scale(self, key, default=None):
self.type_check('scale', key)
return self.__instance.get('scale', key, default)
def dashboard(self, key, default=None):
# logger.info("Getting Node Red Instance of Config")
# self.logger.debug("Getting Node Red Instance of Config")
self.type_check('dashboard', key)
return self.__instance.get('dashboard', key, default)
def audio(self, key, default=None):
self.type_check('audio', key)
return self.__instance.get('audio', key, default)
def video(self, key, default=None):
self.type_check('video', key)
return self.__instance.get('video', key, default)
def photo(self, key, default=None):
return self.__instance.get('photo', key, default)
def sftp(self, key, default=None):
self.type_check('sftp', key)
return self.__instance.get('sftp', key, default)
def temp(self, key, default=None):
self.type_check('temp', key)
return self.__instance.get('temp', key, default)
def log(self, key, default=None):
self.type_check('log', key)
return self.__instance.get('log', key, default)
def auto_start(self, key, default=None):
return self.__instance.get('auto_start', key, default)
def glob(self, key, default=None):
self.type_check('global', key)
return self.__instance.get('global', key, default)
def reload(self):
self.__instance.reload()
[docs]
def parse_global_config(self) -> Tuple[pytz.timezone, str, str, datetime, datetime, int, int, bool]:
"""
Convenience/utility method for parsing information from the ``[global]`` section of the ``beemon-config.ini``
configuration file.
Returns:
Tuple[:class:`datetime.tzinfo`, str, str, :class:`datetime.datetime`, :class:`datetime.datetime`, int, int, bool]:
- **pytz_time_zone** (:class:`datetime.tzinfo`): The timezone to which ``capture_window_start_time`` and
``capture_window_end_time`` should be converted to when populating the recording start time queues.
- **root_upload_dir** (*str*): The absolute path to the upload directory on the specified ``'host'`` machine
in the ``['sftp']`` setting of the ``beemon-config.ini``. This parameter **is not** expected to be
overridden on a per-sensor basis.
- **local_output_dir** (*str*): The absolute path to the temporary data storage location on the Raspberry Pi
to which data will be written prior to being uploaded to the specified ``'host'``. This parameter
**is not** expected to be overridden on a per-sensor basis.
- **capture_window_start_datetime** (:class:datetime.datetime`): A :mod:`datetime` aware object which
specifies what time to start recording data from all sensors. Note that this parameter may be overridden
on a per-sensor basis.
- **capture_window_end_datetime** (:class:`datetime.datetime`): A :mod:`datetime` aware object which
specifies what time to start recording data from all sensors. Note that this parameter may be overridden
on a per-sensor basis.
- **capture_duration_seconds** (*int*): How long each sensor should be instructed to record for (if
applicable) in seconds. Note that this parameter may be overridden on a per-sensor basis.
- **capture_interval_seconds** (*int*): How long to wait (in seconds) before instructing all sensors to
record again for the duration specified by ``capture_duration_seconds``. Note that this parameter may be
overridden on a per-sensor basis.
- **auto_start** (*bool*): Whether or not all sensors should be instructed to automatically record at the
specified ``capture_window_start_time``. If this flag is disabled, then recording commands are expected
to be issued manually. Note that this parameter may be overridden on a per-sensor basis.
Raises:
NotImplementedError: This method will raise a ``NotImplementedError`` in the event that the
specified ``pytz_time_zone`` in the ``beemon-config.ini`` does not yet have a corresponding datetime
conversion method in :mod:`~src.utils.datetimeutils`.
"""
# Parse config file:
pytz_time_zone: str = self.glob('pytz_time_zone')
root_upload_dir: str = self.glob('root_upload_directory')
local_output_dir: str = self.glob('local_output_directory')
capture_duration_seconds: int = int(self.glob('capture_duration_seconds'))
capture_interval_seconds: int = int(self.glob('capture_interval_seconds'))
auto_start: bool = bool(strtobool(self.glob('auto_start')))
# 24 hour military time HHMM format:
raw_capture_window_start_time_military_format: str = str(self.glob('capture_window_start_time'))
# 24 hour military time HHMM format:
raw_capture_window_end_time_military_format: str = str(self.glob('capture_window_end_time'))
# Ensure valid timezone was entered into the beemon-config.ini:
try:
tz_info = pytz.timezone(pytz_time_zone)
except UnknownTimeZoneError as err:
logger.critical(f'The \'pytz_time_zone\': \'{pytz_time_zone}\' specified in the \'beemon-config.ini\' '
f'[global] section was not recognized. Error: {err}')
sys.exit(1)
if tz_info.zone == 'US/Eastern':
capture_window_start_time_datetime: datetime = datetimeutils.military_time_to_edt_datetime_aware(
military_time=raw_capture_window_start_time_military_format
)
capture_window_end_time_datetime: datetime = datetimeutils.military_time_to_edt_datetime_aware(
military_time=raw_capture_window_end_time_military_format
)
elif tz_info.zone == 'Europe/Brussels':
capture_window_start_time_datetime: datetime = datetimeutils.military_time_to_cest_datetime_aware(
military_time=raw_capture_window_start_time_military_format
)
capture_window_end_time_datetime: datetime = datetimeutils.military_time_to_cest_datetime_aware(
military_time=raw_capture_window_end_time_military_format
)
else:
not_implemented_error_message = f'The timezone: \'{pytz_time_zone}\' specified in the ' \
f'\'beemon-config.ini\' [global] section was recognized, but we have not ' \
f'implemented it yet.'
raise NotImplementedError(not_implemented_error_message)
return tz_info, root_upload_dir, local_output_dir, capture_window_start_time_datetime, \
capture_window_end_time_datetime, capture_duration_seconds, capture_interval_seconds, auto_start
[docs]
def type_check(self, section: str, key: str):
"""
Runs sanity checks based on the expected types and values of the associated value in the config file.
Args:
section(str): The section of the config file, such as 'logger'.
key(str): The key to grab from the section.
Raises:
:exception:`ValueError`: This method raises an ``ValueError`` in the event that the section/key
pair has a type that does not match what the parser expects.
"""
try:
expected_type = self._type_table[section][key]
except KeyError:
# If the section/key pair is not in the type table, it is assumed to be a string
expected_type = str
value = self.__instance.get(section, key, default="")
try:
if expected_type == int:
value = int(value)
# If parsing an int from the value doesn't return an error, run the int checks.
int_checks(section, key, value)
elif expected_type == float:
value = float(value)
elif expected_type == bool:
bool(value)
# Bools don't have separate checking methods since we expect them to be
# either true or false.
if value.lower() != 'true' and value.lower() != 'false':
raise ValueError()
else:
# Assumed to be a string, see if there's a check for its value.
str_checks(section, key, value)
except ValueError:
logger.error(f"Config file has wrong type for {section}: {key}. Expected {expected_type}, while the "
f"value was \'{value}\'.")
except ConnectionError as err:
# Connection errors are from str_checks with the sftp hostname.
logger.error(err)
def str_checks(section: str, key: str, value: str):
"""
Called by type_check to run sanity checks on string values.
Args:
section(str): The section of the config file, such as 'logger'.
key(str): The key to grab from the section.
value(str): Current value of the section: key pair to run the check on.
Raises:
:exception:`ValueError`: This method raises an ``ValueError`` in the event that the section/key pair
has a type that does not match what the parser expects.
"""
try:
if section == 'sftp':
if key == 'host':
# Ping hostname, if response isn't zero that means that there was an error.
response = os.system("ping -c 1 -W 1" + value)
if response != 0:
raise ConnectionError(f"Host unreachable. Is \'{value}\' the correct hostname?")
# if key == 'private_key_file_path':
# # Check if the private key file exists.
# if not os.path.exists(value):
# raise ValueError(f"Private key file \'{value}\' does not exist.")
except ValueError:
raise Exception(f"{section}:{key} test raised exception, \'{value}\' not valid for key.")
def int_checks(section: str, key: str, value: int):
"""
Called by type_check for sanity checks on int values.
Args:
section(str): The section of the config file, such as 'logger'.
key(str): The key to grab from the section.
value(int): Current value of the section: key pair to run the check on.
Raises:
:exception:`ValueError`: This method raises an ``ValueError`` in the event that the section/key pair
has a type that does not match what the parser expects. Additionally, this method will raise an
``ValueError`` in the event that an invalid value (logically speaking) is specified.
"""
# Goes through the config section by section and runs a test if necessary.
try:
if section == 'global':
if key == 'capture_window_start_time' or key == 'capture_window_end_time':
# Value must be a valid time 00:00 to 23:59.
if value not in range(0, 2359):
raise ValueError()
if key == 'capture_duration_seconds' or key == 'capture_interval_seconds':
# Durations and intervals can't be negative.
if value < 0:
raise ValueError()
except ValueError:
Exception(f"{section}:{key} test raised exception, \'{value}\' not in valid range.")
def create_config(config_path, group="bee"):
configuration = configparser.ConfigParser()
configuration["global"] = {
"pytz_time_zone": "US/Eastern",
"root_upload_directory": "/usr/local/bee/appmais",
"local_output_directory": "/home/bee/appmais/bee_tmp",
"capture_window_start_time": "0800",
"capture_window_end_time": "2000",
"capture_duration_seconds": "60",
"capture_interval_seconds": "300",
"auto_start": "True"
}
configuration["audio"] = {
"sampling_frequency": "48000",
"sample_format": "24",
"channel_count": "1",
"gain": "49",
"set_gain": "False"
}
configuration["video"] = {
"still_frame": "False",
"frames_per_second": "30",
"resolution_x": "640",
"resolution_y": "480",
"flip_video": "False"
}
configuration["temp"] = {
"capture_window_start_time": "0800",
"capture_window_end_time": "2000"
}
configuration["airquality"] = {
"capture_window_start_time": "0800",
"capture_window_end_time": "2000",
}
configuration["scale"] = {
"weight_multiple": "1000",
"weight_offset": "0.0"
}
configuration["cpu"] = {
"capture_window_start_time": "0800",
"capture_window_end_time": "2000"
}
configuration["sftp"] = {
"username": "Username",
"host": "Host",
"port": "22",
"private_key_file_path": "/home/bee/.ssh/id_ed25519",
"upload_delay": "20"
}
configuration['dashboard'] = {
"access_token": "Token",
"host": "Host"
}
configuration["log"] = {
"logging_level": "debug",
"log_cycle_days": "3",
"max_file_size_megabytes": "16",
"max_archived_files": "7"
}
logger.info("Writing configuration file to: " + os.path.normpath(os.path.realpath(config_path)))
with open(config_path, 'w') as configfile:
configuration.write(configfile)
# Set group permissions, etc
# gid = grp.getgrnam(group).gr_pid
# os.chown(config_path, -1, gid)
# os.chmod(config_path, 0o665)
def create_type_table() -> Dict[str, Dict[Optional[str], Optional[Union[int, bool]]]]:
"""
Creates the look-up table for type checking config values. Set up as a dictionary of dictionaries so it can be
accessed as self.typeTable[section][key] and return type object such as 'int' to easily compare with type(value).
All values that are not in the table are expected as type 'str'.
Returns:
A look-up table for all values in the config file. Set in the initialization of configuration.py.
"""
table = {
"global": {
"capture_window_start_time": int,
"capture_window_end_time": int,
"capture_duration_seconds": int,
"capture_interval_seconds": int,
"auto_start": bool
},
"audio": {
"sampling_frequency": int,
"sample_format": int,
"channel_count": int,
"gain": int,
"set_gain": bool,
"auto_start": bool
},
"video": {
"still_frame": bool,
"frames_per_second": int,
"resolution_x": int,
"resolution_y": int,
"flip_video": bool,
"auto_start": bool
},
"temp": {
"capture_window_start_time": int,
"capture_window_end_time": int,
"auto_start": bool
},
"scale": {
"weight_multiple": float,
"auto_start": bool
},
"sftp": {
"username": str,
"host": str,
"port": int,
"private_key_file_path": str,
"upload_delay": int
},
"dashboard": {},
"log": {
"log_cycle_days": int,
"max_file_size_megabytes": float,
"max_archived_files": int
}
}
return table