import configparser
import os
import pathlib
import sys
from datetime import datetime
from distutils.util import strtobool
from threading import Lock
from loguru import logger
import pytz
from pytz.exceptions import UnknownTimeZoneError
__author__ = 'Nathan Hernandez, Alek Ratzloff, Chris Campell <[email protected]>, Aedan Simons-Rudolph'
from typing import Tuple, Any, Dict, Optional, Union
from src.utils import datetimeutils
_Config__instance = None
__instance = None
def setup_config(configuration_file: str):
"""
Attempts to read an existing ``beemon-config.ini`` from the disk.
Args:
configuration_file (str): The absolute path to the ``beemon-config.ini`` file on the device.
"""
config_path = configuration_file
try:
# If the path doesn't exist, make the file there
if not os.path.exists(config_path):
create_config(config_path)
configuration = Config(config_path)
return configuration
except Exception as ex:
logger.error(f"Error reading config file. Reason: {ex}")
return None
[docs]
class Config:
"""
.. todo:: Docstrings.
"""
class __ConfigInstance:
"""
.. todo:: Docstrings.
"""
def __init__(self, path):
"""
.. todo:: Docstrings.
Args:
path:
"""
assert (os.path.exists(path))
# self.logger = logging.getLogger("beemon.ConfigInstance")
# self.logger.debug("In ConfigInstance")
self.configuration = configparser.ConfigParser()
self.configuration.read(path)
self.path = path
self.lock = Lock()
def get(self, section: str, key: str, default: Any):
"""
.. todo:: Docstrings, return type and type-hints.
Gets a configuration value, deferring to a custom configuration section, if specified.
Args:
section: the section to get the value from
key: the key of the configuration value
default: what to return if it doesn't exist
Returns:
The configuration value if it is found, otherwise the default specified.
"""
if section not in self.configuration:
return default
if type(section) is not str:
return default
# Get the 'settings' config value; this determines whether to use the raw config, or whether to use the
# "settings" sub-keys
self.lock.acquire()
if 'settings' not in self.configuration[section]:
self.lock.release()
return self.get_raw(section, key, default)
else:
setting = self.configuration[section]['settings']
self.lock.release()
new_section = '%s/%s' % (section, setting)
if new_section not in self.configuration:
# key not defined in custom section, try to get it from the original section
return self.get_raw(section, key, default)
else:
return self.get_raw(new_section, key, default)
def get_raw(self, section, key, default):
"""
.. todo:: Docstrings, return types, and type-hints.
Gets a configuration value, without checking for a custom configuration section.
Args:
section: the section to get the value from
key: the key of the configuration value
default: what to return if it doesn't exist
Returns:
The configuration value if it is found, otherwise the default specified.
"""
self.lock.acquire()
if section not in self.configuration:
self.lock.release()
return default
if key not in self.configuration[section]:
self.lock.release()
return default
gotten = self.configuration[section][key]
self.lock.release()
return gotten
def reload(self):
self.lock.acquire()
self.configuration.read(self.path)
self.lock.release()
[docs]
def __init__(self, path=None):
self._type_table = create_type_table()
global __instance
self.path = path
if __instance is None:
assert (path is not None)
__instance = self.__ConfigInstance(path)
self.__instance = __instance
def instance(self):
return self.__instance
def scale(self, key, default=None):
self.type_check('scale', key)
return self.__instance.get('scale', key, default)
def dashboard(self, key, default=None):
# logger.info("Getting Node Red Instance of Config")
# self.logger.debug("Getting Node Red Instance of Config")
self.type_check('dashboard', key)
return self.__instance.get('dashboard', key, default)
def audio(self, key, default=None):
self.type_check('audio', key)
return self.__instance.get('audio', key, default)
def video(self, key, default=None):
self.type_check('video', key)
return self.__instance.get('video', key, default)
def photo(self, key, default=None):
return self.__instance.get('photo', key, default)
def sftp(self, key, default=None):
self.type_check('sftp', key)
return self.__instance.get('sftp', key, default)
def temp(self, key, default=None):
self.type_check('temp', key)
return self.__instance.get('temp', key, default)
def log(self, key, default=None):
self.type_check('log', key)
return self.__instance.get('log', key, default)
def auto_start(self, key, default=None):
return self.__instance.get('auto_start', key, default)
def glob(self, key, default=None):
self.type_check('global', key)
return self.__instance.get('global', key, default)
def reload(self):
self.__instance.reload()
[docs]
def parse_global_config(self) -> Tuple[pytz.timezone, str, str, datetime, datetime, int, int, bool]:
"""
Convenience/utility method for parsing information from the ``[global]`` section of the ``beemon-config.ini``
configuration file.
Returns:
Tuple[:class:`datetime.tzinfo`, str, str, :class:`datetime.datetime`, :class:`datetime.datetime`, int, int, bool]:
- **recording_timezone** (:class:`datetime.tzinfo`): The timezone to which ``capture_window_start_time`` and
``capture_window_end_time`` should be converted to when populating the recording start time queues.
- **root_upload_dir** (*str*): The absolute path to the upload directory on the specified ``'host'`` machine
in the ``['sftp']`` setting of the ``beemon-config.ini``. This parameter **is not** expected to be
overridden on a per-sensor basis.
- **local_output_dir** (*str*): The absolute path to the temporary data storage location on the Raspberry Pi
to which data will be written prior to being uploaded to the specified ``'host'``. This parameter
**is not** expected to be overridden on a per-sensor basis.
- **capture_window_start_datetime** (:class:datetime.datetime`): A :mod:`datetime` aware object which
specifies what time to start recording data from all sensors. Note that this parameter may be overridden
on a per-sensor basis.
- **capture_window_end_datetime** (:class:`datetime.datetime`): A :mod:`datetime` aware object which
specifies what time to start recording data from all sensors. Note that this parameter may be overridden
on a per-sensor basis.
- **capture_duration_seconds** (*int*): How long each sensor should be instructed to record for (if
applicable) in seconds. Note that this parameter may be overridden on a per-sensor basis.
- **capture_interval_seconds** (*int*): How long to wait (in seconds) before instructing all sensors to
record again for the duration specified by ``capture_duration_seconds``. Note that this parameter may be
overridden on a per-sensor basis.
- **auto_start** (*bool*): Whether or not all sensors should be instructed to automatically record at the
specified ``capture_window_start_time``. If this flag is disabled, then recording commands are expected
to be issued manually. Note that this parameter may be overridden on a per-sensor basis.
Raises:
NotImplementedError: This method will raise a ``NotImplementedError`` in the event that the
specified ``pytz_time_zone`` in the ``beemon-config.ini`` does not yet have a corresponding datetime
conversion method in :mod:`~src.utils.datetimeutils`.
"""
# TODO : get configuration path from parsing arguments
repository_root_path = pathlib.Path(__file__).parent.resolve()
# repository_root_path = os.path.abspath(os.path.join(os.path.abspath(__file__), '../'))
relative_config_file_path = os.path.abspath(os.path.join(repository_root_path, '../../beemon-config.ini'))
if setup_config(relative_config_file_path) is None:
# error message is printed in the setup_config method; it's the only place that makes any sense
sys.exit(1)
# Parse config file:
pytz_time_zone: str = self.glob('pytz_time_zone')
root_upload_dir: str = self.glob('root_upload_directory')
local_output_dir: str = self.glob('local_output_directory')
capture_duration_seconds: int = int(self.glob('capture_duration_seconds'))
capture_interval_seconds: int = int(self.glob('capture_interval_seconds'))
auto_start: bool = bool(strtobool(self.glob('auto_start')))
# 24 hour military time HHMM format:
raw_capture_window_start_time_military_format: str = str(self.glob('capture_window_start_time'))
# 24 hour military time HHMM format:
raw_capture_window_end_time_military_format: str = str(self.glob('capture_window_end_time'))
# Ensure valid timezone was entered into the beemon-config.ini:
try:
tz_info = pytz.timezone(pytz_time_zone)
except UnknownTimeZoneError as err:
logger.critical(f'The \'pytz_time_zone\': \'{pytz_time_zone}\' specified in the \'beemon-config.ini\' '
f'[global] section was not recognized. Error: {err}')
sys.exit(1)
if tz_info.zone == 'US/Eastern':
capture_window_start_time_datetime: datetime = datetimeutils.military_time_to_edt_datetime_aware(
military_time=raw_capture_window_start_time_military_format
)
capture_window_end_time_datetime: datetime = datetimeutils.military_time_to_edt_datetime_aware(
military_time=raw_capture_window_end_time_military_format
)
elif tz_info.zone == 'Europe/Brussels':
capture_window_start_time_datetime: datetime = datetimeutils.military_time_to_cest_datetime_aware(
military_time=raw_capture_window_start_time_military_format
)
capture_window_end_time_datetime: datetime = datetimeutils.military_time_to_cest_datetime_aware(
military_time=raw_capture_window_end_time_military_format
)
else:
not_implemented_error_message = f'The timezone: \'{pytz_time_zone}\' specified in the ' \
f'\'beemon-config.ini\' [global] section was recognized, but we have not ' \
f'implemented it yet.'
raise NotImplementedError(not_implemented_error_message)
return tz_info, root_upload_dir, local_output_dir, capture_window_start_time_datetime, \
capture_window_end_time_datetime, capture_duration_seconds, capture_interval_seconds, auto_start
[docs]
def type_check(self, section: str, key: str):
"""
Runs sanity checks based on the expected types and values of the associated value in the config file.
Args:
section(str): The section of the config file, such as 'logger'.
key(str): The key to grab from the section.
Raises:
:exception:`ValueError`: This method raises an ``ValueError`` in the event that the section/key
pair has a type that does not match what the parser expects.
"""
try:
expected_type = self._type_table[section][key]
except KeyError:
# If the section/key pair is not in the type table, it is assumed to be a string
expected_type = str
value = self.__instance.get(section, key, default="")
try:
if expected_type == int:
value = int(value)
# If parsing an int from the value doesn't return an error, run the int checks.
int_checks(section, key, value)
elif expected_type == float:
value = float(value)
elif expected_type == bool:
bool(value)
# Bools don't have separate checking methods since we expect them to be
# either true or false.
if value.lower() != 'true' and value.lower() != 'false':
raise ValueError()
else:
# Assumed to be a string, see if there's a check for its value.
str_checks(section, key, value)
except ValueError:
logger.error(f"Config file has wrong type for {section}: {key}. Expected {expected_type}, while the "
f"value was \'{value}\'.")
except ConnectionError as err:
# Connection errors are from str_checks with the sftp hostname.
logger.error(err)
def str_checks(section: str, key: str, value: str):
"""
Called by type_check to run sanity checks on string values.
Args:
section(str): The section of the config file, such as 'logger'.
key(str): The key to grab from the section.
value(str): Current value of the section: key pair to run the check on.
Raises:
:exception:`ValueError`: This method raises an ``ValueError`` in the event that the section/key pair
has a type that does not match what the parser expects.
"""
try:
if section == 'sftp':
if key == 'host':
# Ping hostname, if response isn't zero that means that there was an error.
response = os.system("ping -c 1 " + value)
if response != 0:
raise ConnectionError(f"Host unreachable. Is \'{value}\' the correct hostname?")
if key == 'private_key_file_path':
# Check if the private key file exists.
if not os.path.exists(value):
raise ValueError(f"Private key file \'{value}\' does not exist.")
except ValueError:
raise Exception(f"{section}:{key} test raised exception, \'{value}\' not valid for key.")
def int_checks(section: str, key: str, value: int):
"""
Called by type_check for sanity checks on int values.
Args:
section(str): The section of the config file, such as 'logger'.
key(str): The key to grab from the section.
value(int): Current value of the section: key pair to run the check on.
Raises:
:exception:`ValueError`: This method raises an ``ValueError`` in the event that the section/key pair
has a type that does not match what the parser expects. Additionally, this method will raise an
``ValueError`` in the event that an invalid value (logically speaking) is specified.
"""
# Goes through the config section by section and runs a test if necessary.
try:
if section == 'global':
if key == 'capture_window_start_time' or key == 'capture_window_end_time':
# Value must be a valid time 00:00 to 23:59.
if value not in range(0, 2359):
raise ValueError()
if key == 'capture_duration_seconds' or key == 'capture_interval_seconds':
# Durations and intervals can't be negative.
if value < 0:
raise ValueError()
except ValueError:
Exception(f"{section}:{key} test raised exception, \'{value}\' not in valid range.")
def create_config(config_path, group="bee"):
configuration = configparser.ConfigParser()
configuration["global"] = {
"pytz_time_zone": "US/Eastern",
"root_upload_directory": "/usr/local/bee/appmais",
"local_output_directory": "/home/bee/appmais/bee_tmp",
"capture_window_start_time": "0800",
"capture_window_end_time": "2000",
"capture_duration_seconds": "60",
"capture_interval_seconds": "300",
"auto_start": "True"
}
configuration["audio"] = {
"sampling_frequency": "48000",
"sample_format": "24",
"channel_count": "1",
"gain": "49",
"set_gain": "False"
}
configuration["video"] = {
"still_frame": "False",
"frames_per_second": "30",
"resolution_x": "640",
"resolution_y": "480",
"flip_video": "False"
}
configuration["temp"] = {
"capture_window_start_time": "0800",
"capture_window_end_time": "2000"
}
configuration["airquality"] = {
"capture_window_start_time": "0800",
"capture_window_end_time": "2000",
}
configuration["scale"] = {
"weight_multiple": "1000",
"weight_offset": "0.0"
}
configuration["cpu"] = {
"capture_window_start_time": "0800",
"capture_window_end_time": "2000"
}
configuration["sftp"] = {
"username": "Username",
"host": "Host",
"port": "22",
"private_key_file_path": "/home/bee/.ssh/id_ed25519",
"upload_delay": "20"
}
configuration['dashboard'] = {
"access_token": "Token",
"host": "Host"
}
configuration["log"] = {
"logging_level": "debug",
"log_cycle_days": "3",
"max_file_size_megabytes": "16",
"max_archived_files": "7"
}
logger.info("Writing configuration file to: " + os.path.normpath(os.path.realpath(config_path)))
with open(config_path, 'w') as configfile:
configuration.write(configfile)
# Set group permissions, etc
# gid = grp.getgrnam(group).gr_pid
# os.chown(config_path, -1, gid)
# os.chmod(config_path, 0o665)
def create_type_table() -> Dict[str, Dict[Optional[str], Optional[Union[int, bool]]]]:
"""
Creates the look-up table for type checking config values. Set up as a dictionary of dictionaries so it can be
accessed as self.typeTable[section][key] and return type object such as 'int' to easily compare with type(value).
All values that are not in the table are expected as type 'str'.
Returns:
A look-up table for all values in the config file. Set in the initialization of configuration.py.
"""
table = {
"global": {
"capture_window_start_time": int,
"capture_window_end_time": int,
"capture_duration_seconds": int,
"capture_interval_seconds": int,
"auto_start": bool
},
"audio": {
"sampling_frequency": int,
"sample_format": int,
"channel_count": int,
"gain": int,
"set_gain": bool,
"auto_start": bool
},
"video": {
"still_frame": bool,
"frames_per_second": int,
"resolution_x": int,
"resolution_y": int,
"flip_video": bool,
"auto_start": bool
},
"temp": {
"capture_window_start_time": int,
"capture_window_end_time": int,
"auto_start": bool
},
"scale": {
"weight_multiple": float,
"auto_start": bool
},
"sftp": {
"username": str,
"host": str,
"port": int,
"private_key_file_path": str,
"upload_delay": int
},
"dashboard": {},
"log": {
"log_cycle_days": int,
"max_file_size_megabytes": float,
"max_archived_files": int
}
}
return table