import argparse
import os
import stat
import threading
import time
from threading import Thread
from typing import Optional, Tuple
from datetime import datetime, timedelta
import paramiko.util
from paramiko import SSHClient
from paramiko.sftp_client import SFTPClient
from loguru import logger
from setproctitle import setproctitle
[docs]
class RemoteFilePollster:
"""
Class that polls a directory on a remote machine for new files that have completed uploading.
AppMAIS software on the RPIs:
RPI ---(SFTP)---> appmais-wg
This class:
AppMAIS server <---(SFTP)--- appmais-wg (That is, AppMAIS server pulls files from appmais-wg)
With the idea being that appmais-wg and the RPIs never initiate a connection with the AppMAIS server. This script
will run on the AppMAIS server and poll appmais-wg for new files that have completed uploading from the RPIs. These
are denoted by the extension ``.done`` or other user-supplied completed uploading extension being added to the
filename. These files should be pulled to AppMAIS for storage.
"""
[docs]
def __init__(
self, remote_polling_path: str, local_storage_path: str, hostname: str, username: str,
port: Optional[int] = 22, log_filepath: Optional[str] = None,
completed_uploading_extension: Optional[str] = '.done', include_paramiko_logs: Optional[bool] = True
):
"""
Initializes the file pollster. Sets up the logging and the paths for the remote polling and local storage.
Args:
remote_polling_path (str): The path on the remote machine to poll for new files that have completed
uploading.
local_storage_path (str): The path on the local machine to store the files that are pulled from the remote
path.
hostname (str): The hostname of the remote machine to connect to.
username (str): The username to use when connecting to the remote machine.
port (Optional[int]): The port to connect to on the remote machine for SFTP (over SSH). Defaults to 22.
log_filepath (Optional[str]): The path to the log file to write. Defaults to file_pollster.log in the
directory of this file.
completed_uploading_extension (Optional[str]): The extension to denote that a file has completed uploading.
Defaults to '.done'.
include_paramiko_logs (Optional[bool]): Whether to include Paramiko logs in the log file. Defaults to True.
"""
# Set up logging
if log_filepath is None:
log_filepath = os.path.abspath(os.path.join(os.path.dirname(__file__), "file_pollster.log"))
logger.add(log_filepath)
if include_paramiko_logs:
paramiko.util.log_to_file(log_filepath)
logger.debug(f'Logging in file pollster initialized at {log_filepath}.'
f'{" Paramiko logs included in the same file." if include_paramiko_logs else ""}')
# Set up the paths
self._remote_polling_path = remote_polling_path
self._local_storage_path = local_storage_path
self._completed_uploading_extension = completed_uploading_extension # Uploading from the pis
# Set up the connection
self._hostname = hostname
self._port = port
self._username = username
logger.info(f"File pollster initialized, looking for new files on the remote path {remote_polling_path}"
f" and storing them locally at {local_storage_path}.")
[docs]
def _poll_for_files(self, recursive_search: Optional[bool] = True):
"""
Polls the remote directory for new files that have completed uploading. If a file is found, it is pulled to the
local storage path.
Args:
recursive_search (Optional[bool]): Whether to recursively search the remote directory for files.
Defaults to True.
"""
logger.debug(f'Opening SSH and SFTP clients for file pollster...')
with SFTPClientContextManager(hostname=self._hostname, username=self._username, port=self._port) as sftp_client:
logger.info(f"Polling for new files on the remote machine at path {self._remote_polling_path}.")
self._poll_for_files_recursive(
path=self._remote_polling_path, sftp_client=sftp_client, expand_dirs=recursive_search
)
logger.debug(f'Finished polling. Closing SSH and SFTP clients for file pollster.')
[docs]
def _poll_for_files_recursive(self, sftp_client: SFTPClient, path: str, expand_dirs: bool):
"""
Recursive utility function to poll for files in a directory and its subdirectories.
Args:
path (str): The path to the directory to poll for files.
expand_dirs (bool): Whether to expand directories and search their contents for files.
"""
for dir_entry in sftp_client.listdir(path=path):
full_path = os.path.join(path, dir_entry)
# check if the path leads to a directory:
file_stat = sftp_client.stat(full_path)
if stat.S_ISDIR(file_stat.st_mode):
if expand_dirs:
self._poll_for_files_recursive(path=full_path, sftp_client=sftp_client, expand_dirs=expand_dirs)
else:
logger.debug(f'Found file: {full_path}')
if not full_path.endswith(self._completed_uploading_extension):
logger.debug(f'File {full_path} has not completed uploading. Skipping...')
else:
logger.debug(f'File {full_path} has completed uploading, opening a thread to pull the file from '
f'remote host.')
thread = Thread(target=self._pull_and_remove_file, args=(full_path,))
thread.setName(f'PullFileThread-{dir_entry.replace(self._completed_uploading_extension, "")}')
thread.start()
[docs]
def _pull_and_remove_file(self, remote_path_to_file: str):
"""
Uses the SFTP client to pull a file from the remote machine to the local machine. The file is placed in the
local storage path, with any necessary directories created. The file is then deleted from the remote machine.
Expects the file to be located within the remote polling path. Expects to be run as a separate thread for the
logging messages to be accurate.
Args:
remote_path_to_file (str): The full path to the file on the remote machine that should be pulled to the
local machine. This path should be absolute.
Raises:
ValueError: If the file is located outside the remote polling path.
"""
with SFTPClientContextManager(hostname=self._hostname, username=self._username, port=self._port) as sftp_client:
thread_name = threading.current_thread().name
filepath_without_upload_extension = remote_path_to_file.replace(
self._completed_uploading_extension, ''
)
logger.debug(
f'{thread_name}: Thread started to pull file {os.path.basename(filepath_without_upload_extension)} '
f'from remote host.'
)
if self._remote_polling_path not in remote_path_to_file:
error_str = (f'{thread_name}: Attempting to pull a file from outside the remote polling path '
f'{self._remote_polling_path} is not allowed.')
logger.error(error_str)
raise ValueError(error_str)
# Create the local path for the file:
local_path_for_file = filepath_without_upload_extension.replace(
self._remote_polling_path, self._local_storage_path
)
dir_path_to_file = os.path.dirname(local_path_for_file)
# Make sure the local path exists:
if not os.path.exists(dir_path_to_file):
logger.debug(f'{thread_name}: Have to make some number of directories in the path {dir_path_to_file}')
os.makedirs(dir_path_to_file)
# Pull the file. the sftp_client.get() function is synchronous, so this will block until the file is pulled:
logger.debug(f'{thread_name}: Getting file {os.path.basename(local_path_for_file)} from remote and placing '
f'it at the local location {dir_path_to_file}')
sftp_client.get(remote_path_to_file, local_path_for_file)
# Remove the file from the remote host:
logger.debug(f'{thread_name}: Removing file {os.path.basename(remote_path_to_file)} from remote host.')
sftp_client.remove(remote_path_to_file)
logger.info(f'{thread_name}: Thread for file {os.path.basename(remote_path_to_file)} has completed.')
[docs]
def start_polling_at_interval(self, polling_interval_minutes: int, recursive_search: bool):
"""
Starts the file pollster. This function will run indefinitely, polling for files at the interval specified upon
initialization. Begins polling at the next whole minute after the function is called, and then polls again after
``polling_interval_minutes`` minutes have passed.
Args:
polling_interval_minutes (int): The number of minutes to wait between polling for files.
recursive_search (bool): Whether to recursively search the remote directory for files.
"""
logger.info(f'Starting file pollster with polling interval of {polling_interval_minutes * 60} seconds.')
next_whole_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(minutes=1)
time_to_sleep = (next_whole_minute - datetime.now()).total_seconds()
logger.info(f'Waiting until the next whole minute ({next_whole_minute}) before starting polling...')
time.sleep(time_to_sleep)
while True:
next_polling_time = datetime.now().timestamp() + (polling_interval_minutes * 60)
next_polling_time = datetime.fromtimestamp(next_polling_time).replace(second=0, microsecond=0)
self._poll_for_files(recursive_search=recursive_search)
logger.info(f'Waiting until {next_polling_time} before polling again...')
time.sleep((next_polling_time - datetime.now()).total_seconds())
@property
def remote_polling_path(self):
return self._remote_polling_path
@property
def local_storage_path(self):
return self._local_storage_path
[docs]
@staticmethod
def parse_command_line_arguments(program_name: str) -> Tuple[str, str, str, str, int, bool, int, str, bool]:
"""
Parses the command line arguments for the file pollster script.
Args:
program_name (str): The name of the program that is being run. This is used to set the process title for
the program as seen by the command line args (such as when doing ``--help``) but is an argument such that
it could be changed along with the process title for consistency.
Returns:
A tuple containing the parsed command line arguments.
* **remote_polling_path** (str): The path on the remote machine to poll for new files that have completed
uploading.
* **local_storage_path** (str): The path on the local machine to store the files that are pulled from the
remote path.
* **hostname** (str): The hostname of the remote machine to connect to. Defaults to
appmais-wg's IP address.
* **username** (str): The username to use when connecting to the remote machine. Defaults to bee.
* **port** (int): The port to connect to on the remote machine for SFTP (over SSH). Defaults to 22.
* **recursive_search** (bool): Whether to recursively search the remote directory for files. Defaults
to True.
* **polling_interval_minutes** (int): The number of minutes to wait between polling for files. Defaults
to 1.
* **log_filepath** (str): The path to the log file to write. Defaults to file_pollster.log in the
directory of this file.
* **include_paramiko_logs** (bool): Whether to include Paramiko logs in the log file. Defaults to True.
"""
parser = argparse.ArgumentParser(
prog=program_name,
description="Polls a remote directory for new files that have completed uploading and pulls them to a "
"local storage path. Used for pulling files from the appmais-wg server that have been uploaded "
"from the RPIs.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
'-r', '--remote_path',
type=str,
help=f"The remote path to poll for new files that have completed uploading. "
f"This program preserves directory structure when pulling files recursively from the remote path, "
f"replacing the given remote path with the local storage path. This argument is required.",
required=True,
default=argparse.SUPPRESS
)
parser.add_argument(
'-l', '--local_path',
type=str,
help=f"The local path to store the files that are pulled from the remote path. This argument is required.",
required=True,
default=argparse.SUPPRESS
)
parser.add_argument(
'-a', '--host_address',
type=str,
help=f"The hostname of the remote machine to connect to. Defaults to appmais-wg's IP address.",
default="152.10.10.74"
)
parser.add_argument(
'-u', '--username',
type=str,
help=f"The username to use when connecting to the remote machine.",
default="bee"
)
parser.add_argument(
'-p', '--port',
type=int,
help=f"The port to connect to on the remote machine for SFTP (over SSH).",
default=22
)
parser.add_argument(
'--polling_interval_minutes',
type=int,
help=f"The number of minutes to wait between polling for files.",
default=1
)
parser.add_argument(
'--recursive_search',
type=bool,
help=f"Whether to recursively search the remote directory for files.",
default=True
)
parser.add_argument(
'--log_filepath',
type=str,
help="The path to the log file to write. Defaults to file_pollster.log in the directory of this file.",
default=os.path.abspath(os.path.join(os.path.dirname(__file__), "file_pollster.log"))
)
parser.add_argument(
'--include_paramiko_logs',
type=bool,
help="Whether to include Paramiko logs in the log file.",
default=True
)
args = parser.parse_args()
return (
args.remote_path,
args.local_path,
args.host_address,
args.username,
args.port,
args.recursive_search,
args.polling_interval_minutes,
args.log_filepath,
args.include_paramiko_logs
)
[docs]
class SFTPClientContextManager:
"""
Context manager for the SSH connection to the AppMAIS-WG server. This context manager is used to ensure that the
SSH connection is properly closed when it is no longer needed.
"""
[docs]
def __init__(self, hostname: str, username: str, port: Optional[int] = 22):
self._hostname = hostname
self._username = username
self._port = port
self._ssh_client = None
self._sftp_client = None
def __enter__(self):
self._ssh_client = SSHClient()
self._ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh_client.connect(
hostname=self._hostname, port=self._port, username=self._username, allow_agent=True, look_for_keys=True
)
self._sftp_client = self._ssh_client.open_sftp()
return self._sftp_client
def __exit__(self, exc_type, exc_val, exc_tb):
self._ssh_client.close()
self._sftp_client.close()
if __name__ == '__main__':
prog_name = "remote_file_pollster" # https://lparchive.org/Mega-Man-Battle-Network-3-Blue/Update%2007/21-MMBN3Update5127.png
setproctitle(prog_name)
(
remote_path, local_path, host_addr, user, sftp_port, recursive, polling_interval, log_path, log_paramiko
) = RemoteFilePollster.parse_command_line_arguments(prog_name)
file_pollster = RemoteFilePollster(
remote_polling_path=remote_path, local_storage_path=local_path, hostname=host_addr,
username=user, port=sftp_port, log_filepath=log_path, include_paramiko_logs=log_paramiko
)
file_pollster.start_polling_at_interval(polling_interval_minutes=polling_interval, recursive_search=recursive)