import copy
import sys
from typing import Optional, List
from datetime import datetime
import pymongo
from pymongo.collation import Collation
from loguru import logger
[docs]
class MongoUtils:
"""
Houses common database operations (such as cloning a collection) that for some ungodly reason MongoDB does not
support natively.
"""
def __init__(self):
self.__client = pymongo.MongoClient('localhost', 27017)
self._db = self.__client.beeDB
[docs]
def clone_collection(self, source_collection_name: str, destination_collection_name: str, fields_to_copy: List[str],
overwrite_existing: bool, copy_order: Optional[str] = 'ascending'):
"""
Clones an existing MongoDB collection item-by-item. This is inefficient, but appears to be the only supported
way of doing this.
Args:
source_collection_name (str): The name of the source MongoDB collection to clone.
destination_collection_name (str): The desired name of the cloned collection.
fields_to_copy (List[str]): A list of fields to copy from the source collection to the cloned destination
collection.
overwrite_existing (bool): A boolean flag indicating if the destination collection should be overwritten
during the clone (should it already exist).
copy_order (Optional[str]): A string of the form: ``'ascending'`` or ``'descending'`` indicating the order
in which records should be copied from the source collection to the destination collection.
Raises:
:exception:`~pymongo.errors.OperationFailure`: Raises an ``OperationFailure`` error if an insert operation
fails during the collection cloning process.
See Also:
- `Creating a MongoDB Collection in pymongo <https://kb.objectrocket.com/mongo-db/how-to-create-and-setup-a-mongodb-collection-in-python-370>`_
- `MongoDB Collation <https://www.mongodb.com/docs/manual/reference/collation/>`_
"""
# Get the source collection:
try:
source_collection: pymongo.collection.Collection = getattr(self._db, f'{source_collection_name}')
except AttributeError as err:
logger.error(
f"The provided MongoDB collection name: {source_collection_name} does not exist in the beeDB database! "
f"AttributeError: {err}")
return
# Check to see if the destination collection already exists:
dest_collection_exists = True
try:
_: Optional[dict] = self._db.validate_collection(f"{destination_collection_name}")
# If a dictionary is returned (and a pymongo.errors.OperationFailure error is not raised) the collection exists.
destination_collection: pymongo.collection.Collection = getattr(self._db, f"{destination_collection_name}")
if overwrite_existing:
logger.warning(
f"Destination collection: {destination_collection_name} already exists in the database. This will "
f"overwrite the existing collection as the overwrite_existing flag is set to True.")
else:
logger.error(
f"The destination collection: {destination_collection_name} already exists in the database. The "
f"\'overwrite_existing\' flag is set to False, and there is nothing to do!")
return
except pymongo.errors.OperationFailure:
# The destination collection does not exist. We will need to create it.
dest_collection_exists = False
if not dest_collection_exists:
# Create it:
collation = Collation(
locale="en_US",
caseLevel=True,
caseFirst='off',
strength=3,
numericOrdering=False,
alternate='non-ignorable',
maxVariable='space',
backwards=False,
normalization=False
)
try:
destination_collection = self._db.create_collection(
name=destination_collection_name,
codec_options=None,
read_preference=None,
write_concern=None,
read_concern=None,
session=None,
collation=collation
)
except Exception as err:
if "already exists" in err._message:
logger.critical(f"Failed to detect existing collection, but failed to create a new collection "
f"because it supposedly already exists!")
sys.exit(-1)
if copy_order == 'ascending':
sort_order = pymongo.ASCENDING
elif copy_order == 'descending':
sort_order = pymongo.DESCENDING
else:
raise NotImplementedError(
f"The provided copy order: \'{copy_order}\' was not recognized. Valid options are "
f"\'ascending\' and \'descending\' only.")
projection = {field for field in fields_to_copy}
selector = {}
'''
Keep a persistent session open (see:
https://www.mongodb.com/docs/v4.4/reference/method/cursor.noCursorTimeout/#session-idle-timeout-overrides-nocursortimeout)
See Also:
https://jira.mongodb.org/browse/PYTHON-1879
'''
last_refresh_time = datetime.now()
refresh_rate_minutes = 5
with self.__client.start_session() as sess:
cursor: pymongo.cursor.Cursor = source_collection.find(
selector,
sort=[('TimeStamp', sort_order)],
projection=projection,
allow_disk_use=True
)
for i, entry in enumerate(cursor):
# Check to see if we need to refresh the session:
elapsed_time = datetime.now() - last_refresh_time
if elapsed_time.seconds >= (refresh_rate_minutes * 60):
# Refresh the session:
self.__client.admin.command('refreshSessions', [sess.session_id], session=sess)
logger.debug(f"Refreshed session.")
last_refresh_time = datetime.now()
# Copy the entry to the new collection:
filter_clause = {}
update_clause = {'$set': entry}
result = destination_collection.insert_one(copy.deepcopy(entry), session=sess)
if __name__ == '__main__':
mongo_utils = MongoUtils()
mongo_utils.clone_collection(
source_collection_name='AudioFiles',
destination_collection_name='SwarmDetection',
fields_to_copy=['_id', 'FilePath', 'HiveName', 'TimeStamp'],
overwrite_existing=True,
copy_order='ascending'
)