Beemon Orchestrator

class orchestrator.Orchestrator(*args, **kwargs)[source]
__init__(command_queue: Queue, command_queue_polling_frequency_seconds: float | None = 0.5)[source]

The Orchestrator is a singleton class which handles the actual execution of commands received by the Server class. Most commonly, this involves interacting with one or more child Sensor objects/processes to carry out the inbound command. As a result, this class orchestrates the various independent Sensor processes (and the Uploader process), controlling their actions.

Notes

Recall that the reason this class runs in a separate process is to leave the Server’s main thread non-IO blocked in order to accept and handle new inbound client requests.

Parameters:
  • command_queue (Queue) – A multiprocessing.Queue object which the Server class utilizes to push all recognized commands to this orchestrator process.

  • command_queue_polling_frequency_seconds (Optional[float]) – How long the orchestrator process should sleep between querying/polling it’s associated command_queue.

_auto_start_flagged_processes()[source]

Automatically starts concrete Process subclasses whose auto_start property (in beemon-config.ini) is set to True.

Returns:

_auto_start_flagged_sensors()[source]

Automatically starts concrete Sensor subclasses whose auto_start property (in beemon-config.ini`) is set to True.

Returns:

_dynamically_initialize_sensors()[source]

Dynamically detects concrete Sensor subclasses in src.beemon.sensor and initializes corresponding instances. Parses each Sensor’s fields from beemon-config.ini, overriding the global defaults as necessary, then populates the Sensor’s multiprocessing.Queue of recording start times.

_handle_command(command: str) str | None[source]

This helper method is invoked every time an inbound command is received from the Server (over the command Queue IPC channel). This method determines which logic should execute as a result of the received command from the Server.

Parameters:

command (str) – The decoded UTF-8 string representation of the command message received by the Server, then relayed to this process over the IPC channel (Queue).

Returns:

Todo

This functionality is not used anywhere. This method currently returns None.

Return type:

Optional[str]

_logger_compression(path: str) None[source]

A custom compression function for the logger. This function is called during rotation. The rotated file has a name in the format of ‘beemon.log.<integer>’ where a larger integer corresponds to older logs.

Loguru passes the path of the latest log file to be rotated.

Parameters:

path (str) – The path of the file to be rotated and compressed.

_logger_rotation_check(logged_message: Message, file_object: TextIOWrapper) bool[source]

Checks whether log file should rotate based on both max file size and number of days since creation as denoted in the config file. Arguments passed by loguru.

Logger rotates to new file when either the file reaches the max file size or it has been the designated number of days since creation, whichever comes first.

Parameters:
  • logged_message (loguru._handler.Message) – contains the current log message (unused).

  • file_object (io.TextIOWrapper) – A TextIOWrapper object, a basic python file

  • object.

Returns:

True if logger should rotate to new file, false otherwise.

_populate_recording_start_time_queue_for_sensor(sensor_name: str, sensor_capture_window_start_time: datetime, sensor_capture_window_end_time: datetime, sensor_capture_duration_seconds: int, sensor_capture_interval_seconds: int, include_elapsed_times: bool | None = False)[source]

Helper method which breaks the specified datetime.datetime range into even intervals of sensor_capture_duration_seconds, and builds a list of all recording start times for the Sensor. This method then (optionally) discards the recording start times which have already elapsed.

Parameters:
  • sensor_name (str) – The name of the Sensor (e.g. audio).

  • sensor_capture_window_start_time (datetime) – The starting datetime (non-naive and inclusive) at which the first recording session (of capture_duration_seconds length in seconds) is to begin at.

  • sensor_capture_window_end_time (datetime) – The ending datetime (non-naive and inclusive) at which the last recording session (of capture_duration_seconds length in seconds) is to begin at.

  • sensor_capture_duration_seconds (float) – The length/duration that each recording session between capture_window_start_time and capture_window_end_time should be.

  • sensor_capture_interval_seconds (float) – How frequently to initialize recording sessions. For instance, a value of 120.0 seconds indicates that recording should be performed every two minutes from capture_window_start_time to capture_window_end_time (inclusive) for a duration of capture_duration_seconds.

  • include_elapsed_times (Optional[bool]) – Indicates whether or not recording start times which have already passed/elapsed should be appended to the Sensor’s multiprocessing.Queue.

static get_all_recording_start_times_within_window(capture_window_start_time: datetime, capture_window_end_time: datetime, capture_duration_seconds: float, capture_interval_seconds: float) List[datetime][source]

Computes the maximum number of recordings which can occur between the the range of datetime objects (inclusive), given that each recording has the specified interval of: capture_interval_seconds. This method then returns a list of datetime objects (non-naive and EDT timezone aware) which constitute the times at which recording should begin in order to successfully record a maximum number of times between the supplied range of times.

Parameters:
  • capture_window_start_time (datetime) – The starting datetime (non-naive and inclusive) at which the first recording session (of capture_duration_seconds length in seconds) is to begin at.

  • capture_window_end_time (datetime) – The ending datetime (non-naive and inclusive) at which the last recording session (of capture_duration_seconds length in seconds) is to begin at.

  • capture_duration_seconds (float) – The length/duration that each recording session between capture_window_start_time and capture_window_end_time should be. This method will produce the times which recording commands should be issued to all registered Sensor concrete subclass objects.

  • capture_interval_seconds (float) – How frequently to initialize recording sessions. For instance, a value of 120.0 seconds indicates that recording should be performed every two minutes from capture_window_start_time to capture_window_end_time (inclusive) for a duration of capture_duration_seconds.

Returns:

Todo

Docstring.

Return type:

List[datetime]

halt_sensor(target: str)[source]

Called in order to halt a Sensor subclass which is currently sleeping (awaiting the next pre-scheduled/populated recording start time). This method purges the multiprocessing.Queue associated with the Sensor subclass instance/object. When the Sensor subclass awakens to begin recording, it will perform a parity check on the recording time and recognize that the multiprocessing.Queue is now empty. It will then abort the scheduled recording operation, returning from it’s Process.run() method, allowing the process to terminate.

Parameters:

target (str) – The name of the sensor whose queue of recording start times should be purged. This will cause any future parity checks made by the (potentially sleeping) sensor to fail, aborting any recording operations which were scheduled to be performed before this command was issued.

Returns:

run() None[source]

The main processing loop for the Orchestrator class. This method is called after instantiation, and will infinitely read from the command Queue IPC object (which the Server populates), awaiting incoming commands to execute. When a command is received by this process, the _handle_command() method is invoked with the decoded (UTF-8) text of the command.

start_command(target: str) str | None[source]

Called whenever the orchestrator process receives a "start" command from the Server process. This method executes the received "start" command, instructing the targeted Sensor (or Process) to begin operations. If a Sensor subclass is targeted, this method will populate the associated queue of recording start times for the Sensor, and then start the Sensor Process (by calling Process.start), which will cause it to enter into it’s main recording loop (in Process.run()).

Parameters:

target (str) – The name of the concrete Sensor subclass to start (e.g. “audio”, “video”, “temp”), or alternatively, the name of the Process to start (e.g. “uploader”).

Returns:

An optional error message.

Return type:

Optional[str]

stop_command(target: str | None = None) str | None[source]

Called whenever the Orchestrator Process receives a "stop" command form the Server Process. This method executes/performs the received "stop" command, instructing the targeted Sensor or (Process) to halt operations. If a Sensor subclass is targeted, this method will purge the queue of recording start times for the Sensor, and then call the Sensor’s Sensor.stop() method (allowing the concrete subclass a chance to gracefully terminate/halt open IPC channels with it’s native firmware/hardware). If no target is provided, all subprocesses will be halted.

Notes

There are three main behavior-impacting observable cases which occur (timing-wise) when this command is manually issued by a user:

a) The ``"stop"`` command is issued before the concrete :class:`Sensor` subclass begins recording, and
    before the concrete subclass enters a routine sleep cycle (awaiting the next recording time).
b) The ``"stop"`` command is issued while the :class:`Sensor` subclass is sleeping in-between
    pre-ordained recording times.
c) The ``"stop"`` command is issued while the :class:`Sensor` subclass is already recording.

In the best case scenario a "stop" command will be issued to a prospective Sensor subclass before it enters a sleep cycle (awaiting the next scheduled recording time). When this case occurs, the Sensor’s Event flag (halt_recording_request_received) will be updated near instantaneously, and the process will terminate without much noticeable delay. However, during normal operation it is quite common for the "stop" command to be issued either: during recording, or while the Sensor is in a sleep cycle. In an effort to avoid firmware/hardware lockups, we do not attempt to halt/interrupt a recording in progress. We leave it up to the implementer of the concrete Sensor subclass to poll/check the halt_recording_request_received Event flag periodically during recording (in their implementation of record()), and decide if they wish to handle forcefully terminating an open connection with the firmware/hardware. We do not presume it is safe to forcefully halt the process directly from the super-class. As a result of this design philosophy, it is quite common for a concrete Sensor subclass to continue to recording (for it’s pre-specified recording duration) after having noticeably received a "stop" command from the Orchestrator. This behavior is non-ideal, but is a byproduct of believing it unwise to forcibly terminate a concrete child process (which may have an open subprocess of it’s own to the recording device in question). Likewise, in the case where the "stop" command is received by the Sensor subclass while it is sleeping, we do not attempt to periodically poll an external command queue (due to observed recording time delays caused by IPC locking mechanisms). The result of this, is that a sleeping sensor will not visibly respond to a "stop" command (while sleeping) until it wakes at the next scheduled recording time, and performs a parity check. The result of these design choices from a user’s perspective is that, in the worst case scenario, a Sensor will continue to record for an additional period after the "stop" command is received, and then it will discard the recorded file.

Parameters:

target (Optional[str]) – The name of the concrete Sensor subclass (or applicable Process) to issue the "stop" command to. If no target is specified, all subprocessess will be halted.

Returns:

An optional error message (currently not utilized).

Return type:

Optional[str]