Source code for validphys.theorycovariance.construction

"""
construction.py
Tools for constructing theory covariance matrices and computing their chi2s.
"""

from collections import defaultdict, namedtuple
import logging

import numpy as np
import pandas as pd

from reportengine import collect
from reportengine.table import table
from validphys.checks import check_using_theory_covmat
from validphys.results import results, results_central
from validphys.theorycovariance.theorycovarianceutils import (
    check_correct_theory_combination,
    check_fit_dataset_order_matches_grouped,
    process_lookup,
)

log = logging.getLogger(__name__)

results_central_bytheoryids = collect(results_central, ("theoryids",))
each_dataset_results_central_bytheory = collect("results_central_bytheoryids", ("data",))


[docs] @check_using_theory_covmat def theory_covmat_dataset( results, results_central_bytheoryids, point_prescription, use_theorycovmat # for the check ): """ Compute the theory covmat for a collection of theoryids for a single dataset. In general this will come from some point prescription and it could be guessed from the input however, it takes as input all relevant variables for generality """ _, theory_results = zip(*results_central_bytheoryids) _, central_th_result = results # Remove the central theory from the list if it was included theory_results = [i for i in theory_results if i._theoryid != central_th_result._theoryid] cv = central_th_result.central_value # Compute the theory contribution to the covmats deltas = list(t.central_value - cv for t in theory_results) thcovmat = compute_covs_pt_prescrip(point_prescription, "A", deltas) return thcovmat
ProcessInfo = namedtuple("ProcessInfo", ("preds", "namelist", "sizes"))
[docs] def combine_by_type(each_dataset_results_central_bytheory): """Groups the datasets bu process and returns an instance of the ProcessInfo class Parameters ---------- each_dataset_results_central_bytheory: list[list[(DataResult,ThPredictionsResult)]] Tuples of DataResult and ThPredictionsResult (where only the second is used for the construction of the theory covariance matrix), wrapped in a list such that there is a tuple per theoryid, wrapped in another list per dataset. Returns ------- :ProcesInfo :py:class:`validphys.theorycovariance.construction.ProcessInfo` Class with info needed to construct the theory covmat. """ dataset_size = defaultdict(list) theories_by_process = defaultdict(list) ordered_names = defaultdict(list) for dataset in each_dataset_results_central_bytheory: name = dataset[0][0].name theory_centrals = [x[1].central_value for x in dataset] dataset_size[name] = len(theory_centrals[0]) proc_type = process_lookup(name) ordered_names[proc_type].append(name) theories_by_process[proc_type].append(theory_centrals) for key, item in theories_by_process.items(): theories_by_process[key] = np.concatenate(item, axis=1) process_info = ProcessInfo( preds=theories_by_process, namelist=ordered_names, sizes=dataset_size ) return process_info
[docs] def covmat_3fpt(deltas1, deltas2): """Returns theory covariance sub-matrix for 3pt factorisation scale variation *only*, given two dataset names and collections of scale variation shifts""" s = 0.5 * (np.outer(deltas1[0], deltas2[0]) + np.outer(deltas1[1], deltas2[1])) return s
[docs] def covmat_3pt(name1, name2, deltas1, deltas2): """Returns theory covariance sub-matrix for 3pt prescription, given two dataset names and collections of scale variation shifts""" if name1 == name2: s = 0.5 * sum(np.outer(d, d) for d in deltas1) else: s = 0.25 * (np.outer((deltas1[0] + deltas1[1]), (deltas2[0] + deltas2[1]))) return s
[docs] def covmat_5pt(name1, name2, deltas1, deltas2): """Returns theory covariance sub-matrix for 5pt prescription, given two dataset names and collections of scale variation shifts""" if name1 == name2: s = 0.5 * sum(np.outer(d, d) for d in deltas1) else: s = 0.5 * (np.outer(deltas1[0], deltas2[0]) + np.outer(deltas1[1], deltas2[1])) + 0.25 * ( np.outer((deltas1[2] + deltas1[3]), (deltas2[2] + deltas2[3])) ) return s
[docs] def covmat_5barpt(name1, name2, deltas1, deltas2): """Returns theory covariance sub-matrix for 5barpt prescription, given two dataset names and collections of scale variation shifts""" if name1 == name2: s = 0.5 * sum(np.outer(d, d) for d in deltas1) else: s = 0.25 * ( np.outer((deltas1[0] + deltas1[2]), (deltas2[0] + deltas2[2])) + np.outer((deltas1[1] + deltas1[3]), (deltas2[1] + deltas2[3])) ) return s
[docs] def covmat_7pt(name1, name2, deltas1, deltas2): """Returns theory covariance sub-matrix for 7pt prescription (Gavin), given two dataset names and collections of scale variation shifts""" if name1 == name2: s = (1 / 3) * sum(np.outer(d, d) for d in deltas1) else: s = (1 / 6) * ( 2 * (np.outer(deltas1[0], deltas2[0]) + np.outer(deltas1[1], deltas2[1])) + ( np.outer((deltas1[2] + deltas1[3]), (deltas2[2] + deltas2[3])) + np.outer((deltas1[4] + deltas1[5]), (deltas2[4] + deltas2[5])) ) ) return s
[docs] def covmat_9pt(name1, name2, deltas1, deltas2): """Returns theory covariance sub-matrix for 9pt prescription, given two dataset names and collections of scale variation shifts""" if name1 == name2: s = 0.25 * sum(np.outer(d, d) for d in deltas1) else: s = (1 / 12) * ( np.outer((deltas1[0] + deltas1[4] + deltas1[6]), (deltas2[0] + deltas2[4] + deltas2[6])) + np.outer( (deltas1[1] + deltas1[5] + deltas1[7]), (deltas2[1] + deltas2[5] + deltas2[7]) ) ) + (1 / 8) * (np.outer((deltas1[2] + deltas1[3]), (deltas2[2] + deltas2[3]))) return s
[docs] def covmat_n3lo_singlet(name1, name2, deltas1, deltas2): """Returns theory covariance sub-matrix for all the singlet splitting function variations. """ n3lo_vars_dict = {"gg": 19, "gq": 21, "qg": 15, "qq": 6} s_singlet_ad = 0 cnt = 0 for n_var in n3lo_vars_dict.values(): s_singlet_ad += covmat_n3lo_ad( name1, name2, deltas1[cnt : cnt + n_var], deltas2[cnt : cnt + n_var] ) cnt += n_var return s_singlet_ad
[docs] def covmat_n3lo_fhmv(name1, name2, deltas1, deltas2): """Returns theory covariance sub-matrix for all the FHMV splitting function variations. """ s_ad = 0 n_var = 2 # loop on the 7 splitting functions variations for cnt in range(0, 14, 2): s_ad += covmat_n3lo_ad(name1, name2, deltas1[cnt : cnt + n_var], deltas2[cnt : cnt + n_var]) return s_ad
[docs] def covmat_n3lo_ad(name1, name2, deltas1, deltas2): """Returns theory covariance sub-matrix for each of the singlet splitting function variations. Normalization is given by: (n_pt - 1) where: * n_pt = number of point presctiption """ norm = len(deltas1) if name1 == name2: s = sum(np.outer(d, d) for d in deltas1) else: s = 0 for i, d1 in enumerate(deltas1): for j, d2 in enumerate(deltas2): if i == j: s += np.outer(d1, d2) return 1 / norm * s
[docs] def compute_covs_pt_prescrip(point_prescription, name1, deltas1, name2=None, deltas2=None): """Utility to compute the covariance matrix by prescription given the shifts with respect to the central value for a pair of processes. The processes are defined by the variables ``name1`` and ``name2`` with ``deltas1`` and ``deltas2`` the associated shifts wrt the central prediction. This utility also allows for the computation of the theory covmat for a single process or dataset if ``names2=deltas2=None``. Parameters --------- point_prescription: str defines the point prescription to be utilized name1: str Process name of the first set of shifts deltas1: list(np.ndarray) list of shifts for each of the non-central theories name2: str Process name of the second set of shifts deltas2: list(np.ndarray) list of shifts for each of the non-central theories """ if name2 is None and deltas2 is not None: raise ValueError( f"Error building theory covmat: predictions have been given with no associated process/dataset name" ) elif deltas2 is None and name2 is not None: raise ValueError( f"Error building theory covmat: a process/dataset name has been given {name2} with no predictions" ) if name2 is None: name2 = name1 deltas2 = deltas1 if point_prescription == "3f point": s = covmat_3fpt(deltas1, deltas2) elif point_prescription == "3r point": s = covmat_3pt(name1, name2, deltas1, deltas2) elif point_prescription == "3 point": s = covmat_3pt(name1, name2, deltas1, deltas2) elif point_prescription == "5 point": s = covmat_5pt(name1, name2, deltas1, deltas2) elif point_prescription == "5bar point": s = covmat_5barpt(name1, name2, deltas1, deltas2) elif point_prescription == "7 point": # 7pt (Gavin) s = covmat_7pt(name1, name2, deltas1, deltas2) elif point_prescription == "9 point": s = covmat_9pt(name1, name2, deltas1, deltas2) elif point_prescription == "ad ihou": # n3lo ad variation prescriprion s = covmat_n3lo_singlet(name1, name2, deltas1, deltas2) elif point_prescription == "dis ihou": # n3lo ihou prescriprion s = covmat_3pt(name1, name2, deltas1, deltas2) elif point_prescription == "3pt missing": # 3 point renormalization scale variations for subset of data s = covmat_3pt(name1, name2, deltas1, deltas2) elif point_prescription == "3pt hadronic": # N3LO 3 point scale variations for hadronic datasets s = covmat_3pt(name1, name2, deltas1, deltas2) elif point_prescription == "fhmv ihou": # n3lo full covmat prescriprion s = covmat_n3lo_fhmv(name1, name2, deltas1, deltas2) return s
[docs] @check_correct_theory_combination def covs_pt_prescrip(combine_by_type, point_prescription): """Produces the sub-matrices of the theory covariance matrix according to a point prescription which matches the number of input theories. If 5 theories are provided, a scheme 'bar' or 'nobar' must be chosen in the runcard in order to specify the prescription. Sub-matrices correspond to applying the scale variation prescription to each pair of processes in turn, using a different procedure for the case where the processes are the same relative to when they are different.""" process_info = combine_by_type running_index = 0 start_proc = defaultdict(list) for name in process_info.preds: size = len(process_info.preds[name][0]) start_proc[name] = running_index running_index += size covmats = defaultdict(list) for name1 in process_info.preds: for name2 in process_info.preds: central1, *others1 = process_info.preds[name1] deltas1 = list(other - central1 for other in others1) central2, *others2 = process_info.preds[name2] deltas2 = list(other - central2 for other in others2) s = compute_covs_pt_prescrip(point_prescription, name1, deltas1, name2, deltas2) start_locs = (start_proc[name1], start_proc[name2]) covmats[start_locs] = s return covmats
[docs] @table def theory_covmat_custom_per_prescription(covs_pt_prescrip, procs_index, combine_by_type): """Takes the individual sub-covmats between each two processes and assembles them into a full covmat. Then reshuffles the order from ordering by process to ordering by experiment as listed in the runcard""" process_info = combine_by_type # Construct a covmat_index based on the order of experiments as they are in combine_by_type # NOTE: maybe the ordering of covmat_index is always the same as that of procs_index? # Regardless, we don't want to open ourselves up to the risk of the ordering of procs_index # changing and breaking this function indexlist = [] for procname in process_info.preds: for datasetname in process_info.namelist[procname]: slicer = procs_index.get_locs((procname, datasetname)) indexlist += procs_index[slicer].to_list() covmat_index = pd.MultiIndex.from_tuples(indexlist, names=procs_index.names) # Put the covariance matrices between two process into a single covariance matrix total_datapoints = sum(combine_by_type.sizes.values()) mat = np.zeros((total_datapoints, total_datapoints), dtype=np.float32) for locs, cov in covs_pt_prescrip.items(): xsize, ysize = cov.shape mat[locs[0] : locs[0] + xsize, locs[1] : locs[1] + ysize] = cov df = pd.DataFrame(mat, index=covmat_index, columns=covmat_index) return df
[docs] @table def fromfile_covmat(covmatpath, procs_data, procs_index): """Reads a general theory covariance matrix from file. Then 1: Applies cuts to match experiment covariance matrix 2: Expands dimensions to match experiment covariance matrix by filling additional entries with 0.""" # Load covmat as pandas DataFrame filecovmat = pd.read_csv( covmatpath, index_col=[0, 1, 2], header=[0, 1, 2], sep="\t|,", engine="python" ) # Remove string in column id filecovmat.columns = filecovmat.index # Reordering covmat to match exp order in runcard # Datasets in exp covmat dslist = [] for group in procs_data: for ds in group.datasets: dslist.append(ds.name) # Datasets in filecovmat in exp covmat order shortlist = [] for ds in dslist: if ds in filecovmat.index.get_level_values(level="dataset"): shortlist.append(ds) filecovmat = filecovmat.reindex(shortlist, level="dataset") filecovmat = filecovmat.reindex(shortlist, level="dataset", axis=1) # ------------- # # 1: Apply cuts # # ------------- # # Loading cuts to apply to covariance matrix indextuples = [] for group in procs_data: for ds in group.datasets: # Load cuts for each dataset in the covmat if ds.name in filecovmat.index.get_level_values(1): cuts = ds.cuts # Creating new index for post cuts lcuts = cuts.load() if lcuts is not None: for keeploc in lcuts: indextuples.append((group.name, ds.name, keeploc)) # If loaded cuts are None, keep all index points for that dataset else: for ind in filecovmat.index(): if ind[1] == ds: indextuples.append(ind) newindex = pd.MultiIndex.from_tuples( indextuples, names=["group", "dataset", "index"], sortorder=0 ) # Reindex covmat with the new cut index cut_df = filecovmat.reindex(newindex).T cut_df = cut_df.reindex(newindex).T # Elements where cuts are applied will become NaN - remove these rows and columns cut_df = cut_df.dropna(axis=0).dropna(axis=1) # -------------------- # # 2: Expand dimensions # # -------------------- # # First make empty df of exp covmat dimensions empty_df = pd.DataFrame(0, index=procs_index, columns=procs_index) covmats = [] # Make a piece of the covmat for each combination of two datasetes for ds1 in dslist: for ds2 in dslist: if (ds1 in shortlist) and (ds2 in shortlist): # If both datasets in the fromfile covmat, use the piece of the fromfile covmat covmat = ( cut_df.xs(ds1, level=1, drop_level=False).T.xs(ds2, level=1, drop_level=False).T ) else: # Otherwise use a covmat of 0s covmat = ( empty_df.xs(ds1, level=1, drop_level=False) .T.xs(ds2, level=1, drop_level=False) .T ) covmats.append(covmat) chunks = [] # Arrange into chunks, each chunk is a list of pieces of covmat which are associated with # one dataset in particular for x in range(0, len(covmats), len(dslist)): chunk = covmats[x : x + len(dslist)] chunks.append(chunk) strips = [] # Concatenate each chunk into a strip of the covariance matrix for chunk in chunks: strip = pd.concat(chunk, axis=1) strips.append(strip.T) # strips.reverse() # Concatenate the strips to make the full matrix full_df = pd.concat(strips, axis=1) # Reindex to align with experiment covmat index full_df = full_df.reindex(procs_index) full_df = full_df.reindex(procs_index, axis=1) return full_df
[docs] @table def user_covmat(procs_data, procs_index, loaded_user_covmat_path): """ General theory covariance matrix provided by the user. Useful for testing the impact of externally produced covariance matrices. Matrices must be produced as a csv of pandas DataFrame, and uploaded to the validphys server. The server path is then provided via ``user_covmat_path`` in ``theorycovmatconfig`` in the runcard. For more information see documentation. """ return fromfile_covmat(loaded_user_covmat_path, procs_data, procs_index)
[docs] @table @check_fit_dataset_order_matches_grouped def total_theory_covmat(theory_covmat_custom, user_covmat): """ Sum of scale variation and user covmat, where both are used. """ return theory_covmat_custom + user_covmat
[docs] def theory_covmat_custom_fitting(theory_covmat_custom_per_prescription, procs_index_matched): """theory_covmat_custom_per_prescription but reindexed so the order of the datasets matches those in the experiment covmat so they are aligned when fitting.""" df = theory_covmat_custom_per_prescription.reindex(procs_index_matched).T.reindex( procs_index_matched ) return df
theory_covmats_fitting = collect(theory_covmat_custom_per_prescription, ("point_prescriptions",))
[docs] @table def theory_covmat_custom(theory_covmats_fitting): """Sum all the theory covmat listed in `point_prescriptions`.""" return sum(theory_covmats_fitting)
[docs] def total_theory_covmat_fitting(total_theory_covmat, procs_index_matched): """total_theory_covmat but reindexed so the order of the datasets matches those in the experiment covmat so they are aligned when fitting.""" return theory_covmat_custom_fitting(total_theory_covmat, procs_index_matched)
[docs] def user_covmat_fitting(user_covmat, procs_index_matched): """user_covmat but reindexed so the order of the datasets matches those in the experiment covmat so they are aligned when fitting.""" return theory_covmat_custom_fitting(user_covmat, procs_index_matched)
[docs] def procs_index_matched(groups_index, procs_index): """procs_index but matched to the dataset order given by groups_index.""" # Making list with exps ordered like in groups_index groups_ds_order = groups_index.get_level_values(level=1).unique().tolist() # Tuples to make multiindex, ordered like in groups_index tups = [] for ds in groups_ds_order: for orig in procs_index: if orig[1] == ds: tups.append(orig) return pd.MultiIndex.from_tuples(tups, names=("process", "dataset", "id"))
[docs] @table def theory_corrmat_custom(theory_covmat_custom): """Calculates the theory correlation matrix for scale variations with variations by process type""" df = theory_covmat_custom covmat = df.values diag_minus_half = (np.diagonal(covmat)) ** (-0.5) mat = diag_minus_half[:, np.newaxis] * df * diag_minus_half return mat
[docs] @table def theory_normcovmat_custom(theory_covmat_custom, procs_data_values): """Calculates the theory covariance matrix for scale variations normalised to data, with variations according to the relevant prescription.""" df = theory_covmat_custom vals = procs_data_values.reindex(df.index) mat = df / np.outer(vals, vals) return mat
[docs] @table def experimentplustheory_corrmat_custom(procs_covmat, theory_covmat_custom): """Calculates the correlation matrix for the experimental plus theory covariance matrices, correlations by prescription.""" total_df = procs_covmat + theory_covmat_custom diag_minus_half = (np.diagonal(total_df.values)) ** (-0.5) corrmat = diag_minus_half[:, np.newaxis] * total_df * diag_minus_half return corrmat
each_dataset_results = collect(results, ("group_dataset_inputs_by_process", "data"))