Source code for database_sync

import glob
import os
import csv
import time
import itertools
from typing import Union, Optional, List
import setproctitle
from watchdog import events
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import pymongo
from datetime import datetime
from loguru import logger
from pymongo import monitoring
from pymongo.collection import Collection


[docs] class CommandLogger(monitoring.CommandListener): """ CommandLogger is a class that implements the CommandListener abstract methods and uses that to log commands that are processed pymongo. """
[docs] def started(self, event: monitoring.CommandStartedEvent): """ This function is called when the CommandListener detects a `CommandStartedEvent`_. This function will parse the command and log that in an easy-to-read format. Args: event (CommandStartedEvent): .. _CommandStartedEvent: https://pymongo.readthedocs.io/en/stable/api/pymongo/monitoring.html#pymongo.monitoring.CommandStartedEvent """ started_event_details = f"\nOperation: {event.command_name} \n" \ f"Request ID: {event.request_id}\n" \ f"Collection: {event.command[event.command_name]}\n" \ "Documents: \n\t{\n" for document in event.command['documents']: for key in document: started_event_details += f"\t\t{key}: {document[key]}\n" started_event_details += "\t}" logger.info(f"Command \033[1;32m{event.command_name}\033[0m with request id " f"{event.request_id} started on connection " f"{event.connection_id}") logger.debug(started_event_details) pass
[docs] def succeeded(self, event: monitoring.CommandSucceededEvent): """ This function is called when the CommandListener detects a `CommandSucceededEvent`_. It will log information related to the event. :param event: :return: .. _CommandSucceededEvent: https://pymongo.readthedocs.io/en/stable/api/pymongo/monitoring.html#pymongo.monitoring.CommandSucceededEvent """ logger.info(f"Command \033[1;32m{event.command_name}\033[0m with request id " f"{event.request_id} succeeded on connection " f"{event.connection_id}") pass
[docs] def failed(self, event: monitoring.CommandFailedEvent): """ This function is called when the CommandListener detects a `CommandFailedEvent`_. It will log information related to the event. :param event: :return: .. _CommandSucceededEvent: https://pymongo.readthedocs.io/en/stable/api/pymongo/monitoring.html#pymongo.monitoring.CommandFailedEvent """ logger.info(f"Command \033[1;32m{event.command_name}\033[0m with request id " f"{event.request_id} failed on connection " f"{event.connection_id}" f"with failure {event.failure}") pass
[docs] class DatabaseSync: """ DatabaseSync is used as a way to get many methods that interact with the database. If run, this program will create a process called `database_sync` that continuously watches a directory (Currently `usr/local/bee/appmais`) for new incoming hives, audio files, or video files to insert into the database. See :class:`.UpdateDatabase` if you wish to insert many older files into the database rather than only the incoming files. """
[docs] def __init__(self): monitoring.register(CommandLogger()) self.__bee_db_client = pymongo.MongoClient('mongo-mais.cs.appstate.edu', 27017) self.__honey_beats_client = pymongo.MongoClient('localhost', 27018) self.__bee_db = self.__bee_db_client.beeDB self.__honey_beats = self.__honey_beats_client.HoneyBeats # the following are collections that we insert data into self.__bee_db_audio_collection = self.__bee_db.AudioFiles self.__honey_beats_audio_collection = self.__honey_beats.AudioFiles self.__video_collection = self.__bee_db.VideoFiles self.__image_collection = self.__bee_db.ImageFiles self.__hives_collection = self.__bee_db.Hives self.__scale_collection = self.__bee_db.Scale self.__temp_collection = self.__bee_db.TemperatureHumidity self.__cpu_collection = self.__bee_db.Cpu self.__airquality_collection = self.__bee_db.AirQuality self.__rpi_path = os.path.abspath('/usr/local/bee/appmais/') try: assert (os.path.exists(self.__rpi_path)) except AssertionError: logger.error(f"Path {self.__rpi_path} does not exist!") raise AssertionError(f"Path {self.__rpi_path} does not exist!") self.__hive_pattern = os.path.relpath('AppMAIS*/') self.__level = "DEBUG" self.__rotation = "3 days" self.__retention = "10 days" logger.add("/home/bee/beestream_files/logs/database_sync.log", level=self.__level, colorize=True, rotation=self.__rotation, retention=self.__retention)
[docs] def return_hives(self) -> List[str]: """ Use this method to get a list of all the hives in the database. Returns: List[str]: The list of all the hive names currently in the database. """ hive_list = [] for obj in self.__hives_collection.find({}, {"HiveName": 1, "_id": 0}): hive_list.append(obj['HiveName']) return hive_list
[docs] def find_new_hives(self): """ Checks the directories at the given location for any new hives. If it finds one, pass it to :meth:`._insert_hive`. Used by :meth:`.UpdateDatabase.update_hives`. """ # Creating a wildcard path to iterate over. hive_name_path = os.path.join(self.__rpi_path, self.__hive_pattern) for entry in glob.iglob(hive_name_path): # Appending all current hives to our hive name list. hive_name = os.path.basename(entry) if not already_exists(self.__hives_collection, "HiveName", hive_name): self.insert_hive(hive_name)
[docs] def insert_hive(self, hive_name: str): """ Inserts one hive into the database, using the hive name and finding the earliest recordings sent from that hive. Args: hive_name (str): Name of hive to be inserted. """ created_time = datetime.now() # Checking for an earlier time based on the files for that hive... hive_rel_path = os.path.relpath(hive_name + '/**/video/*') hive_dates = os.path.join(self.__rpi_path, hive_rel_path) for file in glob.iglob(hive_dates): # If there are any recordings, look for the earliest video and use it's created time. # Earliest recordings are in first index, so break after going through this loop once. created_time = datetime.fromtimestamp(os.path.getctime(file)) break self.__hives_collection.insert_one( {'HiveName': hive_name, "StartDate": str(created_time)})
[docs] def find_new_videos(self): """ Iterates through each hive and checks if they have any new videos or images to insert into the database. Can take a *long* time depending on the number of videos/images.Uses :meth:`check_hive_videos` or :meth:`check_hive_images` to build the filepaths for each date and insert any video/image it finds. """ # Creating a wildcard path to iterate over. hive_name_path = os.path.join(self.__rpi_path, self.__hive_pattern) for entry in glob.iglob(hive_name_path): self.check_hive_videos(os.path.basename(entry)) self.check_hive_images(os.path.basename(entry))
[docs] def check_hive_videos(self, hive_name: str, date: Optional[str] = "**"): """ Checks one hive's subtree in the filesystem for any videos, and creates an entry for any that aren't already in the database. Traverses through all the possible video filepaths, checks if the filepaths are in the database, and if not then pass the filepaths to :meth:`.insert_video` to insert into the database. Args: hive_name (str): Name of the hive to check. date (Optional[str]): date to check for videos. Either in format "YYYY-MM-DD" or "**" for all dates. """ hive_path = os.path.join(self.__rpi_path, hive_name) h264_glob = glob.iglob(hive_path + '/' + date + '/video/*.h264') mp4_glob = glob.iglob(hive_path + '/' + date + '/video/*.mp4') for video_path in itertools.chain(h264_glob, mp4_glob): if not already_exists(self.__video_collection, "FilePath", video_path): self.insert_video(video_path)
[docs] def check_hive_images(self, hive_name: str, date: Optional[str] = "**"): """ Checks one hive's subtree in the filesystem for any images, and creates an entry for any that aren't already in the database. Traverses through all the possible image filepaths, checks if the filepaths are in the database, and if not then pass the filepaths to :meth:`.insert_image` to insert into the database. Args: hive_name (str): Name of the hive to check. date (Optional[str]): date to check for videos. Either in format "YYYY-MM-DD" or "**" for all dates. """ hive_path = os.path.join(self.__rpi_path, hive_name) for image_path in glob.iglob(hive_path + '/' + date + '/video/*.png'): if not already_exists(self.__image_collection, "FilePath", image_path): self.insert_image(image_path)
[docs] def insert_video(self, video_path: str): """ Creates an entry of a video file to insert into the database. Video file entries have the HiveName, TimeStamp, FilePath, and FileSize as column headers. This indiscriminately inserts into the database, so be sure before you use it that it is certain that the file is not in the database. Used internally by :meth:`.check_hive_videos` whenever it finds an outstanding file, and externally by the watchdog method :meth:`.DatabaseHandler.on_closed` when the watchdog finds a new video file. Args: video_path (str): The path to the video that was uploaded. """ hive_name = os.path.basename(video_path).split('@')[0] # Getting the time the video was created and creating a datetime object for standardized strings. timestamp = os.path.basename(video_path).split("@") timestamp = f"{timestamp[1]} {timestamp[2][:-5]}" timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H-%M-%S') file_size = os.path.getsize(video_path) if file_size is None: file_size = None insertion_payload = {"HiveName": hive_name, "TimeStamp": timestamp, "FilePath": video_path, "FileSize": file_size} logger.info( f"inserting {os.path.basename(video_path)} as a video file") self.__video_collection.insert_one(insertion_payload) logger.debug(f"Hive {hive_name} video inserted.")
[docs] def insert_image(self, image_path: str): """ Creates an entry of a image file to insert into the database. image file entries have the HiveName, TimeStamp, FilePath, and FileSize as column headers. This indiscriminately inserts into the database, so be sure before you use it that it is certain that the file is not in the database. Used internally by :meth:`.check_hive_images` whenever it finds an outstanding file, and externally by the watchdog method :meth:`.DatabaseHandler.on_closed` when the watchdog finds a new image file. Args: image_path (str): The path to the image that was uploaded. """ hive_name = os.path.basename(image_path).split('@')[0] # Getting the time the image was created and creating a datetime object for standardized strings. timestamp = os.path.basename(image_path).split("@") timestamp = f"{timestamp[1]} {timestamp[2][:-5]}" timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H-%M-%S') file_size = os.path.getsize(image_path) if file_size is None: file_size = None insertion_payload = {"HiveName": hive_name, "TimeStamp": timestamp, "FilePath": image_path, "FileSize": file_size} logger.info( f"inserting {os.path.basename(image_path)} as an image file") self.__image_collection.insert_one(insertion_payload) logger.debug(f"Hive {hive_name} image inserted.")
[docs] def find_new_audio(self): """ Iterates through each hive and checks if they have any new audio files to insert into the database. Can take a *long* time depending on the number of audio files. Uses :meth:`check_hive_audio` to build the filepaths for each date and insert any audio file it finds. """ # Creating a wildcard path to iterate over. hive_name_path = os.path.join(self.__rpi_path, self.__hive_pattern) for entry in glob.iglob(hive_name_path): self.check_hive_audio(os.path.basename(entry))
[docs] def check_hive_audio(self, hive_name: str, date: Optional[str] = "**"): """ Checks one hive's subtree in the filesystem for any audio files, and creates an entry for any that aren't already in the database. Traverses through all the possible audio filepaths, checks if the filepaths are in the database, and if not then pass the filepaths to :meth:`.insert_audio` to insert into the database. Args: hive_name (str): Name of the hive to check. date (Optional[str]): date to check for audio. Either in format "YYYY-MM-DD" or "**" for all dates. """ hive_path = os.path.join(self.__rpi_path, hive_name) for audio_path in glob.iglob(hive_path + '/' + date + '/audio/*.wav'): if not already_exists(self.__bee_db_audio_collection, "FilePath", audio_path): self.insert_audio( collection=self.__bee_db_audio_collection, audio_path=audio_path) if not already_exists(self.__honey_beats_audio_collection, "FilePath", audio_path): self.insert_audio( collection=self.__honey_beats_audio_collection, audio_path=audio_path)
[docs] def insert_audio(self, collection: Collection, audio_path: str): """ Creates an entry of an audio file to insert into the database. Audio file entries have the HiveName, TimeStamp, and FilePath as column headers. This indiscriminately inserts into the database, so be sure before you use it that it is certain that the file is not in the database. Used internally by :meth:`.check_hive_audio` whenever it finds an outstanding file, and externally by the watchdog method :meth:`.DatabaseHandler.on_closed` when the watchdog finds a new audio file. Args: collection (Collection): The collection to insert the audio file item into. audio_path (str): The path to the audio file that was uploaded. """ hive_name = os.path.basename(audio_path).split('@')[0] # Getting the time the audio file was created and creating a datetime object for standardized strings. timestamp = os.path.basename(audio_path).split("@") timestamp = f"{timestamp[1]} {timestamp[2][:-4]}" timestamp = datetime.strptime(timestamp, '%Y-%m-%d %H-%M-%S') insertion_payload = {"HiveName": hive_name, "TimeStamp": timestamp, "FilePath": audio_path} logger.info( f"inserting {os.path.basename(audio_path)} as an audio file") collection.insert_one(insertion_payload) logger.debug(f"Hive {hive_name} audio inserted.")
[docs] def insert_airquality(self, airquality_path: str): """ Creates an entry for the air quality recording to put into the database. This indiscriminately inserts into the database (overwriting the values if they already exist), so be sure before you use it that it is certain that the values are not already in the database. This does not insert the whole file, just the last line of the .csv. Args: airquality_path (str): Full path to the airquality csv file. """ filename = os.path.basename(airquality_path) hive_name, date = filename.split('@') # Removing the filetype date = date[:-4] # Need to grab the last entry from the csv file... last_entry = None with open(airquality_path, 'r') as open_file: reader = csv.reader(open_file, delimiter=',', quotechar='"') for entry in reader: last_entry = entry last_pm25 = last_entry[1] last_pm10 = last_entry[2] if last_pm25 == "nan": last_pm25 = None else: last_pm25 = float(last_pm25) if last_pm10 == "nan": last_pm10 = None else: last_pm10 = float(last_pm10) date = datetime.strptime(f"{date} {last_entry[0]}", "%Y-%m-%d %H-%M-%S") logger.debug( f"Found a air quality reading for hive {hive_name}! {date}: pm25{last_pm25} pm10{last_pm10}") insertion_payload = {"HiveName": hive_name, "PM25": last_pm25, "PM10": last_pm10, "TimeStamp": date} self.__airquality_collection.insert_one(insertion_payload) logger.debug(f"Hive {hive_name} air quality reading inserted.")
[docs] def insert_temp(self, temp_path: str): """ Creates an entry for both the temperature and humidity recording to put into the database. This indiscriminately inserts into the database, so be sure before you use it that it is certain that the values are not already in the database. This does not insert the whole file, just the last line of the .csv. Args: temp_path (str): Full path to the temperature and humidity csv file. """ filename = os.path.basename(temp_path) hive_name, date = filename.split('@') # Removing the filetype date = date[:-4] # Need to grab the last entry from the csv file... last_entry = None with open(temp_path, 'r') as open_file: reader = csv.reader(open_file, delimiter=',', quotechar='"') for entry in reader: last_entry = entry last_temperature = last_entry[1] last_humidity = last_entry[2] if last_temperature == "nan": last_temperature = None else: last_temperature = float(last_temperature) if last_temperature == -58 or last_temperature == -40: # -58 is a sentinel value since it is the conversion of 50 degrees celsius to fahrenheit # -40 was later chosen as a sentinel value since -40 is the same in fahrenheit and celsius last_temperature = None if last_humidity == "nan": last_humidity = None else: last_humidity = float(last_humidity) if last_humidity == -1: last_humidity = None date = datetime.strptime(f"{date} {last_entry[0]}", "%Y-%m-%d %H-%M-%S") logger.debug( f"Found a temperature humidity reading for hive {hive_name}! {date}: T{last_temperature} H{last_humidity}") insertion_payload = {"HiveName": hive_name, "Temperature": last_temperature, "Humidity": last_humidity, "TimeStamp": date} self.__temp_collection.insert_one(insertion_payload) logger.debug( f"Hive {hive_name} temperature/humidity reading inserted.")
[docs] def insert_scale(self, scale_path: str): """ Creates an entry for the scale recording to put into the database. This indiscriminately inserts into the database, so be sure before you use it that it is certain that the values are not already in the database. This does not insert the whole file, just the last line of the .csv. Args: scale_path (str): Full path to the scale csv file. """ filename = os.path.basename(scale_path) hive_name, date = filename.split('@') # Removing the filetype date = date[:-4] # Need to grab the last entry from the csv file... last_entry = None with open(scale_path, 'r') as open_file: reader = csv.reader(open_file, delimiter=',', quotechar='"') for entry in reader: last_entry = entry last_reading = last_entry[1] logger.debug(f"Found raw scale reading {date}: {last_reading} kg") if last_reading == "nan" or last_reading == "-1": last_reading = None logger.debug(f"Scale reading is a sentinel. {date}: null kg") else: last_reading = float(last_reading) logger.debug( f"Scale reading converted to float. {date}: {last_reading} kg") date = datetime.strptime(f"{date} {last_entry[0]}", "%Y-%m-%d %H-%M-%S") logger.debug( f"Found a scale reading for hive {hive_name}! {date}: {last_reading} kg") insertion_payload = {"HiveName": hive_name, "Scale": last_reading, "TimeStamp": date} self.__scale_collection.insert_one(insertion_payload) logger.debug(f"Hive {hive_name} scale reading inserted.")
[docs] def insert_cpu(self, cpu_path: str): """ Creates an entry for cpu stats to put into the database. This indiscriminately inserts into the database, so be sure before you use it that it is certain that the values are not already in the database. This does not insert the whole file, just the last line of the .csv. Args: cpu_path (str): Full path to the cpu csv file. """ filename = os.path.basename(cpu_path) hive_name, date = filename.split('@') # Removing the filetype date = date[:-4] # Need to grab the last entry from the csv file... last_entry = None with open(cpu_path, 'r') as open_file: reader = csv.reader(open_file, delimiter=',', quotechar='"') for entry in reader: last_entry = entry last_cpu_temp = last_entry[1] last_voltage = last_entry[2] if last_cpu_temp == "nan": last_cpu_temp = None else: last_cpu_temp = float(last_cpu_temp) if last_voltage == "nan": last_voltage = None else: last_voltage = float(last_voltage) date = datetime.strptime(f"{date} {last_entry[0]}", "%Y-%m-%d %H-%M-%S") logger.debug( f"Found a cpu reading for hive {hive_name}! {date}: T{last_cpu_temp} V{last_voltage}") insertion_payload = {"HiveName": hive_name, "Cpu Temperature": last_cpu_temp, "Voltage": last_voltage, "TimeStamp": date} self.__cpu_collection.insert_one(insertion_payload) logger.debug(f"Hive {hive_name} cpu reading inserted.")
@property def bee_db_audio_collection(self) -> Collection: return self.__bee_db_audio_collection @property def honey_beats_audio_collection(self) -> Collection: return self.__honey_beats_audio_collection
[docs] class DatabaseHandler(FileSystemEventHandler): """ A subclass of :class:`.FileSystemEventHandler` to pass watchdog information to a DatabaseSync object. """
[docs] def __init__(self): self.__database_sync = DatabaseSync()
[docs] def on_closed(self, event: Union[events.DirCreatedEvent, events.FileCreatedEvent]): """ Watchdog handler that, given an event, sends the relevant data to its DatabaseSync object. Args: event (Union[:class:`events.DirCreatedEvent`, :class:`events.FileCreatedEvent`]): The file or directory created to use. Has a ``src_path`` field and an ``is_directory`` field. """ try: if event.is_directory: # Checking if the directory is a name of a new hive. If so, insert it. if os.path.basename(event.src_path)[:7] == 'AppMAIS': logger.debug( f"Inserting {os.path.basename(event.src_path)} into the database.") self.__database_sync.insert_hive( os.path.basename(event.src_path)) else: src_path_information = event.src_path.split('/') file_type = src_path_information[-2] if file_type == 'video': if not (event.src_path[-5:] == '.h264' or event.src_path[-4:] == '.png' or event.src_path[-4:] == '.mp4'): # .csv files are stored in the video folder, so this is just a quick # catch to make sure they aren't inserted as video files. return if event.src_path.split(".")[-1] == 'h264' or event.src_path.split(".")[-1] == 'mp4': self.__database_sync.insert_video(event.src_path) elif event.src_path.split(".")[-1] == 'png': self.__database_sync.insert_image(event.src_path) # For video files, we also want to update the arrivals and departures. Spawn a non-blocking # subprocess to do this. # the following four lines of code used to run beetrunner via watchdog. Now we run beetrunner with # cron. # logger.debug(f"Running beetrunner on {os.path.basename(event.src_path)}") # command = ['/home/bee/.conda/envs/BeetRunner/bin/python', '/home/bee/BeetRunner/beetrunner.py', # f'{event.src_path}'] # subprocess.Popen(command) elif file_type == 'audio': self.__database_sync.insert_audio( collection=self.__database_sync.bee_db_audio_collection, audio_path=event.src_path ) self.__database_sync.insert_audio( collection=self.__database_sync.honey_beats_audio_collection, audio_path=event.src_path ) elif file_type == 'temp': self.__database_sync.insert_temp(event.src_path) elif file_type == 'scale': self.__database_sync.insert_scale(event.src_path) elif file_type == 'cpu': self.__database_sync.insert_cpu(event.src_path) elif file_type == 'airquality': self.__database_sync.insert_airquality(event.src_path) except Exception as e: logger.error(f"Exception in on_close(): \n{e}")
[docs] def already_exists(collection: pymongo.collection.Collection, key: str, value: str) -> bool: """ Static method that checks whether an entry already exists in a database collection. Args: collection (:class:`pymongo.collection.Collection`): A database collection to query, such as ``__hives_collection``. key (str): Key column that you're looking for, like "HiveName". value (str): New value to check, like "AppMAIS1L" for "HiveName". Returns: bool: True if the key and value are found in the given collection, false if not. .. todo:: Port this method to utils.mongo """ flag = False if collection.count_documents({key: value}, limit=1) > 0: flag = True return flag
if __name__ == "__main__": try: # Setting process title so it can be pgrep'd, pkilled, etc. setproctitle.setproctitle('database_sync') # Path for the watchdog to watch for files. watchdog_path = '/usr/local/bee/appmais' event_handler = DatabaseHandler() observer = Observer() logger.info("Process title is now 'database_sync'") logger.info(f"Starting to observe {watchdog_path} for new files.") observer.schedule(event_handler, watchdog_path, recursive=True) observer.start() except Exception as e: logger.error(f"Exception in main: \n{e}") try: while True: time.sleep(1) finally: observer.stop() observer.join()