Beemon Uploader
The Uploader Class:
- class uploader.Uploader(command_queue: Queue, config: Config | None)[source]
The
Uploaderclass manages several other processes (and threads) in order to watch a user-specifiedlocal_output_directoryfor changes to the local filesystem, and then to perform FTP transfers of the modified files to a pre-specified location on a remote server. TheUploaderprocess leverages the followingmultiprocessing.Process(s):The
UploadHelperprocess.
Additionally, the
Uploaderprocess leverages the followingthreading.Thread(s):The
polling.PollingObserverthread.
The
watchdog.observers.Observerthreading.Threadwatches the user-specifiedlocal_output_directoryfor filesystem events (such as created and modifiedsensor.Sensormeasurement files). TheUploadHelpermultiprocessing.Processperforms the actual FTP file transfers that are observed by thewatchdog.observers.Observerin the user-specifiedlocal_output_directory. TheUploadHelperprocess performs the task of uploading the observed files to the user-specifiedremote_destination_root_directoryon the remote CS Machine (www.cs.appstate.edu).Notes
The
Uploaderclass maintains an open Interprocess Communication Channel (IPC) with its sole controllingmultiprocessing.Process(theorchestrator.Orchestratorclass) in a pointer to thecommand_queuewhich is provided on initialization.- __init__(command_queue: Queue, config: Config | None)[source]
Initializer for objects of type: Uploader.
- Parameters:
command_queue (Queue) – The
multiprocessing.Queuewhich the uploader routinely polls to determine if a"stop"command was issued by theorchestrator.Orchestratormultiprocessing.Process.config (Optional[Config]) – A reference/pointer to an existing beemon
Configobject, should one exist. IfNoneis provided, a new beemon configurationConfigobject will be created for thebeemon-config.iniin the repository’s root directory.
- _handle_command(command: str) str | None[source]
Called whenever a command is received from the
orchestrator.Orchestratorobject over the IPCmultiprocessing.Queue:command_queue.- Parameters:
command (str) – The UTF-8 text of the command passed across the
command_queueIPC channel from theorchestrator.Orchestrator.- Returns:
An optional error message (currently not used).
- Return type:
Optional[str]
- populate_secondary_queue()[source]
Populates the secondary queue of the
UploadHelperwith all the files still remaining in the specified_helper.local_output_directory. This method is called on startup to queue any remaining files from a previous day for upload. If a file with the extension.finalexists that file is added to the queue instead of the corresponding.csvfile.
- run()[source]
The main process/loop for the
Uploaderclass. This method starts theUploadHelpermultiprocessing.ProcessandUploadHandlerthreading.Thread. It then repeatedly polls it’sUploader._command_queuemultiprocessing.Queuefor inbound commands dispatched from theorchestrator.Orchestratorprocess. It will do this infinitely until theUploader.stop()method is invoked thereby halting the infinite loop.
- start() None[source]
Starts the
Uploaderprocess, scheduling it’sUploader.run()method to be called.
- stop()[source]
Sets the
multiprocessing.Event__runningflag toFalse, aborting theUploader.run()main loop, and then gracefully terminating theUploadHelpermultiprocessing.Processandwatchdog.observers.Observerthreading.Thread.Notes
Exiting (implicitly via a
returnofNone) fromUploader.run()allows themultiprocessing.Processto be terminated gracefully by the garbage collector.
The UploadHelper Class:
- class uploader.UploadHelper(upload_queue_polling_interval_seconds: float | None = 1.0)[source]
A helper
multiprocessing.Processleveraged by the parentUploaderobject to perform the actual upload of queued files to the remote CS Server (www.cs.appstate.edu) via FTP.- __init__(upload_queue_polling_interval_seconds: float | None = 1.0)[source]
Initializes the
UploadHelperclass. :param upload_queue_polling_interval_seconds: How frequently the primary and secondary upload :type upload_queue_polling_interval_seconds: Optional[float] :parammultiprocessing.Queue’s should be polled for pending file transfer requests.:
- static make_directory(session: FTP, directory_name: str) str | None[source]
Helper method which attempts to create the specified directory on the remote machine via the provided
ftplib.FTPsession.- Parameters:
session (ftplib.FTP) – The current FTP session containing the authenticated FTP client with an open socket to the remote server.
directory_name (str) – The name of the directory to create on the remote machine.
- Returns:
A descriptive error message explaining why the directory creation attempt failed, or this is None in the case that no error is encountered.
- Return type:
Optional[str]
- make_remote_directories(session: FTP, date: DateRepresentation) str | None[source]
Helper method which sets up the Beemon data directory structure on the remote machine. This method will create a remote directory for the current Raspberry Pi under the specified
self._upload_directory(which is configurable in thebeemon-config.inifile). By default, this will create a top-level remote directory such as:/usr/local/bee/beemon/rpi4-11. Then, this method will create a subdirectory for the provided date, for instance:/usr/local/bee/beemon/rpi4-11/2021-11-10. Afterward, this method will dynamically import the sensor classes fromAppMAIS.src.beemon.sensorand use the names of the concretesensor.Sensorclasses to create subdirectories such as:/usr/local/bee/beemon/rpi4-11/2021-11-10/audio/usr/local/bee/beemon/rpi4-11/2021-11-10/video/usr/local/bee/beemon/rpi4-11/2021-11-10/temp
- Parameters:
session (ftplib.FTP) – The current FTP session containing the authenticated FTP client with an open socket to the remote server.
date (DateRepresentation) – The current date (of the form:
"YYYY-MM-DD").
- Returns:
A descriptive error message in the event that the remote directory structure could not be created on the remote server. Otherwise, this value is None in the event that no errors are encountered.
- Return type:
Optional[str]
- queue_upload(source_filepath: str, dest_filepath: str, priority: int | None = 0)[source]
Queues an FTP file transfer operation to take place with a specified priority. The file transfer operation request from the provided
source_filepathto the provideddest_filepathwill be inserted into the primary upload queue by default (priority 0). Ifpriorityis set higher than 0, the FTP request will be inserted into the secondary upload queue.- Parameters:
source_filepath (str) – The location of the source/local file.
dest_filepath (str) – The destination directory/file path on the remote host.
priority (Optional[int]) – An integer value indicating whether the FTP request should be inserted into the primary or secondary upload queue. A value of 0 indicates the request should be inserted into the primary upload queue. A value of any other integer indicates the request should be inserted into the secondary upload queue.
- static remove_local_file(filepath: str)[source]
Removes the specified file from the local system. This method is intended to be used as a callback method to delete the file from the local directory once it has been successfully uploaded to the remote server. Any .csv file is not removed. When a .final (the tag indicating the last reading of the day) is removed, then any the path before the .final is removed. (e.g.
/home/bee/bee_tmp/ ... /rpi4-11@2022-07-19.csv.finalcauses the removal of/home/bee/bee_tmp/ ... /rpi4-11@2022-07-19.csv)- Parameters:
filepath (str) – The filepath of the local file which should be removed.
- run()[source]
The main loop for the
UploadHelperobject. Enters into an infinite loop in which the primary and secondary upload queues are routinely queried for pending file transfer requests. If pending requests are present, FTP transfer operations will first be performed from the primary upload queue, and secondly from the secondary upload queue (once the primary queue has been entirely emptied of elements).
- stop()[source]
Sets the
multiprocessing.Event__runningflag toFalsethereby breaking out of theUploadHelper.run()method (via implicitreturn), allowing thismultiprocessing.Processto terminate gracefully.
- upload_file(source_filepath: str, dest_filepath: str) str | None[source]
Uploads the file at the provided local
source_filepathto the remote path specified by thedest_filepathargument. Leverages aUploadSessioncontextlib.ContextManagerinstance in order to cleanly manage the opening and closing of required FTP authentication resources.- Parameters:
- Returns:
An optional error message if the local file could not be uploaded.
- Return type:
Optional[str]
The UploadSession Class:
- class uploader.UploadSession(username: str, password: str, host_ip: str, host_port: int, remote_directory: str)[source]
Serves as a Python Context Manager which encapsulates establishing an authenticated FTP session with a remote host at a particular destination directory.
- __init__(username: str, password: str, host_ip: str, host_port: int, remote_directory: str)[source]
- Parameters:
username (str) – The username to authenticate at the remote
host_ipwith.password (str) – The password used in conjunction with the
usernameto authenticate at the remotehost_ipwith.host_ip (str) – The IP address of the remote server to which the FTP session should be established with.
host_port (int) – The port to request a connection to on the remote host server (at the specified
host_ip).remote_directory (str) – The root remote directory to which this FTP session plans to upload to. For example:
usr/local/bee/rpi4-11/
The UploaderHandler Class:
- class uploader.UploadHandler(upload_helper: UploadHelper)[source]
Allows quick customization of the
watchdog.observers.Observerthread/object performing the monitoring of the local filesystem for events.- __init__(upload_helper: UploadHelper)[source]
- Parameters:
upload_helper (UploadHelper) – A pointer/reference to the
UploadHelpermultiprocessing.Process. Utilized by theUploadHandlerin order to queue FTP transfer requests.
- on_closed(event: FileClosedEvent)[source]
Event handler which is fired whenever this
threading.Threaddetects a closed file on the local filesystem (via iNotify), in the specified directory (in theUploader’sUploader.local_output_directory). This event handler will enqueue the file which was just created in the watched directory for upload via theUploadHelper’s primarymultiprocessing.Queueto the remote server.- Parameters:
event (
events.FileDeletedEvent) – A pointer/handler to the newly closed file.
- on_created(event: DirCreatedEvent | FileCreatedEvent)[source]
Event handler which is fired whenever this
threading.Threaddetects a newly created file on the local filesystem, in the specified directory (in theUploader’sUploader.local_output_directory). This event handler will enqueue the file which was just created in the watched directory for upload via theUploadHelper’s primarymultiprocessing.Queueto the remote server.- Parameters:
event (Union[
events.DirCreatedEvent,events.FileCreatedEvent]) – A pointer/handler to the newly created directory, or file.