Source code for file_upload_completion_watchdog

import argparse
import time

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent
import os
import setproctitle
from typing import Union
from loguru import logger


[docs] class FileUploadCompletionHandler(FileSystemEventHandler): """ Watchdog process that watches for the completion of the file upload process. When a file is done uploading, the extension ``.done`` is added to the filename such that any process that is polling for files to pull can know that the file is done uploading and ready to be pulled. """
[docs] def on_closed(self, event: Union[DirCreatedEvent, FileCreatedEvent]): if event.is_directory: return if event.src_path.endswith(".done"): return logger.debug(f"File {event.src_path} has been closed. Renaming to {event.src_path}.done.") src_path = event.src_path dst_path = src_path + ".done" os.rename(src_path, dst_path)
def run_watchdog(path_to_watch: str): observer = Observer() observer.schedule(FileUploadCompletionHandler(), path=path_to_watch, recursive=True) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == '__main__': setproctitle.setproctitle("file_upload_completion_watchdog") parser = argparse.ArgumentParser( prog='file_upload_completion_watchdog', description="Watchdog process that watches for the completion of the file upload process from the rpis. When a " "file is done uploading, the extension ``.done`` is added to the filename such that any process " "that is polling for files to pull can know that the file is done uploading and ready to be pulled.", ) parser.add_argument( '-p', '--path_to_watch', type=str, help="The path to watch for file upload completions. This should be the path root_upload_directory on the rpis'" " configuration files.", default="/usr/local/bee/appmais" ) args = parser.parse_args() run_watchdog(args.path_to_watch)