"""
Tools to obtain and analyse the pseudodata that was seen by the neural
networks during the fitting.
"""
from collections import namedtuple
import hashlib
import logging
import numpy as np
import pandas as pd
from nnpdf_data import legacy_to_new_map
from reportengine import collect
from validphys.covmats import (
INTRA_DATASET_SYS_NAME,
dataset_inputs_covmat_from_systematics,
sqrt_covmat,
)
log = logging.getLogger(__name__)
DataTrValSpec = namedtuple('DataTrValSpec', ['pseudodata', 'tr_idx', 'val_idx'])
context_index = collect("groups_index", ("fitcontext",))
read_fit_pseudodata = collect('read_replica_pseudodata', ('fitreplicas', 'fitcontextwithcuts'))
read_pdf_pseudodata = collect('read_replica_pseudodata', ('pdfreplicas', 'fitcontextwithcuts'))
[docs]
class ReplicaGenerationError(Exception):
pass
[docs]
def read_replica_pseudodata(fit, context_index, replica):
"""Function to handle the reading of training and validation splits for a fit that has been
produced with the ``savepseudodata`` flag set to ``True``.
The data is read from the PDF to handle the mixing introduced by ``postfit``.
The data files are concatenated to return all the data that went into a fit. The training and validation
indices are also returned so one can access the splits using pandas indexing.
Raises
------
FileNotFoundError
If the training or validation files for the PDF set cannot be found.
CheckError
If the ``use_cuts`` flag is not set to ``fromfit``
Returns
-------
data_indices_list: list[namedtuple]
List of ``namedtuple`` where each entry corresponds to a given replica. Each element contains
attributes ``pseudodata``, ``tr_idx``, and ``val_idx``. The latter two being used to slice
the former to return training and validation data respectively.
Example
-------
>>> from validphys.api import API
>>> data_indices_list = API.read_fit_pseudodata(fit="pseudodata_test_fit_n3fit")
>>> len(data_indices_list) # Same as nrep
10
>>> rep_info = data_indices_list[0]
>>> rep_info.pseudodata.loc[rep_info.tr_idx].head()
replica 1
group dataset id
ATLAS ATLASZPT8TEVMDIST 1 30.665835
3 15.795880
4 8.769734
5 3.117819
6 0.771079
"""
# List of length 1 due to the collect
context_index = context_index[0]
# The [0] is because of how pandas handles sorting a MultiIndex
sorted_index = context_index.sortlevel(level=range(1, 3))[0]
log.debug(f"Reading pseudodata & training/validation splits from {fit.name}.")
replica_path = fit.path / "nnfit" / f"replica_{replica}"
# If it's a closure test fit the pseudodata is fakedata stored under a different filename
fakedata = fit.as_input().get("closuretest", {}).get("fakedata", False)
if fakedata:
tr_pseudodatafile = "datacuts_theory_closuretest_fitting_training_pseudodata.csv"
vl_pseudodatafile = "datacuts_theory_closuretest_fitting_validation_pseudodata.csv"
else:
tr_pseudodatafile = "datacuts_theory_fitting_training_pseudodata.csv"
vl_pseudodatafile = "datacuts_theory_fitting_validation_pseudodata.csv"
try:
tr = pd.read_csv(replica_path / tr_pseudodatafile, index_col=[0, 1, 2], sep="\t", header=0)
val = pd.read_csv(replica_path / vl_pseudodatafile, index_col=[0, 1, 2], sep="\t", header=0)
except FileNotFoundError as e:
raise FileNotFoundError(
"Could not find saved training and validation data files. "
f"Please ensure {fit} was generated with the savepseudodata flag set to true"
) from e
tr["type"], val["type"] = "training", "validation"
pseudodata = pd.concat((tr, val))
# In order for this function to work also with old fit, it is necessary to remap the names
# being read (since the names in the context have already been remapped)
# The following checks whether a given name is in both the context and the fit, and if not
# tries to get it from the old_to_new mapping.
mapping = {}
context_datasets = context_index.get_level_values("dataset").unique()
for dsname in pseudodata.index.get_level_values("dataset").unique():
if dsname not in context_datasets:
new_name, _ = legacy_to_new_map(dsname)
mapping[dsname] = new_name
pseudodata.rename(mapping, level=1, inplace=True)
pseudodata.sort_index(level=range(1, 3), inplace=True)
pseudodata.index = sorted_index
tr = pseudodata[pseudodata["type"] == "training"]
val = pseudodata[pseudodata["type"] == "validation"]
return DataTrValSpec(pseudodata.drop("type", axis=1), tr.index, val.index)
[docs]
def make_replica(
groups_dataset_inputs_loaded_cd_with_cuts,
replica_mcseed,
dataset_inputs_sampling_covmat,
sep_mult=False,
genrep=True,
max_tries=int(1e6),
resample_negative_pseudodata=True,
):
"""Function that takes in a list of :py:class:`validphys.coredata.CommonData`
objects and returns a pseudodata replica accounting for
possible correlations between systematic uncertainties.
The function loops until positive definite pseudodata is generated for any
non-asymmetry datasets. In the case of an asymmetry dataset negative values are
permitted so the loop block executes only once.
Parameters
---------
groups_dataset_inputs_loaded_cd_with_cuts: list[:py:class:`validphys.coredata.CommonData`]
List of CommonData objects which stores information about systematic errors,
their treatment and description, for each dataset.
replica_mcseed: int, None
Seed used to initialise the numpy random number generator. If ``None`` then a random seed is
allocated using the default numpy behaviour.
dataset_inputs_sampling_covmat: np.array
Full covmat to be used. It can be either only experimental or also theoretical.
sep_mult: bool
Specifies whether computing the shifts with the full covmat
or whether multiplicative errors should be separated
genrep: bool
Specifies whether computing replicas or not
max_tries: int
The stochastic nature of replica generation means one can obtain (unphysical) negative predictions.
If after max_tries (default=1e6) no physical configuration is found,
it will raise a :py:class:`ReplicaGenerationError`
resample_negative_pseudodata: bool
When True, replicas that produce negative predictions will be resampled for ``max_tries`` until all points are positive (default: True)
Returns
-------
pseudodata: np.array
Numpy array which is N_dat (where N_dat is the combined number of data points after cuts)
containing monte carlo samples of data centered around the data central value.
Example
-------
>>> from validphys.api import API
>>> pseudodata = API.make_replica(
dataset_inputs=[{"dataset":"NMC"}, {"dataset": "NMCPD"}],
use_cuts="nocuts",
theoryid=53,
replica=1,
mcseed=123,
genrep=True,
)
array([0.25640033, 0.25986534, 0.27165461, 0.29001009, 0.30863588,
0.30100351, 0.31781208, 0.30827054, 0.30258217, 0.32116842,
0.34206012, 0.31866286, 0.2790856 , 0.33257621, 0.33680007,
"""
if not genrep:
return np.concatenate(
[cd.central_values for cd in groups_dataset_inputs_loaded_cd_with_cuts]
)
# Seed the numpy RNG with the seed and the name of the datasets in this run
# TODO: to be simplified after the reader is merged, together with an update of the regression tests
# this is necessary to reproduce exactly the results due to the replicas being generated with a hash
# Only when the sets are legacy (or coming from a legacy runcard) this shall be used
names_for_salt = []
for loaded_cd in groups_dataset_inputs_loaded_cd_with_cuts:
if loaded_cd.legacy_names is None:
names_for_salt.append(loaded_cd.setname)
else:
names_for_salt.append(loaded_cd.legacy_names[0])
name_salt = "-".join(names_for_salt)
name_seed = int(hashlib.sha256(name_salt.encode()).hexdigest(), 16) % 10**8
rng = np.random.default_rng(seed=replica_mcseed + name_seed)
# construct covmat
covmat = dataset_inputs_sampling_covmat
covmat_sqrt = sqrt_covmat(covmat)
# Loading the data
pseudodatas = []
check_positive_masks = []
nonspecial_mult = []
special_mult = []
for cd in groups_dataset_inputs_loaded_cd_with_cuts:
# copy here to avoid mutating the central values.
pseudodata = cd.central_values.to_numpy()
pseudodatas.append(pseudodata)
# Separation of multiplicative errors. If sep_mult is True also the exp_covmat is produced
# without multiplicative errors
if sep_mult:
mult_errors = cd.multiplicative_errors
mult_uncorr_errors = mult_errors.loc[:, mult_errors.columns == "UNCORR"].to_numpy()
mult_corr_errors = mult_errors.loc[:, mult_errors.columns == "CORR"].to_numpy()
nonspecial_mult.append((mult_uncorr_errors, mult_corr_errors))
special_mult.append(
mult_errors.loc[:, ~mult_errors.columns.isin(INTRA_DATASET_SYS_NAME)]
)
if "ASY" in cd.commondataproc or cd.commondataproc.endswith("_POL"):
check_positive_masks.append(np.zeros_like(pseudodata, dtype=bool))
else:
check_positive_masks.append(np.ones_like(pseudodata, dtype=bool))
# concatenating special multiplicative errors, pseudodatas and positive mask
if sep_mult:
special_mult_errors = pd.concat(special_mult, axis=0, sort=True).fillna(0).to_numpy()
all_pseudodata = np.concatenate(pseudodatas, axis=0)
full_mask = np.concatenate(check_positive_masks, axis=0)
# The inner while True loop is for ensuring a positive definite
# pseudodata replica
for _ in range(max_tries):
mult_shifts = []
# Prepare the per-dataset multiplicative shifts
for mult_uncorr_errors, mult_corr_errors in nonspecial_mult:
# convert to from percent to fraction
mult_shift = (
1 + mult_uncorr_errors * rng.normal(size=mult_uncorr_errors.shape) / 100
).prod(axis=1)
mult_shift *= (
1 + mult_corr_errors * rng.normal(size=(1, mult_corr_errors.shape[1])) / 100
).prod(axis=1)
mult_shifts.append(mult_shift)
# If sep_mult is true then the multiplicative shifts were not included in the covmat
shifts = covmat_sqrt @ rng.normal(size=covmat.shape[1])
mult_part = 1.0
if sep_mult:
special_mult = (
1 + special_mult_errors * rng.normal(size=(1, special_mult_errors.shape[1])) / 100
).prod(axis=1)
mult_part = np.concatenate(mult_shifts, axis=0) * special_mult
# Shifting pseudodata
shifted_pseudodata = (all_pseudodata + shifts) * mult_part
# positivity control
if np.all(shifted_pseudodata[full_mask] >= 0) or not resample_negative_pseudodata:
return shifted_pseudodata
dfail = " ".join(i.setname for i in groups_dataset_inputs_loaded_cd_with_cuts)
log.error(f"Error generating replicas for the group: {dfail}")
raise ReplicaGenerationError(f"No valid replica found after {max_tries} attempts")
[docs]
def indexed_make_replica(groups_index, make_replica):
"""Index the make_replica pseudodata appropriately"""
return pd.DataFrame(make_replica, index=groups_index, columns=["data"])
[docs]
def level0_commondata_wc(data, fakepdf):
"""
Given a validphys.core.DataGroupSpec object, load commondata and
generate a new commondata instance with central values replaced
by fakepdf prediction
Parameters
----------
data : validphys.core.DataGroupSpec
fakepdf: validphys.core.PDF
Returns
-------
list
list of validphys.coredata.CommonData instances corresponding to
all datasets within one experiment. The central value is replaced
by Level 0 fake data.
Example
-------
>>> from validphys.api import API
>>> API.level0_commondata_wc(dataset_inputs = [{"dataset":"NMC"}], use_cuts="internal", theoryid=200,fakepdf = "NNPDF40_nnlo_as_01180")
[CommonData(setname='NMC', ndata=204, commondataproc='DIS_NCE', nkin=3, nsys=16)]
"""
from validphys.covmats import dataset_t0_predictions
level0_commondata_instances_wc = []
# ==== Load validphys.coredata.CommonData instance with cuts ====#
for dataset in data.datasets:
commondata_wc = dataset.commondata.load()
if dataset.cuts is not None:
cuts = dataset.cuts.load()
commondata_wc = commondata_wc.with_cuts(cuts)
# == Generate a new CommonData instance with central value given by Level 0 data generated with fakepdf ==#
t0_prediction = dataset_t0_predictions(
t0dataset=dataset, t0set=fakepdf
) # N.B. cuts already applied to th. pred.
level0_commondata_instances_wc.append(commondata_wc.with_central_value(t0_prediction))
return level0_commondata_instances_wc
[docs]
def make_level1_data(data, level0_commondata_wc, filterseed, data_index, sep_mult):
"""
Given a list of Level 0 commondata instances, return the
same list with central values replaced by Level 1 data.
Level 1 data is generated using validphys.make_replica.
The covariance matrix, from which the stochastic Level 1
noise is sampled, is built from Level 0 commondata
instances (level0_commondata_wc). This, in particular,
means that the multiplicative systematics are generated
from the Level 0 central values.
Note that the covariance matrix used to generate Level 2
pseudodata is consistent with the one used at Level 1
up to corrections of the order eta * eps, where eta and
eps are defined as shown below:
Generate L1 data: L1 = L0 + eta, eta ~ N(0,CL0)
Generate L2 data: L2_k = L1 + eps_k, eps_k ~ N(0,CL1)
where CL0 and CL1 means that the multiplicative entries
have been constructed from Level 0 and Level 1 central
values respectively.
Parameters
----------
data : validphys.core.DataGroupSpec
level0_commondata_wc : list
list of validphys.coredata.CommonData instances corresponding to
all datasets within one experiment. The central value is replaced
by Level 0 fake data. Cuts already applied.
filterseed : int
random seed used for the generation of Level 1 data
data_index : pandas.MultiIndex
Returns
-------
list
list of validphys.coredata.CommonData instances corresponding to
all datasets within one experiment. The central value is replaced
by Level 1 fake data.
Example
-------
>>> from validphys.api import API
>>> dataset='NMC'
>>> l1_cd = API.make_level1_data(dataset_inputs = [{"dataset":dataset}],use_cuts="internal", theoryid=200,
fakepdf = "NNPDF40_nnlo_as_01180",filterseed=1)
>>> l1_cd
[CommonData(setname='NMC', ndata=204, commondataproc='DIS_NCE', nkin=3, nsys=16)]
"""
dataset_input_list = list(data.dsinputs)
covmat = dataset_inputs_covmat_from_systematics(
level0_commondata_wc,
dataset_input_list,
use_weights_in_covmat=False,
norm_threshold=None,
_list_of_central_values=None,
_only_additive=sep_mult,
)
# ================== generation of Level1 data ======================#
level1_data = make_replica(
level0_commondata_wc, filterseed, covmat, sep_mult=sep_mult, genrep=True
)
indexed_level1_data = indexed_make_replica(data_index, level1_data)
dataset_order = {cd.setname: i for i, cd in enumerate(level0_commondata_wc)}
# ===== create commondata instances with central values given by pseudo_data =====#
level1_commondata_dict = {c.setname: c for c in level0_commondata_wc}
level1_commondata_instances_wc = []
for xx, grp in indexed_level1_data.groupby('dataset'):
level1_commondata_instances_wc.append(
level1_commondata_dict[xx].with_central_value(grp.values)
)
# sort back so as to mantain same order as in level0_commondata_wc
level1_commondata_instances_wc.sort(key=lambda x: dataset_order[x.setname])
return level1_commondata_instances_wc
_group_recreate_pseudodata = collect(
'indexed_make_replica', ('group_dataset_inputs_by_experiment',)
)
_recreate_fit_pseudodata = collect('_group_recreate_pseudodata', ('fitreplicas', 'fitenvironment'))
_recreate_pdf_pseudodata = collect('_group_recreate_pseudodata', ('pdfreplicas', 'fitenvironment'))
fit_tr_masks = collect('replica_training_mask_table', ('fitreplicas', 'fitenvironment'))
pdf_tr_masks = collect('replica_training_mask_table', ('pdfreplicas', 'fitenvironment'))
make_replicas = collect('make_replica', ('replicas',))
fitted_make_replicas = collect('make_replica', ('pdfreplicas',))
indexed_make_replicas = collect('indexed_make_replica', ('replicas',))
[docs]
def recreate_fit_pseudodata(_recreate_fit_pseudodata, fitreplicas, fit_tr_masks):
"""Function used to reconstruct the pseudodata seen by each of the
Monte Carlo fit replicas.
Returns
-------
res : list[namedtuple]
List of namedtuples, each of which contains a dataframe
containing all the data points, the training indices, and
the validation indices.
Example
-------
>>> from validphys.api import API
>>> API.recreate_fit_pseudodata(fit="pseudodata_test_fit_n3fit")
Notes
-----
- This function does not account for the postfit reshuffling.
See Also
--------
:py:func:`validphys.pseudodata.recreate_pdf_pseudodata`
"""
res = []
for pseudodata, mask, rep in zip(_recreate_fit_pseudodata, fit_tr_masks, fitreplicas):
df = pd.concat(pseudodata)
df.columns = [f"replica {rep}"]
tr_idx = df.loc[mask.values].index
val_idx = df.loc[~mask.values].index
res.append(DataTrValSpec(df, tr_idx, val_idx))
return res
[docs]
def recreate_pdf_pseudodata(_recreate_pdf_pseudodata, pdfreplicas, pdf_tr_masks):
"""Like :py:func:`validphys.pseudodata.recreate_fit_pseudodata`
but accounts for the postfit reshuffling of replicas.
Returns
-------
res : list[namedtuple]
List of namedtuples, each of which contains a dataframe
containing all the data points, the training indices, and
the validation indices.
Example
-------
>>> from validphys.api import API
>>> API.recreate_pdf_pseudodata(fit="pseudodata_test_fit_n3fit")
See Also
--------
:py:func:`validphys.pseudodata.recreate_fit_pseudodata`
"""
return recreate_fit_pseudodata(_recreate_pdf_pseudodata, pdfreplicas, pdf_tr_masks)
pdf_tr_masks_no_table = collect('replica_training_mask', ('pdfreplicas', 'fitenvironment'))
[docs]
def recreate_pdf_pseudodata_no_table(_recreate_pdf_pseudodata, pdfreplicas, pdf_tr_masks_no_table):
return recreate_pdf_pseudodata(_recreate_pdf_pseudodata, pdfreplicas, pdf_tr_masks_no_table)