Source code for lowercase_hivename_cleanup

import os
import subprocess
import sys
import traceback
from typing import Union
from loguru import logger
import pandas as pd


[docs] class LowercaseHiveNameCleanup: """ This class is designed to solve `Issue #47 <https://github.com/ASU-CS-Research/honeyDBee/issues/47>`_. The issue is that some of the hives were named with a lowercase 'b' as the population designation instead of an uppercase 'B'. This class will take a list of hive names and move all the data from the incorrectly named hive directories to the correctly named hive directories. It will also delete the files from the incorrectly named hive directories. If there are two csv files in the directory, it will concatenate them. This class will also ensure that the correct directories are created if they do not already exist. This is a class dedicated to issue #47 should not be used again after the issue is resolved. This should also not be used cases where the incorrect hive name is not a lowercase 'b' or when the correct hive name is not an uppercase 'B'. """ # Columns for the pandas dataframe that will be constructed inside class LowercaseHiveNameCleanup column_names = { "airquality": ["1-timestamp", "2-pm10", "3-pm25"], # ? "temp": ["1-timestamp", "2-temperature", "3-humidity"], "scale": ["1-timestamp", "2-weight"], "cpu": ["1-timestamp", "2-cpu", "3-memory"], "audio": ["1-timestamp", "2-temperature", "3-voltage"], "video": ["1-timestamp", "2-file_size"] }
[docs] def __init__(self, hive_names_lowercase: list[str], sensor_directories: list[str]): """ Args: hive_names_lowercase (list[str]): List of lowercase hive names. sensor_directories (list[str]): List of the sensor directories that contain dates. """ #All hive names are of the form 'AppMAISNLb' self._hive_names = hive_names_lowercase self._sensor_directories = sensor_directories self._target_dir = os.path.abspath("usr/local/bee/appmais/") self._correct_hive_names = [i[:-1] + 'B' for i in self._hive_names]
[docs] def validate_and_create_target_hive_directories(self): """ Ensure target directory exists. If it doesn't, then create it. Also checks for the presence of the subdirectories for each hive name. If they do not exist, then create them. """ logger.debug("New hive names:") logger.debug(self._correct_hive_names) #Creates an initial loop for each combination of uppercase and lowercase hive names for (correct_hive_name, former_hive_name) in zip(self._correct_hive_names, self._hive_names): #Creates a path to an uppercase hive new_target_dir = os.path.join(self._target_dir, correct_hive_name) logger.debug("Current hive name: " + correct_hive_name) logger.debug("New path: " + new_target_dir) #Creates a new directory if needed if os.path.exists(new_target_dir): logger.info("New directory exists. Not creating new one.") else: logger.info("New directory did not exist. Creating new one.") os.mkdir(new_target_dir) #Creates a path to the old hive name path_to_old = os.path.join(self._target_dir, former_hive_name) logger.debug("Path to old directory: " + path_to_old) # If for some reason the path to the old hive name doesn't exist, stop execution if not os.path.exists(path_to_old): logger.error("Old path does not exist. Instance of LowercaseHiveName cleanup instantiated with " "at least one hive name that don't exist within the target_directory." f" Hive name {former_hive_name} does not exist within the target directory.") sys.exit(1) #Creates a list of the dates in the selected hive subdir_dates = os.listdir(path_to_old) logger.debug(f"Dates in old directory: {subdir_dates}") #Creates an inner loop for each date in the old hive for date in subdir_dates: path_to_new_date = os.path.join(new_target_dir, date) logger.debug(f'Path to new date: {path_to_new_date}') # If there is no directory for the new date, make one if not os.path.exists(path_to_new_date): os.mkdir(path_to_new_date) #Creates another nested loop for each sensor in each date. for sensor in self._sensor_directories: path_to_sensor_directory = os.path.join(path_to_new_date, sensor) logger.info("Path to sensor directory: " + path_to_sensor_directory) # If the sensor directory does not exist, create it if not os.path.exists(path_to_sensor_directory): os.mkdir(path_to_sensor_directory)
[docs] def csv_cleanup(self, sensor_name: str) -> bool: """ Takes a sensor name(temp/humid, scale, etc.) and a list of column names(in each CSV). Checks how many CSV files are in the given directory, and concatenates them if there are 2. Move and rename the correct file, clearing the old directory. Args: sensor_name (str): Will be one of the sensors in :attr:`sensor_dirs` to do csv cleanup on. Returns: bool: True if the method runs successfully. """ #Creates an outer loop for each combination in the two lists of hives, old and new for hive, correct_hive in zip(self._hive_names, self._correct_hive_names): #Creates a path to the lowercase hive hive_dir = os.path.join(self._target_dir, hive) #Creates a list of dates in the lowercase directory dates = os.listdir(hive_dir) #Creates a nested loop that goes through all the dates for date in dates: #Creates a path to the sensor passed in as a parameter, lowercase directory sensor_dir = os.path.join(hive_dir, date, sensor_name) #Creates a path to the sensor passed in as a parameter, uppercase directory correct_sensor_dir = os.path.join(self._target_dir, correct_hive, date, sensor_name) # If the uppercase sensor directory doesn't exist, stop execution if not os.path.exists(correct_sensor_dir): logger.error(f'\nError: {sensor_name} directory does not exist in {correct_hive}, {date}' + f' directory, _validate_hive_names method did not create the directory.') raise FileNotFoundError(f'{sensor_name} directory does not exist in {correct_hive}, {date}' + ' directory, _validate_hive_names method did not create the directory.') logger.info(f'\nProcessing hive: {hive}, date: {date}, {sensor_name} directory.') # Concatenate the csv files in the lowercase directory # old_sensor_csv_file is the path to the resulting concatenated csv file # If this doesn't work, stop execution logger.info(f'Files are being concatenated in {sensor_name} directory.') old_sensor_csv_file = \ self._concatenate_csv_files(lowercase_path=sensor_dir, uppercase_path=correct_sensor_dir, sensor_name=sensor_name, new_csv_name=f'{correct_hive}@{date}.csv') # _concatenate_csv_files returns None if there are no csv files in the directory to concatenate if old_sensor_csv_file is None: continue # New path for the concatenated csv file new_sensor_csv_file_path = os.path.join(self._target_dir, correct_hive, date, sensor_name, f'{correct_hive}@{date}.csv') self._move_csv_to_target_dir_and_rename(old_sensor_csv_file, new_sensor_csv_file_path) # Delete the remaining files in the old directory self._delete_moved_files(current_dir=sensor_dir, target_dir=correct_sensor_dir, new_file_name_path=new_sensor_csv_file_path) return True
[docs] def _concatenate_csv_files(self, lowercase_path: str, uppercase_path: str, sensor_name: str, new_csv_name: str) -> Union[str, None]: """ Concatenate the csv files in the lowercase_path and uppercase_path directory and return the concatenated file path. These files may have overlap in the timestamps with different data in the row, if so that entry should not be duplicated and the file in the uppercase_path should be used. Each path has a maximum of 2 csv files, therefore a maximum of 4 files will be concatenated. Args: lowercase_path (str): The path to the lowercase sensor directory that contains the csv files that we are working on. uppercase_path (str): The path to the uppercase sensor directory that contains the csv files that we are working on. sensor_name (str): The name of the sensor directory that is being concatenated. new_csv_name (str): The name of the new csv file that will be created. Returns: Union[str, None]: The path to the concatenated csv files. If there are no csv files in the directory, then return None. """ concat_filepath = None #Creates list of all the CSVs in the respective directories, if any up_pop_files = os.listdir(uppercase_path) low_pop_files = os.listdir(lowercase_path) # If there are no CSV files in either, return none if len(low_pop_files) == 0 and len(up_pop_files) == 0: logger.info(f'No csv files found in {sensor_name} directory.') return concat_filepath # Create concatenated dataframes of the CSVs within each directory up_cat_df = self._concatenate_within_directory(uppercase_path, up_pop_files, sensor_name) low_cat_df = self._concatenate_within_directory(lowercase_path, low_pop_files, sensor_name) if low_cat_df is None: # If there is no lowercase concatenated dataframe, set concat_df to the uppercase df # concat_filepath = os.path.join(uppercase_path, up_pop_files[0]) concat_df = up_cat_df # return concat_filepath elif up_cat_df is None: # If there is no uppercase concatenated dataframe, set concat_df to the lowercase df # concat_filepath = os.path.join(lowercase_path, low_pop_files[0]) concat_df = low_cat_df # return concat_filepath else: #Concatenate the uppercase dataframe and the lowercase dataframe both_df_concatenated = pd.concat([up_cat_df, low_cat_df]) concat_df = both_df_concatenated # Sort the concatenated dataframe by the timestamp column concat_df = concat_df.sort_index(axis=1) # Sort the concatenated dataframe by the column names concat_df = concat_df.sort_values(by=list(LowercaseHiveNameCleanup.column_names[sensor_name]), axis=0) # Drop any duplicate rows concat_df = concat_df.drop_duplicates() # Reset the index of the dataframe concat_df = concat_df.reset_index(drop=True) #splits the new CSV into a list that contains a name and a date hive_name, date = new_csv_name.split('@') if hive_name[-1] == 'b': # If the hive name ends in with lowercase b, replace it with uppercase B, add @ and date back concat_filepath = os.path.join(uppercase_path, hive_name[:-1] + 'B' + '@' + date) elif hive_name[-1] != 'B': # If the hive name does not end an uppercase B, add that B, and a @, along with the date concat_filepath = os.path.join(uppercase_path, hive_name + 'B' + '@' + date) else: # If the hive name ends with an uppercase B, concatenate the lowercase path with the new csv name concat_filepath = os.path.join(lowercase_path, new_csv_name) #convert the new df back to csv concat_df.to_csv(concat_filepath, na_rep='nan', header=False, index=False) return concat_filepath
[docs] @staticmethod def _concatenate_within_directory(path: str, files: list[str], sensor: str) -> Union[pd.DataFrame, None]: """ Concatenate the csv files within the provided directory and return the concatenated dataframe. Each directory has a maximum of 2 csv files. Args: path (str): The path to the sensor directory that contains the csv files that we are working on. files (list[str]): The list of csv files within the provided directory. Returns: Union[pd.DataFrame, None]: The dataframe representing the concatenated csv files. If there are no csv files in the directory, then return None. """ df = None if len(files) == 0: logger.info(f'No csv files found in \'{path}\'.') else: # Creates a path to the CSV file path_to_csv1 = os.path.join(path, files[0]) # Takes the path and turns it into a dataframe df_file1 = pd.read_csv(path_to_csv1, names=LowercaseHiveNameCleanup.column_names[sensor]) if len(files) == 2: # If there are 2 CSVs in the directory, get the path to the second, turn it into a df, and concat path_to_csv2 = os.path.join(path, files[1]) df_file2 = pd.read_csv(path_to_csv2, names=LowercaseHiveNameCleanup.column_names[sensor]) df = pd.concat([df_file1, df_file2], ignore_index=True, sort=True) elif len(files) == 1: # If there is only 1 CSV, return first df df = df_file1 else: # Otherwise, print stack track and stop execution traceback.print_exc() raise ValueError(f"There were an unexpected number of csv files found in \'{path}\'. " "There should only ever by 0, 1, or 2 csv files. " f"\nNumber of files found: {len(files)}" f" \nFiles found: {files}") return df
[docs] def check_and_remove_empty_sensor_directories(self, sensor: str): """ Checks and asserts that all lowercase directories of the provided sensor are empty. Deletes the empty directories. If the checks fails, the program will exit. Args: sensor (str): The name of the sensor data that is being checked. """ # get the size of lowercase directories for data of the provided sensor (directory base size is 4.0K) process_status = subprocess.run([f'du -sh ./*b/*/{sensor}'], shell=True, capture_output=True, text=True, cwd=self._target_dir) # split the process output and extract the directory sizes process_output = process_status.stdout.split() sizes = process_output[::2] directories = process_output[1::2] unique_sizes = set(sizes) # if all directories are the same size and that size is 4.0K, all directories are empty if len(unique_sizes) == 1 and list(unique_sizes)[0] == '4.0K': logger.info(f'\nAll {sensor} directories are empty.') else: logger.error(f'\nNot all {sensor} directories are empty.' '\nSomething may have went wrong.' f'\nSizes found: {unique_sizes}') sys.exit(1) # delete the empty sensor directories for directory in directories: os.rmdir(os.path.join(self._target_dir, directory)) logger.info(f'All {sensor} directories removed.')
[docs] @staticmethod def _delete_moved_files(current_dir: str, target_dir: str, new_file_name_path: str): """ Delete the files in the old directory after they have been moved. Args: current_dir (str): The path to the old directory. target_dir (str): The path to the new directory. new_file_name_path (str): The name of the newly created file with its full path in the target directory that we don't want to delete. """ # Remove the remaining csv files from the directory. # Remove the remaining files from the lowercase directory remaining_files = os.listdir(current_dir) for file in remaining_files: file_to_delete = os.path.join(current_dir, file) os.remove(file_to_delete) logger.info(f'Removed file: {file_to_delete}') # Remove the remaining files from the uppercase directory new_remaining_files = os.listdir(target_dir) # make sure we don't delete the new file we just moved or any other files that are not csv files new_remaining_files = \ [file for file in new_remaining_files if os.path.join(target_dir, file) != new_file_name_path and file.endswith('.csv')] for new_file in new_remaining_files: # Get the path to the extra file and delete it path_to_extra_file = os.path.join(target_dir, new_file) os.remove(path_to_extra_file) logger.info(f'Removed file: {path_to_extra_file}') if not os.path.exists(new_file_name_path): raise FileNotFoundError(f"File was not properly moved to new location" f"\nOld file: {current_dir}" f"\nTarget Directory: {target_dir}" f"\nExpected new file: {new_file_name_path}")
[docs] def delete_old_directories(self): """ Delete empty lowercase directories. """ for name in self._hive_names: path = os.path.join(self._target_dir, name) # os.rmdir() won't delete directories unless they're fully empty dates = os.listdir(path) for date in dates: for sensor in self._sensor_directories: sensor_path = os.path.join(path, date, sensor) os.rmdir(sensor_path) os.rmdir(os.path.join(path, date)) # once empty date directories are removed, the old (now empty) hive can be removed os.rmdir(path) logger.info(f'Deleted directory for hive {name}.')
[docs] @staticmethod def _move_csv_to_target_dir_and_rename(old_csv_path: str, new_csv_path: str): """ Move the csv file from the old directory to the new directory and rename it. Args: old_csv_path (str): The path to the old csv file. new_csv_path (str): The path to the new csv file. """ os.replace(old_csv_path, new_csv_path) if not os.path.exists(new_csv_path) or os.path.exists(old_csv_path): raise FileNotFoundError(f"File was not properly moved to new location" f"\nOld file: {old_csv_path}" f"\nNew file: {new_csv_path}") os.chmod(new_csv_path, 0o644) logger.info(f'\nSuccessfully moved file: {old_csv_path} to {new_csv_path}')
def main(sensor_directories: list[str], hive_names: list[str]): """ Main method to run the class. """ clean_up = \ LowercaseHiveNameCleanup(hive_names_lowercase=hive_names, sensor_directories=sensor_directories) clean_up.validate_and_create_target_hive_directories() for sensor in sensor_directories: finished_cleanup = clean_up.csv_cleanup(sensor) if finished_cleanup: logger.info(f'\n{sensor} cleanup finished.') clean_up.delete_old_directories() if __name__ == '__main__': lowercase_hive_names = ['AppMAIS2Rb', 'AppMAIS4Lb', 'AppMAIS4Rb', 'AppMAIS5Lb', 'AppMAIS8Lb', 'AppMAIS9Lb', 'AppMAIS10Lb', 'AppMAIS10Rb', 'AppMAIS12Lb'] sensor_dirs = ["airquality", "temp", "scale", "cpu", "audio", "video"] main(sensor_dirs, lowercase_hive_names)