Source code for uploader

import contextlib
import ftplib
import glob
import os
import platform
import queue
import time
from distutils.util import strtobool
from queue import Empty
from typing import Optional, NewType, Tuple, Union, Type
from types import TracebackType
from watchdog import events
from src.utils.importutils import get_classes_in_module, parse_information_from_file_path
from loguru import logger
from multiprocessing import Process, Queue, Event
from watchdog.observers import Observer
from watchdog.events import DirCreatedEvent, FileCreatedEvent, FileSystemEventHandler
from src.beemon.configuration import Config

__author__ = 'Gurney Buchanan <gurnben>; Chris Campell <[email protected]>; Joe Graber <gittinhubbed>'

# Date Representation in the form of: YYYY-MM-DD
DateRepresentation = NewType('DateRepresentation', str)

[docs] class UploadSession(contextlib.AbstractContextManager): """ Serves as a Python Context Manager which encapsulates establishing an authenticated FTP session with a remote host at a particular destination directory. """
[docs] def __init__(self, username: str, password: str, host_ip: str, host_port: int, remote_directory: str): """ Args: username (str): The username to authenticate at the remote ``host_ip`` with. password (str): The password used in conjunction with the ``username`` to authenticate at the remote ``host_ip`` with. host_ip (str): The IP address of the remote server to which the FTP session should be established with. host_port (int): The port to request a connection to on the remote host server (at the specified ``host_ip``). remote_directory (str): The root remote directory to which this FTP session plans to upload to. For example: ``usr/local/bee/rpi4-11/`` """ self._username: str = username self._password: str = password self._host_ip: str = host_ip self._host_port: int = host_port self._remote_directory: str = remote_directory super().__init__()
def __enter__(self): """ This method is called whenever the context manager is leveraged via a ``with`` statement. It handles connecting to the specified remote host, with the specified login credentials. Then, this method will create the remote directory structure (if it doesn't already exist) that was provided on initialization via the ``remote_directory`` argument. References: See: https://docs.python.org/3.8/reference/datamodel.html#context-managers Returns: ftplib.FTP: The remote FTP session object with an open (and authenticated) socket connection to the remote server at the specified port and address. """ super().__enter__() logger.debug(f"Initializing the {type(self).__name__} at remote directory: {self._remote_directory}.") self._session = ftplib.FTP(timeout=100) self._session.connect(self._host_ip, self._host_port) self._session.login(self._username, self._password) self._session.cwd("/") # logger.info(f"listing: {self._session.nlst()}") for d in self._remote_directory.split("/")[1:]: listing = self._session.nlst() if d not in listing: logger.info(f"Making directory {d}") self._session.mkd(d) self._session.cwd(d) return self._session def __exit__(self, exc_type: Union[Type[BaseException], None], exc_val: Union[BaseException, None], exc_tb: Union[TracebackType, None]) -> Union[bool, None]: """ Called when scope is lost at the termination of a ``with`` statement, this method closes all existing resources and file handles, ensuring no socket connections are left over. References: See: https://docs.python.org/3.8/reference/datamodel.html#object.__exit__ Args: exc_type (Union[Type[BaseException], None]): The class of the exception raised that caused the context to exit. If the context was exited without an exception, this will be None exc_val (Union[BaseException, None]): The type of the exception raised that caused the context to exit. If the context was exited without an exception, this will be None exc_tb (Union[TracebackType, None]): A traceback of the exception that caused the context to exit. If the context was exited without exception, this will be None ***Note: These args will either all be None or all have Values. Returns: """ self._session.close() logger.debug(f"Closed the {type(self).__name__} Session.") super().__exit__(exc_type, exc_val, exc_tb) return None
[docs] class UploadHelper(Process): """ A helper :class:`multiprocessing.Process` leveraged by the parent :class:`Uploader` object to perform the actual upload of queued files to the remote CS Server (``www.cs.appstate.edu``) via FTP. """
[docs] def __init__(self, upload_queue_polling_interval_seconds: Optional[float] = 1.0): """ Initializes the :class:`UploadHelper` class. Args: upload_queue_polling_interval_seconds (Optional[float]): How frequently the primary and secondary upload :class:`multiprocessing.Queue`'s should be polled for pending file transfer requests. """ super().__init__() self._upload_queue_polling_interval_seconds: float = upload_queue_polling_interval_seconds self._start_time = time.time() # Sentinel value is originally false: self.__running = Event() self._primary_queue = Queue() self._secondary_queue = Queue() root_path = os.path.join(os.path.abspath(__file__), '../../../') beemon_config_path = os.path.abspath(os.path.join(root_path, 'beemon-config.ini')) self._beemon_config = Config(path=beemon_config_path) self._host_name: str = self._beemon_config.ftp(key='host') self._host_port: int = int(self._beemon_config.ftp(key='port')) self._username: str = self._beemon_config.ftp(key='username') self._password: str = self._beemon_config.ftp(key='password') self._upload_delay = int(self._beemon_config.ftp(key='upload_delay', default=60)) self._remote_destination_root_directory = self._beemon_config.glob('root_upload_directory') self._remote_rpi_root_dir = os.path.join(self._remote_destination_root_directory, platform.node())
[docs] def run(self): """ The main loop for the :class:`UploadHelper` object. Enters into an infinite loop in which the primary and secondary upload queues are routinely queried for pending file transfer requests. If pending requests are present, FTP transfer operations will first be performed from the primary upload queue, and secondly from the secondary upload queue (once the primary queue has been entirely emptied of elements). """ super().run() logger.info(f"Starting the upload handler.") self.__running.set() while self.__running.is_set(): # Attempt to upload a file and then sleep for polling_interval if not self._primary_queue.empty(): # Primary queue is not empty, upload from primary queue: error_message = self.upload_from_primary_queue() time.sleep(self._upload_delay) else: # Primary queue is empty, upload from secondary queue: if not self._secondary_queue.empty(): # Secondary queue is not empty, upload from secondary queue: error_message = self.upload_from_secondary_queue() time.sleep(self._upload_delay) else: # Both the primary and secondary queues are empty: # logger.debug(f"Both primary and secondary upload queues are empty.") time.sleep(self._upload_queue_polling_interval_seconds)
[docs] def stop(self): """ Sets the :class:`multiprocessing.Event` ``__running`` flag to ``False`` thereby breaking out of the :meth:`UploadHelper.run` method (via implicit ``return``), allowing this :class:`multiprocessing.Process` to terminate gracefully. """ self.__running.clear()
[docs] def upload_from_primary_queue(self) -> Optional[str]: """ Attempts to retrieve an FTP request from the primary upload queue, and uploads the file to the remote FTP server via the :meth:`UploadHelper.upload_file` method. Returns: Optional[str]: An optional/descriptive error message. """ logger.debug('Uploading file from primary queue.') error_message: Optional[str] = None try: if not self._primary_queue.empty(): # The primary upload queue is non-empty: (source_filepath, dest_filepath) = self._primary_queue.get(block=True, timeout=10) error_message = self.upload_file(source_filepath=source_filepath, dest_filepath=dest_filepath) if error_message is not None: # Error encountered during upload, try again later: logger.info(f"Placing {source_filepath} onto the secondary queue to retry later.") self._secondary_queue.put_nowait([source_filepath, dest_filepath]) return error_message except Empty as err: error_message = f"The primary upload queue was Empty. Error: {err}." return error_message return error_message
[docs] def upload_from_secondary_queue(self) -> Optional[str]: """ Attempts to retrieve an FTP request from the secondary upload queue, and uploads the file to the remote FTP server via the :meth:`UploadHelper.upload_file` method. Returns: Optional[str]: An optional/descriptive error message. """ logger.debug('Uploading file from secondary queue.') error_message: Optional[str] = None try: if not self._secondary_queue.empty(): # The primary upload queue is non-empty: (source_filepath, dest_filepath) = self._secondary_queue.get(block=True, timeout=10) error_message = self.upload_file(source_filepath=source_filepath, dest_filepath=dest_filepath) if error_message is not None: # Error encountered during upload, try again later: self._secondary_queue.put_nowait([source_filepath, dest_filepath]) return error_message except Empty as err: error_message = f"The secondary upload queue was Empty. Error: {err}." return error_message return error_message
[docs] def upload_file(self, source_filepath: str, dest_filepath: str) -> Optional[str]: """ Uploads the file at the provided local ``source_filepath`` to the remote path specified by the ``dest_filepath`` argument. Leverages a :class:`UploadSession` :class:`contextlib.ContextManager` instance in order to cleanly manage the opening and closing of required FTP authentication resources. Args: source_filepath (str): The path to the local file which should be uploaded to the specified remote destination. dest_filepath (str): The destination remote file path, to which the local source file should be uploaded to. Returns: Optional[str]: An optional error message if the local file could not be uploaded. """ error_message: Optional[str] = None filename, date, sensor_name = parse_information_from_file_path(file_path=source_filepath) logger.debug(f"Preparing to upload file: {filename} from date: {date} and sensor: {sensor_name}") date_repr = DateRepresentation(date) logger.debug(f"Uploading: {source_filepath} to remote host location: {dest_filepath}") try: if os.path.exists(source_filepath): # The specified local source file exists. upload_session = UploadSession( username=self._username, password=self._password, host_ip=self._host_name, host_port=self._host_port, remote_directory=os.path.join(self._remote_destination_root_directory, platform.node()) ) with upload_session as sess: error_message = self.make_remote_directories(session=sess, date=date_repr) if error_message is not None: error_message = f"could not create remote directories. Re-queuing. Error: {error_message}" logger.error(error_message) return error_message dest_directory, dest_file = os.path.split(dest_filepath) logger.debug(f"Changing to dest directory: {dest_directory}") sess.cwd(dest_directory) stor_command = 'STOR %s' % os.path.split(dest_filepath)[1] logger.debug(f"Issuing command: \"{stor_command}\"") with open(source_filepath, 'rb') as fp: sess.storbinary( stor_command, fp, callback=self.remove_local_file(source_filepath) ) logger.info(f"Uploaded: {source_filepath} to: {dest_filepath}") else: # The specified source directory does not exist. error_message = f"The specified local source directory: {source_filepath} does not exist!" logger.warning(error_message) except ftplib.all_errors as err: error_message = f"Upload failed for source file: {source_filepath} to destination: {dest_filepath}. " \ f"Error: {err}" logger.error(error_message) return error_message
@property def remote_destination_root_directory(self) -> str: return self._remote_destination_root_directory
[docs] @staticmethod def remove_local_file(filepath: str): """ Removes the specified file from the local system. This method is intended to be used as a callback method to delete the file from the local directory once it has been successfully uploaded to the remote server. Any .csv file is not removed. When a .final (the tag indicating the last reading of the day) is removed, then any the path before the .final is removed. (e.g. ``/home/bee/bee_tmp/ ... /[email protected]`` causes the removal of ``/home/bee/bee_tmp/ ... /[email protected]``) Args: filepath (str): The filepath of the local file which should be removed. """ file_no_extension, extension = os.path.splitext(filepath) if os.path.exists(filepath): if not extension == ".csv": os.remove(filepath) logger.debug(f"Deleted local file: {filepath} after upload.") if extension == ".final" and os.path.exists(file_no_extension): os.remove(file_no_extension)
[docs] @staticmethod def make_directory(session: ftplib.FTP, directory_name: str) -> Optional[str]: """ Helper method which attempts to create the specified directory on the remote machine via the provided :class:`ftplib.FTP` session. Args: session (ftplib.FTP): The current FTP session containing the authenticated FTP client with an open socket to the remote server. directory_name (str): The name of the directory to create on the remote machine. Returns: Optional[str]: A descriptive error message explaining why the directory creation attempt failed, or this is None in the case that no error is encountered. """ error_message: Optional[str] = None try: session.mkd(directory_name) except Exception as err: error_message = f"Failed to make directory: {directory_name}. Error: {err}" return error_message
[docs] def make_remote_directories(self, session: ftplib.FTP, date: DateRepresentation) -> Optional[str]: """ Helper method which sets up the Beemon data directory structure on the remote machine. This method will create a remote directory for the current Raspberry Pi under the specified ``self._upload_directory`` (which is configurable in the ``beemon-config.ini`` file). By default, this will create a top-level remote directory such as: ``/usr/local/bee/beemon/rpi4-11``. Then, this method will create a subdirectory for the provided date, for instance: ``/usr/local/bee/beemon/rpi4-11/2021-11-10``. Afterward, this method will dynamically import the sensor classes from ``AppMAIS.src.beemon.sensor`` and use the names of the concrete :class:`sensor.Sensor` classes to create subdirectories such as: a) ``/usr/local/bee/beemon/rpi4-11/2021-11-10/audio`` b) ``/usr/local/bee/beemon/rpi4-11/2021-11-10/video`` c) ``/usr/local/bee/beemon/rpi4-11/2021-11-10/temp`` Args: session (ftplib.FTP): The current FTP session containing the authenticated FTP client with an open socket to the remote server. date (DateRepresentation): The current date (of the form: ``"YYYY-MM-DD"``). Returns: Optional[str]: A descriptive error message in the event that the remote directory structure could not be created on the remote server. Otherwise, this value is None in the event that no errors are encountered. """ logger.debug(f"Setting up remote directories for date: {date}...") error_message: Optional[str] = None rpi_root_dir = os.path.join(self._remote_destination_root_directory, platform.node()) session.cwd(rpi_root_dir) listing = session.nlst() # logger.debug(f'Remote directory listing: {listing}') if date not in listing: try: session.mkd(date) logger.info(f"Had to create a new directory for today: {date}") except Exception as err: error_message = f"Failed to create directory: {date} on remote server. Error: {err}" logger.warning(error_message) session.cwd(date) listing = session.nlst() sensor_classes = get_classes_in_module(module_name='src.beemon.sensor', package_name='src.beemon', concrete_only=True) sensor_names = [sensor_class_name.lower() for sensor_class_name, sensor_class in sensor_classes.items()] logger.debug(f"Dynamically detected sensor names: {sensor_names}") for sensor_name in sensor_names: if sensor_name not in listing: error_message = self.make_directory(session=session, directory_name=sensor_name) if error_message is not None: logger.warning(error_message) return error_message return error_message
[docs] def queue_upload(self, source_filepath: str, dest_filepath: str, priority: Optional[int] = 0): """ Queues an FTP file transfer operation to take place with a specified priority. The file transfer operation request from the provided ``source_filepath`` to the provided ``dest_filepath`` will be inserted into the primary upload queue by default (priority 0). If ``priority`` is set higher than 0, the FTP request will be inserted into the secondary upload queue. Args: source_filepath (str): The location of the source/local file. dest_filepath (str): The destination directory/file path on the remote host. priority (Optional[int]): An integer value indicating whether the FTP request should be inserted into the primary or secondary upload queue. A value of 0 indicates the request should be inserted into the primary upload queue. A value of any other integer indicates the request should be inserted into the secondary upload queue. """ if priority == 0: self._primary_queue.put_nowait([source_filepath, dest_filepath]) logger.info(f"File transfer request from: {source_filepath} to: " f"{dest_filepath} added to primary upload queue.") else: self._secondary_queue.put_nowait([source_filepath, dest_filepath]) logger.info(f"File transfer request from: {source_filepath} to: " f"{dest_filepath} added to secondary upload queue.")
[docs] class UploadHandler(FileSystemEventHandler): """ Allows quick customization of the :class:`watchdog.observers.Observer` thread/object performing the monitoring of the local filesystem for events. """
[docs] def __init__(self, upload_helper: UploadHelper): """ Args: upload_helper (UploadHelper): A pointer/reference to the :class:`UploadHelper` :class:`multiprocessing.Process`. Utilized by the :class:`UploadHandler` in order to queue FTP transfer requests. """ self._upload_helper = upload_helper super().__init__()
[docs] def on_created(self, event: Union[events.DirCreatedEvent, events.FileCreatedEvent]): """ Event handler which is fired whenever this :class:`threading.Thread` detects a newly created file on the local filesystem, in the specified directory (in the :class:`Uploader`'s :attr:`Uploader.local_output_directory`). This event handler will enqueue the file which was just created in the watched directory for upload via the :class:`UploadHelper`'s primary :class:`multiprocessing.Queue` to the remote server. Args: event (Union[:class:`events.DirCreatedEvent`, :class:`events.FileCreatedEvent`]): A pointer/handler to the newly created directory, or file. """ # logger.debug(f'Event with type {event.event_type} triggered on_created(self, event).') if not event.is_directory: filename, date, sensor_name = parse_information_from_file_path(file_path=event.src_path) filename_no_extension, file_ext = os.path.splitext(filename) if file_ext == ".h264" or file_ext == ".mp4": # Do not upload .h64 files, instead wait for on_closed to trigger in case of transcoding or FFMPEG copy # commands. pass elif not file_ext == ".final": # If this is not the last reading of the day, add this file to the queue rpi_name = platform.node() dest_filepath = os.path.join(self._upload_helper.remote_destination_root_directory, f"{rpi_name}/{date}/{sensor_name}/{filename}") logger.debug(f"Detected created file: {event.src_path}. Queuing upload.") self._upload_helper.queue_upload(source_filepath=event.src_path, dest_filepath=dest_filepath) # Else if it is the final csv, add it to the queue. elif file_ext == ".final": logger.debug(f'{filename} was created. Waiting for it to be written to before adding it to the queue.')
[docs] def on_closed(self, event: events.FileClosedEvent): """ Event handler which is fired whenever this :class:`threading.Thread` detects a closed file on the local filesystem (via iNotify), in the specified directory (in the :class:`Uploader`'s :attr:`Uploader.local_output_directory`). This event handler will enqueue the file which was just created in the watched directory for upload via the :class:`UploadHelper`'s primary :class:`multiprocessing.Queue` to the remote server. Args: event (:class:`events.FileDeletedEvent`): A pointer/handler to the newly closed file. """ if not event.is_directory: filename, date, sensor_name = parse_information_from_file_path(file_path=event.src_path) filename_no_extension, file_ext = os.path.splitext(filename) # If it is the final reading of the day, add it to the queue: if file_ext == ".final": logger.debug(f"Filename {filename} ends with .final: Uploading to {filename_no_extension}.") # Set the file name to not include the extension .final. filename = filename_no_extension rpi_name = platform.node() dest_filepath = os.path.join( self._upload_helper.remote_destination_root_directory, f"{rpi_name}/{date}/{sensor_name}/{filename}" ) logger.debug(f"Detected closed file: {event.src_path}. Queuing upload to {dest_filepath}.") self._upload_helper.queue_upload(source_filepath=event.src_path, dest_filepath=dest_filepath)
[docs] class Uploader(Process): """ The :class:`Uploader` class manages several other processes (and threads) in order to watch a user-specified ``local_output_directory`` for changes to the local filesystem, and then to perform FTP transfers of the modified files to a pre-specified location on a remote server. The :class:`Uploader` process leverages the following :class:`multiprocessing.Process` (s): * The :class:`UploadHelper` process. Additionally, the :class:`Uploader` process leverages the following :class:`threading.Thread` (s): * The :class:`polling.PollingObserver` thread. The :class:`watchdog.observers.Observer` :class:`threading.Thread` watches the user-specified ``local_output_directory`` for filesystem events (such as created and modified :class:`sensor.Sensor` measurement files). The :class:`UploadHelper` :class:`multiprocessing.Process` performs the actual FTP file transfers that are observed by the :class:`watchdog.observers.Observer` in the user-specified ``local_output_directory``. The :class:`UploadHelper` process performs the task of uploading the observed files to the user-specified ``remote_destination_root_directory`` on the remote `CS Machine` (``www.cs.appstate.edu``). Notes: The :class:`Uploader` class maintains an open Interprocess Communication Channel (IPC) with its sole controlling :class:`multiprocessing.Process` (the :class:`orchestrator.Orchestrator` class) in a pointer to the ``command_queue`` which is provided on initialization. """
[docs] def __init__(self, command_queue: Queue, config: Optional[Config]): """ Initializer for objects of type: Uploader. Args: command_queue (Queue): The :class:`multiprocessing.Queue` which the uploader routinely polls to determine if a ``"stop"`` command was issued by the :class:`orchestrator.Orchestrator` :class:`multiprocessing.Process`. config (Optional[Config]): A reference/pointer to an existing beemon :class:`Config` object, should one exist. If ``None`` is provided, a new beemon configuration :class:`Config` object will be created for the ``beemon-config.ini`` in the repository's root directory. """ # Sentinel value is originally False: self.__running: Event = Event() self._command_queue: Queue = command_queue if config is None: self._beemon_config: Config = Config() else: self._beemon_config: Config = config # See if the FTP settings override the global settings: self._local_output_directory: str = self._beemon_config.ftp( key='local_output_directory', default=None ) self._root_upload_directory: str = self._beemon_config.ftp( key='root_upload_directory', default=None ) self._auto_start: str = self._beemon_config.ftp( key='auto_start', default=None ) if self._auto_start is not None: self._auto_start = bool(strtobool(self._auto_start)) else: self._auto_start: bool = bool(strtobool( self._beemon_config.glob( key='auto_start', default=None ) )) if self._local_output_directory is None: # Parse from the global settings (FTP does not override): self._local_output_directory: str = self._beemon_config.glob( key='local_output_directory', default=None ) if self._root_upload_directory is None: # Parse from the global settings (FTP does not override): self._root_upload_directory: str = self._beemon_config.glob( key='root_upload_directory', default=None ) lower_case_process_subclass_name = type(self).__name__.lower() logger.debug(f"Process: {lower_case_process_subclass_name} " f"\"local_output_directory\": {self._local_output_directory}") logger.debug(f"Process: {lower_case_process_subclass_name} " f"\"root_upload_directory\": {self._root_upload_directory}") logger.debug(f"Process: {lower_case_process_subclass_name} " f"\"auto_start\": {self._auto_start}") self._observer: Optional[Observer] = None self._helper: Optional[UploadHelper] = None self._command_queue_polling_interval: float = 0.5 super().__init__(args=(command_queue,))
@property def local_output_directory(self) -> str: return self._local_output_directory @property def upload_directory(self) -> str: return self._root_upload_directory @property def auto_start(self) -> bool: return self._auto_start @property def observer(self) -> Optional[Observer]: return self._observer @property def upload_helper(self) -> Optional[UploadHelper]: return self._helper @property def command_queue(self) -> Queue: return self._command_queue @property def command_queue_polling_interval(self) -> float: return self._command_queue_polling_interval
[docs] def start(self) -> None: """ Starts the :class:`Uploader` process, scheduling it's :meth:`Uploader.run` method to be called. """ logger.info(f"Starting {type(self).__name__.lower()} process.") self.__running.set() super().start() return
[docs] def _handle_command(self, command: str) -> Optional[str]: """ Called whenever a command is received from the :class:`orchestrator.Orchestrator` object over the IPC :class:`multiprocessing.Queue`: ``command_queue``. Args: command (str): The UTF-8 text of the command passed across the ``command_queue`` IPC channel from the :class:`orchestrator.Orchestrator`. Returns: Optional[str]: An optional error message (currently not used). """ error_message: Optional[str] = None prefix = command.split()[0] if prefix == 'start': self.start() elif prefix == 'stop': self.stop() else: error_message = f"Uploader process received unknown command: \"{command}\"." return error_message
[docs] def run(self): """ The main process/loop for the :class:`Uploader` class. This method starts the :class:`UploadHelper` :class:`multiprocessing.Process` and :class:`UploadHandler` :class:`threading.Thread`. It then repeatedly polls it's :attr:`Uploader._command_queue` :class:`multiprocessing.Queue` for inbound commands dispatched from the :class:`orchestrator.Orchestrator` process. It will do this infinitely until the :meth:`Uploader.stop` method is invoked thereby halting the infinite loop. """ super().run() self._helper = UploadHelper(upload_queue_polling_interval_seconds=1.0) self._helper.start() # .. todo:: Rename UploadHandler to UploadEventHandler for clarity. __upload_event_handler = UploadHandler(upload_helper=self._helper) self._observer: Observer = Observer() self._observer.schedule(__upload_event_handler, self._local_output_directory, recursive=True) self._observer.start() # Populate the secondary upload queue with data left over from previous recordings: self.populate_secondary_queue() logger.info(f"Watching for new files in directory: {self._local_output_directory}") while self.__running.is_set(): # Pull from the command queue to see if a stop command was sent: try: logger.debug(f"uploader process polling for commands...") command: str = self._command_queue.get(block=True, timeout=10) logger.debug(f"uploader process received command: \"{command}\"") # Handle the command: error_message = self._handle_command(command=command) except queue.Empty: time.sleep(self._command_queue_polling_interval)
[docs] def stop(self): """ Sets the :class:`multiprocessing.Event` ``__running`` flag to ``False``, aborting the :meth:`Uploader.run` main loop, and then gracefully terminating the :class:`UploadHelper` :class:`multiprocessing.Process` and :class:`watchdog.observers.Observer` :class:`threading.Thread`. Notes: Exiting (implicitly via a ``return`` of ``None``) from :meth:`Uploader.run` allows the :class:`multiprocessing.Process` to be terminated gracefully by the garbage collector. """ logger.debug("Clearing the self.__running flag...") self.__running.clear() logger.info(f"Halting/joining the UploadHelper and Observer processes...") self._helper.stop() self._helper.join() self._observer.stop() self._observer.join() logger.info(f"Halted/joined the UploadHelper and UploadHandler processes. Safely halted the Uploader process.")
# We are not allowed to call: self.join()
[docs] def populate_secondary_queue(self): """ Populates the secondary queue of the :class:`UploadHelper` with all the files still remaining in the specified :attr:`_helper.local_output_directory`. This method is called on startup to queue any remaining files from a previous day for upload. If a file with the extension ``.final`` exists that file is added to the queue instead of the corresponding ``.csv`` file. """ # By default, examine /home/bee/bee_tmp/<any_sensor>/<any_date> for left over files to upload: logger.debug(f'Adding files from {self._local_output_directory} to the secondary queue.') for file in glob.iglob(os.path.join(self._local_output_directory, '**/**/*')): # Iterate over the files and directories in /home/bee/bee_tmp/ if os.path.isdir(file): pass # This is a root-level sensor directory (e.g. 'audio', 'temp', 'video'): else: # This is a file. source_filepath = file filename, date, sensor_name = parse_information_from_file_path(file_path=source_filepath) # Parse out the extension. source_filename_without_extension, source_file_extension = os.path.splitext(p=filename) # Construct the remote output path: rpi_name = platform.node() # Define the path of the current file if it was the tagged final csv file. final_source_filepath = f'{source_filepath}.final' # if current file is not final we enqueue. If it is a .final file then we skip it. if source_file_extension != '.final': # if current file is .csv we check for the .final's existence dest_filepath = os.path.join(self._root_upload_directory, f"{rpi_name}/{date}/{sensor_name}/" f"{filename}") if source_file_extension == '.csv': # if .final exists we enqueue the .final file if os.path.exists(final_source_filepath): # Add the .final file to the queue with destination path same as if it was not tagged. logger.debug(f'{final_source_filepath} exists, adding {final_source_filepath} ' f'with destination {dest_filepath}.') self._helper.queue_upload(dest_filepath=dest_filepath, source_filepath=final_source_filepath, priority=1) else: logger.debug(f'{final_source_filepath} does not exist, adding {source_filepath} ' f'with destination {dest_filepath}.') self._helper.queue_upload(dest_filepath=dest_filepath, source_filepath=source_filepath, priority=1) else: # Enqueue the file for upload to the remote destination in the secondary upload queue: logger.debug(f'Extension was not .csv, adding {source_filepath} ' f'with destination {dest_filepath}.') self._helper.queue_upload(dest_filepath=dest_filepath, source_filepath=source_filepath, priority=1)
# if __name__ == '__main__': # _upload_helper = UploadHelper(upload_queue_polling_interval_seconds=1.0) # local_root_dir = os.path.abspath(os.path.join(os.path.abspath(__file__), '../../../')) # remote_root_dir = os.path.join('beemon', f"{platform.node()}/campellcl_test_dir") # # source_filepath = os.path.join(local_root_dir, 'beemon-config.ini') # _source_filepath = os.path.join('/home/bee/bee_tmp/audio/2021-11-15/rpi4-11@[email protected]') # _dest_filepath = '/beemon/campellcl_test/rpi4-11/audio/2021-11-15/rpi4-11@[email protected]' # _upload_helper.upload_file(source_filepath=_source_filepath, dest_filepath=_dest_filepath) # upload_session = UploadSession( # username=USERNAME, password=PASSWORD, # host_ip='cs.appstate.edu', host_port=21, # remote_directory=remote_root_dir # )