import os
import subprocess
import sys
import traceback
from typing import Union
from loguru import logger
import pandas as pd
[docs]
class LowercaseHiveNameCleanup:
"""
This class is designed to solve `Issue #47 <https://github.com/ASU-CS-Research/honeyDBee/issues/47>`_.
The issue is that some of the hives were named with a lowercase 'b' as
the population designation instead of an uppercase 'B'. This class will take a list of hive names and move all
the data from the incorrectly named hive directories to the correctly named hive directories. It will also delete
the files from the incorrectly named hive directories. If there are two csv files in the directory, it will
concatenate them. This class will also ensure that the correct directories are created if they do not already exist.
This is a class dedicated to issue #47 should not be used again after the issue is resolved. This should also not be
used cases where the incorrect hive name is not a lowercase 'b' or when the correct hive name is not an
uppercase 'B'.
"""
# Columns for the pandas dataframe that will be constructed inside class LowercaseHiveNameCleanup
column_names = {
"airquality": ["1-timestamp", "2-pm10", "3-pm25"], # ?
"temp": ["1-timestamp", "2-temperature", "3-humidity"],
"scale": ["1-timestamp", "2-weight"],
"cpu": ["1-timestamp", "2-cpu", "3-memory"],
"audio": ["1-timestamp", "2-temperature", "3-voltage"],
"video": ["1-timestamp", "2-file_size"]
}
[docs]
def __init__(self, hive_names_lowercase: list[str], sensor_directories: list[str]):
"""
Args:
hive_names_lowercase (list[str]): List of lowercase hive names.
sensor_directories (list[str]): List of the sensor directories that contain dates.
"""
#All hive names are of the form 'AppMAISNLb'
self._hive_names = hive_names_lowercase
self._sensor_directories = sensor_directories
self._target_dir = os.path.abspath("usr/local/bee/appmais/")
self._correct_hive_names = [i[:-1] + 'B' for i in self._hive_names]
[docs]
def validate_and_create_target_hive_directories(self):
"""
Ensure target directory exists. If it doesn't, then create it.
Also checks for the presence of the subdirectories for each hive name. If they do not exist, then create them.
"""
logger.debug("New hive names:")
logger.debug(self._correct_hive_names)
#Creates an initial loop for each combination of uppercase and lowercase hive names
for (correct_hive_name, former_hive_name) in zip(self._correct_hive_names, self._hive_names):
#Creates a path to an uppercase hive
new_target_dir = os.path.join(self._target_dir, correct_hive_name)
logger.debug("Current hive name: " + correct_hive_name)
logger.debug("New path: " + new_target_dir)
#Creates a new directory if needed
if os.path.exists(new_target_dir):
logger.info("New directory exists. Not creating new one.")
else:
logger.info("New directory did not exist. Creating new one.")
os.mkdir(new_target_dir)
#Creates a path to the old hive name
path_to_old = os.path.join(self._target_dir, former_hive_name)
logger.debug("Path to old directory: " + path_to_old)
# If for some reason the path to the old hive name doesn't exist, stop execution
if not os.path.exists(path_to_old):
logger.error("Old path does not exist. Instance of LowercaseHiveName cleanup instantiated with "
"at least one hive name that don't exist within the target_directory."
f" Hive name {former_hive_name} does not exist within the target directory.")
sys.exit(1)
#Creates a list of the dates in the selected hive
subdir_dates = os.listdir(path_to_old)
logger.debug(f"Dates in old directory: {subdir_dates}")
#Creates an inner loop for each date in the old hive
for date in subdir_dates:
path_to_new_date = os.path.join(new_target_dir, date)
logger.debug(f'Path to new date: {path_to_new_date}')
# If there is no directory for the new date, make one
if not os.path.exists(path_to_new_date):
os.mkdir(path_to_new_date)
#Creates another nested loop for each sensor in each date.
for sensor in self._sensor_directories:
path_to_sensor_directory = os.path.join(path_to_new_date, sensor)
logger.info("Path to sensor directory: " + path_to_sensor_directory)
# If the sensor directory does not exist, create it
if not os.path.exists(path_to_sensor_directory):
os.mkdir(path_to_sensor_directory)
[docs]
def csv_cleanup(self, sensor_name: str) -> bool:
"""
Takes a sensor name(temp/humid, scale, etc.) and a list of column names(in each
CSV). Checks how many CSV files are in the given directory, and concatenates them
if there are 2. Move and rename the correct file, clearing the old directory.
Args:
sensor_name (str): Will be one of the sensors in :attr:`sensor_dirs` to do csv cleanup on.
Returns:
bool: True if the method runs successfully.
"""
#Creates an outer loop for each combination in the two lists of hives, old and new
for hive, correct_hive in zip(self._hive_names, self._correct_hive_names):
#Creates a path to the lowercase hive
hive_dir = os.path.join(self._target_dir, hive)
#Creates a list of dates in the lowercase directory
dates = os.listdir(hive_dir)
#Creates a nested loop that goes through all the dates
for date in dates:
#Creates a path to the sensor passed in as a parameter, lowercase directory
sensor_dir = os.path.join(hive_dir, date, sensor_name)
#Creates a path to the sensor passed in as a parameter, uppercase directory
correct_sensor_dir = os.path.join(self._target_dir, correct_hive, date, sensor_name)
# If the uppercase sensor directory doesn't exist, stop execution
if not os.path.exists(correct_sensor_dir):
logger.error(f'\nError: {sensor_name} directory does not exist in {correct_hive}, {date}' +
f' directory, _validate_hive_names method did not create the directory.')
raise FileNotFoundError(f'{sensor_name} directory does not exist in {correct_hive}, {date}' +
' directory, _validate_hive_names method did not create the directory.')
logger.info(f'\nProcessing hive: {hive}, date: {date}, {sensor_name} directory.')
# Concatenate the csv files in the lowercase directory
# old_sensor_csv_file is the path to the resulting concatenated csv file
# If this doesn't work, stop execution
logger.info(f'Files are being concatenated in {sensor_name} directory.')
old_sensor_csv_file = \
self._concatenate_csv_files(lowercase_path=sensor_dir, uppercase_path=correct_sensor_dir,
sensor_name=sensor_name,
new_csv_name=f'{correct_hive}@{date}.csv')
# _concatenate_csv_files returns None if there are no csv files in the directory to concatenate
if old_sensor_csv_file is None:
continue
# New path for the concatenated csv file
new_sensor_csv_file_path = os.path.join(self._target_dir, correct_hive, date, sensor_name,
f'{correct_hive}@{date}.csv')
self._move_csv_to_target_dir_and_rename(old_sensor_csv_file, new_sensor_csv_file_path)
# Delete the remaining files in the old directory
self._delete_moved_files(current_dir=sensor_dir,
target_dir=correct_sensor_dir,
new_file_name_path=new_sensor_csv_file_path)
return True
[docs]
def _concatenate_csv_files(self, lowercase_path: str, uppercase_path: str, sensor_name: str,
new_csv_name: str) -> Union[str, None]:
"""
Concatenate the csv files in the lowercase_path and uppercase_path directory and return the concatenated file
path. These files may have overlap in the timestamps with different data in the row, if so that entry should not
be duplicated and the file in the uppercase_path should be used. Each path has a maximum of 2 csv files,
therefore a maximum of 4 files will be concatenated.
Args:
lowercase_path (str): The path to the lowercase sensor directory that contains the csv files that we are
working on.
uppercase_path (str): The path to the uppercase sensor directory that contains the csv files that we are
working on.
sensor_name (str): The name of the sensor directory that is being concatenated.
new_csv_name (str): The name of the new csv file that will be created.
Returns:
Union[str, None]: The path to the concatenated csv files. If there are no csv files in the directory, then
return None.
"""
concat_filepath = None
#Creates list of all the CSVs in the respective directories, if any
up_pop_files = os.listdir(uppercase_path)
low_pop_files = os.listdir(lowercase_path)
# If there are no CSV files in either, return none
if len(low_pop_files) == 0 and len(up_pop_files) == 0:
logger.info(f'No csv files found in {sensor_name} directory.')
return concat_filepath
# Create concatenated dataframes of the CSVs within each directory
up_cat_df = self._concatenate_within_directory(uppercase_path, up_pop_files, sensor_name)
low_cat_df = self._concatenate_within_directory(lowercase_path, low_pop_files, sensor_name)
if low_cat_df is None:
# If there is no lowercase concatenated dataframe, set concat_df to the uppercase df
# concat_filepath = os.path.join(uppercase_path, up_pop_files[0])
concat_df = up_cat_df
# return concat_filepath
elif up_cat_df is None:
# If there is no uppercase concatenated dataframe, set concat_df to the lowercase df
# concat_filepath = os.path.join(lowercase_path, low_pop_files[0])
concat_df = low_cat_df
# return concat_filepath
else:
#Concatenate the uppercase dataframe and the lowercase dataframe
both_df_concatenated = pd.concat([up_cat_df, low_cat_df])
concat_df = both_df_concatenated
# Sort the concatenated dataframe by the timestamp column
concat_df = concat_df.sort_index(axis=1)
# Sort the concatenated dataframe by the column names
concat_df = concat_df.sort_values(by=list(LowercaseHiveNameCleanup.column_names[sensor_name]), axis=0)
# Drop any duplicate rows
concat_df = concat_df.drop_duplicates()
# Reset the index of the dataframe
concat_df = concat_df.reset_index(drop=True)
#splits the new CSV into a list that contains a name and a date
hive_name, date = new_csv_name.split('@')
if hive_name[-1] == 'b':
# If the hive name ends in with lowercase b, replace it with uppercase B, add @ and date back
concat_filepath = os.path.join(uppercase_path, hive_name[:-1] + 'B' + '@' + date)
elif hive_name[-1] != 'B':
# If the hive name does not end an uppercase B, add that B, and a @, along with the date
concat_filepath = os.path.join(uppercase_path, hive_name + 'B' + '@' + date)
else:
# If the hive name ends with an uppercase B, concatenate the lowercase path with the new csv name
concat_filepath = os.path.join(lowercase_path, new_csv_name)
#convert the new df back to csv
concat_df.to_csv(concat_filepath, na_rep='nan', header=False, index=False)
return concat_filepath
[docs]
@staticmethod
def _concatenate_within_directory(path: str, files: list[str], sensor: str) -> Union[pd.DataFrame, None]:
"""
Concatenate the csv files within the provided directory and return the concatenated dataframe. Each directory
has a maximum of 2 csv files.
Args:
path (str): The path to the sensor directory that contains the csv files that we are
working on.
files (list[str]): The list of csv files within the provided directory.
Returns:
Union[pd.DataFrame, None]: The dataframe representing the concatenated csv files. If there are no csv files
in the directory, then return None.
"""
df = None
if len(files) == 0:
logger.info(f'No csv files found in \'{path}\'.')
else:
# Creates a path to the CSV file
path_to_csv1 = os.path.join(path, files[0])
# Takes the path and turns it into a dataframe
df_file1 = pd.read_csv(path_to_csv1, names=LowercaseHiveNameCleanup.column_names[sensor])
if len(files) == 2:
# If there are 2 CSVs in the directory, get the path to the second, turn it into a df, and concat
path_to_csv2 = os.path.join(path, files[1])
df_file2 = pd.read_csv(path_to_csv2, names=LowercaseHiveNameCleanup.column_names[sensor])
df = pd.concat([df_file1, df_file2],
ignore_index=True, sort=True)
elif len(files) == 1:
# If there is only 1 CSV, return first df
df = df_file1
else:
# Otherwise, print stack track and stop execution
traceback.print_exc()
raise ValueError(f"There were an unexpected number of csv files found in \'{path}\'. "
"There should only ever by 0, 1, or 2 csv files. "
f"\nNumber of files found: {len(files)}"
f" \nFiles found: {files}")
return df
[docs]
def check_and_remove_empty_sensor_directories(self, sensor: str):
"""
Checks and asserts that all lowercase directories of the provided sensor are empty. Deletes the empty
directories. If the checks fails, the program will exit.
Args:
sensor (str): The name of the sensor data that is being checked.
"""
# get the size of lowercase directories for data of the provided sensor (directory base size is 4.0K)
process_status = subprocess.run([f'du -sh ./*b/*/{sensor}'], shell=True, capture_output=True, text=True,
cwd=self._target_dir)
# split the process output and extract the directory sizes
process_output = process_status.stdout.split()
sizes = process_output[::2]
directories = process_output[1::2]
unique_sizes = set(sizes)
# if all directories are the same size and that size is 4.0K, all directories are empty
if len(unique_sizes) == 1 and list(unique_sizes)[0] == '4.0K':
logger.info(f'\nAll {sensor} directories are empty.')
else:
logger.error(f'\nNot all {sensor} directories are empty.'
'\nSomething may have went wrong.'
f'\nSizes found: {unique_sizes}')
sys.exit(1)
# delete the empty sensor directories
for directory in directories:
os.rmdir(os.path.join(self._target_dir, directory))
logger.info(f'All {sensor} directories removed.')
[docs]
@staticmethod
def _delete_moved_files(current_dir: str, target_dir: str, new_file_name_path: str):
"""
Delete the files in the old directory after they have been moved.
Args:
current_dir (str): The path to the old directory.
target_dir (str): The path to the new directory.
new_file_name_path (str): The name of the newly created file with its full path in the target directory
that we don't want to delete.
"""
# Remove the remaining csv files from the directory.
# Remove the remaining files from the lowercase directory
remaining_files = os.listdir(current_dir)
for file in remaining_files:
file_to_delete = os.path.join(current_dir, file)
os.remove(file_to_delete)
logger.info(f'Removed file: {file_to_delete}')
# Remove the remaining files from the uppercase directory
new_remaining_files = os.listdir(target_dir)
# make sure we don't delete the new file we just moved or any other files that are not csv files
new_remaining_files = \
[file for file in new_remaining_files if
os.path.join(target_dir, file) != new_file_name_path and file.endswith('.csv')]
for new_file in new_remaining_files:
# Get the path to the extra file and delete it
path_to_extra_file = os.path.join(target_dir, new_file)
os.remove(path_to_extra_file)
logger.info(f'Removed file: {path_to_extra_file}')
if not os.path.exists(new_file_name_path):
raise FileNotFoundError(f"File was not properly moved to new location"
f"\nOld file: {current_dir}"
f"\nTarget Directory: {target_dir}"
f"\nExpected new file: {new_file_name_path}")
[docs]
def delete_old_directories(self):
"""
Delete empty lowercase directories.
"""
for name in self._hive_names:
path = os.path.join(self._target_dir, name)
# os.rmdir() won't delete directories unless they're fully empty
dates = os.listdir(path)
for date in dates:
for sensor in self._sensor_directories:
sensor_path = os.path.join(path, date, sensor)
os.rmdir(sensor_path)
os.rmdir(os.path.join(path, date))
# once empty date directories are removed, the old (now empty) hive can be removed
os.rmdir(path)
logger.info(f'Deleted directory for hive {name}.')
[docs]
@staticmethod
def _move_csv_to_target_dir_and_rename(old_csv_path: str, new_csv_path: str):
"""
Move the csv file from the old directory to the new directory and rename it.
Args:
old_csv_path (str): The path to the old csv file.
new_csv_path (str): The path to the new csv file.
"""
os.replace(old_csv_path, new_csv_path)
if not os.path.exists(new_csv_path) or os.path.exists(old_csv_path):
raise FileNotFoundError(f"File was not properly moved to new location"
f"\nOld file: {old_csv_path}"
f"\nNew file: {new_csv_path}")
os.chmod(new_csv_path, 0o644)
logger.info(f'\nSuccessfully moved file: {old_csv_path} to {new_csv_path}')
def main(sensor_directories: list[str], hive_names: list[str]):
"""
Main method to run the class.
"""
clean_up = \
LowercaseHiveNameCleanup(hive_names_lowercase=hive_names, sensor_directories=sensor_directories)
clean_up.validate_and_create_target_hive_directories()
for sensor in sensor_directories:
finished_cleanup = clean_up.csv_cleanup(sensor)
if finished_cleanup:
logger.info(f'\n{sensor} cleanup finished.')
clean_up.delete_old_directories()
if __name__ == '__main__':
lowercase_hive_names = ['AppMAIS2Rb', 'AppMAIS4Lb', 'AppMAIS4Rb', 'AppMAIS5Lb', 'AppMAIS8Lb', 'AppMAIS9Lb',
'AppMAIS10Lb', 'AppMAIS10Rb', 'AppMAIS12Lb']
sensor_dirs = ["airquality", "temp", "scale", "cpu", "audio", "video"]
main(sensor_dirs, lowercase_hive_names)