Source code for n3fit.hyper_optimization.rewards

"""
    Target functions to minimize during hyperparameter scan

    These are implemented in the HyperLoss class which incorporates
    various statistics (average, standard deviation, best/worst case)
    both across multiple replicas of a model and across different folds.

    Key functionalities include:
    - Support for different loss types such as Chi-square (chi2) and phi-square (phi2).
    - Calculation of statistical measures (average, best_worst, std) over replicas and folds.
    - Incorporation of penalties into the loss computation.
    - Detailed tracking and storage of loss metrics for further analysis.

    New statistics can be added directly in this class as staticmethods and
    via `IMPLEMENTED_STATS`; their name in the runcard must
    match the name in the module

    Example
    -------
    >>> import numpy as np
    >>> from n3fit.hyper_optimization.rewards import HyperLoss
    >>> losses = np.array([1.0, 2.0, 3.0])
    >>> loss_average = HyperLoss(fold_statistic="average")
    >>> loss_best_worst = HyperLoss(fold_statistic="best_worst")
    >>> loss_std = HyperLoss(fold_statistic="std")
    >>> print(f"{loss_average.reduce_over_folds.__name__} {loss_average.reduce_over_folds(losses)}")
    >>> print(f"{loss_best_worst.reduce_over_folds.__name__} {loss_best_worst.reduce_over_folds(losses)}")
    >>> print(f"{loss_std.reduce_over_folds.__name__} {loss_std.reduce_over_folds(losses)}")
    _average 2.0
    _best_worst 3.0
    _std 0.816496580927726
"""

import logging
from typing import Callable

import numpy as np

from n3fit.backends import MetaModel
from n3fit.vpinterface import N3PDF, compute_phi
from validphys.core import DataGroupSpec
from validphys.pdfgrids import distance_grids, xplotting_grid

log = logging.getLogger(__name__)


def _average_best(fold_losses: np.ndarray, proportion: float = 0.9, axis: int = 0) -> float:
    """
    Compute the average of the input array along the specified axis, among the best `proportion`
    of replicas.

    Parameters
    ----------
        fold_losses: np.ndarray
            Per replica losses for a single fold.
        proportion: float
            The proportion of best replicas to take into account (rounded up).
        axis: int, optional
            Axis along which the mean is computed. Default is 0.

    Returns
    -------
        float: The average along the specified axis.
    """
    # TODO: use directly `validphys.fitveto.determine_vetoes`
    num_best = int(np.ceil(proportion * len(fold_losses)))

    if np.isnan(fold_losses).any():
        log.warning(f"{np.isnan(fold_losses).sum()} replicas have NaNs losses")
    sorted_losses = np.sort(fold_losses, axis=axis)
    best_losses = sorted_losses[:num_best]
    return _average(best_losses, axis=axis)


def _average(fold_losses: np.ndarray, axis: int = 0) -> float:
    """
    Compute the average of the input array along the specified axis.

    Parameters
    ----------
        fold_losses: np.ndarray
            Input array.
        axis: int, optional
            Axis along which the mean is computed. Default is 0.

    Returns
    -------
        float: The average along the specified axis.
    """
    return np.average(fold_losses, axis=axis).item()


def _best_worst(fold_losses: np.ndarray, axis: int = 0) -> float:
    """
    Compute the maximum value of the input array along the specified axis.

    Parameters
    ----------
        fold_losses: np.ndarray
            Input array.
        axis: int, optional
            Axis along which the maximum is computed. Default is 0.

    Returns
    -------
        float: The maximum value along the specified axis.
    """
    return np.max(fold_losses, axis=axis).item()


def _std(fold_losses: np.ndarray, axis: int = 0) -> float:
    """
    Compute the standard deviation of the input array along the specified axis.

    Parameters
    ----------
        fold_losses: np.ndarray
            Input array.
        axis: int, optional
            Axis along which the standard deviation is computed. Default is 0.

    Returns
    -------
        float: The standard deviation along the specified axis.
    """
    return np.std(fold_losses, axis=axis).item()


IMPLEMENTED_STATS = {
    "average": _average,
    "average_best": _average_best,
    "best_worst": _best_worst,
    "std": _std,
}
IMPLEMENTED_LOSSES = ["chi2", "phi2"]


def _pdfs_to_n3pdfs(pdfs_per_fold):
    """Convert a list of multi-replica PDFs to a list of N3PDFs"""
    return [N3PDF(pdf.split_replicas(), name=f"fold_{k}") for k, pdf in enumerate(pdfs_per_fold)]


[docs] class HyperLoss: """ Class to compute the hyper_loss based on the individual replica losses. Computes the statistic over the replicas and then over the folds, both statistics default to the average. The ``compute_loss`` method saves intermediate metrics such as the chi2 of the folds or the phi regardless of the loss type that has been selected. These metrics are saved in the properties ``phi_vector``: list of phi per fold ``chi2_matrix``: list of chi2 per fold, per replica Parameters ---------- loss_type: str the type of loss over the replicas to use. Options are "chi2" and "phi2". replica_statistic: str the statistic over the replicas to use, for per replica losses. Options are "average", "best_worst", and "std". fold_statistic: str the statistic over the folds to use. Options are "average", "best_worst", and "std". penalties_in_loss: bool whether the penalties should be included in the output of ``compute_loss`` """ def __init__( self, loss_type: str = None, replica_statistic: str = None, fold_statistic: str = None, penalties_in_loss: bool = False, ): self._default_loss = "chi2" self._penalties_in_loss = penalties_in_loss self.loss_type = self._parse_loss(loss_type) self.reduce_over_replicas = self._parse_statistic( replica_statistic, "replica_statistic", default="average_best" ) self.reduce_over_folds = self._parse_statistic( fold_statistic, "fold_statistic", default="average" ) self.phi_vector = [] self.chi2_matrix = [] self.penalties = {}
[docs] def compute_loss( self, penalties: dict[str, np.ndarray], experimental_loss: np.ndarray, pdf_object: N3PDF, experimental_data: list[DataGroupSpec], fold_idx: int = 0, ) -> float: """ Compute the loss, including added penalties, for a single fold. Save the phi of the assemble and the chi2 of the separate replicas, and the penalties into the ``phi_vector``, ``chi2_matrix`` and ``penalties`` attributes. Parameters ---------- penalties: Dict[str, NDArray(replicas)] Dict of penalties for each replica. Possible keys are 'saturation', 'patience' and 'integrability' as defined in 'penalties.py' and instantiated within :class:`~n3fit.model_trainer.ModelTrainer`. experimental_loss: NDArray(replicas) Experimental loss for each replica. pdf_object: :class:`n3fit.vpinterface.N3PDF` N3fitted PDF experimental_data: List[validphys.core.DataGroupSpec] List of tuples containing `validphys.core.DataGroupSpec` instances for each group data set fold_idx: int k-fold index. Defaults to 0. include_penalties: float Whether to include the penalties in the returned loss value Returns ------- loss: float The computed loss over the replicas. Example ------- >>> import numpy as np >>> from n3fit.hyper_optimization.rewards import HyperLoss >>> from n3fit.model_gen import generate_pdf_model >>> from n3fit.vpinterface import N3PDF >>> from validphys.loader import Loader >>> hyper = HyperLoss(loss_type="chi2", replica_statistic="average", fold_statistic="average") >>> penalties = {'saturation': np.array([1.0, 2.0]), 'patience': np.array([3.0, 4.0]), 'integrability': np.array([5.0, 6.0]),} >>> experimental_loss = np.array([0.1, 0.2]) >>> ds = Loader().check_dataset("NMC_NC_NOTFIXED_P_EM-SIGMARED", variant="legacy", theoryid=399, cuts="internal") >>> experimental_data = [Loader().check_experiment("My DataGroupSpec", [ds])] >>> fake_fl = [{'fl' : i, 'largex' : [0,1], 'smallx': [1,2]} for i in ['u', 'ubar', 'd', 'dbar', 'c', 'g', 's', 'sbar']] >>> pdf_model = generate_pdf_model(nodes=[8], activations=['linear'], seed=0, num_replicas=2, flav_info=fake_fl, fitbasis="FLAVOUR") >>> pdf = N3PDF(pdf_model.split_replicas()) >>> loss = hyper.compute_loss(penalties, experimental_loss, pdf, experimental_data) """ # calculate phi for a given k-fold using vpinterface and validphys phi_per_fold = compute_phi(pdf_object, experimental_data) # update hyperopt metrics # these are saved in the phi_vector and chi2_matrix attributes, excluding penalties self._save_hyperopt_metrics(phi_per_fold, experimental_loss, penalties, fold_idx) # Prepare the output loss, including penalties if necessary if self._penalties_in_loss: # include penalties to experimental loss experimental_loss += sum(penalties.values()) # add penalties to phi in the form of a sum of per-replicas averages phi_per_fold += sum(np.mean(penalty) for penalty in penalties.values()) # define loss for hyperopt according to the chosen loss_type if self.loss_type == "chi2": # calculate statistics of chi2 over replicas for a given k-fold loss = self.reduce_over_replicas(experimental_loss) elif self.loss_type == "phi2": loss = phi_per_fold**2 return loss
def _save_hyperopt_metrics( self, phi_per_fold: float, chi2_per_fold: np.ndarray, penalties: dict[str, np.ndarray], fold_idx: int = 0, ) -> None: """ Save all chi2 and phi calculated metrics per replica and per fold, including penalties. Parameters ---------- phi_per_fold: float Computed phi for a given k-fold chi2_per_fold: np.ndarray Computed chi2 for each replica for a given k-fold penalties: Dict[str, np.ndarray] dictionary of all penalties with their names fold_idx: int k-fold index. Defaults to 0. """ # reset chi2 and phi arrays for every trial if fold_idx == 0: self.phi_vector = [] self.chi2_matrix = [] self.penalties = {} # populate chi2 matrix and phi vector calculated for a given k-fold self.chi2_matrix.append(chi2_per_fold) self.phi_vector.append(phi_per_fold) # save penalties per replica for a given k-fold for name, values in penalties.items(): temp = self.penalties.get(name, []) temp.append(values) self.penalties[name] = temp def _parse_loss(self, loss_type: str) -> str: """ Parse the type of loss and return the default if None. Parameters ---------- loss_type: str The loss type to parse. Returns ------- loss_type: str The parsed loss type. Raises ------ ValueError: If an invalid loss type is provided. """ if loss_type is None: loss_type = self._default_loss log.warning(f"No loss_type selected in HyperLoss, defaulting to {loss_type}") if loss_type not in IMPLEMENTED_LOSSES: valid_options = ", ".join(IMPLEMENTED_LOSSES) raise ValueError(f"Invalid loss type '{loss_type}'. Valid options are: {valid_options}") log.info(f"Setting '{loss_type}' as the loss type for hyperoptimization") return loss_type def _parse_statistic(self, statistic: str, name: str, default: str) -> Callable: """ Parse the statistic and return the default if None. Parameters ---------- statistic: str The statistic to parse. name: str The name of the statistic. Returns ------- Callable: The parsed statistic method. Raises ------ ValueError: If an invalid statistic is provided. Notes ----- For loss type equal to phi2, the applied fold statistics is always the reciprocal of the selected stats. """ if statistic is None: statistic = default log.warning(f"No {name} selected in HyperLoss, defaulting to {statistic}") if statistic not in IMPLEMENTED_STATS: valid_options = ", ".join(IMPLEMENTED_STATS.keys()) raise ValueError(f"Invalid {name} '{statistic}'. Valid options are: {valid_options}") log.info(f"Using '{statistic}' as the {name} for hyperoptimization") selected_statistic = IMPLEMENTED_STATS[statistic] if self.loss_type == "chi2": return selected_statistic elif self.loss_type == "phi2": # In case of phi2, calculate the inverse of the applied statistics # This is only used when calculating statistics over folds return lambda x: np.reciprocal(selected_statistic(x))
[docs] def fit_distance(pdfs_per_fold=None, **_kwargs): """Loss function for hyperoptimization based on the distance of the fits of all folds to the first fold """ n3pdfs = _pdfs_to_n3pdfs(pdfs_per_fold) if n3pdfs is None: raise ValueError("fit_distance needs n3pdf models to act upon") xgrid = np.concatenate([np.logspace(-6, -1, 20), np.linspace(0.11, 0.9, 30)]) plotting_grids = [xplotting_grid(pdf, 1.6, xgrid) for pdf in n3pdfs] distances = distance_grids(n3pdfs, plotting_grids, 0) # The first distance will obviously be 0 # TODO: define this more sensibly, for now it is just a template max_distance = 0 for distance in distances: max_distance = max(max_distance, distance.grid_values.max()) return max_distance
def _set_central_value(n3pdf, model): """Given an N3PDF object and a MetaModel, set the PDF_0 layer to be the central value of the n3pdf object""" from n3fit.backends import operations as op # Get the input x for key, grid in model.x_in.items(): if key != "xgrid_integration": input_x = grid.numpy() break # Compute the central value of the PDF full_pdf = n3pdf(input_x) cv_pdf = op.numpy_to_tensor(np.mean(full_pdf, axis=0, keepdims=True)) def central_value(x, training=None): # pylint: disable=unused-argument return cv_pdf model.get_layer("PDF_0").call = central_value # This model won't be trainable ever again model.trainable = False model.compile()
[docs] def fit_future_tests(n3pdfs=None, experimental_models=None, **_kwargs): """Use the future tests as a metric for hyperopt NOTE: this function should only be called once at the end of every hyperopt iteration, as it is destructive for the models """ if n3pdfs is None: raise ValueError("fit_future_test needs n3pdf models to act upon") if experimental_models is None: raise ValueError("fit_future_test needs experimental_models to compute chi2") from n3fit.backends import MetaModel compatibility_mode = False try: import tensorflow as tf from n3fit.backends import set_eager tf_version = tf.__version__.split(".") if int(tf_version[0]) == 2 and int(tf_version[1]) < 4: set_eager(True) compatibility_mode = True except ImportError: pass # For the last model the PDF covmat doesn't need to be computed. # This is because the last model corresponds to an empty k-fold partition, # meaning that all datasets were used during training. # but the mask needs to be flipped in the folding for the appropiate datasets last_model = experimental_models[-1] _set_central_value(n3pdfs[-1], last_model) # Loop over all models but the last (our reference!) total_loss = 0.0 for n3pdf, exp_model in zip(n3pdfs[:-1], experimental_models[:-1]): _set_central_value(n3pdf, exp_model) # Get the full input and the total chi2 full_input = exp_model.input # Now update the loss with the PDF covmat for layer in exp_model.get_layer_re(".*_exp$"): # Get the input to the loss layer model_output = MetaModel(full_input, layer.input) # Get the full predictions and generate the PDF covmat y = model_output.predict()[0] # The first is a dummy-dim pdf_covmat = np.cov(y, rowvar=False) # Update the covmat of the loss layer.add_covmat(pdf_covmat) # Update the mask of the last_model so that its synced with this layer last_model.get_layer(layer.name).update_mask(layer.mask) # Compute the loss with pdf errors pdf_chi2 = exp_model.compute_losses()["loss"][0] # And the loss of the best (most complete) fit best_chi2 = last_model.compute_losses()["loss"][0] # Now make this into a measure of the total loss # for instance, any deviation from the "best" value is bad total_loss += np.abs(best_chi2 - pdf_chi2) if compatibility_mode: set_eager(False) return total_loss