Beemon Orchestrator
- class orchestrator.Orchestrator(*args, **kwargs)[source]
- __init__(command_queue: Queue, command_queue_polling_frequency_seconds: float | None = 0.5)[source]
The Orchestrator is a singleton class which handles the actual execution of commands received by the
Server
class. Most commonly, this involves interacting with one or more childSensor
objects/processes to carry out the inbound command. As a result, this class orchestrates the various independentSensor
processes (and theUploader
process), controlling their actions.Notes
Recall that the reason this class runs in a separate process is to leave the
Server
’s main thread non-IO blocked in order to accept and handle new inbound client requests.- Parameters:
command_queue (Queue) – A
multiprocessing.Queue
object which theServer
class utilizes to push all recognized commands to this orchestrator process.command_queue_polling_frequency_seconds (Optional[float]) – How long the orchestrator process should sleep between querying/polling it’s associated
command_queue
.
- _auto_start_flagged_processes()[source]
Automatically starts concrete
Process
subclasses whoseauto_start
property (inbeemon-config.ini
) is set toTrue
.Returns:
- _auto_start_flagged_sensors()[source]
Automatically starts concrete
Sensor
subclasses whoseauto_start
property (in beemon-config.ini`) is set toTrue
.Returns:
- _dynamically_initialize_sensors()[source]
Dynamically detects concrete
Sensor
subclasses insrc.beemon.sensor
and initializes corresponding instances. Parses eachSensor
’s fields frombeemon-config.ini
, overriding the global defaults as necessary, then populates the Sensor’smultiprocessing.Queue
of recording start times.
- _handle_command(command: str) str | None [source]
This helper method is invoked every time an inbound command is received from the
Server
(over the commandQueue
IPC channel). This method determines which logic should execute as a result of the received command from theServer
.
- _logger_compression(path: str) None [source]
A custom compression function for the logger. This function is called during rotation. The rotated file has a name in the format of ‘beemon.log.<integer>’ where a larger integer corresponds to older logs.
Loguru passes the path of the latest log file to be rotated.
- Parameters:
path (str) – The path of the file to be rotated and compressed.
- _logger_rotation_check(logged_message: Message, file_object: TextIOWrapper) bool [source]
Checks whether log file should rotate based on both max file size and number of days since creation as denoted in the config file. Arguments passed by loguru.
Logger rotates to new file when either the file reaches the max file size or it has been the designated number of days since creation, whichever comes first.
- Parameters:
logged_message (loguru._handler.Message) – contains the current log message (unused).
file_object (io.TextIOWrapper) – A TextIOWrapper object, a basic python file
object.
- Returns:
True if logger should rotate to new file, false otherwise.
- _populate_recording_start_time_queue_for_sensor(sensor_name: str, sensor_capture_window_start_time: datetime, sensor_capture_window_end_time: datetime, sensor_capture_duration_seconds: int, sensor_capture_interval_seconds: int, include_elapsed_times: bool | None = False)[source]
Helper method which breaks the specified
datetime.datetime
range into even intervals ofsensor_capture_duration_seconds
, and builds a list of all recording start times for theSensor
. This method then (optionally) discards the recording start times which have already elapsed.- Parameters:
sensor_name (str) – The name of the
Sensor
(e.g.audio
).sensor_capture_window_start_time (datetime) – The starting datetime (non-naive and inclusive) at which the first recording session (of
capture_duration_seconds
length in seconds) is to begin at.sensor_capture_window_end_time (datetime) – The ending datetime (non-naive and inclusive) at which the last recording session (of
capture_duration_seconds
length in seconds) is to begin at.sensor_capture_duration_seconds (float) – The length/duration that each recording session between
capture_window_start_time
andcapture_window_end_time
should be.sensor_capture_interval_seconds (float) – How frequently to initialize recording sessions. For instance, a value of
120.0
seconds indicates that recording should be performed every two minutes fromcapture_window_start_time
tocapture_window_end_time
(inclusive) for a duration ofcapture_duration_seconds
.include_elapsed_times (Optional[bool]) – Indicates whether or not recording start times which have already passed/elapsed should be appended to the
Sensor
’smultiprocessing.Queue
.
- static get_all_recording_start_times_within_window(capture_window_start_time: datetime, capture_window_end_time: datetime, capture_duration_seconds: float, capture_interval_seconds: float) List[datetime] [source]
Computes the maximum number of recordings which can occur between the the range of datetime objects (inclusive), given that each recording has the specified interval of:
capture_interval_seconds
. This method then returns a list of datetime objects (non-naive and EDT timezone aware) which constitute the times at which recording should begin in order to successfully record a maximum number of times between the supplied range of times.- Parameters:
capture_window_start_time (datetime) – The starting datetime (non-naive and inclusive) at which the first recording session (of
capture_duration_seconds
length in seconds) is to begin at.capture_window_end_time (datetime) – The ending datetime (non-naive and inclusive) at which the last recording session (of
capture_duration_seconds
length in seconds) is to begin at.capture_duration_seconds (float) – The length/duration that each recording session between
capture_window_start_time
andcapture_window_end_time
should be. This method will produce the times which recording commands should be issued to all registeredSensor
concrete subclass objects.capture_interval_seconds (float) – How frequently to initialize recording sessions. For instance, a value of
120.0
seconds indicates that recording should be performed every two minutes fromcapture_window_start_time
tocapture_window_end_time
(inclusive) for a duration ofcapture_duration_seconds
.
- Returns:
Todo
Docstring.
- Return type:
List[datetime]
- halt_sensor(target: str)[source]
Called in order to halt a
Sensor
subclass which is currently sleeping (awaiting the next pre-scheduled/populated recording start time). This method purges themultiprocessing.Queue
associated with theSensor
subclass instance/object. When theSensor
subclass awakens to begin recording, it will perform a parity check on the recording time and recognize that themultiprocessing.Queue
is now empty. It will then abort the scheduled recording operation, returning from it’sProcess.run()
method, allowing the process to terminate.- Parameters:
target (str) – The name of the sensor whose queue of recording start times should be purged. This will cause any future parity checks made by the (potentially sleeping) sensor to fail, aborting any recording operations which were scheduled to be performed before this command was issued.
Returns:
- run() None [source]
The main processing loop for the
Orchestrator
class. This method is called after instantiation, and will infinitely read from the commandQueue
IPC object (which theServer
populates), awaiting incoming commands to execute. When a command is received by this process, the_handle_command()
method is invoked with the decoded (UTF-8) text of the command.
- start_command(target: str) str | None [source]
Called whenever the orchestrator process receives a
"start"
command from theServer
process. This method executes the received"start"
command, instructing the targetedSensor
(orProcess
) to begin operations. If aSensor
subclass is targeted, this method will populate the associated queue of recording start times for theSensor
, and then start theSensor
Process
(by callingProcess.start
), which will cause it to enter into it’s main recording loop (inProcess.run()
).
- stop_command(target: str | None = None) str | None [source]
Called whenever the
Orchestrator
Process
receives a"stop"
command form theServer
Process
. This method executes/performs the received"stop"
command, instructing the targetedSensor
or (Process
) to halt operations. If aSensor
subclass is targeted, this method will purge the queue of recording start times for theSensor
, and then call theSensor
’sSensor.stop()
method (allowing the concrete subclass a chance to gracefully terminate/halt open IPC channels with it’s native firmware/hardware). If notarget
is provided, all subprocesses will be halted.Notes
There are three main behavior-impacting observable cases which occur (timing-wise) when this command is manually issued by a user:
a) The ``"stop"`` command is issued before the concrete :class:`Sensor` subclass begins recording, and before the concrete subclass enters a routine sleep cycle (awaiting the next recording time). b) The ``"stop"`` command is issued while the :class:`Sensor` subclass is sleeping in-between pre-ordained recording times. c) The ``"stop"`` command is issued while the :class:`Sensor` subclass is already recording.
In the best case scenario a
"stop"
command will be issued to a prospectiveSensor
subclass before it enters a sleep cycle (awaiting the next scheduled recording time). When this case occurs, theSensor
’sEvent
flag (halt_recording_request_received
) will be updated near instantaneously, and the process will terminate without much noticeable delay. However, during normal operation it is quite common for the"stop"
command to be issued either: during recording, or while theSensor
is in a sleep cycle. In an effort to avoid firmware/hardware lockups, we do not attempt to halt/interrupt a recording in progress. We leave it up to the implementer of the concreteSensor
subclass to poll/check thehalt_recording_request_received
Event
flag periodically during recording (in their implementation ofrecord()
), and decide if they wish to handle forcefully terminating an open connection with the firmware/hardware. We do not presume it is safe to forcefully halt the process directly from the super-class. As a result of this design philosophy, it is quite common for a concreteSensor
subclass to continue to recording (for it’s pre-specified recording duration) after having noticeably received a"stop"
command from theOrchestrator
. This behavior is non-ideal, but is a byproduct of believing it unwise to forcibly terminate a concrete child process (which may have an open subprocess of it’s own to the recording device in question). Likewise, in the case where the"stop"
command is received by theSensor
subclass while it is sleeping, we do not attempt to periodically poll an external command queue (due to observed recording time delays caused by IPC locking mechanisms). The result of this, is that a sleeping sensor will not visibly respond to a"stop"
command (while sleeping) until it wakes at the next scheduled recording time, and performs a parity check. The result of these design choices from a user’s perspective is that, in the worst case scenario, a Sensor will continue to record for an additional period after the"stop"
command is received, and then it will discard the recorded file.