Source code for n3fit.model_trainer

"""
    The ModelTrainer class is the true driver around the n3fit code

    This class is initialized with all information about the NN, inputs and outputs.
    The construction of the NN and the fitting is performed at the same time when the
    hyperparametrizable method of the function is called.

    This allows to use hyperscanning libraries, that need to change the parameters of the network
    between iterations while at the same time keeping the amount of redundant calls to a minimum
"""

from collections import namedtuple
from itertools import zip_longest
import logging

import numpy as np

from n3fit import model_gen
from n3fit.backends import NN_LAYER_ALL_REPLICAS, MetaModel, callbacks, clear_backend_state
from n3fit.backends import operations as op
from n3fit.hyper_optimization.hyper_scan import HYPEROPT_STATUSES
import n3fit.hyper_optimization.penalties
import n3fit.hyper_optimization.rewards
from n3fit.hyper_optimization.rewards import HyperLoss
from n3fit.scaler import generate_scaler
from n3fit.stopping import Stopping
from n3fit.vpinterface import N3PDF, compute_phi
from validphys.core import DataGroupSpec
from validphys.photon.compute import Photon

log = logging.getLogger(__name__)

# Threshold defaults
# Any partition with a chi2 over the threshold will discard its hyperparameters
HYPER_THRESHOLD = 50.0
CHI2_THRESHOLD = 10.0
# Each how many epochs do we increase the positivitiy Lagrange Multiplier
PUSH_POSITIVITY_EACH = 100

# Each how many epochs do we increase the integrability Lagrange Multiplier
PUSH_INTEGRABILITY_EACH = 100

# See ModelTrainer::_xgrid_generation for the definition of each field and how they are generated
InputInfo = namedtuple("InputInfo", ["input", "split", "idx"])


def _pdf_injection(pdf_layers, observables, masks):
    """
    Takes as input a list of PDF layers each corresponding to one observable (also given as a list)
    And (where neded) a mask to select the output.
    Returns a list of obs(pdf).
    Note that the list of masks don't need to be the same size as the list of layers/observables
    """
    return [f(x, mask=m) for f, x, m in zip_longest(observables, pdf_layers, masks)]


def _LM_initial_and_multiplier(input_initial, input_multiplier, max_lambda, steps):
    """
    If any of input_initial or input_multiplier is None this function computes
    the missing values taking as input the maximum lambda multiplier and the number of steps needed
    to reach the maximum number of epochs
    """
    initial = input_initial
    multiplier = input_multiplier
    # If the multiplier is None, compute it from known values
    if multiplier is None:
        # If the initial value is also None, set it to one
        if initial is None:
            initial = 1.0
        multiplier = pow(max_lambda / initial, 1 / max(steps, 1))
    elif initial is None:
        # Select the necessary initial value to get to max_lambda after all steps
        initial = max_lambda / pow(multiplier, steps)
    return initial, multiplier


[docs] class ModelTrainer: """ ModelTrainer Class: Wrapper around the fitting code and the generation of the Neural Network When the "hyperparametrizable"* function is called with a dictionary of parameters, it generates a NN and subsequentially performs a fit. The motivation behind this class is minimising the amount of redundant calls of each hyperopt run, in particular this allows to completely reset the NN at the beginning of each iteration reusing some of the previous work. *called in this way because it accept a dictionary of hyper-parameters which defines the Neural Network """ def __init__( self, experiments_data, exp_info, pos_info, integ_info, flavinfo, fitbasis, nnseeds, boundary_condition, debug=False, kfold_parameters=None, max_cores=None, model_file=None, sum_rules=None, theoryid=None, lux_params=None, replicas=None, ): """ Parameters ---------- experiments_data: list list of `validphys.core.DataGroupSpec` containing experiments exp_info: list list of dictionaries containing experiments pos_info: list list of dictionaries containing positivity sets integ_info: list list of dictionaries containing integrability sets flavinfo: list the object returned by fitting['basis'] fitbasis: str the name of the basis being fitted nnseeds: list(int) the seed used to initialise the NN for each model to be passed to model_gen debug: bool flag to activate some debug options kfold_parameters: dict parameters defining the kfolding method max_cores: int maximum number of cores the fitting can use to run model_file: str whether to save the models sum_rules: str whether sum rules should be enabled (All, MSR, VSR, False) theoryid: validphys.core.TheoryIDSpec object contining info for generating the photon lux_params: dict dictionary containing the params needed from LuxQED replicas: list list with the replicas ids to be fitted """ # Save all input information self.exp_info = list(exp_info) self.pos_info = [] if pos_info is None else pos_info self.integ_info = [] if integ_info is None else integ_info self.all_info = self.exp_info[0] + self.pos_info + self.integ_info self.boundary_condition = boundary_condition self.flavinfo = flavinfo self.fitbasis = fitbasis self._nn_seeds = nnseeds self.debug = debug self.all_datasets = [] self._scaler = None self.theoryid = theoryid self.lux_params = lux_params self.replicas = replicas self.experiments_data = experiments_data # Initialise internal variables which define behaviour if debug: self.max_cores = 1 else: self.max_cores = max_cores self.model_file = model_file self.print_summary = True self.mode_hyperopt = False self.impose_sumrule = sum_rules self._hyperkeys = None if kfold_parameters is None: self.kpartitions = [None] self.hyper_threshold = None else: self.kpartitions = kfold_parameters["partitions"] self.hyper_threshold = kfold_parameters.get("threshold", HYPER_THRESHOLD) # if there are penalties enabled, set them up penalties = kfold_parameters.get("penalties", []) self.hyper_penalties = [] for penalty in penalties: pen_fun = getattr(n3fit.hyper_optimization.penalties, penalty) self.hyper_penalties.append(pen_fun) log.info("Adding penalty: %s", penalty) # Check what is the hyperoptimization target function replica_statistic = kfold_parameters.get("replica_statistic", None) fold_statistic = kfold_parameters.get("fold_statistic", None) loss_type = kfold_parameters.get("loss_type", None) self._hyper_loss = HyperLoss( loss_type=loss_type, replica_statistic=replica_statistic, fold_statistic=fold_statistic, penalties_in_loss=kfold_parameters.get("penalties_in_loss", False), ) # Initialize the dictionaries which contain all fitting information self.input_list = [] self.training = { "output": [], "expdata": [], "ndata": 0, "model": None, "posdatasets": [], "posmultipliers": [], "posinitials": [], "integdatasets": [], "integmultipliers": [], "integinitials": [], "folds": [], } self.validation = { "output": [], "expdata": [], "ndata": 0, "model": None, "folds": [], "posdatasets": [], } self.experimental = {"output": [], "expdata": [], "ndata": 0, "model": None, "folds": []} self.tr_masks = [] self._fill_the_dictionaries() if self.validation["ndata"] == 0: # If there is no validation, the validation chi2 = training chi2 self.no_validation = True self.validation["expdata"] = self.training["expdata"] else: # Consider the validation only if there is validation (of course) self.no_validation = False self.callbacks = [] if debug: self.callbacks.append(callbacks.TimerCallback())
[docs] def set_hyperopt(self, hyperopt_on, keys=None): """Set hyperopt options on and off (mostly suppresses some printing)""" if keys is None: keys = [] self._hyperkeys = keys if hyperopt_on: self.print_summary = False self.mode_hyperopt = True else: self.print_summary = True self.mode_hyperopt = False
########################################################################### # # Internal functions # # Never to be called from the dark and cold outside world # ########################################################################### def _fill_the_dictionaries(self): """ This function fills the following dictionaries -``training``: data for the fit -``validation``: data which for the stopping -``experimental``: 'true' data, only used for reporting purposes with fixed information. Fixed information: information which will not change between different runs of the code. This information does not depend on the parameters of the fit at any stage and so it will remain unchanged between different runs of the hyperoptimizer. The aforementioned information corresponds to: - ``expdata``: experimental data - ``name``: names of the experiment - ``ndata``: number of experimental points """ for exp_dict in self.exp_info[0]: self.training["expdata"].append(exp_dict["expdata"]) self.validation["expdata"].append(exp_dict["expdata_vl"]) self.experimental["expdata"].append(exp_dict["expdata_true"]) self.training["folds"].append(exp_dict["folds"]["training"]) self.validation["folds"].append(exp_dict["folds"]["validation"]) self.experimental["folds"].append(exp_dict["folds"]["experimental"]) nd_tr = exp_dict["ndata"] nd_vl = exp_dict["ndata_vl"] self.training["ndata"] += nd_tr self.validation["ndata"] += nd_vl self.experimental["ndata"] += nd_tr + nd_vl for dataset in exp_dict["datasets"]: self.all_datasets.append(dataset.name) self.all_datasets = set(self.all_datasets) for pos_dict in self.pos_info: self.training["expdata"].append(pos_dict["expdata"]) self.training["posdatasets"].append(pos_dict["name"]) self.validation["expdata"].append(pos_dict["expdata"]) self.validation["posdatasets"].append(pos_dict["name"]) for integ_dict in self.integ_info: self.training["expdata"].append(integ_dict["expdata"]) self.training["integdatasets"].append(integ_dict["name"]) def _xgrid_generation(self): """ Generates the full x-grid pertaining to the complete set of observables to be fitted. To first approximation, the full x-grid is a concatenation of all x-grid requested by all fk-tables. In the case of pineappl models all fktables ask for the same grid in x and so the input can be simplified to be a single grid for all (or most) datasets. However, this is not a _strict_ requirement for pineappl and was not a requirement before so the solution below must be kept general enough. Detailed implementation of the union of xgrids: let's assume an input [x1, x1, x1, x2, x2, x3] where each xi is a different grid, this will be broken into two lists: [x1, x2, x3] (unique grids) and [0,0,0,1,1,2] (index of the grid per dataset) The pdf will then be evaluated to concatenate([x1,x2,x3]) and then split (x1, x2, x3) Then each of the experiment, looking at the indexes, will receive one of the 3 PDFs The decision whether two grids (x1 and x1) are really the same is decided below The necessary information to redistribute the x-grid is held by a ``InputInfo`` tuple which is returned by this function. Returns ------ Instance of ``InputInfo`` containing the input information necessary for the PDF model: - input: backend input layer with an array attached which is a concatenation of the unique inputs of the Model two inputs are the same if and only if they have the same shape, values and order - split: backend layer which splits the aforementioned concatenation back into the separate unique inputs, to be applied after the PDF is called - idx: indices of the observables to which the split PDF must be distributed """ log.info("Generating the input grid") inputs_unique = [] inputs_idx = [] for igrid in self.input_list: for idx, arr in enumerate(inputs_unique): if igrid.size == arr.size and np.allclose(igrid, arr): inputs_idx.append(idx) break else: inputs_idx.append(len(inputs_unique)) inputs_unique.append(igrid) # Concatenate the unique inputs input_arr = np.concatenate(inputs_unique, axis=1).T if self._scaler: # Apply feature scaling if given input_arr = self._scaler(input_arr) input_layer = op.numpy_to_input(input_arr, name="pdf_input") # The PDF model will be called with a concatenation of all inputs # now the output needs to be splitted so that each experiment takes its corresponding input sp_ar = [[i.shape[1] for i in inputs_unique]] sp_kw = {"axis": 2} sp_layer = op.as_layer(op.split, op_args=sp_ar, op_kwargs=sp_kw, name="pdf_split") return InputInfo(input_layer, sp_layer, inputs_idx) def _model_generation(self, xinput, pdf_model, partition, partition_idx): """ Fills the three dictionaries (``training``, ``validation``, ``experimental``) with the ``model`` entry Compiles the validation and experimental models with fakes optimizers and learning rate as they are never trained, but this is needed by some backends in order to run evaluate on them. Before entering this function we have the input of the model and a list of outputs, but they are not connected. This function connects inputs with outputs by injecting the PDF. At this point we have a PDF model that takes an input (1, None, 1) and outputs in return (1, none, 14). The injection of the PDF is done by concatenating all inputs and calling pdf_model on it. This in turn generates an output_layer that needs to be splitted for every experiment as we have a set of observable "functions" that each take (1, exp_xgrid_size, 14) and output (1, masked_ndata) where masked_ndata can be the training/validation or the experimental mask (in which cased masked_ndata == ndata). Several models can be fitted at once by passing a list of models with a shared input so that every mode receives the same input and the output will be concatenated at the end the final output of the model is then (1, None, 14, n) (with n=number of parallel models). Parameters ---------- xinput: InputInfo a tuple containing the input layer (with all values of x), and the information (in the form of a splitting layer and a list of indices) to distribute the results of the PDF (PDF(xgrid)) among the different observables pdf_model: n3fit.backend.MetaModel a model that produces PDF values partition: dict Only active during k-folding, information about the partition to be fitted partition_idx: int Index of the partition Returns ------- models: dict dict of MetaModels for training, validation and experimental """ log.info("Generating the Model") # For multireplica fits: # The trainable part of the n3fit framework is a concatenation of all PDF models # We apply the Model as Layers and save for later the model (full_pdf) full_model_input_dict, full_pdf = pdf_model.apply_as_layer({"pdf_input": xinput.input}) split_pdf_unique = xinput.split(full_pdf) # Now reorganize the uniques PDF so that each experiment receives its corresponding PDF split_pdf = [split_pdf_unique[i] for i in xinput.idx] # If we are in a kfolding partition, select which datasets are out training_mask = validation_mask = experimental_mask = [None] if partition and partition["datasets"]: # If we want to overfit the fold, leave the training and validation masks as [None] # otherwise, use the mask generated for the fold. # The experimental model instead is always limited to the fold if not partition.get("overfit", False): training_mask = [i[partition_idx] for i in self.training["folds"]] validation_mask = [i[partition_idx] for i in self.validation["folds"]] experimental_mask = [i[partition_idx] for i in self.experimental["folds"]] # Training and validation leave out the kofld dataset # experiment leaves out the negation output_tr = _pdf_injection(split_pdf, self.training["output"], training_mask) training = MetaModel(full_model_input_dict, output_tr) # Validation skips integrability and the "true" chi2 skips also positivity, # so we must only use the corresponding subset of PDF functions val_pdfs = [] exp_pdfs = [] for partial_pdf, obs in zip(split_pdf, self.training["output"]): if not obs.positivity and not obs.integrability: val_pdfs.append(partial_pdf) exp_pdfs.append(partial_pdf) elif not obs.integrability and obs.positivity: val_pdfs.append(partial_pdf) # We don't want to included the integrablity in the validation output_vl = _pdf_injection(val_pdfs, self.validation["output"], validation_mask) validation = MetaModel(full_model_input_dict, output_vl) # Or the positivity in the total chi2 output_ex = _pdf_injection(exp_pdfs, self.experimental["output"], experimental_mask) experimental = MetaModel(full_model_input_dict, output_ex) if self.print_summary: training.summary() pdf_model = training.get_layer("PDFs") pdf_model.summary() nn_model = pdf_model.get_layer(NN_LAYER_ALL_REPLICAS) nn_model.summary() # We may have fits without sumrules imposed try: msr_model = pdf_model.get_layer("impose_msr") msr_model.summary() except ValueError: pass models = {"training": training, "validation": validation, "experimental": experimental} return models def _reset_observables(self): """ Resets the 'output' and 'losses' entries of all 3 dictionaries: (``training``, ``validation``, ``experimental``) as well as the input_list this is necessary as these can either depend on the parametrization of the NN or be obliterated when/if the backend state is reset """ self.input_list = [] for key in ["output", "posmultipliers", "integmultipliers"]: self.training[key] = [] self.validation[key] = [] self.experimental[key] = [] ############################################################################ # # Parameterizable functions # # # # The functions defined in this block accept a 'params' dictionary which # # defines the fit and the behaviours of the Neural Networks # # # # These are all called by the function hyperparamizable below # # i.e., the most important function is hyperparametrizable, which is a # # wrapper around all of these # ############################################################################ def _generate_observables( self, all_pos_multiplier, all_pos_initial, all_integ_multiplier, all_integ_initial, epochs, interpolation_points, ): """ This functions fills the 3 dictionaries (training, validation, experimental) with the output layers and the loss functions It also fill the list of input tensors (input_list) The arguments of this function are used to define the initial positivity of the positivity observables and the multiplier to be applied at each step. Parameters ---------- all_pos_multiplier: float, None multiplier to be applied to the positivity each ``PUSH_POSITIVITY_EACH`` epochs all_pos_initial: float, None initial value for the positivity lambda epochs: int total number of epochs for the run """ # First reset the dictionaries self._reset_observables() log.info("Generating layers") # We need to transpose Experimental data, stacking over replicas experiment_data = { "trmask": [], "expdata": [], "expdata_vl": [], "invcovmat": [], "invcovmat_vl": [], } # Loop over datasets for i in range(len(self.exp_info[0])): # Loop over data fields for key, value in experiment_data.items(): replica_data = [] # Loop over replicas for replica in self.exp_info: if key in ["expdata", "expdata_vl"]: # Save the data with shape (ndata) instead of (1, ndata) replica_data.append(replica[i][key][0]) else: replica_data.append(replica[i][key]) # Stack value.append(np.stack(replica_data)) # Now we need to loop over all dictionaries (First exp_info, then pos_info and integ_info) for i, exp_dict in enumerate(self.exp_info[0]): if not self.mode_hyperopt: log.info("Generating layers for experiment %s", exp_dict["name"]) # Stacked tr-vl mask array for all replicas for this dataset exp_layer = model_gen.observable_generator( exp_dict, self.boundary_condition, mask_array=experiment_data["trmask"][i], training_data=experiment_data["expdata"][i], validation_data=experiment_data["expdata_vl"][i], invcovmat_tr=experiment_data["invcovmat"][i], invcovmat_vl=experiment_data["invcovmat_vl"][i], n_replicas=len(self.replicas), ) # Save the input(s) corresponding to this experiment self.input_list.append(exp_layer["inputs"]) # Now save the observable layer, the losses and the experimental data self.training["output"].append(exp_layer["output_tr"]) self.validation["output"].append(exp_layer["output_vl"]) self.experimental["output"].append(exp_layer["output"]) # Generate the positivity penalty for pos_dict in self.pos_info: if not self.mode_hyperopt: log.info("Generating positivity penalty for %s", pos_dict["name"]) positivity_steps = int(epochs / PUSH_POSITIVITY_EACH) max_lambda = pos_dict["lambda"] pos_initial, pos_multiplier = _LM_initial_and_multiplier( all_pos_initial, all_pos_multiplier, max_lambda, positivity_steps ) num_experiments = len(self.exp_info) replica_masks = np.stack([pos_dict["trmask"]] * num_experiments) training_data = np.stack([pos_dict["expdata"].flatten()] * num_experiments) pos_layer = model_gen.observable_generator( pos_dict, self.boundary_condition, positivity_initial=pos_initial, mask_array=replica_masks, training_data=training_data, validation_data=training_data, n_replicas=len(self.replicas), ) # The input list is still common self.input_list.append(pos_layer["inputs"]) # The positivity should be on both training and validation models self.training["output"].append(pos_layer["output_tr"]) self.validation["output"].append(pos_layer["output_tr"]) self.training["posmultipliers"].append(pos_multiplier) self.training["posinitials"].append(pos_initial) # Finally generate the integrability penalty for integ_dict in self.integ_info: if not self.mode_hyperopt: log.info("Generating integrability penalty for %s", integ_dict["name"]) integrability_steps = int(epochs / PUSH_INTEGRABILITY_EACH) max_lambda = integ_dict["lambda"] integ_initial, integ_multiplier = _LM_initial_and_multiplier( all_integ_initial, all_integ_multiplier, max_lambda, integrability_steps ) integ_layer = model_gen.observable_generator( integ_dict, self.boundary_condition, positivity_initial=integ_initial, integrability=True, n_replicas=len(self.replicas), ) # The input list is still common self.input_list.append(integ_layer["inputs"]) # The integrability all falls to the training self.training["output"].append(integ_layer["output_tr"]) self.training["integmultipliers"].append(integ_multiplier) self.training["integinitials"].append(integ_initial) # Store a reference to the interpolator as self._scaler if interpolation_points: self._scaler = generate_scaler(self.input_list, interpolation_points) def _generate_pdf( self, nodes_per_layer, activation_per_layer, initializer, layer_type, dropout, regularizer, regularizer_args, seed, photons, ): """ Defines the internal variable layer_pdf this layer takes any input (x) and returns the pdf value for that x if the sumrule is being imposed, it also updates input_list with the integrator_input tensor used to calculate the sumrule Parameters: ----------- nodes_per_layer: list list of nodes each layer has activation_per_layer: list list of the activation function for each layer initializer: str initializer for the weights of the NN layer_type: str type of layer to be used dropout: float dropout to add at the end of the NN regularizer: str choice of regularizer to add to the dense layers of the NN regularizer_args: dict dictionary of arguments for the regularizer seed: int seed for the NN photons: :py:class:`validphys.photon.compute.Photon` function to compute the photon PDF see model_gen.pdfNN_layer_generator for more information Returns ------- pdf_model: MetaModel pdf model """ log.info("Generating PDF models") pdf_model = model_gen.generate_pdf_model( nodes=nodes_per_layer, activations=activation_per_layer, layer_type=layer_type, flav_info=self.flavinfo, fitbasis=self.fitbasis, seed=seed, initializer_name=initializer, dropout=dropout, regularizer=regularizer, regularizer_args=regularizer_args, impose_sumrule=self.impose_sumrule, scaler=self._scaler, num_replicas=len(self.replicas), photons=photons, ) return pdf_model def _prepare_reporting(self, partition): """Parses the information received by the :py:class:`n3fit.ModelTrainer.ModelTrainer` to select the bits necessary for reporting the chi2. Receives the chi2 partition data to see whether any dataset is to be left out """ reported_keys = ["name", "count_chi2", "positivity", "integrability", "ndata", "ndata_vl"] reporting_list = [] for exp_dict in self.all_info: reporting_dict = {k: exp_dict.get(k) for k in reported_keys} if partition: # If we are in a partition we need to remove the number of datapoints # in order to avoid calculating the chi2 wrong for dataset in exp_dict["datasets"]: if dataset in partition["datasets"]: ndata = dataset["ndata"] frac = dataset["frac"] reporting_dict["ndata"] -= int(ndata * frac) reporting_dict["ndata_vl"] = int(ndata * (1 - frac)) reporting_list.append(reporting_dict) return reporting_list def _train_and_fit(self, training_model, stopping_object, epochs=100) -> bool: """ Trains the NN for the number of epochs given using stopping_object as the stopping criteria Every ``PUSH_POSITIVITY_EACH`` epochs the positivity will be multiplied by their respective positivity multipliers. In the same way, every ``PUSH_INTEGRABILITY_EACH`` epochs the integrability will be multiplied by their respective integrability multipliers """ callback_st = callbacks.StoppingCallback(stopping_object) callback_pos = callbacks.LagrangeCallback( self.training["posdatasets"], self.training["posmultipliers"], update_freq=PUSH_POSITIVITY_EACH, ) callback_integ = callbacks.LagrangeCallback( self.training["integdatasets"], self.training["integmultipliers"], update_freq=PUSH_INTEGRABILITY_EACH, ) training_model.perform_fit( epochs=epochs, verbose=False, callbacks=self.callbacks + [callback_st, callback_pos, callback_integ], ) def _hyperopt_override(self, params): """Unrolls complicated hyperopt structures into very simple dictionaries""" # If the input contains all parameters, then that's your dictionary of hyperparameters hyperparameters = params.get("parameters") if hyperparameters is not None: return hyperparameters # Else, loop over all different keys and unroll the dictionaries within hyperparameters for hyperkey in self._hyperkeys: item = params[hyperkey] if isinstance(item, dict): params.update(item) return params
[docs] def enable_tensorboard(self, logdir, weight_freq=0, profiling=False): """Enables tensorboard callback for further runs of the fitting procedure Parameters ---------- logdir: Path path where to save the tensorboard logs weight_freq: int frequency (in epochs) at which to save weight histograms profiling: bool flag to enable the tensorboard profiler """ callback_tb = callbacks.gen_tensorboard_callback( logdir, profiling=profiling, histogram_freq=weight_freq ) self.callbacks.append(callback_tb)
[docs] def evaluate(self, stopping_object): """Returns the training, validation and experimental chi2 Parameters ---------- stopping_object A Stopping intance which will have associated a validation model and the list of output layers that should contribute to the training chi2 Returns ------- train_chi2: chi2 of the trainining set val_chi2 : chi2 of the validation set exp_chi2: chi2 of the experimental data (without replica or tr/vl split) """ if self.training["model"] is None: raise RuntimeError("Modeltrainer.evaluate was called before any training") # Needs to receive a `stopping_object` in order to select the part of the # training and the validation which are actually `chi2` and not part of the penalty train_chi2 = stopping_object.evaluate_training(self.training["model"]) val_chi2 = stopping_object.vl_chi2 exp_chi2 = self.experimental["model"].compute_losses()["loss"] / self.experimental["ndata"] return train_chi2, val_chi2, exp_chi2
def _filter_datagroupspec(self, datasets_partition): """Takes a list of all input exp datasets as :class:`validphys.core.DataGroupSpec` and select `DataSetSpec`s whose names are in datasets_partition. Parameters ---------- datasets_partition: List[str] List with names of the datasets you want to select. Returns ------- filtered_datagroupspec: List[validphys.core.DataGroupSpec] List of filtered exp datasets whose names are in datasets_partition. """ filtered_datagroupspec = [] # self.experiments_data is composed of a list of `DataGroupSpec` objects # These represent a group of related exp data sets # Loop over this list for datagroup in self.experiments_data: filtered_datasetspec = [] # Each `DataGroupSpec` is composed by several `DataSetSpec` objects # `DataSetSpec` represents each exp dataset # Now, loop over them for dataset in datagroup.datasets: # Include `DataSetSpec`s whose names are in datasets_partition if dataset.name in datasets_partition: filtered_datasetspec.append(dataset) # List of filtered experiments as `DataGroupSpec` filtered_datagroupspec.append( DataGroupSpec(name=f"{datagroup.name}_exp", datasets=filtered_datasetspec) ) return filtered_datagroupspec
[docs] def hyperparametrizable(self, params): """ Wrapper around all the functions defining the fit. After the ModelTrainer class has been instantiated, a call to this function (with a ``params`` dictionary) is necessary in order to generate the whole PDF model and perform a fit. This is a necessary step for hyperopt to work Parameters used only here: - ``epochs``: maximum number of iterations for the fit to run - ``stopping_patience``: patience of the stopper after finding a new minimum All other parameters are passed to the corresponding functions """ # Reset the internal state of the backend every time this function is called print("") clear_backend_state() # When doing hyperopt some entries in the params dictionary # can bring with them overriding arguments if self.mode_hyperopt: log.info("Performing hyperparameter scan") for key in self._hyperkeys: log.info(" > > Testing %s = %s", key, params[key]) params = self._hyperopt_override(params) # Preprocess some hyperparameters epochs = int(params["epochs"]) stopping_patience = params["stopping_patience"] stopping_epochs = int(epochs * stopping_patience) # Fill the 3 dictionaries (training, validation, experimental) with the layers and losses # when k-folding, these are the same for all folds positivity_dict = params.get("positivity", {}) integrability_dict = params.get("integrability", {}) self._generate_observables( positivity_dict.get("multiplier"), positivity_dict.get("initial"), integrability_dict.get("multiplier"), integrability_dict.get("initial"), epochs, params.get("interpolation_points"), ) threshold_pos = positivity_dict.get("threshold", 1e-6) threshold_chi2 = params.get("threshold_chi2", CHI2_THRESHOLD) # Initialize the chi2 dictionaries l_valid = [] l_exper = [] l_hyper = [] # And lists to save hyperopt utilities pdfs_per_fold = [] exp_models = [] # phi evaluated over training/validation exp data trvl_phi_per_fold = [] # Generate the grid in x, note this is the same for all partitions xinput = self._xgrid_generation() # Initialize all photon classes for the different replicas: if self.lux_params: photons = Photon( theoryid=self.theoryid, lux_params=self.lux_params, replicas=self.replicas ) else: photons = None ### Training loop for k, partition in enumerate(self.kpartitions): # Each partition of the kfolding needs to have its own separate model # and the seed needs to be updated accordingly seeds = self._nn_seeds if k > 0: # generate random integers for each k-fold from the input `nnseeds` # we generate new seeds to avoid the integer overflow that may # occur when doing k*nnseeds rngs = [np.random.default_rng(seed=seed) for seed in seeds] seeds = [generator.integers(1, pow(2, 30)) * k for generator in rngs] # Generate the pdf model pdf_model = self._generate_pdf( params["nodes_per_layer"], params["activation_per_layer"], params["initializer"], params["layer_type"], params["dropout"], params.get("regularizer", None), # regularizer optional params.get("regularizer_args", None), seeds, photons, ) if photons: if self._scaler: # select only the non-scaled input pdf_model.get_layer("add_photon").register_photon(xinput.input.tensor_content[:,:,1:]) else: pdf_model.get_layer("add_photon").register_photon(xinput.input.tensor_content) # Model generation joins all the different observable layers # together with pdf model generated above models = self._model_generation(xinput, pdf_model, partition, k) # Only after model generation, apply possible weight file # Starting every replica with the same weights if self.model_file: log.info("Applying model file %s", self.model_file) pdf_model.load_identical_replicas(self.model_file) if k > 0: # Reset the positivity and integrability multipliers pos_and_int = self.training["posdatasets"] + self.training["integdatasets"] initial_values = self.training["posinitials"] + self.training["posinitials"] models["training"].reset_layer_weights_to(pos_and_int, initial_values) # Generate the list containing reporting info necessary for chi2 reporting = self._prepare_reporting(partition) if self.no_validation: # Substitute the validation model with the training model models["validation"] = models["training"] validation_model = models["training"] else: validation_model = models["validation"] # Generate the stopping_object this object holds statistical information about the fit # it is used to perform stopping stopping_object = Stopping( validation_model, reporting, pdf_model, total_epochs=epochs, stopping_patience=stopping_epochs, threshold_positivity=threshold_pos, threshold_chi2=threshold_chi2, ) # Compile each of the models with the right parameters for model in models.values(): model.compile(**params["optimizer"]) self._train_and_fit(models["training"], stopping_object, epochs=epochs) if self.mode_hyperopt: validation_loss = stopping_object.vl_chi2 # number of active points in this fold # it would be nice to have a ndata_per_fold variable coming in the vp object... ndata = np.sum([np.count_nonzero(i[k]) for i in self.experimental["folds"]]) # If ndata == 0 then it's the opposite, all data is in! if ndata == 0: ndata = self.experimental["ndata"] # Compute experimental loss, over excluded datasets exp_loss_raw = models["experimental"].compute_losses()["loss"] experimental_loss = exp_loss_raw / ndata # Compute penalties per replica penalties = { penalty.__name__: penalty(pdf_model=pdf_model, stopping_object=stopping_object) for penalty in self.hyper_penalties } # Extracting the necessary data to compute phi # First, create a list of `validphys.core.DataGroupSpec` # containing only exp datasets within the held out fold experimental_data = self._filter_datagroupspec(partition["datasets"]) vplike_pdf = N3PDF(pdf_model.split_replicas()) if self.boundary_condition is not None: vplike_pdf.register_boundary(self.boundary_condition["unpolarized_bc"]) # Compute per replica hyper losses hyper_loss = self._hyper_loss.compute_loss( penalties=penalties, experimental_loss=experimental_loss, pdf_object=vplike_pdf, experimental_data=experimental_data, fold_idx=k, ) # Create another list of `validphys.core.DataGroupSpec` # containing now exp datasets that are included in the training/validation dataset trvl_partitions = list(self.kpartitions) trvl_partitions.pop(k) trvl_exp_names = [ exp_name for item in trvl_partitions for exp_name in item['datasets'] ] trvl_data = self._filter_datagroupspec(trvl_exp_names) # evaluate phi on training/validation exp set trvl_phi = compute_phi(vplike_pdf, trvl_data) # Now save all information from this fold l_hyper.append(hyper_loss) l_valid.append(validation_loss) l_exper.append(experimental_loss) trvl_phi_per_fold.append(trvl_phi) pdfs_per_fold.append(pdf_model) exp_models.append(models["experimental"]) if hyper_loss > self.hyper_threshold: log.info( "Loss above threshold (%.1f > %.1f), breaking", hyper_loss, self.hyper_threshold, ) # Apply a penalty proportional to the number of folds not computed pen_mul = len(self.kpartitions) - k l_hyper = [i * pen_mul for i in l_hyper] passed = False break else: passed = True log.info("Fold %d finished, loss=%.1f, pass=%s", k + 1, hyper_loss, passed) # endfor if self.mode_hyperopt: # turn losses into arrays l_hyper = np.array(l_hyper) l_valid = np.array(l_valid) l_exper = np.array(l_exper) # Compute the loss over all folds for hyperopt final_hyper_loss = self._hyper_loss.reduce_over_folds(l_hyper) # Hyperopt needs a dictionary with information about the losses # it is possible to store arbitrary information in the trial file # by adding it to this dictionary dict_out = { "status": HYPEROPT_STATUSES[passed], "loss": final_hyper_loss, "validation_loss": np.average(l_valid), "experimental_loss": np.average(l_exper), "kfold_meta": { "validation_losses": l_valid, "trvl_losses_phi": np.array(trvl_phi_per_fold), "experimental_losses": l_exper, "hyper_losses": np.array(self._hyper_loss.chi2_matrix), "hyper_losses_phi": np.array(self._hyper_loss.phi_vector), "penalties": { name: np.array(values) for name, values in self._hyper_loss.penalties.items() }, }, } return dict_out # Keep a reference to the models after training for future reporting self.training["model"] = models["training"] self.experimental["model"] = models["experimental"] self.validation["model"] = models["validation"] # In a normal run, the only information we need to output is the stopping object # (which contains metadata about the stopping) # and the pdf model (which are used to generate the PDF grids and compute arclengths) if not self.mode_hyperopt: passed = any(bool(i) for i in stopping_object.e_best_chi2) dict_out = {"status": passed, "stopping_object": stopping_object, "pdf_model": pdf_model} return dict_out