"""
Hyperopt trial object for parallel hyperoptimization with MongoDB.
Data are fetched from MongoDB databases and stored in the form of json and tar.gz files within the nnfit folder.
"""
import json
import logging
import os
import subprocess
import tarfile
from bson import SON, ObjectId
from hyperopt.mongoexp import MongoTrials
from n3fit.backends import get_physical_gpus
from n3fit.hyper_optimization.filetrials import space_eval_trial
log = logging.getLogger(__name__)
[docs]
def convert_bson_to_dict(obj):
"""
Recursively convert a BSON object to a standard Python dictionary.
This function is particularly useful for converting MongoDB query results,
which may contain BSON types like ObjectId and SON, into a more manageable
dictionary format.
Parameters
----------
obj : dict or bson.SON or list or any
The object to convert. Can be a BSON object (like SON), a dictionary
containing BSON types, a list of such objects, or any other type.
Returns
-------
dict or list or any
A Python dictionary with all BSON types converted to standard Python
types (e.g., ObjectId converted to string). If the input is a list,
returns a list of converted elements. For other types, returns the
object as is.
Examples
--------
>>> from bson import ObjectId, SON
>>> sample_son = SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')])
>>> convert_bson_to_dict(sample_son)
{'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'}
>>> sample_list = [SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]), {'age': 30}]
>>> convert_bson_to_dict(sample_list)
[{'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'}, {'age': 30}]
"""
if isinstance(obj, (SON, dict)):
return {k: convert_bson_to_dict(v) for k, v in obj.items()}
if isinstance(obj, ObjectId):
return str(obj) # or just return None if you don't need the ObjectId
if isinstance(obj, list):
return [convert_bson_to_dict(v) for v in obj]
return obj
[docs]
class MongodRunner:
"""Class to manage a MongoDB instance.
This class is responsible for automatically creating and managing a MongoDB database
using the `mongod` command. It allows for starting and stopping a MongoDB instance
programmatically.
Parameters
----------
db_port: int
MongoDB database connection port. Defaults to 27017.
db_name: str
MongoDB database name. Defaults to "hyperopt-db".
"""
def __init__(self, db_name="hyperopt-db", db_port=27017):
self.db_name = db_name
self.db_port = db_port
[docs]
def ensure_database_dir_exists(self):
"""Check if MongoDB database directory exists."""
if not os.path.exists(f"{self.db_name}"):
log.info(f"Creating MongoDB database dir {self.db_name}")
os.makedirs(self.db_name, exist_ok=True)
[docs]
def start(self):
"""Starts the MongoDB instance via `mongod` command."""
args = [
"mongod",
"-quiet",
"--dbpath",
self.db_name,
"--port",
str(self.db_port),
"--directoryperdb",
]
try:
mongod = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
log.info(f"Started MongoDB database {self.db_name}")
return mongod
except OSError as err:
msg = f"Failed to execute {args}. Make sure you have MongoDB installed."
raise EnvironmentError(msg) from err
[docs]
def stop(self, mongod):
"""Stops `mongod` command."""
try:
mongod.terminate()
mongod.wait()
log.info(f"Stopped mongod")
except Exception as err:
log.error(f"Failed to stop mongod: {err}")
[docs]
class MongoFileTrials(MongoTrials):
"""
MongoDB implementation of :class:`n3fit.hyper_optimization.filetrials.FileTrials`.
Parameters
----------
replica_path: path
Replica folder as generated by n3fit.
db_host: str
MongoDB database connection host. Defaults to "localhost".
db_port: int
MongoDB database connection port. Defaults to 27017.
db_name: str
MongoDB database name. Defaults to "hyperopt-db".
num_workers: int
Number of MongoDB workers to be initiated concurrently. Defaults to 1.
parameters: dict
Dictionary of parameters on which we are doing hyperoptimization. Default to None.
store_trial: bool
If True, store data into json file. Default to True.
"""
def __init__(
self,
replica_path,
db_host="localhost",
db_port=27017,
db_name="hyperopt-db",
num_workers=1,
parameters=None,
*args,
**kwargs,
):
self.db_host = db_host
self.db_port = str(db_port)
self.db_name = db_name
self.num_workers = num_workers
self.mongotrials_arg = (
f"mongo://{self.db_host}:{self.db_port}/{self._process_db_name(self.db_name)}/jobs"
)
self.workers = []
self.output_folder_name = replica_path.parts[-3]
self._store_trial = False
self._json_file = replica_path / "tries.json"
self.database_tar_file = replica_path / f"{self.db_name}.tar.gz"
self._parameters = parameters
self._rstate = None
self._dynamic_trials = []
super().__init__(self.mongotrials_arg, *args, **kwargs)
def _process_db_name(self, db_name):
"""Checks if db_name contains a slash, indicating a "directory/db" format."""
if '/' in db_name:
# Split the string by '/' and take the last part as the db name
db_name_parts = db_name.split('/')
db_name = db_name_parts[-1]
return db_name
@property
def rstate(self):
"""Returns the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`."""
return self._rstate
@rstate.setter
def rstate(self, random_generator):
"""Sets the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`."""
self._rstate = random_generator
def _set_dynamic_trials(self):
"""Converts self._trials to a dictionary and stores it in self._dynamic_trials."""
self._dynamic_trials = [convert_bson_to_dict(item) for item in self._trials]
[docs]
def refresh(self):
"""Fetches data from mongo database and save to a json file."""
super().refresh()
# convert BSON object to a dictionary
self._set_dynamic_trials()
# write json to disk
if self._store_trial:
local_trials = []
for idx, t in enumerate(self._dynamic_trials):
local_trials.append(t)
local_trials[idx]["misc"]["space_vals"] = space_eval_trial(self._parameters, t)
all_to_str = json.dumps(local_trials, default=str)
with open(self._json_file, "w") as f:
f.write(all_to_str)
# like in `FileTrials` the two methods below are implemented to avoid writing to the database twice
[docs]
def new_trial_ids(self, n):
self._store_trial = False
return super().new_trial_ids(n)
def _insert_trial_docs(self, docs):
self._store_trial = True
return super()._insert_trial_docs(docs)
[docs]
def start_mongo_workers(
self,
workdir=None,
exp_key=None,
poll_interval=0.1,
no_subprocesses=False,
max_consecutive_failures=10,
reserve_timeout=600,
):
"""Initiates all mongo workers simultaneously."""
# get the number of gpu cards, if any
gpus_all_physical_list = get_physical_gpus()
num_gpus_available = len(gpus_all_physical_list)
if not num_gpus_available:
log.warning("No GPUs found in the system.")
# launch mongo workers
for i in range(self.num_workers):
# construct the command to start a hyperopt-mongo-worker
args = [
"hyperopt-mongo-worker",
"--mongo",
f"{self.db_host}:{self.db_port}/{self.db_name}",
]
if workdir:
args.extend(["--workdir", workdir])
if exp_key:
args.extend(["--exp-key", exp_key])
if poll_interval:
args.extend(["--poll-interval", str(poll_interval)])
if max_consecutive_failures:
args.extend(["--max-consecutive-failures", str(max_consecutive_failures)])
if reserve_timeout:
args.extend(["--reserve-timeout", str(reserve_timeout)])
if no_subprocesses:
args.append("--no-subprocesses")
# start the worker as a subprocess
try:
my_env = os.environ.copy()
if num_gpus_available:
# set CUDA_VISIBLE_DEVICES environment variable
# the GPU index assigned to each worker i is given by mod(i, num_gpus_available)
my_env["CUDA_VISIBLE_DEVICES"] = str(i % num_gpus_available)
# set tensorflow memory growth
my_env["TF_FORCE_GPU_ALLOW_GROWTH"] = "true"
# avoid memory fragmentation issues?
# my_env["TF_GPU_ALLOCATOR"] = "cuda_malloc_async"
# create log files to redirect the mongo-workers output
mongo_workers_logfile = f"mongo-worker_{i+1}_{self.output_folder_name}.log"
with open(mongo_workers_logfile, mode='w', encoding="utf-8") as log_file:
# run mongo workers
worker = subprocess.Popen(
args, env=my_env, stdout=log_file, stderr=subprocess.STDOUT
)
self.workers.append(worker)
log.info(f"Started mongo worker {i+1}/{self.num_workers}")
except OSError as err:
msg = f"Failed to execute {args}. Make sure you have MongoDB installed."
raise EnvironmentError(msg) from err
[docs]
def stop_mongo_workers(self):
"""Terminates all active mongo workers."""
for worker in self.workers:
try:
worker.terminate()
worker.wait()
log.info(f"Stopped mongo worker {self.workers.index(worker)+1}/{self.num_workers}")
except Exception as err:
log.error(
f"Failed to stop mongo worker {self.workers.index(worker)+1}/{self.num_workers}: {err}"
)
[docs]
def compress_mongodb_database(self):
"""Saves MongoDB database as tar file"""
# check if the database exist
if not os.path.exists(f"{self.db_name}"):
raise FileNotFoundError(
f"The MongoDB database directory '{self.db_name}' does not exist. "
"Ensure it has been initiated correctly and it is in your path."
)
# create the tar.gz file
try:
log.info(f"Compressing MongoDB database into {self.database_tar_file}")
with tarfile.open(self.database_tar_file, "w:gz") as tar:
tar.add(self.db_name)
except tarfile.TarError as err:
raise RuntimeError(f"Error compressing the database: {err}")