import contextlib
import ftplib
import glob
import os
import platform
import queue
import time
from distutils.util import strtobool
from queue import Empty
from typing import Optional, NewType, Tuple, Union, Type
from types import TracebackType
from watchdog import events
from src.utils.importutils import get_classes_in_module, parse_information_from_file_path
from loguru import logger
from multiprocessing import Process, Queue, Event
from watchdog.observers import Observer
from watchdog.events import DirCreatedEvent, FileCreatedEvent, FileSystemEventHandler
from src.beemon.configuration import Config
__author__ = 'Gurney Buchanan <gurnben>; Chris Campell <[email protected]>; Joe Graber <gittinhubbed>'
# Date Representation in the form of: YYYY-MM-DD
DateRepresentation = NewType('DateRepresentation', str)
[docs]
class UploadSession(contextlib.AbstractContextManager):
"""
Serves as a Python Context Manager which encapsulates establishing an authenticated FTP session with a remote host
at a particular destination directory.
"""
[docs]
def __init__(self, username: str, password: str, host_ip: str, host_port: int, remote_directory: str):
"""
Args:
username (str): The username to authenticate at the remote ``host_ip`` with.
password (str): The password used in conjunction with the ``username`` to authenticate at the remote
``host_ip`` with.
host_ip (str): The IP address of the remote server to which the FTP session should be established with.
host_port (int): The port to request a connection to on the remote host server (at the specified
``host_ip``).
remote_directory (str): The root remote directory to which this FTP session plans to upload to. For example:
``usr/local/bee/rpi4-11/``
"""
self._username: str = username
self._password: str = password
self._host_ip: str = host_ip
self._host_port: int = host_port
self._remote_directory: str = remote_directory
super().__init__()
def __enter__(self):
"""
This method is called whenever the context manager is leveraged via a ``with`` statement. It handles connecting
to the specified remote host, with the specified login credentials. Then, this method will create the remote
directory structure (if it doesn't already exist) that was provided on initialization via the
``remote_directory`` argument.
References:
See: https://docs.python.org/3.8/reference/datamodel.html#context-managers
Returns:
ftplib.FTP: The remote FTP session object with an open (and authenticated) socket connection to the remote
server at the specified port and address.
"""
super().__enter__()
logger.debug(f"Initializing the {type(self).__name__} at remote directory: {self._remote_directory}.")
self._session = ftplib.FTP(timeout=100)
self._session.connect(self._host_ip, self._host_port)
self._session.login(self._username, self._password)
self._session.cwd("/")
# logger.info(f"listing: {self._session.nlst()}")
for d in self._remote_directory.split("/")[1:]:
listing = self._session.nlst()
if d not in listing:
logger.info(f"Making directory {d}")
self._session.mkd(d)
self._session.cwd(d)
return self._session
def __exit__(self, exc_type: Union[Type[BaseException], None],
exc_val: Union[BaseException, None],
exc_tb: Union[TracebackType, None]) -> Union[bool, None]:
"""
Called when scope is lost at the termination of a ``with`` statement, this method closes all existing resources
and file handles, ensuring no socket connections are left over.
References:
See: https://docs.python.org/3.8/reference/datamodel.html#object.__exit__
Args:
exc_type (Union[Type[BaseException], None]): The class of the exception raised that caused the context to
exit. If the context was exited without an exception, this will be None
exc_val (Union[BaseException, None]): The type of the exception raised that caused the context to exit. If
the context was exited without an exception, this will be None
exc_tb (Union[TracebackType, None]): A traceback of the exception that caused the context to exit. If the
context was exited without exception, this will be None
***Note: These args will either all be None or all have Values.
Returns:
"""
self._session.close()
logger.debug(f"Closed the {type(self).__name__} Session.")
super().__exit__(exc_type, exc_val, exc_tb)
return None
[docs]
class UploadHelper(Process):
"""
A helper :class:`multiprocessing.Process` leveraged by the parent :class:`Uploader` object to perform the actual
upload of queued files to the remote CS Server (``www.cs.appstate.edu``) via FTP.
"""
[docs]
def __init__(self, upload_queue_polling_interval_seconds: Optional[float] = 1.0):
"""
Initializes the :class:`UploadHelper` class.
Args:
upload_queue_polling_interval_seconds (Optional[float]): How frequently the primary and secondary upload
:class:`multiprocessing.Queue`'s should be polled for pending file transfer requests.
"""
super().__init__()
self._upload_queue_polling_interval_seconds: float = upload_queue_polling_interval_seconds
self._start_time = time.time()
# Sentinel value is originally false:
self.__running = Event()
self._primary_queue = Queue()
self._secondary_queue = Queue()
root_path = os.path.join(os.path.abspath(__file__), '../../../')
beemon_config_path = os.path.abspath(os.path.join(root_path, 'beemon-config.ini'))
self._beemon_config = Config(path=beemon_config_path)
self._host_name: str = self._beemon_config.ftp(key='host')
self._host_port: int = int(self._beemon_config.ftp(key='port'))
self._username: str = self._beemon_config.ftp(key='username')
self._password: str = self._beemon_config.ftp(key='password')
self._upload_delay = int(self._beemon_config.ftp(key='upload_delay', default=60))
self._remote_destination_root_directory = self._beemon_config.glob('root_upload_directory')
self._remote_rpi_root_dir = os.path.join(self._remote_destination_root_directory, platform.node())
[docs]
def run(self):
"""
The main loop for the :class:`UploadHelper` object. Enters into an infinite loop in which the primary and
secondary upload queues are routinely queried for pending file transfer requests. If pending requests are
present, FTP transfer operations will first be performed from the primary upload queue, and secondly from the
secondary upload queue (once the primary queue has been entirely emptied of elements).
"""
super().run()
logger.info(f"Starting the upload handler.")
self.__running.set()
while self.__running.is_set():
# Attempt to upload a file and then sleep for polling_interval
if not self._primary_queue.empty():
# Primary queue is not empty, upload from primary queue:
error_message = self.upload_from_primary_queue()
time.sleep(self._upload_delay)
else:
# Primary queue is empty, upload from secondary queue:
if not self._secondary_queue.empty():
# Secondary queue is not empty, upload from secondary queue:
error_message = self.upload_from_secondary_queue()
time.sleep(self._upload_delay)
else:
# Both the primary and secondary queues are empty:
# logger.debug(f"Both primary and secondary upload queues are empty.")
time.sleep(self._upload_queue_polling_interval_seconds)
[docs]
def stop(self):
"""
Sets the :class:`multiprocessing.Event` ``__running`` flag to ``False`` thereby breaking out of the
:meth:`UploadHelper.run` method (via implicit ``return``), allowing this :class:`multiprocessing.Process` to
terminate gracefully.
"""
self.__running.clear()
[docs]
def upload_from_primary_queue(self) -> Optional[str]:
"""
Attempts to retrieve an FTP request from the primary upload queue, and uploads the file to the remote FTP
server via the :meth:`UploadHelper.upload_file` method.
Returns:
Optional[str]: An optional/descriptive error message.
"""
logger.debug('Uploading file from primary queue.')
error_message: Optional[str] = None
try:
if not self._primary_queue.empty():
# The primary upload queue is non-empty:
(source_filepath, dest_filepath) = self._primary_queue.get(block=True, timeout=10)
error_message = self.upload_file(source_filepath=source_filepath, dest_filepath=dest_filepath)
if error_message is not None:
# Error encountered during upload, try again later:
logger.info(f"Placing {source_filepath} onto the secondary queue to retry later.")
self._secondary_queue.put_nowait([source_filepath, dest_filepath])
return error_message
except Empty as err:
error_message = f"The primary upload queue was Empty. Error: {err}."
return error_message
return error_message
[docs]
def upload_from_secondary_queue(self) -> Optional[str]:
"""
Attempts to retrieve an FTP request from the secondary upload queue, and uploads the file to the remote FTP
server via the :meth:`UploadHelper.upload_file` method.
Returns:
Optional[str]: An optional/descriptive error message.
"""
logger.debug('Uploading file from secondary queue.')
error_message: Optional[str] = None
try:
if not self._secondary_queue.empty():
# The primary upload queue is non-empty:
(source_filepath, dest_filepath) = self._secondary_queue.get(block=True, timeout=10)
error_message = self.upload_file(source_filepath=source_filepath, dest_filepath=dest_filepath)
if error_message is not None:
# Error encountered during upload, try again later:
self._secondary_queue.put_nowait([source_filepath, dest_filepath])
return error_message
except Empty as err:
error_message = f"The secondary upload queue was Empty. Error: {err}."
return error_message
return error_message
[docs]
def upload_file(self, source_filepath: str, dest_filepath: str) -> Optional[str]:
"""
Uploads the file at the provided local ``source_filepath`` to the remote path specified by the
``dest_filepath`` argument. Leverages a :class:`UploadSession` :class:`contextlib.ContextManager` instance in
order to cleanly manage the opening and closing of required FTP authentication resources.
Args:
source_filepath (str): The path to the local file which should be uploaded to the specified remote
destination.
dest_filepath (str): The destination remote file path, to which the local source file should be uploaded to.
Returns:
Optional[str]: An optional error message if the local file could not be uploaded.
"""
error_message: Optional[str] = None
filename, date, sensor_name = parse_information_from_file_path(file_path=source_filepath)
logger.debug(f"Preparing to upload file: {filename} from date: {date} and sensor: {sensor_name}")
date_repr = DateRepresentation(date)
logger.debug(f"Uploading: {source_filepath} to remote host location: {dest_filepath}")
try:
if os.path.exists(source_filepath):
# The specified local source file exists.
upload_session = UploadSession(
username=self._username,
password=self._password,
host_ip=self._host_name,
host_port=self._host_port,
remote_directory=os.path.join(self._remote_destination_root_directory, platform.node())
)
with upload_session as sess:
error_message = self.make_remote_directories(session=sess, date=date_repr)
if error_message is not None:
error_message = f"could not create remote directories. Re-queuing. Error: {error_message}"
logger.error(error_message)
return error_message
dest_directory, dest_file = os.path.split(dest_filepath)
logger.debug(f"Changing to dest directory: {dest_directory}")
sess.cwd(dest_directory)
stor_command = 'STOR %s' % os.path.split(dest_filepath)[1]
logger.debug(f"Issuing command: \"{stor_command}\"")
with open(source_filepath, 'rb') as fp:
sess.storbinary(
stor_command,
fp,
callback=self.remove_local_file(source_filepath)
)
logger.info(f"Uploaded: {source_filepath} to: {dest_filepath}")
else:
# The specified source directory does not exist.
error_message = f"The specified local source directory: {source_filepath} does not exist!"
logger.warning(error_message)
except ftplib.all_errors as err:
error_message = f"Upload failed for source file: {source_filepath} to destination: {dest_filepath}. " \
f"Error: {err}"
logger.error(error_message)
return error_message
@property
def remote_destination_root_directory(self) -> str:
return self._remote_destination_root_directory
[docs]
@staticmethod
def remove_local_file(filepath: str):
"""
Removes the specified file from the local system. This method is intended to be used as a callback method to
delete the file from the local directory once it has been successfully uploaded to the remote server. Any .csv
file is not removed. When a .final (the tag indicating the last reading of the day) is removed, then any the
path before the .final is removed. (e.g. ``/home/bee/bee_tmp/ ... /[email protected]`` causes the
removal of ``/home/bee/bee_tmp/ ... /[email protected]``)
Args:
filepath (str): The filepath of the local file which should be removed.
"""
file_no_extension, extension = os.path.splitext(filepath)
if os.path.exists(filepath):
if not extension == ".csv":
os.remove(filepath)
logger.debug(f"Deleted local file: {filepath} after upload.")
if extension == ".final" and os.path.exists(file_no_extension):
os.remove(file_no_extension)
[docs]
@staticmethod
def make_directory(session: ftplib.FTP, directory_name: str) -> Optional[str]:
"""
Helper method which attempts to create the specified directory on the remote machine via the provided
:class:`ftplib.FTP` session.
Args:
session (ftplib.FTP): The current FTP session containing the authenticated FTP client with an open socket
to the remote server.
directory_name (str): The name of the directory to create on the remote machine.
Returns:
Optional[str]: A descriptive error message explaining why the directory creation attempt failed, or this is
None in the case that no error is encountered.
"""
error_message: Optional[str] = None
try:
session.mkd(directory_name)
except Exception as err:
error_message = f"Failed to make directory: {directory_name}. Error: {err}"
return error_message
[docs]
def make_remote_directories(self, session: ftplib.FTP, date: DateRepresentation) -> Optional[str]:
"""
Helper method which sets up the Beemon data directory structure on the remote machine. This method will create a
remote directory for the current Raspberry Pi under the specified ``self._upload_directory``
(which is configurable in the ``beemon-config.ini`` file). By default, this will create a top-level remote
directory such as: ``/usr/local/bee/beemon/rpi4-11``. Then, this method will create a subdirectory for the
provided date, for instance: ``/usr/local/bee/beemon/rpi4-11/2021-11-10``. Afterward, this method will
dynamically import the sensor classes from ``AppMAIS.src.beemon.sensor`` and use the names of the concrete
:class:`sensor.Sensor` classes to create subdirectories such as:
a) ``/usr/local/bee/beemon/rpi4-11/2021-11-10/audio``
b) ``/usr/local/bee/beemon/rpi4-11/2021-11-10/video``
c) ``/usr/local/bee/beemon/rpi4-11/2021-11-10/temp``
Args:
session (ftplib.FTP): The current FTP session containing the authenticated FTP client with an open socket
to the remote server.
date (DateRepresentation): The current date (of the form: ``"YYYY-MM-DD"``).
Returns:
Optional[str]: A descriptive error message in the event that the remote directory structure could not be
created on the remote server. Otherwise, this value is None in the event that no errors are encountered.
"""
logger.debug(f"Setting up remote directories for date: {date}...")
error_message: Optional[str] = None
rpi_root_dir = os.path.join(self._remote_destination_root_directory, platform.node())
session.cwd(rpi_root_dir)
listing = session.nlst()
# logger.debug(f'Remote directory listing: {listing}')
if date not in listing:
try:
session.mkd(date)
logger.info(f"Had to create a new directory for today: {date}")
except Exception as err:
error_message = f"Failed to create directory: {date} on remote server. Error: {err}"
logger.warning(error_message)
session.cwd(date)
listing = session.nlst()
sensor_classes = get_classes_in_module(module_name='src.beemon.sensor', package_name='src.beemon', concrete_only=True)
sensor_names = [sensor_class_name.lower() for sensor_class_name, sensor_class in sensor_classes.items()]
logger.debug(f"Dynamically detected sensor names: {sensor_names}")
for sensor_name in sensor_names:
if sensor_name not in listing:
error_message = self.make_directory(session=session, directory_name=sensor_name)
if error_message is not None:
logger.warning(error_message)
return error_message
return error_message
[docs]
def queue_upload(self, source_filepath: str, dest_filepath: str, priority: Optional[int] = 0):
"""
Queues an FTP file transfer operation to take place with a specified priority. The file transfer operation
request from the provided ``source_filepath`` to the provided ``dest_filepath`` will be inserted into the
primary upload queue by default (priority 0). If ``priority`` is set higher than 0, the FTP request will be
inserted into the secondary upload queue.
Args:
source_filepath (str): The location of the source/local file.
dest_filepath (str): The destination directory/file path on the remote host.
priority (Optional[int]): An integer value indicating whether the FTP request should be inserted into the
primary or secondary upload queue. A value of 0 indicates the request should be inserted into the primary
upload queue. A value of any other integer indicates the request should be inserted into the secondary
upload queue.
"""
if priority == 0:
self._primary_queue.put_nowait([source_filepath, dest_filepath])
logger.info(f"File transfer request from: {source_filepath} to: "
f"{dest_filepath} added to primary upload queue.")
else:
self._secondary_queue.put_nowait([source_filepath, dest_filepath])
logger.info(f"File transfer request from: {source_filepath} to: "
f"{dest_filepath} added to secondary upload queue.")
[docs]
class UploadHandler(FileSystemEventHandler):
"""
Allows quick customization of the :class:`watchdog.observers.Observer` thread/object performing the monitoring of
the local filesystem for events.
"""
[docs]
def __init__(self, upload_helper: UploadHelper):
"""
Args:
upload_helper (UploadHelper): A pointer/reference to the :class:`UploadHelper`
:class:`multiprocessing.Process`. Utilized by the :class:`UploadHandler` in order to queue FTP transfer
requests.
"""
self._upload_helper = upload_helper
super().__init__()
[docs]
def on_created(self, event: Union[events.DirCreatedEvent, events.FileCreatedEvent]):
"""
Event handler which is fired whenever this :class:`threading.Thread` detects a newly created file on the local
filesystem, in the specified directory (in the :class:`Uploader`'s
:attr:`Uploader.local_output_directory`). This event handler will enqueue the file which
was just created in the watched directory for upload via the :class:`UploadHelper`'s primary
:class:`multiprocessing.Queue` to the remote server.
Args:
event (Union[:class:`events.DirCreatedEvent`, :class:`events.FileCreatedEvent`]): A pointer/handler
to the newly created directory, or file.
"""
# logger.debug(f'Event with type {event.event_type} triggered on_created(self, event).')
if not event.is_directory:
filename, date, sensor_name = parse_information_from_file_path(file_path=event.src_path)
filename_no_extension, file_ext = os.path.splitext(filename)
if file_ext == ".h264" or file_ext == ".mp4":
# Do not upload .h64 files, instead wait for on_closed to trigger in case of transcoding or FFMPEG copy
# commands.
pass
elif not file_ext == ".final":
# If this is not the last reading of the day, add this file to the queue
rpi_name = platform.node()
dest_filepath = os.path.join(self._upload_helper.remote_destination_root_directory,
f"{rpi_name}/{date}/{sensor_name}/{filename}")
logger.debug(f"Detected created file: {event.src_path}. Queuing upload.")
self._upload_helper.queue_upload(source_filepath=event.src_path, dest_filepath=dest_filepath)
# Else if it is the final csv, add it to the queue.
elif file_ext == ".final":
logger.debug(f'{filename} was created. Waiting for it to be written to before adding it to the queue.')
[docs]
def on_closed(self, event: events.FileClosedEvent):
"""
Event handler which is fired whenever this :class:`threading.Thread` detects a closed file on the local
filesystem (via iNotify), in the specified directory (in the :class:`Uploader`'s
:attr:`Uploader.local_output_directory`). This event handler will enqueue the file which was just created in the
watched directory for upload via the :class:`UploadHelper`'s primary :class:`multiprocessing.Queue` to the
remote server.
Args:
event (:class:`events.FileDeletedEvent`): A pointer/handler to the newly closed file.
"""
if not event.is_directory:
filename, date, sensor_name = parse_information_from_file_path(file_path=event.src_path)
filename_no_extension, file_ext = os.path.splitext(filename)
# If it is the final reading of the day, add it to the queue:
if file_ext == ".final":
logger.debug(f"Filename {filename} ends with .final: Uploading to {filename_no_extension}.")
# Set the file name to not include the extension .final.
filename = filename_no_extension
rpi_name = platform.node()
dest_filepath = os.path.join(
self._upload_helper.remote_destination_root_directory,
f"{rpi_name}/{date}/{sensor_name}/{filename}"
)
logger.debug(f"Detected closed file: {event.src_path}. Queuing upload to {dest_filepath}.")
self._upload_helper.queue_upload(source_filepath=event.src_path, dest_filepath=dest_filepath)
[docs]
class Uploader(Process):
"""
The :class:`Uploader` class manages several other processes (and threads) in order to watch a user-specified
``local_output_directory`` for changes to the local filesystem, and then to perform FTP transfers of the
modified files to a pre-specified location on a remote server. The :class:`Uploader` process leverages the following
:class:`multiprocessing.Process` (s):
* The :class:`UploadHelper` process.
Additionally, the :class:`Uploader` process leverages the following :class:`threading.Thread` (s):
* The :class:`polling.PollingObserver` thread.
The :class:`watchdog.observers.Observer` :class:`threading.Thread` watches the user-specified ``local_output_directory`` for
filesystem events (such as created and modified :class:`sensor.Sensor` measurement files). The :class:`UploadHelper`
:class:`multiprocessing.Process` performs the actual FTP file transfers that are observed by the
:class:`watchdog.observers.Observer` in the user-specified ``local_output_directory``. The :class:`UploadHelper`
process performs the task of uploading the observed files to the user-specified
``remote_destination_root_directory`` on the remote `CS Machine` (``www.cs.appstate.edu``).
Notes:
The :class:`Uploader` class maintains an open Interprocess Communication Channel (IPC) with its sole controlling
:class:`multiprocessing.Process` (the :class:`orchestrator.Orchestrator` class) in a pointer to the
``command_queue`` which is provided on initialization.
"""
[docs]
def __init__(self, command_queue: Queue, config: Optional[Config]):
"""
Initializer for objects of type: Uploader.
Args:
command_queue (Queue): The :class:`multiprocessing.Queue` which the uploader routinely polls to determine
if a ``"stop"`` command was issued by the :class:`orchestrator.Orchestrator`
:class:`multiprocessing.Process`.
config (Optional[Config]): A reference/pointer to an existing beemon :class:`Config` object, should one
exist. If ``None`` is provided, a new beemon configuration :class:`Config` object will be created for the
``beemon-config.ini`` in the repository's root directory.
"""
# Sentinel value is originally False:
self.__running: Event = Event()
self._command_queue: Queue = command_queue
if config is None:
self._beemon_config: Config = Config()
else:
self._beemon_config: Config = config
# See if the FTP settings override the global settings:
self._local_output_directory: str = self._beemon_config.ftp(
key='local_output_directory',
default=None
)
self._root_upload_directory: str = self._beemon_config.ftp(
key='root_upload_directory',
default=None
)
self._auto_start: str = self._beemon_config.ftp(
key='auto_start',
default=None
)
if self._auto_start is not None:
self._auto_start = bool(strtobool(self._auto_start))
else:
self._auto_start: bool = bool(strtobool(
self._beemon_config.glob(
key='auto_start',
default=None
)
))
if self._local_output_directory is None:
# Parse from the global settings (FTP does not override):
self._local_output_directory: str = self._beemon_config.glob(
key='local_output_directory',
default=None
)
if self._root_upload_directory is None:
# Parse from the global settings (FTP does not override):
self._root_upload_directory: str = self._beemon_config.glob(
key='root_upload_directory',
default=None
)
lower_case_process_subclass_name = type(self).__name__.lower()
logger.debug(f"Process: {lower_case_process_subclass_name} "
f"\"local_output_directory\": {self._local_output_directory}")
logger.debug(f"Process: {lower_case_process_subclass_name} "
f"\"root_upload_directory\": {self._root_upload_directory}")
logger.debug(f"Process: {lower_case_process_subclass_name} "
f"\"auto_start\": {self._auto_start}")
self._observer: Optional[Observer] = None
self._helper: Optional[UploadHelper] = None
self._command_queue_polling_interval: float = 0.5
super().__init__(args=(command_queue,))
@property
def local_output_directory(self) -> str:
return self._local_output_directory
@property
def upload_directory(self) -> str:
return self._root_upload_directory
@property
def auto_start(self) -> bool:
return self._auto_start
@property
def observer(self) -> Optional[Observer]:
return self._observer
@property
def upload_helper(self) -> Optional[UploadHelper]:
return self._helper
@property
def command_queue(self) -> Queue:
return self._command_queue
@property
def command_queue_polling_interval(self) -> float:
return self._command_queue_polling_interval
[docs]
def start(self) -> None:
"""
Starts the :class:`Uploader` process, scheduling it's :meth:`Uploader.run` method to be called.
"""
logger.info(f"Starting {type(self).__name__.lower()} process.")
self.__running.set()
super().start()
return
[docs]
def _handle_command(self, command: str) -> Optional[str]:
"""
Called whenever a command is received from the :class:`orchestrator.Orchestrator` object over the IPC
:class:`multiprocessing.Queue`: ``command_queue``.
Args:
command (str): The UTF-8 text of the command passed across the ``command_queue`` IPC channel from the
:class:`orchestrator.Orchestrator`.
Returns:
Optional[str]: An optional error message (currently not used).
"""
error_message: Optional[str] = None
prefix = command.split()[0]
if prefix == 'start':
self.start()
elif prefix == 'stop':
self.stop()
else:
error_message = f"Uploader process received unknown command: \"{command}\"."
return error_message
[docs]
def run(self):
"""
The main process/loop for the :class:`Uploader` class. This method starts the :class:`UploadHelper`
:class:`multiprocessing.Process` and :class:`UploadHandler` :class:`threading.Thread`. It then repeatedly polls
it's :attr:`Uploader._command_queue` :class:`multiprocessing.Queue` for inbound commands dispatched from the
:class:`orchestrator.Orchestrator` process. It will do this infinitely until the :meth:`Uploader.stop` method
is invoked thereby halting the infinite loop.
"""
super().run()
self._helper = UploadHelper(upload_queue_polling_interval_seconds=1.0)
self._helper.start()
# .. todo:: Rename UploadHandler to UploadEventHandler for clarity.
__upload_event_handler = UploadHandler(upload_helper=self._helper)
self._observer: Observer = Observer()
self._observer.schedule(__upload_event_handler, self._local_output_directory, recursive=True)
self._observer.start()
# Populate the secondary upload queue with data left over from previous recordings:
self.populate_secondary_queue()
logger.info(f"Watching for new files in directory: {self._local_output_directory}")
while self.__running.is_set():
# Pull from the command queue to see if a stop command was sent:
try:
logger.debug(f"uploader process polling for commands...")
command: str = self._command_queue.get(block=True, timeout=10)
logger.debug(f"uploader process received command: \"{command}\"")
# Handle the command:
error_message = self._handle_command(command=command)
except queue.Empty:
time.sleep(self._command_queue_polling_interval)
[docs]
def stop(self):
"""
Sets the :class:`multiprocessing.Event` ``__running`` flag to ``False``, aborting the :meth:`Uploader.run`
main loop, and then gracefully terminating the :class:`UploadHelper` :class:`multiprocessing.Process` and
:class:`watchdog.observers.Observer` :class:`threading.Thread`.
Notes:
Exiting (implicitly via a ``return`` of ``None``) from :meth:`Uploader.run` allows the
:class:`multiprocessing.Process` to be terminated gracefully by the garbage collector.
"""
logger.debug("Clearing the self.__running flag...")
self.__running.clear()
logger.info(f"Halting/joining the UploadHelper and Observer processes...")
self._helper.stop()
self._helper.join()
self._observer.stop()
self._observer.join()
logger.info(f"Halted/joined the UploadHelper and UploadHandler processes. Safely halted the Uploader process.")
# We are not allowed to call: self.join()
[docs]
def populate_secondary_queue(self):
"""
Populates the secondary queue of the :class:`UploadHelper` with all the files still remaining in the specified
:attr:`_helper.local_output_directory`. This method is called on startup to queue any remaining files
from a previous day for upload. If a file with the extension ``.final`` exists that file is added to the
queue instead of the corresponding ``.csv`` file.
"""
# By default, examine /home/bee/bee_tmp/<any_sensor>/<any_date> for left over files to upload:
logger.debug(f'Adding files from {self._local_output_directory} to the secondary queue.')
for file in glob.iglob(os.path.join(self._local_output_directory, '**/**/*')):
# Iterate over the files and directories in /home/bee/bee_tmp/
if os.path.isdir(file):
pass
# This is a root-level sensor directory (e.g. 'audio', 'temp', 'video'):
else:
# This is a file.
source_filepath = file
filename, date, sensor_name = parse_information_from_file_path(file_path=source_filepath)
# Parse out the extension.
source_filename_without_extension, source_file_extension = os.path.splitext(p=filename)
# Construct the remote output path:
rpi_name = platform.node()
# Define the path of the current file if it was the tagged final csv file.
final_source_filepath = f'{source_filepath}.final'
# if current file is not final we enqueue. If it is a .final file then we skip it.
if source_file_extension != '.final':
# if current file is .csv we check for the .final's existence
dest_filepath = os.path.join(self._root_upload_directory,
f"{rpi_name}/{date}/{sensor_name}/"
f"{filename}")
if source_file_extension == '.csv':
# if .final exists we enqueue the .final file
if os.path.exists(final_source_filepath):
# Add the .final file to the queue with destination path same as if it was not tagged.
logger.debug(f'{final_source_filepath} exists, adding {final_source_filepath} '
f'with destination {dest_filepath}.')
self._helper.queue_upload(dest_filepath=dest_filepath,
source_filepath=final_source_filepath,
priority=1)
else:
logger.debug(f'{final_source_filepath} does not exist, adding {source_filepath} '
f'with destination {dest_filepath}.')
self._helper.queue_upload(dest_filepath=dest_filepath,
source_filepath=source_filepath,
priority=1)
else:
# Enqueue the file for upload to the remote destination in the secondary upload queue:
logger.debug(f'Extension was not .csv, adding {source_filepath} '
f'with destination {dest_filepath}.')
self._helper.queue_upload(dest_filepath=dest_filepath,
source_filepath=source_filepath,
priority=1)
# if __name__ == '__main__':
# _upload_helper = UploadHelper(upload_queue_polling_interval_seconds=1.0)
# local_root_dir = os.path.abspath(os.path.join(os.path.abspath(__file__), '../../../'))
# remote_root_dir = os.path.join('beemon', f"{platform.node()}/campellcl_test_dir")
# # source_filepath = os.path.join(local_root_dir, 'beemon-config.ini')
# _source_filepath = os.path.join('/home/bee/bee_tmp/audio/2021-11-15/rpi4-11@[email protected]')
# _dest_filepath = '/beemon/campellcl_test/rpi4-11/audio/2021-11-15/rpi4-11@[email protected]'
# _upload_helper.upload_file(source_filepath=_source_filepath, dest_filepath=_dest_filepath)
# upload_session = UploadSession(
# username=USERNAME, password=PASSWORD,
# host_ip='cs.appstate.edu', host_port=21,
# remote_directory=remote_root_dir
# )