Source code for validphys.closuretest.multiclosure

"""
closuretest/multiclosure.py

Module containing all of the statistical estimators which are
averaged across multiple fits or a single replica proxy fit. The actions
in this module are used to produce results which are plotted in
``multiclosure_output.py``

"""

import numpy as np
import dataclasses
import pandas as pd
import scipy.linalg as la
import scipy.special as special

from reportengine import collect
from validphys.calcutils import calc_chi2
from validphys import covmats
from validphys.checks import check_use_t0
from validphys.closuretest.closure_checks import (
    check_at_least_10_fits,
    check_fits_areclosures,
    check_fits_different_filterseed,
    check_fits_underlying_law_match,
    check_multifit_replicas,
    check_t0pdfset_matches_multiclosure_law,
)
from validphys.results import ThPredictionsResult


# bootstrap seed default
DEFAULT_SEED = 9689372
# stepsize in fits/replicas to use for finite size bootstraps
SAMPLING_INTERVAL = 5


# TODO: deprecate this at some point
[docs] @check_fits_underlying_law_match @check_fits_areclosures @check_fits_different_filterseed @check_use_t0 @check_t0pdfset_matches_multiclosure_law def internal_multiclosure_dataset_loader( dataset, fits_pdf, multiclosure_underlyinglaw, fits, t0_covmat_from_systematics ): """Internal function for loading multiple theory predictions for a given dataset and a single covariance matrix using underlying law as t0 PDF, which is for use with multiclosure statistical estimators. Avoiding memory issues from caching the load function of a group of datasets. Parameters ---------- dataset: (DataSetSpec, DataGroupSpec) dataset for which the theory predictions and t0 covariance matrix will be loaded. Note that due to the structure of `validphys` this function can be overloaded to accept a DataGroupSpec. fits_pdf: list list of PDF objects produced from performing multiple closure tests fits. Each fit should have a different filterseed but the same underlying law used to generate the pseudodata. multiclosure_underlyinglaw: PDF PDF used to generate the pseudodata which the closure tests fitted. This is inferred from the fit runcards. fits: list list of closure test fits, used to collect ``fits_pdf`` Returns ------- multiclosure_results: tuple a tuple of length 4 containing all necessary dependencies of multiclosure statistical estimators in order: closure fits theory predictions, underlying law theory predictions, covariance matrix, sqrt covariance matrix Notes ----- This function replicates behaviour found elsewhere in validphys, the reason for this is that due to the default caching behaviour one can run into memory issues when loading the theory predictions for the amount of fits typically used in these studies. """ fits_dataset_predictions = [ ThPredictionsResult.from_convolution(pdf, dataset) for pdf in fits_pdf ] fits_underlying_predictions = ThPredictionsResult.from_convolution( multiclosure_underlyinglaw, dataset ) sqrt_covmat = la.cholesky(t0_covmat_from_systematics, lower=True) # TODO: support covmat reg and theory covariance matrix # possibly make this a named tuple return ( fits_dataset_predictions, fits_underlying_predictions, t0_covmat_from_systematics, sqrt_covmat, )
[docs] @check_fits_underlying_law_match @check_fits_areclosures @check_fits_different_filterseed @check_t0pdfset_matches_multiclosure_law @check_use_t0 def internal_multiclosure_data_loader( data, fits_pdf, multiclosure_underlyinglaw, fits, dataset_inputs_t0_covmat_from_systematics ): """Like `internal_multiclosure_dataset_loader` except for all data""" return internal_multiclosure_dataset_loader( data, fits_pdf, multiclosure_underlyinglaw, fits, dataset_inputs_t0_covmat_from_systematics )
[docs] def eigendecomposition(covmat): """ Compute the eigendecomposition of a covariance matrix. Parameters ---------- covmat: np.array covariance matrix Returns ------- tuple 3D tuple containing the eigenvalues, eigenvectors and the normalized eigenvalues. Note that the eigenvalues are sorted from largest to smallest. """ eighvals, eigvecs = np.linalg.eigh(covmat) idx = np.argsort(eighvals)[::-1] # sort eigenvalues from largest to smallest eigvecs = eigvecs[:, idx] eighvals = eighvals[idx] eighvals_norm = eighvals / eighvals.sum() return eighvals, eigvecs, eighvals_norm
[docs] @dataclasses.dataclass(frozen=True) class PCAInternalMulticlosureLoader: """ Parameters ---------- closures_th: list list containing validphys.results.ThPredictionsResult objects for each fit law_th: ThPredictionsResult object underlying law theory predictions pc_basis: np.array basis of principal components n_comp: int number of principal components kept after regularisation covmat_pca: np.array regularised covariance matrix computed from replicas of theory predictions sqrt_covmat_pca: np.array cholesky decomposed covariance matrix """ closures_th: list law_th: ThPredictionsResult pc_basis: np.array n_comp: int covmat_pca: np.array sqrt_covmat_pca: np.array
[docs] @check_multifit_replicas def internal_multiclosure_dataset_loader_pca( internal_multiclosure_dataset_loader, explained_variance_ratio=0.99, _internal_max_reps=None, _internal_min_reps=20, ): """ Similar to multiclosure.internal_multiclosure_dataset_loader but returns PCA regularised covariance matrix, where the covariance matrix has been computed from the replicas of the theory predictions. Parameters ---------- internal_multiclosure_dataset_loader: tuple closure fits theory predictions, underlying law theory predictions, covariance matrix, sqrt covariance matrix explained_variance_ratio: float, default is 0.99 _internal_max_reps: int, default is None Maximum number of replicas used in the fits this is needed to check that the number of replicas is the same for all fits _internal_min_reps: int, default is 20 Minimum number of replicas used in the fits this is needed to check that the number of replicas is the same for all fits Returns ------- PCAInternalMulticlosureLoader """ closures_th, law_th, _, _ = internal_multiclosure_dataset_loader reps = np.asarray([th.error_members for th in closures_th]) # compute the covariance matrix of the theory predictions for each fit _covmats = np.array([np.cov(rep, rowvar=True, bias=True) for rep in reps]) # compute the mean covariance matrix _covmat_mean = np.mean(_covmats, axis=0) # diagonalize the mean covariance matrix and only keep the principal components # that explain the required variance if _covmat_mean.shape == (): return PCAInternalMulticlosureLoader( closures_th=closures_th, law_th=law_th, pc_basis=1, n_comp=1, covmat_pca=_covmat_mean, sqrt_covmat_pca=np.sqrt(_covmat_mean), ) eighvals, eigvecs, eighvals_norm = eigendecomposition(_covmat_mean) # choose components to keep based on EVR n_comp = 1 for _ in range(eighvals.shape[0]): if np.sum(eighvals_norm[:n_comp]) >= explained_variance_ratio: break n_comp += 1 # get the principal components pc_basis = eigvecs[:, :n_comp] # compute the (PCA) regularized covariance matrix covmat_pca = pc_basis.T @ _covmat_mean @ pc_basis if n_comp == 1: return PCAInternalMulticlosureLoader( closures_th=closures_th, law_th=law_th, pc_basis=pc_basis, n_comp=1, covmat_pca=covmat_pca, sqrt_covmat_pca=np.sqrt(covmat_pca), ) # compute sqrt of pdf covariance matrix sqrt_covmat_pca = covmats.sqrt_covmat(covmat_pca) return PCAInternalMulticlosureLoader( closures_th=closures_th, law_th=law_th, pc_basis=pc_basis, n_comp=n_comp, covmat_pca=covmat_pca, sqrt_covmat_pca=sqrt_covmat_pca, )
[docs] @check_multifit_replicas def internal_multiclosure_data_loader_pca( internal_multiclosure_data_loader, explained_variance_ratio=0.99, _internal_max_reps=None, _internal_min_reps=20, ): """ Like multiclosure.internal_multiclosure_dataset_loader_pca except for all data Parameters ---------- internal_multiclosure_data_loader: tuple closure fits theory predictions, underlying law theory predictions, covariance matrix, sqrt covariance matrix explained_variance_ratio: float, default is 0.99 _internal_max_reps: int, default is None Maximum number of replicas used in the fits this is needed to check that the number of replicas is the same for all fits _internal_min_reps: int, default is 20 Minimum number of replicas used in the fits this is needed to check that the number of replicas is the same for all fits Returns ------- PCAInternalMulticlosureLoader """ closures_th, law_th, _, _ = internal_multiclosure_data_loader reps = np.asarray([th.error_members for th in closures_th]) # compute the covariance matrix of the theory predictions for each fit _covmats = np.array([np.cov(rep, rowvar=True, bias=True) for rep in reps]) # compute the mean covariance matrix _covmat_mean = np.mean(_covmats, axis=0) # Keep the sqrt of the diagonals to reconstruct the covmat later D = np.sqrt(np.diag(_covmat_mean)) # compute the correlation matrix _corrmat_mean = _covmat_mean / np.outer(D, D) # diagonalize the mean correlation matrix and only keep the principal components # that explain the required variance if _covmat_mean.shape == (): return PCAInternalMulticlosureLoader( closures_th=closures_th, law_th=law_th, pc_basis=1, n_comp=1, covmat_pca=_covmat_mean, sqrt_covmat_pca=np.sqrt(_covmat_mean), ) eighvals, eigvecs, eighvals_norm = eigendecomposition(_corrmat_mean) # choose components to keep based on EVR n_comp = 1 for _ in range(eighvals.shape[0]): if np.sum(eighvals_norm[:n_comp]) >= explained_variance_ratio: break n_comp += 1 # get the principal components pc_basis = eigvecs[:, :n_comp] # compute the (PCA) regularized covariance matrix by projecting the mean covariance matrix # onto the principal components covmat_pca = pc_basis.T @ _covmat_mean @ pc_basis if n_comp == 1: return PCAInternalMulticlosureLoader( closures_th=closures_th, law_th=law_th, pc_basis=pc_basis, n_comp=1, covmat_pca=covmat_pca, sqrt_covmat_pca=np.sqrt(covmat_pca), ) # compute sqrt of pdf covariance matrix sqrt_covmat_pca = covmats.sqrt_covmat(covmat_pca) return PCAInternalMulticlosureLoader( closures_th=closures_th, law_th=law_th, pc_basis=pc_basis, n_comp=n_comp, covmat_pca=covmat_pca, sqrt_covmat_pca=sqrt_covmat_pca, )
[docs] def bootstrapped_internal_multiclosure_dataset_loader_pca( internal_multiclosure_dataset_loader, n_fit_max, n_fit, n_rep_max, n_rep, n_boot_multiclosure, rng_seed_mct_boot, use_repeats=True, explained_variance_ratio=0.99, _internal_max_reps=None, _internal_min_reps=20, ): """ Similar to multiclosure.bootstrapped_internal_multiclosure_dataset_loader but returns PCA regularised covariance matrix, where the covariance matrix has been computed from the replicas of the theory predictions. """ # get bootstrapped internal multiclosure dataset loader bootstrap_imdl = bootstrapped_internal_multiclosure_dataset_loader( internal_multiclosure_dataset_loader, n_fit_max=n_fit_max, n_fit=n_fit, n_rep_max=n_rep_max, n_rep=n_rep, n_boot_multiclosure=n_boot_multiclosure, rng_seed_mct_boot=rng_seed_mct_boot, use_repeats=use_repeats, ) # PCA regularise all the bootstrapped internal multiclosure dataset loaders bootstrap_imdl_pca = [ internal_multiclosure_dataset_loader_pca( imdl, explained_variance_ratio, _internal_max_reps, _internal_min_reps ) for imdl in bootstrap_imdl ] return tuple(bootstrap_imdl_pca)
[docs] def bootstrapped_internal_multiclosure_data_loader_pca( internal_multiclosure_data_loader, n_fit_max, n_fit, n_rep_max, n_rep, n_boot_multiclosure, rng_seed_mct_boot, use_repeats=True, explained_variance_ratio=0.99, _internal_max_reps=None, _internal_min_reps=20, ): """ Same as bootstrapped_internal_multiclosure_dataset_loader_pca but for all the data. """ # get bootstrapped internal multiclosure dataset loader bootstrap_imdl = bootstrapped_internal_multiclosure_data_loader( internal_multiclosure_data_loader, n_fit_max=n_fit_max, n_fit=n_fit, n_rep_max=n_rep_max, n_rep=n_rep, n_boot_multiclosure=n_boot_multiclosure, rng_seed_mct_boot=rng_seed_mct_boot, use_repeats=use_repeats, ) # PCA regularise all the bootstrapped internal multiclosure dataset loaders bootstrap_imdl_pca = [ internal_multiclosure_data_loader_pca( imdl, explained_variance_ratio, _internal_max_reps, _internal_min_reps ) for imdl in bootstrap_imdl ] return tuple(bootstrap_imdl_pca)
[docs] def principal_components_bias_variance_dataset(internal_multiclosure_dataset_loader_pca): """ Compute the bias and variance for one dataset using the principal component reduced covariance matrix. Parameters ---------- internal_multiclosure_dataset_loader : tuple Tuple containing the results of multiclosure fits explained_variance_ratio : float, default is 0.99 3D tuple containing the principal components of the theory predictions Returns ------- tuple 3D tuple: - biases: 1-D array of shape (Nfits,) - variances: 1-D array of shape (Nfits, ) - n_comp: number of principal components kept """ pca_loader = internal_multiclosure_dataset_loader_pca reps = np.asarray([th.error_members for th in pca_loader.closures_th]) # compute bias diff and project it onto space spanned by PCs delta_bias = reps.mean(axis=2).T - pca_loader.law_th.central_value[:, np.newaxis] if pca_loader.n_comp == 1: delta_bias = pca_loader.pc_basis * delta_bias biases = (delta_bias / pca_loader.sqrt_covmat_pca) ** 2 variances = [] for i in range(reps.shape[0]): diffs = pca_loader.pc_basis * ( reps[i, :, :] - reps[i, :, :].mean(axis=1, keepdims=True) ) variances.append(np.mean((diffs / pca_loader.sqrt_covmat_pca) ** 2)) else: delta_bias = pca_loader.pc_basis.T @ delta_bias biases = calc_chi2(pca_loader.sqrt_covmat_pca, delta_bias) variances = [] for i in range(reps.shape[0]): diffs = pca_loader.pc_basis.T @ ( reps[i, :, :] - reps[i, :, :].mean(axis=1, keepdims=True) ) variances.append(np.mean(calc_chi2(pca_loader.sqrt_covmat_pca, diffs))) return biases, np.asarray(variances), pca_loader.n_comp
[docs] def principal_components_bias_variance_data(internal_multiclosure_data_loader_pca): """ Like principal_components_bias_variance_datasets but for all data Parameters ---------- internal_multiclosure_data_loader_pca : tuple Tuple containing the results of multiclosure fits after pca regularization Returns ------- tuple 3D tuple: - biases: 1-D array of shape (Nfits,) - variances: 1-D array of shape (Nfits, ) - n_comp: number of principal components kept """ pca_loader = internal_multiclosure_data_loader_pca reps = np.asarray([th.error_members for th in pca_loader.closures_th]) # compute bias diff and project it onto space spanned by PCs delta_bias = reps.mean(axis=2).T - pca_loader.law_th.central_value[:, np.newaxis] if pca_loader.n_comp == 1: delta_bias = pca_loader.pc_basis * delta_bias biases = (delta_bias / pca_loader.sqrt_covmat_pca) ** 2 variances = [] for i in range(reps.shape[0]): diffs = pca_loader.pc_basis * ( reps[i, :, :] - reps[i, :, :].mean(axis=1, keepdims=True) ) variances.append(np.mean((diffs / pca_loader.sqrt_covmat_pca) ** 2)) else: delta_bias = pca_loader.pc_basis.T @ delta_bias biases = calc_chi2(pca_loader.sqrt_covmat_pca, delta_bias) variances = [] for i in range(reps.shape[0]): diffs = pca_loader.pc_basis.T @ ( reps[i, :, :] - reps[i, :, :].mean(axis=1, keepdims=True) ) variances.append(np.mean(calc_chi2(pca_loader.sqrt_covmat_pca, diffs))) return biases, np.asarray(variances), pca_loader.n_comp
[docs] def principal_components_normalized_delta_data(internal_multiclosure_data_loader_pca): """ Compute for all data only the normalized delta after PCA regularization Parameters ---------- internal_multiclosure_data_loader_pca : tuple Tuple containing the results of multiclosure fits after pca regularization Returns ------- nd.array: deltas """ pca_loader = internal_multiclosure_data_loader_pca reps = np.asarray([th.error_members for th in pca_loader.closures_th]) # compute bias diff and project it onto space spanned by PCs delta_bias = reps.mean(axis=2).T - pca_loader.law_th.central_value[:, np.newaxis] # find basis that diagonalise covmat pca eigvals, eigenvects = np.linalg.eigh(pca_loader.covmat_pca) if pca_loader.n_comp == 1: delta_bias = pca_loader.pc_basis * delta_bias std_deviations = [] for i in range(reps.shape[0]): diffs = pca_loader.pc_basis * ( reps[i, :, :] - reps[i, :, :].mean(axis=1, keepdims=True) ) std_deviations.append(np.sqrt(np.mean((diffs / pca_loader.sqrt_covmat_pca) ** 2))) else: delta_bias = eigenvects.T @ (pca_loader.pc_basis.T @ delta_bias) std_deviations = np.sqrt(eigvals)[:, None] return (delta_bias / std_deviations).flatten(), pca_loader.n_comp
principal_components_bias_variance_datasets = collect( "principal_components_bias_variance_dataset", ("data",) )
[docs] def bootstrapped_principal_components_normalized_delta_data( bootstrapped_internal_multiclosure_data_loader_pca, ): """ Compute the normalized deltas for each bootstrap sample. Parameters ---------- bootstrapped_internal_multiclosure_data_loader_pca: list list of tuples containing the results of multiclosure fits after pca regularization Returns ------- list list of tuples containing the normalized deltas and the number of principal components. Each tuple corresponds to a bootstrap sample. """ normalised_deltas = [] for boot_imdl_pca in bootstrapped_internal_multiclosure_data_loader_pca: normalised_deltas.append(principal_components_normalized_delta_data(boot_imdl_pca)) return normalised_deltas
[docs] def bootstrapped_indicator_function_data( bootstrapped_principal_components_normalized_delta_data, nsigma=1 ): """ Compute the indicator function for each bootstrap sample. Parameters ---------- bootstrapped_principal_components_normalized_delta_data: list list of tuples containing the normalized deltas and the number of principal components. Each tuple corresponds to a bootstrap sample. nsigma: int, default is 1 Returns ------- 2-D tuple: list list of length N_boot and entrances are arrays of dim Npca x Nfits containing the indicator function for each bootstrap sample. float average number of degrees of freedom """ indicator_list = [] ndof_list = [] for boot, ndof in bootstrapped_principal_components_normalized_delta_data: indicator_list.append(standard_indicator_function(boot, nsigma)) ndof_list.append(ndof) return indicator_list, np.mean(np.asarray(ndof_list))
[docs] def bootstrapped_principal_components_bias_variance_dataset( bootstrapped_internal_multiclosure_dataset_loader_pca, dataset ): """ Computes Bias and Variance for each bootstrap sample. Returns a DataFrame with the results. """ boot_bias_var_samples = [] for i, boot_imdl_pca in enumerate(bootstrapped_internal_multiclosure_dataset_loader_pca): bias, var, n_comp = principal_components_bias_variance_dataset(boot_imdl_pca) boot_bias_var_samples.append( { "bias": np.mean(bias), "variance": np.mean(var), "n_comp": n_comp, "dataset": str(dataset), "bootstrap_index": i, } ) df = pd.DataFrame.from_records( boot_bias_var_samples, index="bootstrap_index", columns=("bootstrap_index", "dataset", "n_comp", "bias", "variance"), ) df.columns = ["dataset", "n_comp", "bias", "variance"] return df
bootstrapped_principal_components_bias_variance_datasets = collect( "bootstrapped_principal_components_bias_variance_dataset", ("data",) )
[docs] def bootstrapped_principal_components_bias_variance_data( bootstrapped_internal_multiclosure_data_loader_pca, ): """ Computes Bias and Variance for each bootstrap sample. Returns a DataFrame with the results. """ boot_bias_var_samples = [] for i, boot_imdl_pca in enumerate(bootstrapped_internal_multiclosure_data_loader_pca): bias, var, n_comp = principal_components_bias_variance_data(boot_imdl_pca) boot_bias_var_samples.append( { "bias": np.mean(bias), "variance": np.mean(var), "n_comp": n_comp, "data": "Full dataset", "bootstrap_index": i, } ) df = pd.DataFrame.from_records( boot_bias_var_samples, index="bootstrap_index", columns=("bootstrap_index", "dataset", "n_comp", "bias", "variance"), ) df.columns = ["dataset", "n_comp", "bias", "variance"] return df
[docs] @check_multifit_replicas def fits_dataset_bias_variance( internal_multiclosure_dataset_loader, _internal_max_reps=None, _internal_min_reps=20 ): """For a single dataset, calculate the bias and variance for each fit and return tuple (bias, variance, n_data), where bias and variance are 1-D arrays of length ``len(fits)``. For more information on bias see closuretest.bias_dataset and for more information on variance see :py:func:`validphys.closuretest.closure_results.variance_dataset`. The fits should each have the same underlying law and t0 PDF, but have different filterseeds, so that the level 1 shift is different. Can control the number of replicas taken from each fit with ``_internal_max_reps``. """ closures_th, law_th, _, sqrtcov = internal_multiclosure_dataset_loader # The dimentions here are (fit, data point, replica) reps = np.asarray([th.error_members[:, :_internal_max_reps] for th in closures_th]) # take mean across replicas - since we might have changed no. of reps centrals = reps.mean(axis=2) # place bins on first axis diffs = law_th.central_value[:, np.newaxis] - centrals.T biases = calc_chi2(sqrtcov, diffs) variances = [] # this seems slow but breaks for datasets with single data point otherwise for i in range(reps.shape[0]): diffs = reps[i, :, :] - reps[i, :, :].mean(axis=1, keepdims=True) variances.append(np.mean(calc_chi2(sqrtcov, diffs))) return biases, np.asarray(variances), len(law_th)
[docs] @check_multifit_replicas def fits_normed_dataset_central_delta( internal_multiclosure_dataset_loader, _internal_max_reps=None, _internal_min_reps=20 ): """ For each fit calculate the difference between central expectation value and true val. Normalize this value by the variance of the differences between replicas and central expectation value (different for each fit but expected to vary only a little). Each observable central exp value is expected to be gaussianly distributed around the true value set by the fakepdf. Parameters ---------- internal_multiclosure_dataset_loader: tuple closure fits theory predictions, underlying law theory predictions, covariance matrix, sqrt covariance matrix _internal_max_reps: int maximum number of replicas to use for each fit _internal_min_reps: int minimum number of replicas to use for each fit Returns ------- deltas: np.array 2-D array with shape (n_fits, n_obs) """ closures_th, law_th, _, _ = internal_multiclosure_dataset_loader # The dimentions here are (fit, data point, replica) reps = np.asarray([th.error_members[:, :_internal_max_reps] for th in closures_th]) # One could mask here some reps in order to avoid redundancy of information # TODO n_fits = np.shape(reps)[0] deltas = [] # There are n_fits pdf_covariances # flag to see whether to eliminate dataset for rep in reps: # bias diffs in the for loop should have shape (n_obs,) bias_diffs = np.mean(rep, axis=1) - law_th.central_value # sigmas has shape (n_obs, ) sigmas = np.sqrt(np.var(rep, axis=1)) delta = bias_diffs / sigmas deltas.append(delta.tolist()) return np.asarray(deltas)
fits_datasets_bias_variance = collect("fits_dataset_bias_variance", ("data",))
[docs] def expected_dataset_bias_variance(fits_dataset_bias_variance): """For a given dataset calculate the expected bias and variance across fits then return tuple (expected bias, expected variance, n_data) """ biases, variances, n_data = fits_dataset_bias_variance return np.mean(biases), np.mean(variances), n_data
[docs] @check_multifit_replicas def fits_data_bias_variance( internal_multiclosure_data_loader, _internal_max_reps=None, _internal_min_reps=20 ): """Like `fits_dataset_bias_variance` but for all data""" return fits_dataset_bias_variance( internal_multiclosure_data_loader, _internal_max_reps, _internal_min_reps )
[docs] def expected_data_bias_variance(fits_data_bias_variance): """Like `expected_dataset_bias_variance` except for all data""" return expected_dataset_bias_variance(fits_data_bias_variance)
fits_experiments_bias_variance = collect( "fits_data_bias_variance", ("group_dataset_inputs_by_experiment",) )
[docs] def fits_total_bias_variance(fits_experiments_bias_variance): """Like `fits_dataset_bias_variance` except for all data, assumes there are no inter-experiment correlations. That assumption is broken if a theory covariance matrix is used. """ bias_total, variance_total, n_total = np.sum(fits_experiments_bias_variance, axis=0) return bias_total, variance_total, n_total
datasets_expected_bias_variance = collect("expected_dataset_bias_variance", ("data",)) experiments_expected_bias_variance = collect( "expected_data_bias_variance", ("group_dataset_inputs_by_experiment",) )
[docs] def expected_total_bias_variance(fits_total_bias_variance): """Like `expected_dataset_bias_variance` except for all data""" return expected_dataset_bias_variance(fits_total_bias_variance)
[docs] def dataset_replica_and_central_diff(internal_multiclosure_dataset_loader, diagonal_basis=True): """For a given dataset calculate sigma, the RMS difference between replica predictions and central predictions, and delta, the difference between the central prediction and the underlying prediction. If ``diagonal_basis`` is ``True`` he differences are calculated in the basis which would diagonalise the dataset's covariance matrix. This is the default behaviour. """ closures_th, law_th, covmat, _ = internal_multiclosure_dataset_loader replicas = np.asarray([th.error_members for th in closures_th]) centrals = np.mean(replicas, axis=-1) underlying = law_th.central_value _, e_vec = la.eigh(covmat) central_diff = centrals - underlying[np.newaxis, :] var_diff_sqrt = centrals[:, :, np.newaxis] - replicas if diagonal_basis: # project into basis which diagonalises covariance matrix var_diff_sqrt = e_vec.T @ var_diff_sqrt.transpose(2, 1, 0) central_diff = e_vec.T @ central_diff.T else: var_diff_sqrt = var_diff_sqrt.transpose(2, 1, 0) central_diff = central_diff.T var_diff = var_diff_sqrt**2 sigma = np.sqrt(var_diff.mean(axis=0)) # sigma is always positive return sigma, central_diff
[docs] def dataset_xi(dataset_replica_and_central_diff): """Take sigma and delta for a dataset, where sigma is the RMS difference between replica predictions and central predictions, and delta is the difference between the central prediction and the underlying prediction. Then the indicator function is evaluated elementwise for sigma and delta :math:`I_{[-\sigma_j, \sigma_j]}(\delta_j)` which is 1 when :math:`|\delta_j| < \sigma_j` and 0 otherwise. Finally, take the mean across fits. Returns ------- xi_1sigma_i: np.array a 1-D array where each element is the value of xi_1sigma for that particular eigenvector. We note that the eigenvectors are ordered by ascending eigenvalues """ sigma, central_diff = dataset_replica_and_central_diff # sigma is always positive in_1_sigma = np.array(abs(central_diff) < sigma, dtype=int) # mean across fits return in_1_sigma.mean(axis=1)
[docs] def data_replica_and_central_diff(internal_multiclosure_data_loader, diagonal_basis=True): """Like ``dataset_replica_and_central_diff`` but for all data""" return dataset_replica_and_central_diff(internal_multiclosure_data_loader, diagonal_basis)
[docs] def data_xi(data_replica_and_central_diff): """Like dataset_xi but for all data""" return dataset_xi(data_replica_and_central_diff)
experiments_xi_measured = collect("data_xi", ("group_dataset_inputs_by_experiment",)) experiments_replica_central_diff = collect( "data_replica_and_central_diff", ("group_dataset_inputs_by_experiment",) )
[docs] @check_at_least_10_fits def n_fit_samples(fits): """Return a range object where each item is a number of fits to use for resampling a multiclosure quantity. It is determined by varying n_fits between 10 and number of fits provided by user in steps of 5. User must provide at least 10 fits. """ return list(range(10, len(fits) + SAMPLING_INTERVAL, SAMPLING_INTERVAL))
# NOTE: check_multifit_replicas can fill in _internal_max_reps and # _internal_min_reps if they are None which means by default the values are # filled but this value can be overridden in specific studies. Both keys # must be present in signature for the check to work
[docs] @check_multifit_replicas def n_replica_samples(fits_pdf, _internal_max_reps=None, _internal_min_reps=20): """Return a range object where each item is a number of replicas to use for resampling a multiclosure quantity. It is determined by varying n_reps between 20 and number of replicas that each provided closure fit has. All provided fits must have the same number of replicas and that number must be at least 20. The number of replicas used from each fit can be overridden by supplying _internal_max_reps. """ return list( range(_internal_min_reps, _internal_max_reps + SAMPLING_INTERVAL, SAMPLING_INTERVAL) )
[docs] class BootstrappedTheoryResult: """Proxy class which mimics results.ThPredictionsResult so that pre-existing bias/variance actions can be used with bootstrapped replicas """ def __init__(self, data): self.error_members = data self.central_value = data.mean(axis=1) self.rawdata = np.concatenate([self.central_value.reshape(-1, 1), data], axis=-1)
def _bootstrap_multiclosure_fits( internal_multiclosure_dataset_loader, rng, n_fit_max, n_fit, n_rep_max, n_rep, use_repeats ): """Perform a single bootstrap resample of the multiclosure fits and return a proxy of the base internal object used by relevant estimator actions with the fits and replicas resampled. If use_repeats is False then each fit and replica can only be chosen once and there are no repeated samples of either fit or replicas within each fit. The various n_fit* and n_rep* choices are for finite size effect studies. If you want to perform a simple bootstrap then simply set n_fit and n_fit_max to the number of closure fits (len(fits)) and n_rep and n_rep_max to the number of replicas in each of the closure tests. Returns ------- resampled_multiclosure: like internal_multiclosure_dataset_loader but with the fits and replicas resampled randomly using np.random.choice. See also: np.random.choice """ closure_th, *input_tuple = internal_multiclosure_dataset_loader fit_boot_index = rng.choice(n_fit_max, size=n_fit, replace=use_repeats) fit_boot_th = [closure_th[i] for i in fit_boot_index] boot_ths = [] # construct proxy fits theory predictions for fit_th in fit_boot_th: rep_boot_index = rng.choice(n_rep_max, size=n_rep, replace=use_repeats) boot_ths.append(BootstrappedTheoryResult(fit_th.error_members[:, rep_boot_index])) return (boot_ths, *input_tuple)
[docs] def bootstrapped_internal_multiclosure_dataset_loader( internal_multiclosure_dataset_loader, n_fit_max, n_fit, n_rep_max, n_rep, n_boot_multiclosure, rng_seed_mct_boot, use_repeats=True, ): """ Returns a tuple of internal_multiclosure_dataset_loader objects each of which is a bootstrap resample of the original dataset Parameters ---------- internal_multiclosure_dataset_loader: tuple closure fits theory predictions, underlying law theory predictions, covariance matrix, sqrt covariance matrix n_fit_max: int maximum number of fits, should be smaller or equal to number of multiclosure fits n_fit: int number of fits to draw for each resample n_rep_max: int maximum number of replicas, should be smaller or equal to number of replicas in each fit n_rep: int number of replicas to draw for each resample n_boot_multiclosure: int number of bootstrap resamples to perform rng_seed_mct_boot: int seed for random number generator use_repeats: bool, default is True whether to allow repeated fits and replicas in each resample Returns ------- resampled_multiclosure: tuple of shape (n_boot_multiclosure,) tuple of internal_multiclosure_dataset_loader objects each of which is a bootstrap resample of the original dataset """ rng = np.random.RandomState(seed=rng_seed_mct_boot) return tuple( [ _bootstrap_multiclosure_fits( internal_multiclosure_dataset_loader, rng=rng, n_fit_max=n_fit_max, n_fit=n_fit, n_rep_max=n_rep_max, n_rep=n_rep, use_repeats=use_repeats, ) for _ in range(n_boot_multiclosure) ] )
[docs] def bootstrapped_internal_multiclosure_data_loader( internal_multiclosure_data_loader, n_fit_max, n_fit, n_rep_max, n_rep, n_boot_multiclosure, rng_seed_mct_boot, use_repeats=True, ): """Like bootstrapped_internal_multiclosure_dataset_loader except for all data""" return bootstrapped_internal_multiclosure_dataset_loader( internal_multiclosure_data_loader, n_fit_max, n_fit, n_rep_max, n_rep, n_boot_multiclosure, rng_seed_mct_boot, use_repeats, )
[docs] def bias_variance_resampling_dataset( internal_multiclosure_dataset_loader, n_fit_samples, n_replica_samples, bootstrap_samples=100, boot_seed=DEFAULT_SEED, use_repeats=True, ): """For a single dataset, create bootstrap distributions of bias and variance varying the number of fits and replicas drawn for each resample. Return two 3-D arrays with dimensions (number of n_rep samples, number of n_fit samples, n_boot) filled with resampled bias and variance respectively. The number of bootstrap_samples is 100 by default. The number of n_rep samples is determined by varying n_rep between 10 and the number of replicas each fit has in intervals of 5. This action requires that each fit has the same number of replicas which also must be at least 10. The number of n_fit samples is determined analogously to the number of n_rep samples, also requiring at least 10 fits. Returns ------- resamples: tuple tuple of two 3-D arrays with resampled bias and variance respectively for each n_rep samples and each n_fit samples Notes ----- The bootstrap samples are seeded in this function. If this action is collected over multiple datasets then the set of resamples all used corresponding replicas and fits. """ # seed same rng so we can aggregate results across datasets rng = np.random.RandomState(seed=boot_seed) bias_sample = [] variance_sample = [] for n_rep_sample in n_replica_samples: # results varying n_fit_sample fixed_n_rep_bias = [] fixed_n_rep_variance = [] for n_fit_sample in n_fit_samples: # for each n_fit and n_replica sample store result of each boot resample bias_boot = [] variance_boot = [] for _ in range(bootstrap_samples): boot_internal_loader = _bootstrap_multiclosure_fits( internal_multiclosure_dataset_loader, rng, n_fit_samples[-1], n_fit_sample, n_replica_samples[-1], n_rep_sample, use_repeats, ) # explicitly pass n_rep to fits_dataset_bias_variance so it uses # full subsample bias, variance, _ = expected_dataset_bias_variance( fits_dataset_bias_variance(boot_internal_loader, n_rep_sample) ) bias_boot.append(bias) variance_boot.append(variance) fixed_n_rep_bias.append(bias_boot) fixed_n_rep_variance.append(variance_boot) bias_sample.append(fixed_n_rep_bias) variance_sample.append(fixed_n_rep_variance) return np.array(bias_sample), np.array(variance_sample)
[docs] def bias_variance_resampling_data( internal_multiclosure_data_loader, n_fit_samples, n_replica_samples, bootstrap_samples=100, boot_seed=DEFAULT_SEED, use_repeats=True, ): """Like ratio_n_dependence_dataset except for all data. Notes ----- The bootstrap samples are seeded in this function. If this action is collected over multiple experiments then the set of resamples all used corresponding fits/replicas and can be added together. """ return bias_variance_resampling_dataset( internal_multiclosure_data_loader, n_fit_samples, n_replica_samples, bootstrap_samples, boot_seed=boot_seed, use_repeats=use_repeats, )
exps_bias_var_resample = collect( "bias_variance_resampling_data", ("group_dataset_inputs_by_experiment",) )
[docs] def bias_variance_resampling_total(exps_bias_var_resample): """Sum the bias_variance_resampling_data for all experiments, giving the total bias and variance resamples. This relies on the bootstrap seed being the same for all experiments, such that the fits/replicas are the same, and there being no inter-experiment correlations. """ bias_total, var_total = np.sum(exps_bias_var_resample, axis=0) return bias_total, var_total
[docs] def xi_resampling_dataset( internal_multiclosure_dataset_loader, n_fit_samples, n_replica_samples, bootstrap_samples=100, boot_seed=DEFAULT_SEED, use_repeats=True, ): """For a single dataset, create bootstrap distributions of xi_1sigma varying the number of fits and replicas drawn for each resample. Return a 4-D array with dimensions (number of n_rep samples, number of n_fit samples, n_boot, n_data) filled with resampled bias and variance respectively. The number of bootstrap_samples is 100 by default. The number of n_rep samples is determined by varying n_rep between 10 and the number of replicas each fit has in intervals of 5. This action requires that each fit has the same number of replicas which also must be at least 10. The number of n_fit samples is determined analogously to the number of n_rep samples, also requiring at least 10 fits. Returns ------- resamples: array 4-D array with resampled xi for each n_rep samples and each n_fit samples Notes ----- The bootstrap samples are seeded in this function. If this action is collected over multiple datasets then the set of resamples all used corresponding replicas. """ # seed same rng so we can aggregate results rng = np.random.RandomState(seed=boot_seed) xi_1sigma = [] for n_rep_sample in n_replica_samples: # results varying n_fit_sample fixed_n_rep_xi_1sigma = [] for n_fit_sample in n_fit_samples: # for each n_fit and n_replica sample store result of each boot resample xi_1sigma_boot = [] for _ in range(bootstrap_samples): boot_internal_loader = _bootstrap_multiclosure_fits( internal_multiclosure_dataset_loader, rng, n_fit_samples[-1], n_fit_sample, n_replica_samples[-1], n_rep_sample, use_repeats, ) # append the 1d array for individual eigenvectors xi_1sigma_boot.append( dataset_xi(dataset_replica_and_central_diff(boot_internal_loader)) ) fixed_n_rep_xi_1sigma.append(xi_1sigma_boot) xi_1sigma.append(fixed_n_rep_xi_1sigma) return np.array(xi_1sigma)
[docs] def xi_resampling_data( internal_multiclosure_data_loader, n_fit_samples, n_replica_samples, bootstrap_samples=100, boot_seed=DEFAULT_SEED, use_repeats=True, ): """Like xi_resampling_dataset except for all data. Notes ----- The bootstrap samples are seeded in this function. If this action is collected over multiple experiments then the set of resamples all used corresponding replicas and can be added together. """ return xi_resampling_dataset( internal_multiclosure_data_loader, n_fit_samples, n_replica_samples, bootstrap_samples, boot_seed=boot_seed, use_repeats=use_repeats, )
exps_xi_resample = collect("xi_resampling_data", ("group_dataset_inputs_by_experiment",))
[docs] def total_xi_resample(exps_xi_resample): """Concatenate the xi for each datapoint for all data""" return np.concatenate(exps_xi_resample, axis=-1)
[docs] def total_expected_xi_resample(bias_variance_resampling_total): """Using the bias and variance resample, return a resample of expected xi using the method outlined in :py:func:`validphys.closuretest.multiclosure_output.expected_xi_from_bias_variance`. The general concept is based on assuming all of the distributions are gaussians and using the ratio of bias/variance to predict the corresponding integral. To see a more in depth explanation, see :py:func:`validphys.closuretest.multiclosure_output.expected_xi_from_bias_variance`. """ bias_total, var_total = bias_variance_resampling_total sqrt_bias_var = np.sqrt(bias_total / var_total) n_sigma_in_variance = 1 / sqrt_bias_var # pylint can't find erf here, disable error in this function # pylint: disable=no-member return special.erf(n_sigma_in_variance / np.sqrt(2))
[docs] @check_multifit_replicas def fits_bootstrap_data_bias_variance( internal_multiclosure_data_loader, fits, _internal_max_reps=None, _internal_min_reps=20, bootstrap_samples=100, boot_seed=DEFAULT_SEED, ): """Perform bootstrap resample of `fits_data_bias_variance`, returns tuple of bias_samples, variance_samples where each element is a 1-D np.array of length bootstrap_samples. The elements of the arrays are bootstrap samples of bias and variance respectively. """ # seed same rng so we can aggregate results rng = np.random.RandomState(seed=boot_seed) bias_boot = [] variance_boot = [] for _ in range(bootstrap_samples): # use all fits. Use all replicas by default. Allow repeats in resample. boot_internal_loader = _bootstrap_multiclosure_fits( internal_multiclosure_data_loader, rng, len(fits), len(fits), _internal_max_reps, _internal_max_reps, True, ) # explicitly pass n_rep to fits_dataset_bias_variance so it uses # full subsample bias, variance, _ = expected_dataset_bias_variance( fits_dataset_bias_variance(boot_internal_loader, _internal_max_reps, _internal_min_reps) ) bias_boot.append(bias) variance_boot.append(variance) return np.array(bias_boot), np.array(variance_boot)
experiments_bootstrap_bias_variance = collect( "fits_bootstrap_data_bias_variance", ("group_dataset_inputs_by_experiment",) )
[docs] def total_bootstrap_ratio(experiments_bootstrap_bias_variance): """Calculate the total bootstrap ratio for all data. Leverages the fact that the covariance matrix is block diagonal in experiments so Total ratio = sum(bias) / sum(variance) Which is valid provided there are no inter-experimental correlations. Returns ------- bias_var_total: tuple tuple of the total bias and variance """ bias_tot, var_tot = np.sum(experiments_bootstrap_bias_variance, axis=0) return bias_tot, var_tot
[docs] def experiments_bootstrap_ratio(experiments_bootstrap_bias_variance, total_bootstrap_ratio): """Returns a bootstrap resampling of the ratio of bias/variance for each experiment and total. Total is calculated as sum(bias)/sum(variance) where each sum refers to the sum across experiments. Returns ------- ratios_resampled: list list of bootstrap samples of ratio of bias/var, length of list is len(experiments) + 1 because the final element is the total ratio resampled. """ ratios = [bias / var for bias, var in experiments_bootstrap_bias_variance] bias_tot, var_tot = total_bootstrap_ratio ratios.append(bias_tot / var_tot) return ratios
[docs] def experiments_bootstrap_sqrt_ratio(experiments_bootstrap_ratio): """Square root of experiments_bootstrap_ratio""" return np.sqrt(experiments_bootstrap_ratio)
[docs] def experiments_bootstrap_expected_xi(experiments_bootstrap_sqrt_ratio): """Calculate a bootstrap resampling of the expected xi from ``experiments_bootstrap_sqrt_ratio``, using the same formula as :py:func:`validphys.closuretest.multiclosure_output.expected_xi_from_bias_variance`. """ n_sigma_in_variance = 1 / experiments_bootstrap_sqrt_ratio # pylint can't find erf here, disable error in this function # pylint: disable=no-member estimated_integral = special.erf(n_sigma_in_variance / np.sqrt(2)) return estimated_integral
groups_bootstrap_bias_variance = collect( "fits_bootstrap_data_bias_variance", ("group_dataset_inputs_by_metadata",) )
[docs] def groups_bootstrap_ratio(groups_bootstrap_bias_variance, total_bootstrap_ratio): """Like :py:func:`experiments_bootstrap_ratio` but for metadata groups.""" return experiments_bootstrap_ratio(groups_bootstrap_bias_variance, total_bootstrap_ratio)
[docs] def groups_bootstrap_sqrt_ratio(groups_bootstrap_ratio): """Like :py:func:`experiments_bootstrap_sqrt_ratio` but for metadata groups.""" return experiments_bootstrap_sqrt_ratio(groups_bootstrap_ratio)
[docs] def groups_bootstrap_expected_xi(groups_bootstrap_sqrt_ratio): """Like :py:func:`experiments_bootstrap_expected_xi` but for metadata groups.""" return experiments_bootstrap_expected_xi(groups_bootstrap_sqrt_ratio)
[docs] @check_multifit_replicas def fits_bootstrap_data_xi( internal_multiclosure_data_loader, fits, _internal_max_reps=None, _internal_min_reps=20, bootstrap_samples=100, boot_seed=DEFAULT_SEED, ): """Perform bootstrap resample of ``data_xi``, returns a list where each element is an independent resampling of ``data_xi``. For more information on bootstrapping see _bootstrap_multiclosure_fits. For more information on xi see dataset_xi. """ # seed same rng so we can aggregate results rng = np.random.RandomState(seed=boot_seed) xi_1sigma_boot = [] for _ in range(bootstrap_samples): # use all fits. Use all replicas by default. Allow repeats in resample. boot_internal_loader = _bootstrap_multiclosure_fits( internal_multiclosure_data_loader, rng, len(fits), len(fits), _internal_max_reps, _internal_max_reps, True, ) xi_1sigma_boot.append(dataset_xi(dataset_replica_and_central_diff(boot_internal_loader))) return xi_1sigma_boot
experiments_bootstrap_xi = collect( "fits_bootstrap_data_xi", ("group_dataset_inputs_by_experiment",) )
[docs] def total_bootstrap_xi(experiments_bootstrap_xi): """Given the bootstrap samples of xi_1sigma for all experiments, concatenate the result to get xi_1sigma for all data points in a single array """ return np.concatenate(experiments_bootstrap_xi, axis=1)
groups_bootstrap_xi = collect("fits_bootstrap_data_xi", ("group_dataset_inputs_by_metadata",))
[docs] def dataset_fits_bias_replicas_variance_samples( internal_multiclosure_dataset_loader, _internal_max_reps=None, _internal_min_reps=20 ): """For a single dataset, calculate the samples of chi2-quantities which are used to calculate the bias and variance for each fit. The output of this function is similar to :py:func:`fits_dataset_bias_variance` except that the mean is not taken across replicas when calculating the mean squared difference between replica predictions and central predictions and instead the results are concatenated. The mean of this array would be the expected value of the variance across fits. Return tuple (fits_bias, fits_replica_variance, n_data), where fits_bias is 1-D array of length N_fits and fits_replica_variance is 1-D array length N_fits * N_replicas. For more information on bias see closuretest.bias_dataset and for more information on variance see :py:func:`validphys.closuretest.closure_results.variance_dataset`. The fits should each have the same underlying law and t0 PDF, but have different filterseeds, so that the level 1 shift is different. Can control the number of replicas taken from each fit with ``_internal_max_reps``. """ closures_th, law_th, _, sqrtcov = internal_multiclosure_dataset_loader # The dimentions here are (fit, data point, replica) reps = np.asarray([th.error_members[:, :_internal_max_reps] for th in closures_th]) # take mean across replicas - since we might have changed no. of reps centrals = reps.mean(axis=2) # place bins on first axis diffs = law_th.central_value[:, np.newaxis] - centrals.T biases = calc_chi2(sqrtcov, diffs) variances = [] # this seems slow but breaks for datasets with single data point otherwise for i in range(reps.shape[0]): diffs = reps[i, :, :] - reps[i, :, :].mean(axis=1, keepdims=True) variances.append(calc_chi2(sqrtcov, diffs)) return biases, np.concatenate(variances), len(law_th)
[docs] def dataset_inputs_fits_bias_replicas_variance_samples( internal_multiclosure_data_loader, _internal_max_reps=None, _internal_min_reps=20 ): return dataset_fits_bias_replicas_variance_samples( internal_multiclosure_data_loader, _internal_max_reps=None, _internal_min_reps=20 )
experiments_fits_bias_replicas_variance_samples = collect( "dataset_inputs_fits_bias_replicas_variance_samples", ("group_dataset_inputs_by_experiment",) )
[docs] def xq2_dataset_map( xq2map_with_cuts, internal_multiclosure_dataset_loader, _internal_max_reps=None, _internal_min_reps=20, ): """ Load in a dictionary all the specs of a dataset meaning: - ds name - ds coords - standard deviation (in multiclosure) - mean (in multiclosure again) - (x,Q^2) coords """ commondata = xq2map_with_cuts.commondata coords = xq2map_with_cuts[2] central_deltas = fits_normed_dataset_central_delta(internal_multiclosure_dataset_loader) # std_devs = np.std(central_deltas, axis = 0) std_devs = np.sqrt(np.mean(central_deltas**2, axis=0)) means = np.mean(central_deltas, axis=0) xi = dataset_xi(dataset_replica_and_central_diff(internal_multiclosure_dataset_loader, False)) # for the case of double-hadronic observables we have 2 (x,Q) for each experimental point if coords[0].shape[0] != std_devs.shape[0]: std_devs = np.concatenate((std_devs, std_devs)) means = np.concatenate((means, means)) xi = np.concatenate((xi, xi)) return { 'x_coords': coords[0], 'Q_coords': coords[1], 'std_devs': std_devs, 'name': commondata.name, 'process': commondata.process_type, 'means': means, 'xi': xi, }
xq2_data_map = collect("xq2_dataset_map", ("data",))
[docs] def standard_indicator_function(standard_variable, nsigma=1): """ Calculate the indicator function for a standardised variable. Parameters ---------- standard_variable: np.array array of variables that have been standardised: (x - mu)/sigma nsigma: float number of standard deviations to consider Returns ------- np.array array of ones and zeros. If 1 then the variable is within nsigma standard deviations from the mean, otherwise it is 0. """ return np.array(abs(standard_variable) < nsigma, dtype=int)