Source code for mongo

import copy
import sys
from typing import Optional, List
from datetime import datetime
import pymongo
from pymongo.collation import Collation
from loguru import logger


[docs] class MongoUtils: """ Houses common database operations (such as cloning a collection) that for some ungodly reason MongoDB does not support natively. """ def __init__(self): self.__client = pymongo.MongoClient('localhost', 27017) self._db = self.__client.beeDB
[docs] def clone_collection(self, source_collection_name: str, destination_collection_name: str, fields_to_copy: List[str], overwrite_existing: bool, copy_order: Optional[str] = 'ascending'): """ Clones an existing MongoDB collection item-by-item. This is inefficient, but appears to be the only supported way of doing this. Args: source_collection_name (str): The name of the source MongoDB collection to clone. destination_collection_name (str): The desired name of the cloned collection. fields_to_copy (List[str]): A list of fields to copy from the source collection to the cloned destination collection. overwrite_existing (bool): A boolean flag indicating if the destination collection should be overwritten during the clone (should it already exist). copy_order (Optional[str]): A string of the form: ``'ascending'`` or ``'descending'`` indicating the order in which records should be copied from the source collection to the destination collection. Raises: :exception:`~pymongo.errors.OperationFailure`: Raises an ``OperationFailure`` error if an insert operation fails during the collection cloning process. See Also: - `Creating a MongoDB Collection in pymongo <https://kb.objectrocket.com/mongo-db/how-to-create-and-setup-a-mongodb-collection-in-python-370>`_ - `MongoDB Collation <https://www.mongodb.com/docs/manual/reference/collation/>`_ """ # Get the source collection: try: source_collection: pymongo.collection.Collection = getattr(self._db, f'{source_collection_name}') except AttributeError as err: logger.error( f"The provided MongoDB collection name: {source_collection_name} does not exist in the beeDB database! " f"AttributeError: {err}") return # Check to see if the destination collection already exists: dest_collection_exists = True try: _: Optional[dict] = self._db.validate_collection(f"{destination_collection_name}") # If a dictionary is returned (and a pymongo.errors.OperationFailure error is not raised) the collection exists. destination_collection: pymongo.collection.Collection = getattr(self._db, f"{destination_collection_name}") if overwrite_existing: logger.warning( f"Destination collection: {destination_collection_name} already exists in the database. This will " f"overwrite the existing collection as the overwrite_existing flag is set to True.") else: logger.error( f"The destination collection: {destination_collection_name} already exists in the database. The " f"\'overwrite_existing\' flag is set to False, and there is nothing to do!") return except pymongo.errors.OperationFailure: # The destination collection does not exist. We will need to create it. dest_collection_exists = False if not dest_collection_exists: # Create it: collation = Collation( locale="en_US", caseLevel=True, caseFirst='off', strength=3, numericOrdering=False, alternate='non-ignorable', maxVariable='space', backwards=False, normalization=False ) try: destination_collection = self._db.create_collection( name=destination_collection_name, codec_options=None, read_preference=None, write_concern=None, read_concern=None, session=None, collation=collation ) except Exception as err: if "already exists" in err._message: logger.critical(f"Failed to detect existing collection, but failed to create a new collection " f"because it supposedly already exists!") sys.exit(-1) if copy_order == 'ascending': sort_order = pymongo.ASCENDING elif copy_order == 'descending': sort_order = pymongo.DESCENDING else: raise NotImplementedError( f"The provided copy order: \'{copy_order}\' was not recognized. Valid options are " f"\'ascending\' and \'descending\' only.") projection = {field for field in fields_to_copy} selector = {} ''' Keep a persistent session open (see: https://www.mongodb.com/docs/v4.4/reference/method/cursor.noCursorTimeout/#session-idle-timeout-overrides-nocursortimeout) See Also: https://jira.mongodb.org/browse/PYTHON-1879 ''' last_refresh_time = datetime.now() refresh_rate_minutes = 5 with self.__client.start_session() as sess: cursor: pymongo.cursor.Cursor = source_collection.find( selector, sort=[('TimeStamp', sort_order)], projection=projection, allow_disk_use=True ) for i, entry in enumerate(cursor): # Check to see if we need to refresh the session: elapsed_time = datetime.now() - last_refresh_time if elapsed_time.seconds >= (refresh_rate_minutes * 60): # Refresh the session: self.__client.admin.command('refreshSessions', [sess.session_id], session=sess) logger.debug(f"Refreshed session.") last_refresh_time = datetime.now() # Copy the entry to the new collection: filter_clause = {} update_clause = {'$set': entry} result = destination_collection.insert_one(copy.deepcopy(entry), session=sess)
if __name__ == '__main__': mongo_utils = MongoUtils() mongo_utils.clone_collection( source_collection_name='AudioFiles', destination_collection_name='SwarmDetection', fields_to_copy=['_id', 'FilePath', 'HiveName', 'TimeStamp'], overwrite_existing=True, copy_order='ascending' )