Source code for n3fit.hyper_optimization.mongofiletrials

"""
    Hyperopt trial object for parallel hyperoptimization with MongoDB.
    Data are fetched from MongoDB databases and stored in the form of json and tar.gz files within the nnfit folder.
"""

import json
import logging
import os
import subprocess
import tarfile

from bson import SON, ObjectId
from hyperopt.mongoexp import MongoTrials

from n3fit.backends import get_physical_gpus
from n3fit.hyper_optimization.filetrials import space_eval_trial

log = logging.getLogger(__name__)


[docs]def convert_bson_to_dict(obj): """ Recursively convert a BSON object to a standard Python dictionary. This function is particularly useful for converting MongoDB query results, which may contain BSON types like ObjectId and SON, into a more manageable dictionary format. Parameters ---------- obj : dict or bson.SON or list or any The object to convert. Can be a BSON object (like SON), a dictionary containing BSON types, a list of such objects, or any other type. Returns ------- dict or list or any A Python dictionary with all BSON types converted to standard Python types (e.g., ObjectId converted to string). If the input is a list, returns a list of converted elements. For other types, returns the object as is. Examples -------- >>> from bson import ObjectId, SON >>> sample_son = SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]) >>> convert_bson_to_dict(sample_son) {'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'} >>> sample_list = [SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]), {'age': 30}] >>> convert_bson_to_dict(sample_list) [{'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'}, {'age': 30}] """ if isinstance(obj, (SON, dict)): return {k: convert_bson_to_dict(v) for k, v in obj.items()} if isinstance(obj, ObjectId): return str(obj) # or just return None if you don't need the ObjectId if isinstance(obj, list): return [convert_bson_to_dict(v) for v in obj] return obj
[docs]class MongodRunner: """Class to manage a MongoDB instance. This class is responsible for automatically creating and managing a MongoDB database using the `mongod` command. It allows for starting and stopping a MongoDB instance programmatically. Parameters ---------- db_port: int MongoDB database connection port. Defaults to 27017. db_name: str MongoDB database name. Defaults to "hyperopt-db". """ def __init__(self, db_name="hyperopt-db", db_port=27017): self.db_name = db_name self.db_port = db_port
[docs] def ensure_database_dir_exists(self): """Check if MongoDB database directory exists.""" if not os.path.exists(f"{self.db_name}"): log.info(f"Creating MongoDB database dir {self.db_name}") os.makedirs(self.db_name, exist_ok=True)
[docs] def start(self): """Starts the MongoDB instance via `mongod` command.""" args = [ "mongod", "-quiet", "--dbpath", self.db_name, "--port", str(self.db_port), "--directoryperdb", ] try: mongod = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) log.info(f"Started MongoDB database {self.db_name}") return mongod except OSError as err: msg = f"Failed to execute {args}. Make sure you have MongoDB installed." raise EnvironmentError(msg) from err
[docs] def stop(self, mongod): """Stops `mongod` command.""" try: mongod.terminate() mongod.wait() log.info(f"Stopped mongod") except Exception as err: log.error(f"Failed to stop mongod: {err}")
[docs]class MongoFileTrials(MongoTrials): """ MongoDB implementation of :class:`n3fit.hyper_optimization.filetrials.FileTrials`. Parameters ---------- replica_path: path Replica folder as generated by n3fit. db_host: str MongoDB database connection host. Defaults to "localhost". db_port: int MongoDB database connection port. Defaults to 27017. db_name: str MongoDB database name. Defaults to "hyperopt-db". num_workers: int Number of MongoDB workers to be initiated concurrently. Defaults to 1. parameters: dict Dictionary of parameters on which we are doing hyperoptimization. Default to None. store_trial: bool If True, store data into json file. Default to True. """ def __init__( self, replica_path, db_host="localhost", db_port=27017, db_name="hyperopt-db", num_workers=1, parameters=None, *args, **kwargs, ): self.db_host = db_host self.db_port = str(db_port) self.db_name = db_name self.num_workers = num_workers self.mongotrials_arg = ( f"mongo://{self.db_host}:{self.db_port}/{self._process_db_name(self.db_name)}/jobs" ) self.workers = [] self.output_folder_name = replica_path.parts[-3] self._store_trial = False self._json_file = replica_path / "tries.json" self.database_tar_file = replica_path / f"{self.db_name}.tar.gz" self._parameters = parameters self._rstate = None self._dynamic_trials = [] super().__init__(self.mongotrials_arg, *args, **kwargs) def _process_db_name(self, db_name): """Checks if db_name contains a slash, indicating a "directory/db" format.""" if '/' in db_name: # Split the string by '/' and take the last part as the db name db_name_parts = db_name.split('/') db_name = db_name_parts[-1] return db_name @property def rstate(self): """Returns the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`.""" return self._rstate @rstate.setter def rstate(self, random_generator): """Sets the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`.""" self._rstate = random_generator def _set_dynamic_trials(self): """Converts self._trials to a dictionary and stores it in self._dynamic_trials.""" self._dynamic_trials = [convert_bson_to_dict(item) for item in self._trials]
[docs] def refresh(self): """Fetches data from mongo database and save to a json file.""" super().refresh() # convert BSON object to a dictionary self._set_dynamic_trials() # write json to disk if self._store_trial: local_trials = [] for idx, t in enumerate(self._dynamic_trials): local_trials.append(t) local_trials[idx]["misc"]["space_vals"] = space_eval_trial(self._parameters, t) all_to_str = json.dumps(local_trials, default=str) with open(self._json_file, "w") as f: f.write(all_to_str)
# like in `FileTrials` the two methods below are implemented to avoid writing to the database twice
[docs] def new_trial_ids(self, n): self._store_trial = False return super().new_trial_ids(n)
def _insert_trial_docs(self, docs): self._store_trial = True return super()._insert_trial_docs(docs)
[docs] def start_mongo_workers( self, workdir=None, exp_key=None, poll_interval=0.1, no_subprocesses=False, max_consecutive_failures=10, reserve_timeout=600, ): """Initiates all mongo workers simultaneously.""" # get the number of gpu cards, if any gpus_all_physical_list = get_physical_gpus() num_gpus_available = len(gpus_all_physical_list) if not num_gpus_available: log.warning("No GPUs found in the system.") # launch mongo workers for i in range(self.num_workers): # construct the command to start a hyperopt-mongo-worker args = [ "hyperopt-mongo-worker", "--mongo", f"{self.db_host}:{self.db_port}/{self.db_name}", ] if workdir: args.extend(["--workdir", workdir]) if exp_key: args.extend(["--exp-key", exp_key]) if poll_interval: args.extend(["--poll-interval", str(poll_interval)]) if max_consecutive_failures: args.extend(["--max-consecutive-failures", str(max_consecutive_failures)]) if reserve_timeout: args.extend(["--reserve-timeout", str(reserve_timeout)]) if no_subprocesses: args.append("--no-subprocesses") # start the worker as a subprocess try: my_env = os.environ.copy() if num_gpus_available: # set CUDA_VISIBLE_DEVICES environment variable # the GPU index assigned to each worker i is given by mod(i, num_gpus_available) my_env["CUDA_VISIBLE_DEVICES"] = str(i % num_gpus_available) # set tensorflow memory growth my_env["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" # avoid memory fragmentation issues? # my_env["TF_GPU_ALLOCATOR"] = "cuda_malloc_async" # create log files to redirect the mongo-workers output mongo_workers_logfile = f"mongo-worker_{i+1}_{self.output_folder_name}.log" with open(mongo_workers_logfile, mode='w', encoding="utf-8") as log_file: # run mongo workers worker = subprocess.Popen( args, env=my_env, stdout=log_file, stderr=subprocess.STDOUT ) self.workers.append(worker) log.info(f"Started mongo worker {i+1}/{self.num_workers}") except OSError as err: msg = f"Failed to execute {args}. Make sure you have MongoDB installed." raise EnvironmentError(msg) from err
[docs] def stop_mongo_workers(self): """Terminates all active mongo workers.""" for worker in self.workers: try: worker.terminate() worker.wait() log.info(f"Stopped mongo worker {self.workers.index(worker)+1}/{self.num_workers}") except Exception as err: log.error( f"Failed to stop mongo worker {self.workers.index(worker)+1}/{self.num_workers}: {err}" )
[docs] def compress_mongodb_database(self): """Saves MongoDB database as tar file""" # check if the database exist if not os.path.exists(f"{self.db_name}"): raise FileNotFoundError( f"The MongoDB database directory '{self.db_name}' does not exist. " "Ensure it has been initiated correctly and it is in your path." ) # create the tar.gz file try: log.info(f"Compressing MongoDB database into {self.database_tar_file}") with tarfile.open(self.database_tar_file, "w:gz") as tar: tar.add(self.db_name) except tarfile.TarError as err: raise RuntimeError(f"Error compressing the database: {err}")
[docs] @staticmethod def extract_mongodb_database(database_tar_file, path=os.getcwd()): """Untar MongoDB database for use in restarts.""" # check if the database tar file exist if not os.path.exists(f"{database_tar_file}"): raise FileNotFoundError( f"The MongoDB database tar file '{database_tar_file}' does not exist." ) # check of the provided file is a tar type if not tarfile.is_tarfile(database_tar_file): raise tarfile.ReadError( f"The file '{database_tar_file}' provided is not a tar file type." ) # extract tar file try: log.info(f"Extracting MongoDB database {database_tar_file} to {path}") with tarfile.open(f"{database_tar_file}") as tar: tar.extractall(path) except tarfile.TarError as err: raise RuntimeError(f"Error extracting the database: {err}")