Beemon Orchestrator
- class orchestrator.Orchestrator(*args, **kwargs)[source]
- __init__(command_queue: Queue, command_queue_polling_frequency_seconds: float | None = 0.5)[source]
The Orchestrator is a singleton class which handles the actual execution of commands received by the
Serverclass. Most commonly, this involves interacting with one or more childSensorobjects/processes to carry out the inbound command. As a result, this class orchestrates the various independentSensorprocesses (and theUploaderprocess), controlling their actions.Notes
Recall that the reason this class runs in a separate process is to leave the
Server’s main thread non-IO blocked in order to accept and handle new inbound client requests.- Parameters:
command_queue (Queue) – A
multiprocessing.Queueobject which theServerclass utilizes to push all recognized commands to this orchestrator process.command_queue_polling_frequency_seconds (Optional[float]) – How long the orchestrator process should sleep between querying/polling it’s associated
command_queue.
- _auto_start_flagged_processes()[source]
Automatically starts concrete
Processsubclasses whoseauto_startproperty (inbeemon-config.ini) is set toTrue.Returns:
- _auto_start_flagged_sensors()[source]
Automatically starts concrete
Sensorsubclasses whoseauto_startproperty (in beemon-config.ini`) is set toTrue.Returns:
- _dynamically_initialize_sensors()[source]
Dynamically detects concrete
Sensorsubclasses insrc.beemon.sensorand initializes corresponding instances. Parses eachSensor’s fields frombeemon-config.ini, overriding the global defaults as necessary, then populates the Sensor’smultiprocessing.Queueof recording start times.
- _handle_command(command: str) str | None[source]
This helper method is invoked every time an inbound command is received from the
Server(over the commandQueueIPC channel). This method determines which logic should execute as a result of the received command from theServer.
- _logger_compression(path: str) None[source]
A custom compression function for the logger. This function is called during rotation. The rotated file has a name in the format of ‘beemon.log.<integer>’ where a larger integer corresponds to older logs.
Loguru passes the path of the latest log file to be rotated.
- Parameters:
path (str) – The path of the file to be rotated and compressed.
- _logger_rotation_check(logged_message: Message, file_object: TextIOWrapper) bool[source]
Checks whether log file should rotate based on both max file size and number of days since creation as denoted in the config file. Arguments passed by loguru.
Logger rotates to new file when either the file reaches the max file size or it has been the designated number of days since creation, whichever comes first.
- Parameters:
logged_message (loguru._handler.Message) – contains the current log message (unused).
file_object (io.TextIOWrapper) – A TextIOWrapper object, a basic python file
object.
- Returns:
True if logger should rotate to new file, false otherwise.
- _populate_recording_start_time_queue_for_sensor(sensor_name: str, sensor_capture_window_start_time: datetime, sensor_capture_window_end_time: datetime, sensor_capture_duration_seconds: int, sensor_capture_interval_seconds: int, include_elapsed_times: bool | None = False)[source]
Helper method which breaks the specified
datetime.datetimerange into even intervals ofsensor_capture_duration_seconds, and builds a list of all recording start times for theSensor. This method then (optionally) discards the recording start times which have already elapsed.- Parameters:
sensor_name (str) – The name of the
Sensor(e.g.audio).sensor_capture_window_start_time (datetime) – The starting datetime (non-naive and inclusive) at which the first recording session (of
capture_duration_secondslength in seconds) is to begin at.sensor_capture_window_end_time (datetime) – The ending datetime (non-naive and inclusive) at which the last recording session (of
capture_duration_secondslength in seconds) is to begin at.sensor_capture_duration_seconds (float) – The length/duration that each recording session between
capture_window_start_timeandcapture_window_end_timeshould be.sensor_capture_interval_seconds (float) – How frequently to initialize recording sessions. For instance, a value of
120.0seconds indicates that recording should be performed every two minutes fromcapture_window_start_timetocapture_window_end_time(inclusive) for a duration ofcapture_duration_seconds.include_elapsed_times (Optional[bool]) – Indicates whether or not recording start times which have already passed/elapsed should be appended to the
Sensor’smultiprocessing.Queue.
- static get_all_recording_start_times_within_window(capture_window_start_time: datetime, capture_window_end_time: datetime, capture_duration_seconds: float, capture_interval_seconds: float) List[datetime][source]
Computes the maximum number of recordings which can occur between the the range of datetime objects (inclusive), given that each recording has the specified interval of:
capture_interval_seconds. This method then returns a list of datetime objects (non-naive and EDT timezone aware) which constitute the times at which recording should begin in order to successfully record a maximum number of times between the supplied range of times.- Parameters:
capture_window_start_time (datetime) – The starting datetime (non-naive and inclusive) at which the first recording session (of
capture_duration_secondslength in seconds) is to begin at.capture_window_end_time (datetime) – The ending datetime (non-naive and inclusive) at which the last recording session (of
capture_duration_secondslength in seconds) is to begin at.capture_duration_seconds (float) – The length/duration that each recording session between
capture_window_start_timeandcapture_window_end_timeshould be. This method will produce the times which recording commands should be issued to all registeredSensorconcrete subclass objects.capture_interval_seconds (float) – How frequently to initialize recording sessions. For instance, a value of
120.0seconds indicates that recording should be performed every two minutes fromcapture_window_start_timetocapture_window_end_time(inclusive) for a duration ofcapture_duration_seconds.
- Returns:
Todo
Docstring.
- Return type:
List[datetime]
- halt_sensor(target: str)[source]
Called in order to halt a
Sensorsubclass which is currently sleeping (awaiting the next pre-scheduled/populated recording start time). This method purges themultiprocessing.Queueassociated with theSensorsubclass instance/object. When theSensorsubclass awakens to begin recording, it will perform a parity check on the recording time and recognize that themultiprocessing.Queueis now empty. It will then abort the scheduled recording operation, returning from it’sProcess.run()method, allowing the process to terminate.- Parameters:
target (str) – The name of the sensor whose queue of recording start times should be purged. This will cause any future parity checks made by the (potentially sleeping) sensor to fail, aborting any recording operations which were scheduled to be performed before this command was issued.
Returns:
- run() None[source]
The main processing loop for the
Orchestratorclass. This method is called after instantiation, and will infinitely read from the commandQueueIPC object (which theServerpopulates), awaiting incoming commands to execute. When a command is received by this process, the_handle_command()method is invoked with the decoded (UTF-8) text of the command.
- start_command(target: str) str | None[source]
Called whenever the orchestrator process receives a
"start"command from theServerprocess. This method executes the received"start"command, instructing the targetedSensor(orProcess) to begin operations. If aSensorsubclass is targeted, this method will populate the associated queue of recording start times for theSensor, and then start theSensorProcess(by callingProcess.start), which will cause it to enter into it’s main recording loop (inProcess.run()).
- stop_command(target: str | None = None) str | None[source]
Called whenever the
OrchestratorProcessreceives a"stop"command form theServerProcess. This method executes/performs the received"stop"command, instructing the targetedSensoror (Process) to halt operations. If aSensorsubclass is targeted, this method will purge the queue of recording start times for theSensor, and then call theSensor’sSensor.stop()method (allowing the concrete subclass a chance to gracefully terminate/halt open IPC channels with it’s native firmware/hardware). If notargetis provided, all subprocesses will be halted.Notes
There are three main behavior-impacting observable cases which occur (timing-wise) when this command is manually issued by a user:
a) The ``"stop"`` command is issued before the concrete :class:`Sensor` subclass begins recording, and before the concrete subclass enters a routine sleep cycle (awaiting the next recording time). b) The ``"stop"`` command is issued while the :class:`Sensor` subclass is sleeping in-between pre-ordained recording times. c) The ``"stop"`` command is issued while the :class:`Sensor` subclass is already recording.In the best case scenario a
"stop"command will be issued to a prospectiveSensorsubclass before it enters a sleep cycle (awaiting the next scheduled recording time). When this case occurs, theSensor’sEventflag (halt_recording_request_received) will be updated near instantaneously, and the process will terminate without much noticeable delay. However, during normal operation it is quite common for the"stop"command to be issued either: during recording, or while theSensoris in a sleep cycle. In an effort to avoid firmware/hardware lockups, we do not attempt to halt/interrupt a recording in progress. We leave it up to the implementer of the concreteSensorsubclass to poll/check thehalt_recording_request_receivedEventflag periodically during recording (in their implementation ofrecord()), and decide if they wish to handle forcefully terminating an open connection with the firmware/hardware. We do not presume it is safe to forcefully halt the process directly from the super-class. As a result of this design philosophy, it is quite common for a concreteSensorsubclass to continue to recording (for it’s pre-specified recording duration) after having noticeably received a"stop"command from theOrchestrator. This behavior is non-ideal, but is a byproduct of believing it unwise to forcibly terminate a concrete child process (which may have an open subprocess of it’s own to the recording device in question). Likewise, in the case where the"stop"command is received by theSensorsubclass while it is sleeping, we do not attempt to periodically poll an external command queue (due to observed recording time delays caused by IPC locking mechanisms). The result of this, is that a sleeping sensor will not visibly respond to a"stop"command (while sleeping) until it wakes at the next scheduled recording time, and performs a parity check. The result of these design choices from a user’s perspective is that, in the worst case scenario, a Sensor will continue to record for an additional period after the"stop"command is received, and then it will discard the recorded file.