Source code for n3fit.scripts.n3fit_exec

#!/usr/bin/env python
"""
    n3fit - performs fit using ml external frameworks
"""

import argparse
import logging
import pathlib
import re
import shutil
import sys

from ruamel.yaml import error

from reportengine import colors
from reportengine.namespaces import NSList
from validphys.app import App
from validphys.config import Config, ConfigError, Environment, EnvironmentError_
from validphys.core import FitSpec
from validphys.utils import yaml_safe

N3FIT_FIXED_CONFIG = dict(use_cuts='internal', use_t0=True, actions_=[])

FIT_NAMESPACE = "datacuts::theory::fitting "
CLOSURE_NAMESPACE = "datacuts::theory::closuretest::fitting "

N3FIT_PROVIDERS = [
    "n3fit.performfit",
    "n3fit.n3fit_checks_provider",
    "validphys.results",
    "validphys.n3fit_data",
    "validphys.pseudodata",
    "validphys.covmats",
    "validphys.commondata",
]

log = logging.getLogger(__name__)

RUNCARD_COPY_FILENAME = "filter.yml"
INPUT_FOLDER = "input"
TAB_FOLDER = "tables"


[docs] class N3FitError(Exception): """Exception raised when n3fit cannot succeed and knows why""" pass
[docs] class N3FitEnvironment(Environment): """Container for information to be filled at run time"""
[docs] def init_output(self): # check file exists, is a file, has extension. if not self.config_yml.exists(): raise N3FitError("Invalid runcard. File not found.") else: if not self.config_yml.is_file(): raise N3FitError("Invalid runcard. Must be a file.") # check if results folder exists self.output_path = pathlib.Path(self.output_path).absolute() if not (self.output_path / "nnfit").is_dir(): if not re.fullmatch(r"[\w.\-]+", self.output_path.name): raise N3FitError("Invalid output folder name. Must be alphanumeric.") try: self.output_path.mkdir(exist_ok=True) (self.output_path / "nnfit").mkdir(exist_ok=True) except OSError as e: raise EnvironmentError_(e) from e try: shutil.copy2(self.config_yml, self.output_path / RUNCARD_COPY_FILENAME) except shutil.SameFileError: pass # create output folder for the fit self.replica_path = self.output_path / "nnfit" for replica in self.replicas: path = self.replica_path / f"replica_{replica}" log.info(f"Creating replica output folder in {path}") try: path.mkdir(exist_ok=True) except OSError as e: raise EnvironmentError_(e) from e # place tables in last replica path, folder already exists. self.table_folder = path # make lockfile input inside of replica folder # avoid conflict with setupfit self.input_folder = self.replica_path / INPUT_FOLDER self.input_folder.mkdir(exist_ok=True)
[docs] @classmethod def ns_dump_description(cls): return { "replicas": "The MC replica number/s", "replica_path": "The replica output path", "output_path": "The runcard name", "hyperopt": "The hyperopt flag", **super().ns_dump_description(), }
[docs] class N3FitConfig(Config): """Specialization for yaml parsing"""
[docs] @classmethod def from_yaml(cls, o, *args, **kwargs): try: file_content = yaml_safe.load(o) except error.YAMLError as e: raise ConfigError(f"Failed to parse yaml file: {e}") if not isinstance(file_content, dict): raise ConfigError( f"Expecting input runcard to be a mapping, not '{type(file_content)}'." ) if file_content.get('closuretest') is not None: namespace = CLOSURE_NAMESPACE else: namespace = FIT_NAMESPACE N3FIT_FIXED_CONFIG['actions_'].append(namespace + "performfit") if fps := file_content["fitting"].get("savepseudodata", True): if fps != True: raise TypeError(f"fitting::savepseudodata is neither True nor False ({fps})") if len(kwargs["environment"].replicas) != 1: raise ConfigError( "Cannot request that multiple replicas are fitted and that " "pseudodata is saved. Either set `fitting::savepseudodata` " "to `false` or fit replicas one at a time." ) # take same namespace configuration on the pseudodata_table action. training_action = namespace + "training_pseudodata" validation_action = namespace + "validation_pseudodata" N3FIT_FIXED_CONFIG['actions_'].extend((training_action, validation_action)) if thconfig := file_content.get('fiatlux'): N3FIT_FIXED_CONFIG['fiatlux'] = thconfig else: N3FIT_FIXED_CONFIG['fiatlux'] = None if thconfig := file_content.get('positivity_bound'): N3FIT_FIXED_CONFIG['positivity_bound'] = thconfig else: N3FIT_FIXED_CONFIG['positivity_bound'] = None # Theorycovmat flags and defaults N3FIT_FIXED_CONFIG['use_thcovmat_in_fitting'] = False N3FIT_FIXED_CONFIG['use_thcovmat_in_sampling'] = False if (thconfig := file_content.get('theorycovmatconfig')) is not None: N3FIT_FIXED_CONFIG['use_thcovmat_in_fitting'] = thconfig.get( 'use_thcovmat_in_fitting', True ) N3FIT_FIXED_CONFIG['use_thcovmat_in_sampling'] = thconfig.get( 'use_thcovmat_in_sampling', True ) # Fitting flag file_content.update(N3FIT_FIXED_CONFIG) return cls(file_content, *args, **kwargs)
[docs] def produce_fit(self): """Produces a FitSpec which points at the current fit, to load fit_commondata from. """ fitpath = self.environment.output_path return FitSpec(fitpath.name, fitpath)
[docs] def parse_fakedata(self, fakedata: bool): """Parses the `fakedata` key from the closuretest namespace, if True then use generated closure test in fit """ if fakedata: log.warning("using filtered closure data") if not (self.environment.output_path / 'filter').is_dir(): raise ConfigError( "Could not find filter result at " f"{self.environment.output_path/'filter'} " "to load commondata from. Did you run filter on the " "runcard?" ) return fakedata
[docs] def produce_use_fitcommondata(self, fakedata): """Produces the `use_fitcommondata` key from the `fakedata` key in `closuretest` namespace """ return fakedata
[docs] def produce_kfold_parameters(self, kfold=None, hyperopt=None): """Return None even if there are kfolds in the runcard if the hyperopt flag is not active""" if hyperopt is not None: return kfold return None
[docs] def produce_kpartitions(self, kfold_parameters): if kfold_parameters: partitions = kfold_parameters["partitions"] # Note that one of the partitions could be empty ([]) or, by yaml usual notation, None for partition in partitions: if partition["datasets"] is None: partition["datasets"] = [] return partitions return None
[docs] def produce_hyperscanner(self, parameters, hyperscan_config=None, hyperopt=None): """For a hyperparameter scan to be run, a hyperscanner must be constructed from the original hyperscan_config""" # This needs to be imported here because it needs Tensorflow and n3fit from n3fit.hyper_optimization.hyper_scan import HyperScanner if hyperscan_config is None or hyperopt is None: return None if hyperopt and self.environment.restart: hyperscan_config.update({'restart': 'true'}) if hyperopt and self.environment.parallel_hyperopt: hyperscan_config.update({'parallel': 'true'}) hyperscan_config.update( { 'db_host': self.environment.db_host, 'db_port': self.environment.db_port, 'db_name': self.environment.db_name, 'output_path': self.environment.output_path.name, 'num_mongo_workers': self.environment.num_mongo_workers, } ) return HyperScanner(parameters, hyperscan_config)
[docs] class N3FitApp(App): """The class which parsers and performs the fit""" environment_class = N3FitEnvironment config_class = N3FitConfig def __init__(self): super().__init__(name="n3fit", providers=N3FIT_PROVIDERS) @property def argparser(self): parser = super().argparser parser.add_argument( "-o", "--output", help="Output folder and name of the fit", default=None ) def check_positive(value): ivalue = int(value) if ivalue <= 0: raise argparse.ArgumentTypeError("%s is an invalid positive int value." % value) return ivalue parser.add_argument("--hyperopt", help="Enable hyperopt scan", default=None, type=int) parser.add_argument("--restart", help="Enable hyperopt restarts", action="store_true") parser.add_argument( "--parallel-hyperopt", help="Enable hyperopt run in parallel with MongoDB", action="store_true", ) parser.add_argument("--db-host", help="MongoDB host", default="localhost") parser.add_argument("--db-port", help="MongoDB port", default=27017) parser.add_argument("--db-name", help="MongoDB dataset name", default="hyperopt-db") parser.add_argument( "--num-mongo-workers", help="Number of mongo workers to be launched simultaneously", type=check_positive, default=1, ) parser.add_argument("replica", help="MC replica number", type=check_positive) parser.add_argument( "-r", "--replica_range", help="End of the range of replicas to compute", type=check_positive, ) return parser
[docs] def get_commandline_arguments(self, cmdline=None): args = super().get_commandline_arguments(cmdline) # Validate dependencies related to the --hyperopt argument if args["hyperopt"] is None: if args["restart"]: raise argparse.ArgumentError( None, "The --restart option requires --hyperopt to be set." ) if args["parallel_hyperopt"]: raise argparse.ArgumentError( None, "The --parallel-hyperopt option requires --hyperopt to be set." ) if args["output"] is None: args["output"] = pathlib.Path(args["config_yml"]).stem return args
[docs] def run(self): try: self.environment.config_yml = pathlib.Path(self.args["config_yml"]).absolute() replica = self.args["replica"] if self.args["replica_range"]: replicas = list(range(replica, self.args["replica_range"] + 1)) else: replicas = [replica] self.environment.replicas = NSList(replicas, nskey="replica") self.environment.hyperopt = self.args["hyperopt"] self.environment.restart = self.args["restart"] self.environment.parallel_hyperopt = self.args["parallel_hyperopt"] self.environment.db_host = self.args["db_host"] self.environment.db_port = self.args["db_port"] self.environment.db_name = self.args["db_name"] self.environment.num_mongo_workers = self.args["num_mongo_workers"] super().run() except N3FitError as e: log.error(f"Error in n3fit:\n{e}") sys.exit(1) except Exception as e: log.critical(f"Bug in n3fit ocurred. Please report it.") print(colors.color_exception(e.__class__, e, e.__traceback__), file=sys.stderr) sys.exit(1)
[docs] def main(): a = N3FitApp() a.main()
if __name__ == "__main__": main()