Beemon Uploader

The Uploader Class:

class uploader.Uploader(command_queue: Queue, config: Config | None)[source]

The Uploader class manages several other processes (and threads) in order to watch a user-specified local_output_directory for changes to the local filesystem, and then to perform FTP transfers of the modified files to a pre-specified location on a remote server. The Uploader process leverages the following multiprocessing.Process (s):

Additionally, the Uploader process leverages the following threading.Thread (s):

  • The polling.PollingObserver thread.

The watchdog.observers.Observer threading.Thread watches the user-specified local_output_directory for filesystem events (such as created and modified sensor.Sensor measurement files). The UploadHelper multiprocessing.Process performs the actual FTP file transfers that are observed by the watchdog.observers.Observer in the user-specified local_output_directory. The UploadHelper process performs the task of uploading the observed files to the user-specified remote_destination_root_directory on the remote CS Machine (www.cs.appstate.edu).

Notes

The Uploader class maintains an open Interprocess Communication Channel (IPC) with its sole controlling multiprocessing.Process (the orchestrator.Orchestrator class) in a pointer to the command_queue which is provided on initialization.

__init__(command_queue: Queue, config: Config | None)[source]

Initializer for objects of type: Uploader.

Parameters:
  • command_queue (Queue) – The multiprocessing.Queue which the uploader routinely polls to determine if a "stop" command was issued by the orchestrator.Orchestrator multiprocessing.Process.

  • config (Optional[Config]) – A reference/pointer to an existing beemon Config object, should one exist. If None is provided, a new beemon configuration Config object will be created for the beemon-config.ini in the repository’s root directory.

_handle_command(command: str) str | None[source]

Called whenever a command is received from the orchestrator.Orchestrator object over the IPC multiprocessing.Queue: command_queue.

Parameters:

command (str) – The UTF-8 text of the command passed across the command_queue IPC channel from the orchestrator.Orchestrator.

Returns:

An optional error message (currently not used).

Return type:

Optional[str]

populate_secondary_queue()[source]

Populates the secondary queue of the UploadHelper with all the files still remaining in the specified _helper.local_output_directory. This method is called on startup to queue any remaining files from a previous day for upload. If a file with the extension .final exists that file is added to the queue instead of the corresponding .csv file.

run()[source]

The main process/loop for the Uploader class. This method starts the UploadHelper multiprocessing.Process and UploadHandler threading.Thread. It then repeatedly polls it’s Uploader._command_queue multiprocessing.Queue for inbound commands dispatched from the orchestrator.Orchestrator process. It will do this infinitely until the Uploader.stop() method is invoked thereby halting the infinite loop.

start() None[source]

Starts the Uploader process, scheduling it’s Uploader.run() method to be called.

stop()[source]

Sets the multiprocessing.Event __running flag to False, aborting the Uploader.run() main loop, and then gracefully terminating the UploadHelper multiprocessing.Process and watchdog.observers.Observer threading.Thread.

Notes

Exiting (implicitly via a return of None) from Uploader.run() allows the multiprocessing.Process to be terminated gracefully by the garbage collector.

The UploadHelper Class:

class uploader.UploadHelper(upload_queue_polling_interval_seconds: float | None = 1.0)[source]

A helper multiprocessing.Process leveraged by the parent Uploader object to perform the actual upload of queued files to the remote CS Server (www.cs.appstate.edu) via FTP.

__init__(upload_queue_polling_interval_seconds: float | None = 1.0)[source]

Initializes the UploadHelper class. :param upload_queue_polling_interval_seconds: How frequently the primary and secondary upload :type upload_queue_polling_interval_seconds: Optional[float] :param multiprocessing.Queue’s should be polled for pending file transfer requests.:

static make_directory(session: FTP, directory_name: str) str | None[source]

Helper method which attempts to create the specified directory on the remote machine via the provided ftplib.FTP session.

Parameters:
  • session (ftplib.FTP) – The current FTP session containing the authenticated FTP client with an open socket to the remote server.

  • directory_name (str) – The name of the directory to create on the remote machine.

Returns:

A descriptive error message explaining why the directory creation attempt failed, or this is None in the case that no error is encountered.

Return type:

Optional[str]

make_remote_directories(session: FTP, date: DateRepresentation) str | None[source]

Helper method which sets up the Beemon data directory structure on the remote machine. This method will create a remote directory for the current Raspberry Pi under the specified self._upload_directory (which is configurable in the beemon-config.ini file). By default, this will create a top-level remote directory such as: /usr/local/bee/beemon/rpi4-11. Then, this method will create a subdirectory for the provided date, for instance: /usr/local/bee/beemon/rpi4-11/2021-11-10. Afterward, this method will dynamically import the sensor classes from AppMAIS.src.beemon.sensor and use the names of the concrete sensor.Sensor classes to create subdirectories such as:

  1. /usr/local/bee/beemon/rpi4-11/2021-11-10/audio

  2. /usr/local/bee/beemon/rpi4-11/2021-11-10/video

  3. /usr/local/bee/beemon/rpi4-11/2021-11-10/temp

Parameters:
  • session (ftplib.FTP) – The current FTP session containing the authenticated FTP client with an open socket to the remote server.

  • date (DateRepresentation) – The current date (of the form: "YYYY-MM-DD").

Returns:

A descriptive error message in the event that the remote directory structure could not be created on the remote server. Otherwise, this value is None in the event that no errors are encountered.

Return type:

Optional[str]

queue_upload(source_filepath: str, dest_filepath: str, priority: int | None = 0)[source]

Queues an FTP file transfer operation to take place with a specified priority. The file transfer operation request from the provided source_filepath to the provided dest_filepath will be inserted into the primary upload queue by default (priority 0). If priority is set higher than 0, the FTP request will be inserted into the secondary upload queue.

Parameters:
  • source_filepath (str) – The location of the source/local file.

  • dest_filepath (str) – The destination directory/file path on the remote host.

  • priority (Optional[int]) – An integer value indicating whether the FTP request should be inserted into the primary or secondary upload queue. A value of 0 indicates the request should be inserted into the primary upload queue. A value of any other integer indicates the request should be inserted into the secondary upload queue.

static remove_local_file(filepath: str)[source]

Removes the specified file from the local system. This method is intended to be used as a callback method to delete the file from the local directory once it has been successfully uploaded to the remote server. Any .csv file is not removed. When a .final (the tag indicating the last reading of the day) is removed, then any the path before the .final is removed. (e.g. /home/bee/bee_tmp/ ... /rpi4-11@2022-07-19.csv.final causes the removal of /home/bee/bee_tmp/ ... /rpi4-11@2022-07-19.csv)

Parameters:

filepath (str) – The filepath of the local file which should be removed.

run()[source]

The main loop for the UploadHelper object. Enters into an infinite loop in which the primary and secondary upload queues are routinely queried for pending file transfer requests. If pending requests are present, FTP transfer operations will first be performed from the primary upload queue, and secondly from the secondary upload queue (once the primary queue has been entirely emptied of elements).

stop()[source]

Sets the multiprocessing.Event __running flag to False thereby breaking out of the UploadHelper.run() method (via implicit return), allowing this multiprocessing.Process to terminate gracefully.

upload_file(source_filepath: str, dest_filepath: str) str | None[source]

Uploads the file at the provided local source_filepath to the remote path specified by the dest_filepath argument. Leverages a UploadSession contextlib.ContextManager instance in order to cleanly manage the opening and closing of required FTP authentication resources.

Parameters:
  • source_filepath (str) – The path to the local file which should be uploaded to the specified remote destination.

  • dest_filepath (str) – The destination remote file path, to which the local source file should be uploaded to.

Returns:

An optional error message if the local file could not be uploaded.

Return type:

Optional[str]

upload_from_primary_queue() str | None[source]

Attempts to retrieve an FTP request from the primary upload queue, and uploads the file to the remote FTP server via the UploadHelper.upload_file() method.

Returns:

An optional/descriptive error message.

Return type:

Optional[str]

upload_from_secondary_queue() str | None[source]

Attempts to retrieve an FTP request from the secondary upload queue, and uploads the file to the remote FTP server via the UploadHelper.upload_file() method.

Returns:

An optional/descriptive error message.

Return type:

Optional[str]

The UploadSession Class:

class uploader.UploadSession(username: str, password: str, host_ip: str, host_port: int, remote_directory: str)[source]

Serves as a Python Context Manager which encapsulates establishing an authenticated FTP session with a remote host at a particular destination directory.

__init__(username: str, password: str, host_ip: str, host_port: int, remote_directory: str)[source]
Parameters:
  • username (str) – The username to authenticate at the remote host_ip with.

  • password (str) – The password used in conjunction with the username to authenticate at the remote host_ip with.

  • host_ip (str) – The IP address of the remote server to which the FTP session should be established with.

  • host_port (int) – The port to request a connection to on the remote host server (at the specified host_ip).

  • remote_directory (str) – The root remote directory to which this FTP session plans to upload to. For example: usr/local/bee/rpi4-11/

The UploaderHandler Class:

class uploader.UploadHandler(upload_helper: UploadHelper)[source]

Allows quick customization of the watchdog.observers.Observer thread/object performing the monitoring of the local filesystem for events.

__init__(upload_helper: UploadHelper)[source]
Parameters:

upload_helper (UploadHelper) – A pointer/reference to the UploadHelper multiprocessing.Process. Utilized by the UploadHandler in order to queue FTP transfer requests.

on_closed(event: FileClosedEvent)[source]

Event handler which is fired whenever this threading.Thread detects a closed file on the local filesystem (via iNotify), in the specified directory (in the Uploader’s Uploader.local_output_directory). This event handler will enqueue the file which was just created in the watched directory for upload via the UploadHelper’s primary multiprocessing.Queue to the remote server.

Parameters:

event (events.FileDeletedEvent) – A pointer/handler to the newly closed file.

on_created(event: DirCreatedEvent | FileCreatedEvent)[source]

Event handler which is fired whenever this threading.Thread detects a newly created file on the local filesystem, in the specified directory (in the Uploader’s Uploader.local_output_directory). This event handler will enqueue the file which was just created in the watched directory for upload via the UploadHelper’s primary multiprocessing.Queue to the remote server.

Parameters:

event (Union[events.DirCreatedEvent, events.FileCreatedEvent]) – A pointer/handler to the newly created directory, or file.