import argparse
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent
import os
import setproctitle
from typing import Union
from loguru import logger
[docs]
class FileUploadCompletionHandler(FileSystemEventHandler):
"""
Watchdog process that watches for the completion of the file upload process. When a file is done uploading, the
extension ``.done`` is added to the filename such that any process that is polling for files to pull can know that
the file is done uploading and ready to be pulled.
"""
[docs]
def on_closed(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
if event.is_directory:
return
if event.src_path.endswith(".done"):
return
logger.debug(f"File {event.src_path} has been closed. Renaming to {event.src_path}.done.")
src_path = event.src_path
dst_path = src_path + ".done"
os.rename(src_path, dst_path)
def run_watchdog(path_to_watch: str):
observer = Observer()
observer.schedule(FileUploadCompletionHandler(), path=path_to_watch, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
if __name__ == '__main__':
setproctitle.setproctitle("file_upload_completion_watchdog")
parser = argparse.ArgumentParser(
prog='file_upload_completion_watchdog',
description="Watchdog process that watches for the completion of the file upload process from the rpis. When a "
"file is done uploading, the extension ``.done`` is added to the filename such that any process "
"that is polling for files to pull can know that the file is done uploading and ready to be pulled.",
)
parser.add_argument(
'-p', '--path_to_watch',
type=str,
help="The path to watch for file upload completions. This should be the path root_upload_directory on the rpis'"
" configuration files.",
default="/usr/local/bee/appmais"
)
args = parser.parse_args()
run_watchdog(args.path_to_watch)