Beemon Uploader
The Uploader
Class:
- class uploader.Uploader(command_queue: Queue, config: Config | None)[source]
The
Uploader
class manages several other processes (and threads) in order to watch a user-specifiedlocal_output_directory
for changes to the local filesystem, and then to perform FTP transfers of the modified files to a pre-specified location on a remote server. TheUploader
process leverages the followingmultiprocessing.Process
(s):The
UploadHelper
process.
Additionally, the
Uploader
process leverages the followingthreading.Thread
(s):The
polling.PollingObserver
thread.
The
watchdog.observers.Observer
threading.Thread
watches the user-specifiedlocal_output_directory
for filesystem events (such as created and modifiedsensor.Sensor
measurement files). TheUploadHelper
multiprocessing.Process
performs the actual FTP file transfers that are observed by thewatchdog.observers.Observer
in the user-specifiedlocal_output_directory
. TheUploadHelper
process performs the task of uploading the observed files to the user-specifiedremote_destination_root_directory
on the remote CS Machine (www.cs.appstate.edu
).Notes
The
Uploader
class maintains an open Interprocess Communication Channel (IPC) with its sole controllingmultiprocessing.Process
(theorchestrator.Orchestrator
class) in a pointer to thecommand_queue
which is provided on initialization.- __init__(command_queue: Queue, config: Config | None)[source]
Initializer for objects of type: Uploader.
- Parameters:
command_queue (Queue) – The
multiprocessing.Queue
which the uploader routinely polls to determine if a"stop"
command was issued by theorchestrator.Orchestrator
multiprocessing.Process
.config (Optional[Config]) – A reference/pointer to an existing beemon
Config
object, should one exist. IfNone
is provided, a new beemon configurationConfig
object will be created for thebeemon-config.ini
in the repository’s root directory.
- _handle_command(command: str) str | None [source]
Called whenever a command is received from the
orchestrator.Orchestrator
object over the IPCmultiprocessing.Queue
:command_queue
.- Parameters:
command (str) – The UTF-8 text of the command passed across the
command_queue
IPC channel from theorchestrator.Orchestrator
.- Returns:
An optional error message (currently not used).
- Return type:
Optional[str]
- populate_secondary_queue()[source]
Populates the secondary queue of the
UploadHelper
with all the files still remaining in the specified_helper.local_output_directory
. This method is called on startup to queue any remaining files from a previous day for upload. If a file with the extension.final
exists that file is added to the queue instead of the corresponding.csv
file.
- run()[source]
The main process/loop for the
Uploader
class. This method starts theUploadHelper
multiprocessing.Process
andUploadHandler
threading.Thread
. It then repeatedly polls it’sUploader._command_queue
multiprocessing.Queue
for inbound commands dispatched from theorchestrator.Orchestrator
process. It will do this infinitely until theUploader.stop()
method is invoked thereby halting the infinite loop.
- start() None [source]
Starts the
Uploader
process, scheduling it’sUploader.run()
method to be called.
- stop()[source]
Sets the
multiprocessing.Event
__running
flag toFalse
, aborting theUploader.run()
main loop, and then gracefully terminating theUploadHelper
multiprocessing.Process
andwatchdog.observers.Observer
threading.Thread
.Notes
Exiting (implicitly via a
return
ofNone
) fromUploader.run()
allows themultiprocessing.Process
to be terminated gracefully by the garbage collector.
The UploadHelper
Class:
- class uploader.UploadHelper(upload_queue_polling_interval_seconds: float | None = 1.0)[source]
A helper
multiprocessing.Process
leveraged by the parentUploader
object to perform the actual upload of queued files to the remote CS Server (www.cs.appstate.edu
) via FTP.- __init__(upload_queue_polling_interval_seconds: float | None = 1.0)[source]
Initializes the
UploadHelper
class. :param upload_queue_polling_interval_seconds: How frequently the primary and secondary upload :type upload_queue_polling_interval_seconds: Optional[float] :parammultiprocessing.Queue
’s should be polled for pending file transfer requests.:
- static make_directory(session: FTP, directory_name: str) str | None [source]
Helper method which attempts to create the specified directory on the remote machine via the provided
ftplib.FTP
session.- Parameters:
session (ftplib.FTP) – The current FTP session containing the authenticated FTP client with an open socket to the remote server.
directory_name (str) – The name of the directory to create on the remote machine.
- Returns:
A descriptive error message explaining why the directory creation attempt failed, or this is None in the case that no error is encountered.
- Return type:
Optional[str]
- make_remote_directories(session: FTP, date: DateRepresentation) str | None [source]
Helper method which sets up the Beemon data directory structure on the remote machine. This method will create a remote directory for the current Raspberry Pi under the specified
self._upload_directory
(which is configurable in thebeemon-config.ini
file). By default, this will create a top-level remote directory such as:/usr/local/bee/beemon/rpi4-11
. Then, this method will create a subdirectory for the provided date, for instance:/usr/local/bee/beemon/rpi4-11/2021-11-10
. Afterward, this method will dynamically import the sensor classes fromAppMAIS.src.beemon.sensor
and use the names of the concretesensor.Sensor
classes to create subdirectories such as:/usr/local/bee/beemon/rpi4-11/2021-11-10/audio
/usr/local/bee/beemon/rpi4-11/2021-11-10/video
/usr/local/bee/beemon/rpi4-11/2021-11-10/temp
- Parameters:
session (ftplib.FTP) – The current FTP session containing the authenticated FTP client with an open socket to the remote server.
date (DateRepresentation) – The current date (of the form:
"YYYY-MM-DD"
).
- Returns:
A descriptive error message in the event that the remote directory structure could not be created on the remote server. Otherwise, this value is None in the event that no errors are encountered.
- Return type:
Optional[str]
- queue_upload(source_filepath: str, dest_filepath: str, priority: int | None = 0)[source]
Queues an FTP file transfer operation to take place with a specified priority. The file transfer operation request from the provided
source_filepath
to the provideddest_filepath
will be inserted into the primary upload queue by default (priority 0). Ifpriority
is set higher than 0, the FTP request will be inserted into the secondary upload queue.- Parameters:
source_filepath (str) – The location of the source/local file.
dest_filepath (str) – The destination directory/file path on the remote host.
priority (Optional[int]) – An integer value indicating whether the FTP request should be inserted into the primary or secondary upload queue. A value of 0 indicates the request should be inserted into the primary upload queue. A value of any other integer indicates the request should be inserted into the secondary upload queue.
- static remove_local_file(filepath: str)[source]
Removes the specified file from the local system. This method is intended to be used as a callback method to delete the file from the local directory once it has been successfully uploaded to the remote server. Any .csv file is not removed. When a .final (the tag indicating the last reading of the day) is removed, then any the path before the .final is removed. (e.g.
/home/bee/bee_tmp/ ... /rpi4-11@2022-07-19.csv.final
causes the removal of/home/bee/bee_tmp/ ... /rpi4-11@2022-07-19.csv
)- Parameters:
filepath (str) – The filepath of the local file which should be removed.
- run()[source]
The main loop for the
UploadHelper
object. Enters into an infinite loop in which the primary and secondary upload queues are routinely queried for pending file transfer requests. If pending requests are present, FTP transfer operations will first be performed from the primary upload queue, and secondly from the secondary upload queue (once the primary queue has been entirely emptied of elements).
- stop()[source]
Sets the
multiprocessing.Event
__running
flag toFalse
thereby breaking out of theUploadHelper.run()
method (via implicitreturn
), allowing thismultiprocessing.Process
to terminate gracefully.
- upload_file(source_filepath: str, dest_filepath: str) str | None [source]
Uploads the file at the provided local
source_filepath
to the remote path specified by thedest_filepath
argument. Leverages aUploadSession
contextlib.ContextManager
instance in order to cleanly manage the opening and closing of required FTP authentication resources.- Parameters:
- Returns:
An optional error message if the local file could not be uploaded.
- Return type:
Optional[str]
The UploadSession
Class:
- class uploader.UploadSession(username: str, password: str, host_ip: str, host_port: int, remote_directory: str)[source]
Serves as a Python Context Manager which encapsulates establishing an authenticated FTP session with a remote host at a particular destination directory.
- __init__(username: str, password: str, host_ip: str, host_port: int, remote_directory: str)[source]
- Parameters:
username (str) – The username to authenticate at the remote
host_ip
with.password (str) – The password used in conjunction with the
username
to authenticate at the remotehost_ip
with.host_ip (str) – The IP address of the remote server to which the FTP session should be established with.
host_port (int) – The port to request a connection to on the remote host server (at the specified
host_ip
).remote_directory (str) – The root remote directory to which this FTP session plans to upload to. For example:
usr/local/bee/rpi4-11/
The UploaderHandler
Class:
- class uploader.UploadHandler(upload_helper: UploadHelper)[source]
Allows quick customization of the
watchdog.observers.Observer
thread/object performing the monitoring of the local filesystem for events.- __init__(upload_helper: UploadHelper)[source]
- Parameters:
upload_helper (UploadHelper) – A pointer/reference to the
UploadHelper
multiprocessing.Process
. Utilized by theUploadHandler
in order to queue FTP transfer requests.
- on_closed(event: FileClosedEvent)[source]
Event handler which is fired whenever this
threading.Thread
detects a closed file on the local filesystem (via iNotify), in the specified directory (in theUploader
’sUploader.local_output_directory
). This event handler will enqueue the file which was just created in the watched directory for upload via theUploadHelper
’s primarymultiprocessing.Queue
to the remote server.- Parameters:
event (
events.FileDeletedEvent
) – A pointer/handler to the newly closed file.
- on_created(event: DirCreatedEvent | FileCreatedEvent)[source]
Event handler which is fired whenever this
threading.Thread
detects a newly created file on the local filesystem, in the specified directory (in theUploader
’sUploader.local_output_directory
). This event handler will enqueue the file which was just created in the watched directory for upload via theUploadHelper
’s primarymultiprocessing.Queue
to the remote server.- Parameters:
event (Union[
events.DirCreatedEvent
,events.FileCreatedEvent
]) – A pointer/handler to the newly created directory, or file.