Source code for n3fit.hyper_optimization.mongofiletrials

"""
Hyperopt trial object for parallel hyperoptimization with MongoDB.
Data are fetched from MongoDB databases and stored within the nnfit folder.

The workflow when running parallel hyperopt with mongodb is as follows:
    1. Submit the main "server" job. This can be submitted as part of n3fit or as a separate job,
    e.g., in the login node, running only the database.
    The main server job can run itself a worker job.
    2. Submit the worker jobs. The workers each run a single job.

Each of the workers need to run with the --parallel-hyperopt flag and --hyperopt <number of trials>,
where the number of trials is per-worker (so, for 1000 each, 2 workers, will give 2000 trials total).
Each job will first try to connect to the database and, failing that, will try to start the database.

The database is stored in the folder for the replica 1 (together with the trials) so that it gets
stored with vp-upload, allowing for a continuation of the fit.
"""

import json
import logging
import os
from pathlib import Path
import platform
import subprocess
import time

try:
    from bson import SON, ObjectId
    from hyperopt.mongoexp import MongoTrials
except ModuleNotFoundError:
    SON = object()
    from . import HyperoptDependencyMissing as MongoTrials

    ObjectId = object()

from n3fit.backends import get_physical_gpus
from n3fit.hyper_optimization.filetrials import space_eval_trial

log = logging.getLogger(__name__)


[docs]def convert_bson_to_dict(obj): """ Recursively convert a BSON object to a standard Python dictionary. This function is particularly useful for converting MongoDB query results, which may contain BSON types like ObjectId and SON, into a more manageable dictionary format. Parameters ---------- obj : dict or bson.SON or list or any The object to convert. Can be a BSON object (like SON), a dictionary containing BSON types, a list of such objects, or any other type. Returns ------- dict or list or any A Python dictionary with all BSON types converted to standard Python types (e.g., ObjectId converted to string). If the input is a list, returns a list of converted elements. For other types, returns the object as is. Examples -------- >>> from bson import ObjectId, SON >>> sample_son = SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]) >>> convert_bson_to_dict(sample_son) {'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'} >>> sample_list = [SON([('_id', ObjectId('507f1f77bcf86cd799439011')), ('name', 'John Doe')]), {'age': 30}] >>> convert_bson_to_dict(sample_list) [{'_id': '507f1f77bcf86cd799439011', 'name': 'John Doe'}, {'age': 30}] """ if isinstance(obj, (SON, dict)): return {k: convert_bson_to_dict(v) for k, v in obj.items()} if isinstance(obj, ObjectId): return str(obj) # or just return None if you don't need the ObjectId if isinstance(obj, list): return [convert_bson_to_dict(v) for v in obj] return obj
[docs]class MongodRunner: """Class to manage a MongoDB instance. This class is responsible for automatically creating and managing a MongoDB database using the `mongod` command. It allows for starting and stopping a MongoDB instance programmatically. Parameters ---------- db_port: int MongoDB database connection port. Defaults to 27017. db_host: str hostname of the database db_name: str MongoDB database name. Defaults to "hyperopt-db". """ def __init__(self, db_path="hyperopt-db", db_host=None, db_port=27017): self.db_path = db_path self.db_port = db_port self.db_host = db_host self._runner_job = None
[docs] def is_up(self): """Checks whether the database is up.""" from pymongo import MongoClient from pymongo.errors import ServerSelectionTimeoutError # If the db doesn't exist, and we didn't get a target to find it, no need to look further if not self.db_path.exists() and self.db_host is None: return False # If the db exists, check whether we know how to connect: if self.db_host is None: if (hostfile := self.db_path.with_suffix(".hostname")).exists(): possible_host = hostfile.read_text() else: possible_host = self.db_host mc = MongoClient(host=possible_host, port=self.db_port, serverSelectionTimeoutMS=100) try: mc.admin.command("ismaster") self.db_host = possible_host log.info(f"Database up at {self.db_host}:{self.db_port}") except ServerSelectionTimeoutError: mc.close() return False return True
[docs] def start(self): """Starts the MongoDB instance via `mongod` command.""" args = [ "mongod", "-quiet", "--wiredTigerCacheSizeGB", # NB: 16GB seemed reasonable for cineca's cluster, not benchmarked beyond that os.environ.get("HYPEROPT_MONGO_CACHE_SIZE", "16"), "--dbpath", self.db_path, "--port", str(self.db_port), "--bind_ip_all", ] try: self.db_path.mkdir(exist_ok=True, parents=True) mongod = subprocess.Popen(args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) cmd_str = " ".join([str(i) for i in args]) # The job starting the database gets to write the hostname down # unless it has been set explicitly, in that case trust the user and use that if self.db_host is None: self.db_host = platform.node() self.db_path.with_suffix(".hostname").write_text(self.db_host) log.info(f"Started MongoDB database at {self.db_host}:{self.db_path} with {cmd_str}") self._runner_job = mongod except OSError as err: msg = f"Failed to execute {args}. Make sure you have MongoDB installed." self.db_path.with_suffix(".hostname").unlink(missing_ok=True) raise EnvironmentError(msg) from err
[docs] def stop(self): """Stops `mongod` command.""" if self._runner_job is None: return try: self._runner_job.terminate() self._runner_job.wait() log.info("Stopped mongod") except Exception as err: log.error(f"Failed to stop mongod: {err}")
def __enter__(self): if not self.is_up(): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop()
[docs]class MongoFileTrials(MongoTrials): """ MongoDB implementation of :class:`n3fit.hyper_optimization.filetrials.FileTrials`. Parameters ---------- replica_path: path Replica folder as generated by n3fit. mongod_runner: :py:class:MongodRunner Instance of MongodRunner with parameters such as db_host or db_port defining the database num_workers: int Number of MongoDB workers to be initiated concurrently. Defaults to 1. parameters: dict Dictionary of parameters on which we are doing hyperoptimization. Default to None. store_trial: bool If True, store data into json file. Default to True. """ def __init__( self, replica_path, mongod_runner, *args, num_workers=1, parameters=None, **kwargs ): self.num_workers = num_workers # Define the connection string db_name = Path(mongod_runner.db_path).name host = mongod_runner.db_host port = mongod_runner.db_port self._mongo_str = f"{host}:{port}/{db_name}" self.workers = [] self._runname = replica_path.parts[-3] # corresponds to the runcard/runfolder name self._store_trial = False self._json_file = replica_path / "tries.json" self._parameters = parameters self._rstate = None self._dynamic_trials = [] super().__init__(f"mongo://{self._mongo_str}/jobs", *args, **kwargs) @property def rstate(self): """Returns the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`.""" return self._rstate @rstate.setter def rstate(self, random_generator): """Sets the rstate attribute; see :class:`n3fit.hyper_optimization.filetrials.FileTrials`.""" self._rstate = random_generator def _set_dynamic_trials(self): """Converts self._trials to a dictionary and stores it in self._dynamic_trials.""" self._dynamic_trials = [convert_bson_to_dict(item) for item in self._trials]
[docs] def refresh(self): """Fetches data from mongo database and save to a json file.""" super().refresh() # convert BSON object to a dictionary self._set_dynamic_trials() # write json to disk if self._store_trial: local_trials = [] for idx, t in enumerate(self._dynamic_trials): local_trials.append(t) local_trials[idx]["misc"]["space_vals"] = space_eval_trial(self._parameters, t) all_to_str = json.dumps(local_trials, default=str) with open(self._json_file, "w") as f: f.write(all_to_str)
# like in `FileTrials` the two methods below are implemented to avoid writing to the database twice
[docs] def new_trial_ids(self, n): self._store_trial = False return super().new_trial_ids(n)
def _insert_trial_docs(self, docs): self._store_trial = True return super()._insert_trial_docs(docs)
[docs] def start_mongo_workers( self, workdir=None, exp_key=None, poll_interval=0.1, no_subprocesses=False, max_consecutive_failures=10, reserve_timeout=600, ): """Initiates all mongo workers simultaneously.""" # get the number of gpu cards, if any gpus_all_physical_list = get_physical_gpus() num_gpus_available = len(gpus_all_physical_list) if not num_gpus_available: log.warning("No GPUs found in the system.") # construct the command to start a hyperopt-mongo-worker args = ["hyperopt-mongo-worker", "--mongo", self._mongo_str] if workdir: args.extend(["--workdir", workdir]) if exp_key: args.extend(["--exp-key", exp_key]) if poll_interval: args.extend(["--poll-interval", str(poll_interval)]) if max_consecutive_failures: args.extend(["--max-consecutive-failures", str(max_consecutive_failures)]) if reserve_timeout: args.extend(["--reserve-timeout", str(reserve_timeout)]) if no_subprocesses: args.append("--no-subprocesses") # start the worker as a subprocess try: my_env = os.environ.copy() my_env["TF_FORCE_GPU_ALLOW_GROWTH"] = "true" # create log files to redirect the mongo-workers output mongo_workers_logfile = f"mongo-worker_{self._runname}_{time.time()}.log" with open(mongo_workers_logfile, mode='w', encoding="utf-8") as log_file: # run mongo workers worker = subprocess.Popen( args, env=my_env, stdout=log_file, stderr=subprocess.STDOUT ) self.workers.append(worker) log.info("Started mongo worker") except OSError as err: msg = f"Failed to execute {args}. Make sure you have MongoDB installed." raise EnvironmentError(msg) from err
[docs] def stop_mongo_workers(self): """Terminates all active mongo workers.""" for worker in self.workers: try: worker.terminate() worker.wait() log.info(f"Stopped mongo worker {self.workers.index(worker)+1}/{self.num_workers}") except Exception as err: log.error( f"Failed to stop mongo worker {self.workers.index(worker)+1}/{self.num_workers}: {err}" )