Source code for validphys.closuretest.closure_results

"""
closuretest/results.py

underlying actions to calculate closure test estimators plus some table actions
"""
from collections import namedtuple

import numpy as np
import pandas as pd

from reportengine import collect
from reportengine.table import table
from validphys.calcutils import bootstrap_values, calc_chi2
from validphys.checks import check_pdf_is_montecarlo
from validphys.closuretest.closure_checks import (
    check_fit_isclosure,
    check_fits_areclosures,
    check_fits_same_filterseed,
    check_fits_underlying_law_match,
    check_use_fitcommondata,
)

BiasData = namedtuple("BiasData", ("bias", "ndata"))

underlying_results = collect("results", ("fitunderlyinglaw",))


[docs]@check_fit_isclosure @check_use_fitcommondata def bias_dataset(results, underlying_results, fit, use_fitcommondata): """Calculate the bias for a given dataset and fit. The bias is defined as chi2 between the prediction from the underlying PDF (which was used to generate the closure pseudodata), also known as level zero closure data, and the central prediction calculated from the fitted PDF. we require that use_fitcommondata is true because the generated closure data is used to generate the multiplicative contributions to the covariance matrix """ dt_ct, th_ct = results # does collect need to collect a list even with one element? ((_, th_ul),) = underlying_results central_diff = th_ct.central_value - th_ul.central_value bias_out = calc_chi2(dt_ct.sqrtcovmat, central_diff) # unnormalised return BiasData(bias_out, len(th_ct))
underlying_dataset_inputs_results = collect("dataset_inputs_results", ("fitunderlyinglaw",))
[docs]@check_fit_isclosure @check_use_fitcommondata def bias_experiment( dataset_inputs_results, underlying_dataset_inputs_results, fit, use_fitcommondata ): """Like `bias_dataset` but for a whole experiment.""" return bias_dataset( dataset_inputs_results, underlying_dataset_inputs_results, fit, use_fitcommondata, )
experiments_bias = collect("bias_experiment", ("group_dataset_inputs_by_experiment",)) fits_experiments_bias = collect("experiments_bias", ("fits", "fitcontext")) fits_experiments = collect("experiments_data", ("fits", "fitcontext"))
[docs]@table @check_fits_same_filterseed @check_fits_underlying_law_match def biases_table(fits_experiments, fits_experiments_bias, fits, show_total: bool = False): """Creates a table with fits as the columns and the experiments from both fits as the row index. """ col = ["bias"] dfs = [] for fit, experiments, biases in zip(fits, fits_experiments, fits_experiments_bias): total = 0 total_points = 0 records = [] for biasres, experiment in zip(biases, experiments): records.append( dict( experiment=str(experiment), bias=biasres.bias / biasres.ndata, # normalised bias ) ) if show_total: total += biasres.bias total_points += biasres.ndata if show_total: total /= total_points records.append(dict(experiment="Total", bias=total)) df = pd.DataFrame.from_records( records, columns=("experiment", "bias"), index=("experiment") ) df.columns = pd.MultiIndex.from_product(([str(fit)], col)) dfs.append(df) return pd.concat(dfs, axis=1, sort=True)
[docs]@check_pdf_is_montecarlo @check_fit_isclosure def bootstrap_bias_experiment( dataset_inputs_results, underlying_dataset_inputs_results, bootstrap_samples=500 ): """Calculates bias as per `bias_experiment` but performs bootstrap sample across replicas. note that bias_experiment returns a named tuple like (unnormalised_bias, ndata) whereas this actions simply returns an array `boostrap_bias` with length bootstrap_samples. Each element of returned array is bias/n_data (bias normalised by number of datapoints) """ dt_ct, th_ct = dataset_inputs_results ((_, th_ul),) = underlying_dataset_inputs_results th_ct_boot_cv = bootstrap_values(th_ct.error_members, bootstrap_samples) boot_diffs = th_ct_boot_cv - th_ul.central_value[:, np.newaxis] boot_bias = calc_chi2(dt_ct.sqrtcovmat, boot_diffs) / len(dt_ct) return boot_bias
experiments_bootstrap_bias = collect( "bootstrap_bias_experiment", ("group_dataset_inputs_by_experiment",) ) fits_experiments_bootstrap_bias = collect("experiments_bootstrap_bias", ("fits", "fitcontext"))
[docs]@table @check_use_fitcommondata @check_fits_areclosures @check_fits_same_filterseed @check_fits_underlying_law_match def fits_bootstrap_bias_table( fits_experiments_bootstrap_bias, fits_name_with_covmat_label, fits_experiments, fits, use_fitcommondata, ): """Produce a table with bias for each experiment for each fit, along with variance calculated from doing a bootstrap sample """ col = ["bias", "bias std. dev."] dfs = [] for fit, experiments, biases in zip( fits_name_with_covmat_label, fits_experiments, fits_experiments_bootstrap_bias ): records = [] for biasres, experiment in zip(biases, experiments): records.append( dict( experiment=str(experiment), bias=biasres.mean(), bias_std=biasres.std(), ) ) df = pd.DataFrame.from_records( records, columns=("experiment", "bias", "bias_std"), index=("experiment") ) df.columns = pd.MultiIndex.from_product(([str(fit)], col)) dfs.append(df) return pd.concat(dfs, axis=1, sort=True)
VarianceData = namedtuple("VarianceData", ("variance", "ndata"))
[docs]@check_fit_isclosure @check_use_fitcommondata def variance_dataset(results, fit, use_fitcommondata): """calculate the variance for a given dataset, which is the spread of replicas measured in the space of the covariance matrix. Given by: E_rep [ (T - E_rep[T])_i C^{-1}_ij (T - E_rep[T])_j ] where E_rep is the expectation value across replicas. The quantity is the same as squaring `phi_data`, however it is redefined here in a way which can be made fully independent of the closure data. This is useful when checking the variance of data which was not included in the fit. """ dt, th = results diff = th.central_value[:, np.newaxis] - th.error_members var_unnorm = calc_chi2(dt.sqrtcovmat, diff).mean() return VarianceData(var_unnorm, len(th))
[docs]@check_fit_isclosure @check_use_fitcommondata def variance_experiment(dataset_inputs_results, fit, use_fitcommondata): """Like variance_dataset but for a whole experiment""" return variance_dataset(dataset_inputs_results, fit, use_fitcommondata)
[docs]@check_fit_isclosure def bootstrap_variance_experiment(dataset_inputs_results, bootstrap_samples=500): """Calculate the variance as in `variance_experiment` but performs bootstrap sample of the estimator. Returns an array of variance for each resample, normalised to the number of data in the experiment. """ dt_ct, th_ct = dataset_inputs_results diff = th_ct.central_value[:, np.newaxis] - th_ct.error_members var_unnorm_boot = bootstrap_values( diff, bootstrap_samples, apply_func=(lambda x, y: calc_chi2(y, x)), args=[dt_ct.sqrtcovmat], ).mean( axis=0 ) # mean across replicas return var_unnorm_boot / len(th_ct) # normalise by n_data
experiments_boostrap_variance = collect( "bootstrap_variance_experiment", ("group_dataset_inputs_by_experiment",) ) fits_exps_bootstrap_var = collect("experiments_boostrap_variance", ("fits", "fitcontext"))
[docs]@table @check_use_fitcommondata @check_fits_areclosures @check_fits_same_filterseed @check_fits_underlying_law_match def fits_bootstrap_variance_table( fits_exps_bootstrap_var, fits_name_with_covmat_label, fits_experiments, fits, use_fitcommondata, ): """Produce a table with variance and its error. Variance is defined as var = sum_ij E_rep[(E_rep[T_i], T_i) invcov_ij (E_rep[T_j], T_j)] / N_data which is the expectation value across replicas of the chi2 between the central theory predictions and replica theory predictions. It is the same as phi^2 and gives the variance of the theory predictions in units of the covariance matrix, normalised by the number of data points. The error is the standard deviation across bootstrap samples. """ col = ["variance", "variance std. dev."] dfs = [] for fit, experiments, fit_vars in zip( fits_name_with_covmat_label, fits_experiments, fits_exps_bootstrap_var ): records = [] for var_res, experiment in zip(fit_vars, experiments): records.append( dict( experiment=str(experiment), variance=var_res.mean(), variance_std=var_res.std(), ) ) df = pd.DataFrame.from_records( records, columns=("experiment", "variance", "variance_std"), index=("experiment"), ) df.columns = pd.MultiIndex.from_product(([str(fit)], col)) dfs.append(df) return pd.concat(dfs, axis=1, sort=True)
experiments_bootstrap_chi2_central = collect( "dataset_inputs_bootstrap_chi2_central", ("group_dataset_inputs_by_experiment",) ) fits_exps_bootstrap_chi2_central = collect( "experiments_bootstrap_chi2_central", ("fits", "fitcontext") ) fits_level_1_noise = collect("total_chi2_data", ("fits", "fitinputcontext", "fitunderlyinglaw"))
[docs]@check_use_fitcommondata @check_fits_areclosures @check_fits_same_filterseed @check_fits_underlying_law_match def delta_chi2_bootstrap( fits_level_1_noise, fits_exps_bootstrap_chi2_central, fits, use_fitcommondata ): """Bootstraps delta chi2 for specified fits. Delta chi2 measures whether the level one data is fitted better by the underlying law or the specified fit, it is a measure of overfitting. delta chi2 = (chi2(T[<f>], D_1) - chi2(T[f_in], D_1))/chi2(T[f_in], D_1) where T[<f>] is central theory prediction from fit, T[f_in] is theory prediction from t0 pdf (input) and D_1 is level 1 closure data Exact details on delta chi2 can be found in 1410.8849 eq (28). """ closure_total_chi2_boot = np.sum(fits_exps_bootstrap_chi2_central, axis=1) t0_pseudodata_chi2 = np.array([chi2.central_result for chi2 in fits_level_1_noise]) deltachi2boot = ( closure_total_chi2_boot - t0_pseudodata_chi2[:, np.newaxis] ) / t0_pseudodata_chi2[:, np.newaxis] return deltachi2boot
# Note that these collect over the experiments as specified in fit in case of # TEST set fits_exps_level_1_noise = collect( "experiments_chi2_data", ("fits", "fitinputcontext", "fitunderlyinglaw") ) fits_exps_chi2 = collect("experiments_chi2_data", ("fits", "fitcontext"))
[docs]@table @check_use_fitcommondata @check_fits_areclosures @check_fits_same_filterseed @check_fits_underlying_law_match def delta_chi2_table( fits_exps_chi2, fits_exps_level_1_noise, fits_name_with_covmat_label, fits_experiments, fits, use_fitcommondata, ): """Calculated delta chi2 per experiment and put in table Here delta chi2 is just normalised by ndata and is equal to delta_chi2 = (chi2(T[<f>], D_1) - chi2(T[f_in], D_1))/ndata """ dfs = [] cols = ("ndata", r"$\Delta_{chi^2}$ (normalised by ndata)") for label, experiments, exps_chi2, exps_level_1_noise in zip( fits_name_with_covmat_label, fits_experiments, fits_exps_chi2, fits_exps_level_1_noise, ): records = [] for experiment, exp_chi2, level_1_noise in zip(experiments, exps_chi2, exps_level_1_noise): delta_chi2 = (exp_chi2.central_result - level_1_noise.central_result) / exp_chi2.ndata npoints = exp_chi2.ndata records.append(dict(experiment=str(experiment), npoints=npoints, delta_chi2=delta_chi2)) df = pd.DataFrame.from_records( records, columns=("experiment", "npoints", "delta_chi2"), index=("experiment",), ) df.columns = pd.MultiIndex.from_product(([label], cols)) dfs.append(df) res = pd.concat(dfs, axis=1) return res
[docs]@check_fit_isclosure def fit_underlying_pdfs_summary(fit, fitunderlyinglaw): """Returns a table with a single column for the `fit` with a row indication the PDF used to generate the data and the t0 pdf """ t0name = fit.as_input()["datacuts"]["t0pdfset"] df = pd.DataFrame( [str(fitunderlyinglaw["pdf"]), t0name], columns=[fit.label], index=["underlying PDF", "t0 PDF"], ) return df
fits_underlying_pdfs_summary = collect("fit_underlying_pdfs_summary", ("fits",))
[docs]@table def summarise_closure_underlying_pdfs(fits_underlying_pdfs_summary): """Collects the underlying pdfs for all fits and concatenates them into a single table""" return pd.concat(fits_underlying_pdfs_summary, axis=1)