Source code for orchestrator

import glob
import io
import os.path
import queue
from multiprocessing import Process, Queue, Manager, Event
from typing import Dict, Tuple, Union, Optional, Type, List
from loguru import logger
from loguru._handler import Message
from src.beemon.sensor import Sensor
from src.beemon.singleton import Singleton
from src.beemon.configuration import setup_config, Config
from src.utils import datetimeutils
from src.utils import importutils
from datetime import datetime, timedelta
import time
from src.beemon.uploader import Uploader


[docs] class Orchestrator(Process, metaclass=Singleton):
[docs] def __init__(self, command_queue: Queue, command_queue_polling_frequency_seconds: Optional[float] = 0.5): """ The Orchestrator is a singleton class which handles the actual execution of commands received by the :class:`Server` class. Most commonly, this involves interacting with one or more child :class:`Sensor` objects/processes to carry out the inbound command. As a result, this class orchestrates the various independent :class:`Sensor` processes (and the :class:`Uploader` process), controlling their actions. Notes: Recall that the reason this class runs in a separate process is to leave the :class:`Server`'s main thread non-IO blocked in order to accept and handle new inbound client requests. Args: command_queue (Queue): A :class:`multiprocessing.Queue` object which the :class:`Server` class utilizes to push all recognized commands to this orchestrator process. command_queue_polling_frequency_seconds (Optional[float]): How long the orchestrator process should sleep between querying/polling it's associated ``command_queue``. """ self._command_queue: Queue = command_queue self._command_queue_polling_frequency_seconds: float = command_queue_polling_frequency_seconds # Command queue for the Uploader process: self._uploader_command_queue: Queue = Queue() repository_root_path = os.path.realpath(os.path.join(__file__, '../../../')) beemon_config_path = os.path.realpath(os.path.join(repository_root_path, 'beemon-config.ini')) logger.debug(f'Importing beemon_config.ini from: {beemon_config_path}') self._beemon_config: Config = setup_config(configuration_file=beemon_config_path) self._local_output_directory = self._beemon_config.glob( 'local_output_directory', default=None ) logger.debug(f"Beemon local root output directory: {self._local_output_directory}") self._upload_directory: str = self._beemon_config.glob( 'root_upload_directory', default=None ) logger.debug(f"Beemon upload root directory: {self._upload_directory}") self._sensors: Dict[str, Sensor] = {} self._log_cycle_days = int(self._beemon_config.log('log_cycle_days', default=3)) self._max_file_size_megabytes = float(self._beemon_config.log('max_file_size_megabytes', default=1000)) self._log_level = self._beemon_config.log('logging_level', default="DEBUG").upper() self._max_archived_files = int(self._beemon_config.log('max_archived_files', default="7")) logger.debug(f"Logger cycling log files every: {self._log_cycle_days} days") logger.debug(f"Logger max file size: {self._max_file_size_megabytes} MB") # noinspection PyTypeChecker logger.add("/var/log/beemon.log", level=self._log_level, colorize=True, rotation=self._logger_rotation_check, retention=self._max_archived_files, compression=self._logger_compression) """ Notes: We don't initialize a pointer to the uploader process here because __init__ is executed from the main process. If we try and instantiate the Uploader process here, we will get an error when we try and start it later, because the context will have changed to Orchestrator's process. This will raise an:: AssertionError: can only start a process object created by current process. The way around this is to lazily instantiate the process object after leaving __init__ when it is needed. """ self._uploader_process: Optional[Uploader] = None # Sentinel value is originally False: self._orchestrator_halt_request_received: Event = Event() self._recording_start_time_queues: Dict[str, Queue] = {} # Keep track if the recording queues have been refreshed self._daily_refresh_lock: bool = True super().__init__() self.start()
[docs] def _logger_compression(self, path: str) -> None: """ A custom compression function for the logger. This function is called during rotation. The rotated file has a name in the format of 'beemon.log.<integer>' where a larger integer corresponds to older logs. Loguru passes the path of the latest log file to be rotated. Args: path (str): The path of the file to be rotated and compressed. """ root_path = f'/{os.path.join(*path.split("/")[0:-1])}' file_base_name = os.path.basename(path) # Rename file to new_base_name = f'beemon.log.0' os.renames(old=os.path.join(root_path, file_base_name), new=os.path.join(root_path, f'{new_base_name}')) # Define the pattern of files to look for. file_name_pattern = 'beemon.log.*' # Get the files and sort them by name. glob_result = sorted(glob.glob1(root_path, file_name_pattern), reverse=True) # Loop through the files and increment the rotation number of the files. for file in glob_result: file_number = file.split(".")[-1] new_number = int(file_number) + 1 # Rename the file. new_name = f'beemon.log.{new_number}' os.renames(old=os.path.join(root_path, file), new=os.path.join(root_path, new_name))
[docs] def _logger_rotation_check(self, logged_message: Message, file_object: io.TextIOWrapper) -> bool: """ Checks whether log file should rotate based on both max file size and number of days since creation as denoted in the config file. Arguments passed by loguru. Logger rotates to new file when either the file reaches the max file size or it has been the designated number of days since creation, whichever comes first. Args: logged_message(loguru._handler.Message): contains the current log message (unused). file_object (io.TextIOWrapper): A TextIOWrapper object, a basic python file object. Returns: True if logger should rotate to new file, false otherwise. """ flag = False # Quick check if filesize is greater than the max denoted in the config file. NUM_BYTES_IN_MB = 1000000 if file_object.tell() > self._max_file_size_megabytes * NUM_BYTES_IN_MB: flag = True file_object = open(file_object.name, 'r') # Parse the date from the first line of the file. try: line = file_object.readline().split('|')[0] date = line.split('-') # Split year on 'm' to remove color codes from the string. year_str = date[0].split('m')[-1] month_str = date[1] # Split on default 'space char' to separate day from time. day_str = date[2].split()[0] except IndexError: return False try: timestamp = datetime(int(year_str), int(month_str), int(day_str)).timestamp() except ValueError: # This just means the log file is empty: no need to worry, just return false and it'll start populating the # log file. return False # Lastly comparing the current date to the calculated timestamp, and checking if its greater than the number of # days listed by the config file's log:log_cycle_days field. current_time = datetime.now().timestamp() if current_time - timestamp >= timedelta(days=int(self._log_cycle_days)).total_seconds(): flag = True return flag
[docs] def run(self) -> None: """ The main processing loop for the :class:`Orchestrator` class. This method is called after instantiation, and will infinitely read from the command :class:`Queue` IPC object (which the :class:`Server` populates), awaiting incoming commands to execute. When a command is received by this process, the :func:`_handle_command` method is invoked with the decoded (UTF-8) text of the command. """ super().run() """ Initialize all the detected :class:`Sensor` concrete subclasses, parse their ``beemon-config.ini`` settings, and populate their respective multiprocessing Queues: """ # Create the :class:`Sensor` instances and setup their associated :class:`multiprocessing.Queue`'s: self._dynamically_initialize_sensors() # Create the :class:`Uploader` instance and setup the associated :class:`multiprocessing.Queue`'s: self._uploader_process = Uploader( command_queue=self._uploader_command_queue, config=self._beemon_config ) # Start any sensors that have the auto_start flag enabled: self._auto_start_flagged_sensors() # Start any processes that have the auto_start flag enabled: self._auto_start_flagged_processes() while not self._orchestrator_halt_request_received.is_set(): # Restart the sensors and repopulate the recording time queues for the new day now = datetime.now().time() # Get 2am for the current day time_to_check = now.replace(hour=2, minute=0, second=0, microsecond=0) if now < time_to_check and not self._daily_refresh_lock: # After the clock passes 2am, we purge the sensor queues and restart self._daily_refresh_lock = True logger.debug("Purging queues and restarting sensors.") self.stop_command("all") logger.debug("Sleeping to make sure recording queues are empty.") time.sleep(3) logger.debug("Re-initializing sensors.") self._dynamically_initialize_sensors() self._auto_start_flagged_sensors() elif now > time_to_check: self._daily_refresh_lock = False try: logger.debug(f"orchestrator process polling for commands...") command: str = self._command_queue.get(block=True, timeout=10) logger.debug(f"orchestrator process received command: \"{command}\"") # Handle the command: error_message = self._handle_command(command=command) except queue.Empty: time.sleep(self._command_queue_polling_frequency_seconds) logger.debug("Orchestrator halt request received. Orchestrator no longer listening for server commands.")
[docs] def _dynamically_initialize_sensors(self): """ Dynamically detects concrete :class:`Sensor` subclasses in ``src.beemon.sensor`` and initializes corresponding instances. Parses each :class:`Sensor`'s fields from ``beemon-config.ini``, overriding the global defaults as necessary, then populates the Sensor's :class:`multiprocessing.Queue` of recording start times. """ # Get a list of all classes (excluding base classes) in the specified module: concrete_classes: Dict[str, Type] = importutils.get_classes_in_module( module_name='src.beemon.sensor', package_name='src.beemon', concrete_only=True ) logger.debug(f"Dynamically imported classes: {concrete_classes}") for cls_name, cls in concrete_classes.items(): # Get the name of the Sensor: lower_case_sensor_name = cls_name.lower() # Create a :class:`multiprocessing.Queue` for each concrete :class:`Sensor` subclass: self._recording_start_time_queues[lower_case_sensor_name] = Queue() # Create the :class:`Sensor` instance itself: try: sensor = cls( recording_start_time_queue=self._recording_start_time_queues[lower_case_sensor_name], config=self._beemon_config ) except Exception as e: logger.error(f'Failed to dynamically initialize {lower_case_sensor_name} sensor. Error: {e}') continue self._sensors[lower_case_sensor_name] = sensor # Populate the queue of recording start times for the Sensor: self._populate_recording_start_time_queue_for_sensor( sensor_name=type(sensor).__name__.lower(), sensor_capture_window_start_time=sensor.capture_window_start_time, sensor_capture_window_end_time=sensor.capture_window_end_time, sensor_capture_duration_seconds=sensor.capture_duration_seconds, sensor_capture_interval_seconds=sensor.capture_interval_seconds, include_elapsed_times=False ) logger.info(f"Successfully dynamically initialized Sensor: {lower_case_sensor_name}")
[docs] def _populate_recording_start_time_queue_for_sensor( self, sensor_name: str, sensor_capture_window_start_time: datetime, sensor_capture_window_end_time: datetime, sensor_capture_duration_seconds: int, sensor_capture_interval_seconds: int, include_elapsed_times: Optional[bool] = False): """ Helper method which breaks the specified :class:`datetime.datetime` range into even intervals of ``sensor_capture_duration_seconds``, and builds a list of all recording start times for the :class:`Sensor`. This method then (optionally) discards the recording start times which have already elapsed. Args: sensor_name (str): The name of the :class:`Sensor` (e.g. ``audio``). sensor_capture_window_start_time (datetime): The starting datetime (non-naive and inclusive) at which the first recording session (of ``capture_duration_seconds`` length in seconds) is to begin at. sensor_capture_window_end_time (datetime): The ending datetime (non-naive and inclusive) at which the last recording session (of ``capture_duration_seconds`` length in seconds) is to begin at. sensor_capture_duration_seconds (float): The length/duration that each recording session between ``capture_window_start_time`` and ``capture_window_end_time`` should be. sensor_capture_interval_seconds (float): How frequently to initialize recording sessions. For instance, a value of ``120.0`` seconds indicates that recording should be performed every two minutes from ``capture_window_start_time`` to ``capture_window_end_time`` (inclusive) for a duration of ``capture_duration_seconds``. include_elapsed_times (Optional[bool]): Indicates whether or not recording start times which have already passed/elapsed should be appended to the :class:`Sensor`'s :class:`multiprocessing.Queue`. """ sensor_recording_start_times = self.get_all_recording_start_times_within_window( capture_window_start_time=sensor_capture_window_start_time, capture_window_end_time=sensor_capture_window_end_time, capture_duration_seconds=sensor_capture_duration_seconds, capture_interval_seconds=sensor_capture_interval_seconds ) logger.debug( f"Identified {len(sensor_recording_start_times)} discrete capture times within the specified " f"capture window: {sensor_capture_window_start_time} - {sensor_capture_window_end_time}.") if not include_elapsed_times: # Trim the recording timestamps which have already passed: non_elapsed_recording_start_times: List[datetime] = [] for sensor_recording_start_time in sensor_recording_start_times: seconds_until_recording_time: float = datetimeutils.seconds_until_datetime( date_time=sensor_recording_start_time ) if seconds_until_recording_time > 0: """ Notes: We use two elements per-recording time. The Sensor processes will read once from the list to determine how long they should sleep until. After waking, the Sensor processes will read from the list again to confirm that they should still record (e.g. no "stop sensor" command was received during the period the sensor was sleeping). """ non_elapsed_recording_start_times.append(sensor_recording_start_time) non_elapsed_recording_start_times.append(sensor_recording_start_time) logger.debug( f"Identified {int(len(non_elapsed_recording_start_times) / 2)} discrete capture times within " f"the specified capture window which have not yet passed.") # Append the recording start times to the sensor's queue: for recording_start_time in non_elapsed_recording_start_times: self._recording_start_time_queues[sensor_name.lower()].put_nowait(recording_start_time) return else: for recording_start_time in sensor_recording_start_times: self._recording_start_time_queues[sensor_name.lower()].put_nowait(recording_start_time) return
[docs] def _auto_start_flagged_sensors(self): """ Automatically starts concrete :class:`Sensor` subclasses whose ``auto_start`` property (in `beemon-config.ini``) is set to ``True``. Returns: """ # Iterate over the dynamically instantiated sensors: for sensor_name, sensor in self._sensors.items(): # Check the autostart flag for the sensor: if sensor.auto_start: # Issue the start command to the sensor: logger.info(f"Sensor: {sensor_name} scheduled to autostart: {sensor.auto_start}.") sensor.start()
[docs] def _auto_start_flagged_processes(self): """ Automatically starts concrete :class:`Process` subclasses whose ``auto_start`` property (in ``beemon-config.ini``) is set to ``True``. Returns: """ # The only process object to initialize on auto-start that is not a sensor (currently) is the uploader process: if self._uploader_process.auto_start: logger.info( f"Process: {type(self._uploader_process).__name__} scheduled to autostart: {self._uploader_process.auto_start}") self._uploader_process.start()
[docs] def _handle_command(self, command: str) -> Optional[str]: """ This helper method is invoked every time an inbound command is received from the :class:`Server` (over the command :class:`Queue` IPC channel). This method determines which logic should execute as a result of the received command from the :class:`Server`. Args: command (str): The decoded UTF-8 string representation of the command message received by the :class:`Server`, then relayed to this process over the IPC channel (:class:`Queue`). Returns: Optional[str]: .. todo:: This functionality is not used anywhere. This method currently returns None. """ error_message: Optional[str] = None prefix = command.split()[0] if prefix == 'start': invocation_target = command.split()[-1] error_message = self.start_command(target=invocation_target) elif prefix == 'stop': split_command = command.split() if len(split_command) == 1: # Just "stop" was provided. sensor_name = None else: sensor_name = split_command[-1] error_message = self.stop_command(target=sensor_name) else: # The :class:`Server` process passed an unrecognized command to the orchestrator: logger.critical(f"Orchestrator process received unrecognized command: \"{command}\" from the Server " f"process!") return error_message
[docs] @staticmethod def get_all_recording_start_times_within_window( capture_window_start_time: datetime, capture_window_end_time: datetime, capture_duration_seconds: float, capture_interval_seconds: float) -> List[datetime]: """ Computes the maximum number of recordings which can occur between the the range of datetime objects (inclusive), given that each recording has the specified interval of: ``capture_interval_seconds``. This method then returns a list of datetime objects (non-naive and EDT timezone aware) which constitute the times at which recording should begin in order to successfully record a maximum number of times between the supplied range of times. Args: capture_window_start_time (datetime): The starting datetime (non-naive and inclusive) at which the first recording session (of ``capture_duration_seconds`` length in seconds) is to begin at. capture_window_end_time (datetime): The ending datetime (non-naive and inclusive) at which the last recording session (of ``capture_duration_seconds`` length in seconds) is to begin at. capture_duration_seconds (float): The length/duration that each recording session between ``capture_window_start_time`` and ``capture_window_end_time`` should be. This method will produce the times which recording commands should be issued to all registered :class:`Sensor` concrete subclass objects. capture_interval_seconds (float): How frequently to initialize recording sessions. For instance, a value of ``120.0`` seconds indicates that recording should be performed every two minutes from ``capture_window_start_time`` to ``capture_window_end_time`` (inclusive) for a duration of ``capture_duration_seconds``. Returns: List[datetime]: .. todo:: Docstring. """ recording_start_times: List[datetime] = [] # We have a problem if the capture_duration meets or exceeds the capture_interval. if capture_duration_seconds == capture_interval_seconds: logger.warning(f"The specified \"capture_duration_seconds\" of: {capture_duration_seconds} seconds is " f"equal to the specified \"capture_interval_seconds\" of: " f"{capture_interval_seconds} seconds. The Sensor may not have enough time to perform " f"post-processing operations between recording sessions. Some recording sessions may be " f"skipped as a result (depending on the speed of the sensor\'s post-recording operations).") elif capture_duration_seconds > capture_interval_seconds: error_message = f"Invalid configuration. The specified \"capture_duration_seconds\" of: " \ f"{capture_duration_seconds} must not exceed the specified \"capture_interval_seconds\" " \ f"of: {capture_interval_seconds}. It does not make sense for a sensor to record for " \ f"{capture_duration_seconds} seconds for a period of every {capture_interval_seconds} " \ f"seconds (as the previous recording would still be in progress)." logger.critical(error_message) raise NotImplementedError(error_message) elif capture_duration_seconds < capture_interval_seconds: # This is the desired state. # The sensor will hopefully have time to handle post-processing operations while waiting for the next # scheduled recording time. pass next_recording_start_time = capture_window_start_time while next_recording_start_time <= capture_window_end_time: recording_start_times.append(next_recording_start_time) next_recording_start_time = recording_start_times[-1] + timedelta(seconds=capture_interval_seconds) # logger.debug(f"Recording start times: {[stime.isoformat() for stime in recording_start_times]}") return recording_start_times
[docs] def start_command(self, target: str) -> Optional[str]: """ Called whenever the orchestrator process receives a ``"start"`` command from the :class:`Server` process. This method executes the received ``"start"`` command, instructing the targeted :class:`Sensor` (or :class:`Process`) to begin operations. If a :class:`Sensor` subclass is targeted, this method will populate the associated queue of recording start times for the :class:`Sensor`, and then start the :class:`Sensor` :class:`Process` (by calling :class:`Process.start`), which will cause it to enter into it's main recording loop (in :meth:`Process.run`). Args: target (str): The name of the concrete :class:`Sensor` subclass to start (e.g. "audio", "video", "temp"), or alternatively, the name of the :class:`Process` to start (e.g. "uploader"). Returns: Optional[str]: An optional error message. """ error_message: Optional[str] = None start_all: bool = False logger.debug(f"Orchestrator received \"start {target}\" command from Server process.") if target == 'uploader': # The :class:`uploader.Uploader` is not a :class:`sensor.Sensor` subclass, and is a special case: self._uploader_process.start() return error_message else: # This is a command to start one or more :class:`sensor.Sensor` subclasses. if target == 'all': for sensor_name, sensor in self._sensors.items(): if not sensor.is_running.is_set(): # Start the sensor: sensor.start() else: try: sensor = self._sensors[target] except KeyError as err: logger.warning(f"Specified Sensor: {target} does not exist!") return if not sensor.is_running.is_set(): # Start the sensor instance: self._sensors[target].start() logger.info(f"Started Sensor: {target}.") else: # Sensor was already running. logger.info(f"Sensor: {target} is already running!")
[docs] def halt_sensor(self, target: str): """ Called in order to halt a :class:`Sensor` subclass which is currently sleeping (awaiting the next pre-scheduled/populated recording start time). This method purges the :class:`multiprocessing.Queue` associated with the :class:`Sensor` subclass instance/object. When the :class:`Sensor` subclass awakens to begin recording, it will perform a parity check on the recording time and recognize that the :class:`multiprocessing.Queue` is now empty. It will then abort the scheduled recording operation, returning from it's :meth:`Process.run` method, allowing the process to terminate. Args: target (str): The name of the sensor whose queue of recording start times should be purged. This will cause any future parity checks made by the (potentially sleeping) sensor to fail, aborting any recording operations which were scheduled to be performed before this command was issued. Returns: """ logger.debug(f"Purging multiprocessing queue of recording start times for Sensor: {target}") try: while True: _ = self._recording_start_time_queues[target].get_nowait() except queue.Empty as err: self._recording_start_time_queues[target].close() self._recording_start_time_queues[target].join_thread() except OSError as err: error_message = err.args[0] if 'handle is closed' in error_message: # A stop command was issued to an already stopped sensor. logger.debug(f"Sensor: {target} is already stopped.") pass else: raise err
[docs] def stop_command(self, target: Optional[str] = None) -> Optional[str]: """ Called whenever the :class:`Orchestrator` :class:`Process` receives a ``"stop"`` command form the :class:`Server` :class:`Process`. This method executes/performs the received ``"stop"`` command, instructing the targeted :class:`Sensor` or (:class:`Process`) to halt operations. If a :class:`Sensor` subclass is targeted, this method will purge the queue of recording start times for the :class:`Sensor`, and then call the :class:`Sensor`'s :meth:`Sensor.stop` method (allowing the concrete subclass a chance to gracefully terminate/halt open IPC channels with it's native firmware/hardware). If no ``target`` is provided, all subprocesses will be halted. Notes: There are three main behavior-impacting observable cases which occur (timing-wise) when this command is manually issued by a user:: a) The ``"stop"`` command is issued before the concrete :class:`Sensor` subclass begins recording, and before the concrete subclass enters a routine sleep cycle (awaiting the next recording time). b) The ``"stop"`` command is issued while the :class:`Sensor` subclass is sleeping in-between pre-ordained recording times. c) The ``"stop"`` command is issued while the :class:`Sensor` subclass is already recording. In the best case scenario a ``"stop"`` command will be issued to a prospective :class:`~sensor.Sensor` subclass before it enters a sleep cycle (awaiting the next scheduled recording time). When this case occurs, the :class:`~sensor.Sensor`'s :class:`Event` flag (:attr:`~sensor.Sensor.halt_recording_request_received`) will be updated near instantaneously, and the process will terminate without much noticeable delay. However, during normal operation it is quite common for the ``"stop"`` command to be issued either: during recording, or while the :class:`~sensor.Sensor` is in a sleep cycle. In an effort to avoid firmware/hardware lockups, we do not attempt to halt/interrupt a recording in progress. We leave it up to the implementer of the concrete :class:`~sensor.Sensor` subclass to poll/check the :attr:`~sensor.Sensor.halt_recording_request_received` :class:`Event` flag periodically during recording (in their implementation of :meth:`~sensor.Sensor.record`), and decide if they wish to handle forcefully terminating an open connection with the firmware/hardware. We do not presume it is safe to forcefully halt the process directly from the super-class. As a result of this design philosophy, it is quite common for a concrete :class:`~sensor.Sensor` subclass to continue to recording (for it's pre-specified recording duration) after having noticeably received a ``"stop"`` command from the :class:`Orchestrator`. This behavior is non-ideal, but is a byproduct of believing it unwise to forcibly terminate a concrete child process (which may have an open subprocess of it's own to the recording device in question). Likewise, in the case where the ``"stop"`` command is received by the :class:`~sensor.Sensor` subclass while it is sleeping, we do not attempt to periodically poll an external command queue (due to observed recording time delays caused by IPC locking mechanisms). The result of this, is that a sleeping sensor will not visibly respond to a ``"stop"`` command (while sleeping) until it wakes at the next scheduled recording time, and performs a parity check. The result of these design choices from a user's perspective is that, in the worst case scenario, a Sensor will continue to record for an additional period after the ``"stop"`` command is received, and then it will discard the recorded file. Args: target (Optional[str]): The name of the concrete :class:`~sensor.Sensor` subclass (or applicable :class:`~multiprocessing.Process`) to issue the ``"stop"`` command to. If no ``target`` is specified, all subprocessess will be halted. Returns: Optional[str]: An optional error message (currently not utilized). """ error_message: Optional[str] = None logger.debug(f"orchestrator received \"stop {target}\" command from Server process.") if target is None: # Stop the entire server. # Immediately halt the orchestrator from polling for commands: self._orchestrator_halt_request_received.set() # Stop all sensors: logger.debug(f"Purging multiprocessing queue of recording start times for all Sensors.") for target, recording_start_time_queue in self._recording_start_time_queues.items(): self.halt_sensor(target=target) logger.debug(f"Queuing \"stop\" command for the \"uploader\" process.") self._uploader_command_queue.put_nowait('stop') # We culd optionally invoke the uploader's stop() method directly... # self._uploader_process.stop() elif target == 'uploader': # The Uploader process is a special case compared to the sensor classes. # Tell the Uploader process to stop: logger.debug(f"Queuing \"stop\" command for the \"uploader\" process.") self._uploader_command_queue.put_nowait('stop') # We could optionally invoke the uploader's stop() method directly... # self._uploader_process.stop() else: # The target is a :class:`sensor.Sensor` class: if target == 'all': logger.debug(f"Purging multiprocessing queue of recording start times for all Sensors.") for target, recording_start_time_queue in self._recording_start_time_queues.items(): self.halt_sensor(target=target) return error_message else: self.halt_sensor(target=target) return error_message
# def is_capture_time(self, query_time: Optional[datetime] = None) -> bool: # """ # Determines if the provided ``query_time`` is within the capture window (specified during initialization). If no # ``query_time`` is provided, then this method determines whether or not the current system time is within the # capture window. # # Args: # query_time (Optional[datetime]): ``None`` in the event that the current system time should be queried and # determined to be within the capture window. Otherwise this is the :class:`datetime.datetime` object to # query in order to see if it is within the capture window specified during initialization. # # Returns: # bool: True if the provided query_time is within the capture window (or if the current system time is within # the window in the case where no query_time is provided). False if the provided ``query_time`` is not # within the capture window (or if the current system time is not within the capture window in the case # where no ``query_time`` is provided). # # """ # if query_time is None: # right_now = datetime.now(tz=pytz.timezone('US/Eastern')) # if right_now >= self._capture_window_start_time: # return right_now <= self._capture_window_end_time # else: # return False # else: # if query_time >= self._capture_window_start_time: # return query_time <= self._capture_window_end_time # else: # return False # def is_x_seconds_from_capture_window_start_time(self, seconds: int) -> bool: # """ # Utility/helper method which returns true in the event that the pre-specified capture/recording window is within # the provided number of seconds. This method will return false in the event that the capture window is not within # ``seconds`` number of seconds from the start of the capture window. # # Args: # seconds (int): The number of seconds. # # Returns: # bool: True if the current system time is within ``x`` seconds of the capture window start time. False # otherwise. # # """ # # x_seconds_before_capture_window_start_time = self._capture_window_start_time - timedelta(seconds=seconds) # delta = x_seconds_before_capture_window_start_time - datetime.now(tz=pytz.timezone('US/Eastern')) # return delta.total_seconds() < 0 # def _halt_sensors(self, total_grace_period_seconds: float) -> Optional[str]: # """ # Halts all Sensor sub-processes, allowing for ``grace_period_seconds`` timeout interval until the processes are # more forcibly halted. First the processes are attempted to be halted via :method:`threading.join`. In the event # that a Sensor sub-process fails to halt by the provided grace period and encounters a timeout, a SIGTERM is # issued to the zombie process. In the event that the process still continues to run, a SIGKILL is issued to the # zombie process. If somehow, the process still persists past the SIGKILL, then a critical error message is sent # to the logger requesting manual intervention, and this method will return the associated error message. # # Args: # total_grace_period_seconds (float): The number of seconds to wait for all processes to gracefully terminate. # # Returns: # Optional[str]: A descriptive error message in the event that a Sensor/sub-process fails to terminate via # SIGKILL signal. # # """ # # error_message: Optional[str] = None # # Keep track of termination time per-sensor: # last_sensor_halt_time_seconds = 0.0 # remaining_grace_period_seconds = total_grace_period_seconds # logger.info(f"Issuing join() (graceful stop) to the Sensor sub-processes.") # # .. todo:: Make the termination logic for all sensor process asynchronous (see below). # for sensor_name, sensor in self._sensors.items(): # starting_sensor_halt_time = datetime.now(tz=pytz.timezone('US/Eastern')) # # .. todo:: This may need to be executed asynchronously or the timeout interval needs to be updated # # dynamically. There is a potential problem here, if the first sensor takes # # self._capture_interval_seconds - 60 seconds to end gracefully, the next sensor should have 0 seconds # # to terminate gracefully, not the full timeout duration again. Ideally all sensors should be # # instructed to exit gracefully at the same time (not sequentially as is currently being done). However, # # as a temporary hack, it may make sense to just dynamically calculate the remaining timeout interval # # every iteration and pray that the long-terminating sensors don't get called first. # # Update the total amount of seconds left in the grace period: # sensor.join(timeout=remaining_grace_period_seconds) # # Process terminated or the method timed out. Check the exitcode to determine which: # process_exit_code: Optional[int] = sensor.exitcode # if process_exit_code is None: # # The process failed to terminate gracefully. # logger.warning(f"The sub-process associated with sensor: {sensor_name} failed to terminate gracefully.") # # Attempt to issue a SIGTERM: # logger.info(f"Issuing a SIGTERM to sub-process/sensor: {sensor_name}.") # sensor.terminate() # process_exit_code: Optional[int] = sensor.exitcode # if process_exit_code is None: # # The process failed to terminate with a SIGTERM, fallback to a SIGKILL. # logger.error(f"The sub-process associated with sensor: {sensor_name} failed to terminate with a " # f"SIGTERM. Issuing a SIGKILL.") # sensor.kill() # # It looks like we need to give the process time to be killed by the OS: # time.sleep(1) # # Now pull the new exitcode # process_exit_code: Optional[int] = sensor.exitcode # if process_exit_code is None: # # The process failed to terminate with a SIGKILL. Manual intervention required. # error_message = f"The sub-process {sensor.pid} associated with sensor: {sensor_name} " \ # f"failed to terminate via SIGKILL. Manual intervention required." # logger.critical(error_message) # return error_message # else: # # The process terminated via SIGKILL. A negative value -N indicates the child was terminated by signal N. # logger.warning(f"The sub-process {sensor.pid} associated with Sensor: {sensor_name} was " # f"forcefully terminated via SIGKILL.") # else: # # The process terminated via SIGTERM. A negative value -N indicates the child was terminated by signal N. # logger.warning(f"The sub-process {sensor.pid} associated with Sensor: {sensor_name} was " # f"ungracefully terminated via SIGTERM.") # else: # # The process terminated gracefully. A negative value -N indicates the child was terminated by signal N. # logger.debug(f"The sub-process {sensor.pid} associated with Sensor: {sensor_name} " # f"joined/ended gracefully.") # last_sensor_halt_time_seconds = (datetime.now(tz=pytz.timezone('US/Eastern')) - starting_sensor_halt_time).total_seconds() # remaining_grace_period_seconds = remaining_grace_period_seconds - last_sensor_halt_time_seconds # # Remove all closed() processes from memory. Sensor sub-processes must be re-initialized between recording # # sessions: # self._sensors: Dict[str, Sensor] = {key: value for key, value in self._sensors.items() if value.is_alive()} # logger.debug(f"Pruned dead/halted Sensor sub-process instances.") # return error_message @property def sensors(self) -> Optional[Dict[str, Sensor]]: return self._sensors @property def capture_window_start_time(self) -> datetime: return self._capture_window_start_time @property def recording_start_time_queues(self) -> Dict[str, Queue]: return self._recording_start_time_queues @property def command_queue(self) -> Queue: return self._command_queue @property def capture_window_end_time(self) -> datetime: return self._capture_window_end_time