from collections import ChainMap, defaultdict
from collections.abc import Mapping, Sequence
import copy
import functools
from importlib.resources import contents, read_text
import inspect
import logging
import numbers
import pathlib
import pandas as pd
from nnpdf_data import legacy_to_new_map
from reportengine import configparser, report
from reportengine.compat import yaml
from reportengine.configparser import ConfigError, _parse_func, element_of, record_from_defaults
from reportengine.environment import Environment, EnvironmentError_
from reportengine.helputils import get_parser_type
from reportengine.namespaces import NSList
from validphys.core import (
CutsPolicy,
DataGroupSpec,
DataSetInput,
ExperimentInput,
MatchedCuts,
SimilarCuts,
ThCovMatSpec,
)
from validphys.filters import (
AddedFilterRule,
FilterDefaults,
FilterRule,
Rule,
RuleProcessingError,
default_filter_rules_input,
default_filter_settings_input,
)
from validphys.fitdata import fitted_replica_indexes, num_fitted_replicas
from validphys.gridvalues import LUMI_CHANNELS
from validphys.loader import (
DataNotFoundError,
FallbackLoader,
InconsistentMetaDataError,
Loader,
LoaderError,
LoadFailedError,
PDFNotFound,
)
from validphys.plotoptions.core import get_info
import validphys.scalevariations
log = logging.getLogger(__name__)
[docs]
class Environment(Environment):
"""Container for information to be filled at run time"""
def __init__(self, *, this_folder=None, net=True, upload=False, dry=False, **kwargs):
if this_folder:
self.this_folder = pathlib.Path(this_folder)
if not net:
loader_class = Loader
elif dry and net:
log.warning(
"The --dry flag overrides the --net flag. No resources will be downloaded "
"while executing a dry run"
)
loader_class = Loader
else:
loader_class = FallbackLoader
try:
self.loader = loader_class()
except LoaderError as e:
log.error("Failed to find the paths. These are configured " "in the nnprofile settings")
raise EnvironmentError_(e) from e
self.deta_path = self.loader.datapath
self.results_path = self.loader.resultspath
self.upload = upload
super().__init__(**kwargs)
def _id_with_label(f):
f = _parse_func(f)
def parse_func(self, item, **kwargs):
if not isinstance(item, dict):
return f(self, item, **kwargs)
keydiff = item.keys() - {"id", "label"}
if keydiff or not "id" in item:
unrecognized = f" Unrecognized: {keydiff}" if keydiff else ""
raise ConfigError(
f"'{item}' must be a single id, or a mapping "
f"with keys 'id', 'label.{unrecognized}'"
)
id = item["id"]
val = f(self, id, **kwargs)
if "label" in item:
val.label = str(item["label"])
return val
currsig = inspect.signature(parse_func)
origsig = inspect.signature(f)
parse_func = functools.wraps(f)(parse_func)
params = [*list(currsig.parameters.values())[:2], *list(origsig.parameters.values())[2:]]
parse_func.__signature__ = inspect.Signature(parameters=params)
labeldoc = (" Either just an id %s, or a mapping " "with 'id' and 'label'.") % (
get_parser_type(f),
)
if parse_func.__doc__ is None:
parse_func.__doc__ = labeldoc
else:
parse_func.__doc__ += labeldoc
return parse_func
[docs]
class CoreConfig(configparser.Config):
@property
def loader(self):
return self.environment.loader
def _check_pdf_usable(self, pdf_name: str):
"""Check that the given PDF can be loaded and the error type
is understood before continuing"""
try:
pdf = self.loader.check_pdf(pdf_name)
except PDFNotFound as e:
raise ConfigError(
f"Bad PDF: {pdf_name} not installed", pdf_name, self.loader.available_pdfs
) from e
except LoaderError as e:
raise ConfigError(e) from e
# Check that we know how to compute errors
try:
pdf.stats_class
except NotImplementedError as e:
raise ConfigError(str(e))
return pdf
[docs]
@element_of("pdfs")
@_id_with_label
def parse_pdf(self, name: str, unpolarized_bc=None):
"""A PDF set installed in LHAPDF.
If an unpolarized boundary condition it defined, it will be registered as part of the PDF.
"""
pdf = self._check_pdf_usable(name)
if unpolarized_bc is not None:
pdf.register_boundary(unpolarized_bc=unpolarized_bc)
return pdf
[docs]
@element_of("unpolarized_bcs")
@_id_with_label
def parse_unpolarized_bc(self, name):
"""Unpolarised PDF used as a Boundary Condition to impose positivity of pPDFs."""
return self.parse_pdf(name)
[docs]
@element_of("theoryids")
@_id_with_label
def parse_theoryid(self, theoryID: (str, int)):
"""A number corresponding to the database theory ID where the
corresponding theory folder is installed in the data directory."""
try:
return self.loader.check_theoryID(theoryID)
except LoaderError as e:
raise ConfigError(
str(e), theoryID, self.loader.available_theories, display_alternatives="all"
)
[docs]
def parse_use_cuts(self, use_cuts: (bool, str)):
"""Whether to filter the points based on the cuts applied in the fit,
or the whole data in the dataset. The possible options are:
- internal: Calculate the cuts based on the existing rules. This is
the default.
- fromfit: Read the cuts stored in the fit.
- nocuts: Use the whole dataset.
"""
# The lower is an aesthetic preference...
valid_cuts = {c.value for c in CutsPolicy}
if isinstance(use_cuts, bool):
if use_cuts:
res = CutsPolicy.FROMFIT
else:
res = CutsPolicy.NOCUTS
log.warning(
"Setting a boolean for `use_cuts` is deprecated. "
f"The available values are {valid_cuts} and the default "
f"value is 'internal'. Your input ('{use_cuts}') is "
f"equivalent to '{res}'."
)
elif isinstance(use_cuts, str) and use_cuts in valid_cuts:
res = CutsPolicy(use_cuts)
else:
raise ConfigError(f"Invalid use_cuts setting: '{use_cuts}'.", use_cuts, valid_cuts)
return res
[docs]
def produce_replicas(self, nreplica: int):
"""Produce a replicas array"""
return NSList(range(1, nreplica + 1), nskey="replica")
[docs]
def parse_point_prescriptions(self, point_prescriptions):
return NSList(point_prescriptions, nskey="point_prescription")
# TODO: load fit config from here
[docs]
@element_of("fits")
@_id_with_label
def parse_fit(self, fit: str):
"""A fit in the results folder, containing at least a valid filter result."""
try:
return self.loader.check_fit(fit)
except LoadFailedError as e:
raise ConfigError(str(e), fit, self.loader.available_fits)
[docs]
def produce_fitreplicas(self, fit):
"""Production rule mapping the ``replica`` key to each Monte Carlo
fit replica.
"""
num_replicas = num_fitted_replicas(fit)
return NSList(range(1, num_replicas + 1), nskey="replica")
[docs]
def produce_pdfreplicas(self, fitpdf):
"""Production rule mapping the ``replica`` key to each postfit
replica.
"""
pdf = fitpdf["pdf"]
replicas = fitted_replica_indexes(pdf)
return NSList(replicas, nskey="replica")
[docs]
def produce_fitcontextwithcuts(self, fit, fitinputcontext):
"""Like fitinputcontext but setting the cuts policy."""
theoryid = fitinputcontext["theoryid"]
data_input = fitinputcontext["data_input"]
return {"dataset_inputs": data_input, "theoryid": theoryid, "use_cuts": CutsPolicy.FROMFIT}
[docs]
def produce_fitenvironment(self, fit, fitinputcontext):
"""Like fitcontext, but additionally forcing various other
parameters, such as the cuts policy and Monte Carlo seeding to be
the same as the fit.
Notes
-----
- This production rule is designed to be used as a namespace
to collect over, for use with
:py:func:`validphys.pseudodata.recreate_fit_pseudodata` and
can be added to freely, e.g by setting trvlseed to be from
the fit runcard.
"""
log.warning(f"Using mcseed and trvlseed from fit: {fit}")
theoryid = fitinputcontext["theoryid"]
data_input = fitinputcontext["data_input"]
runcard = fit.as_input()
trvlseed = runcard["trvlseed"]
mcseed = runcard["mcseed"]
genrep = runcard["genrep"]
return {
"dataset_inputs": data_input,
"theoryid": theoryid,
"use_cuts": CutsPolicy.FROMFIT,
"mcseed": mcseed,
"trvlseed": trvlseed,
"genrep": genrep,
}
[docs]
def produce_fitcontext(self, fitinputcontext, fitpdf):
"""Set PDF, theory ID and data input from the fit config"""
return dict(**fitinputcontext, **fitpdf)
[docs]
def produce_fitinputcontext(self, fit):
"""Like ``fitcontext`` but without setting the PDF"""
_, theory = self.parse_from_("fit", "theory", write=False)
thid = theory["theoryid"]
data_input = self._parse_data_input_from_("fit", {"theoryid": thid})
return {"theoryid": thid, "data_input": data_input}
[docs]
def produce_fitpdf(self, fit):
"""Like ``fitcontext`` only setting the PDF"""
with self.set_context(ns=self._curr_ns.new_child({"fit": fit})):
_, pdf = self.parse_from_("fit", "pdf", write=False)
# Register possible boundaries
try:
_, boundary = self.parse_from_("fit", "positivity_bound", write=False)
pdf.register_boundary(unpolarized_bc=boundary["unpolarized_bc"])
except ConfigError:
pass
return {"pdf": pdf}
[docs]
def produce_fitunderlyinglaw(self, fit):
"""Reads closuretest: fakepdf from fit config file and passes as
pdf
"""
with self.set_context(ns=self._curr_ns.new_child({"fit": fit})):
_, datacuts = self.parse_from_("fit", "closuretest", write=False)
underlyinglaw = datacuts["fakepdf"]
return {"pdf": underlyinglaw}
[docs]
@element_of("hyperscans")
def parse_hyperscan(self, hyperscan):
"""A hyperscan in the hyperscan_results folder, containing at least one tries.json file"""
try:
return self.loader.check_hyperscan(hyperscan)
except LoadFailedError as e:
raise ConfigError(str(e), hyperscan, self.loader.available_hyperscans) from e
[docs]
def parse_hyperscan_config(self, hyperscan_config, hyperopt=None):
"""Configuration of the hyperscan"""
if "from_hyperscan" in hyperscan_config:
hyperscan = self.parse_hyperscan(hyperscan_config["from_hyperscan"])
log.info("Using previous hyperscan: '%s' to generate the search space", hyperscan)
return hyperscan.as_input().get("hyperscan_config")
if "use_tries_from" in hyperscan_config:
hyperscan = self.parse_hyperscan(hyperscan_config["use_tries_from"])
log.info("Reusing tries from: %s", hyperscan)
return {"parameters": hyperscan.sample_trials(n=hyperopt)}
return hyperscan_config
[docs]
def produce_multiclosure_underlyinglaw(self, fits):
"""Produce the underlying law for a set of fits. This allows a single t0
like covariance matrix to be loaded for all fits, for use with
statistical estimators on multiple closure fits. If the fits don't all
have the same underlying law then an error is raised, offending fit is
identified.
"""
# could use comprehension here but more useful to find offending fit
laws = set()
for fit in fits:
try:
closuretest_spec = fit.as_input()["closuretest"]
except KeyError as e:
raise ConfigError(
f"fit: {fit} does not have a `closuretest` namespace in " "runcard"
) from e
try:
laws.add(closuretest_spec["fakepdf"])
except KeyError as e:
raise ConfigError(
f"fit: {fit} does not have `fakepdf` specified in the "
"closuretest namespace in runcard."
) from e
if len(laws) != 1:
raise ConfigError(
"Did not find unique underlying law from fits, " f"instead found: {laws}"
)
return self.parse_pdf(laws.pop())
[docs]
def produce_fitq0fromfit(self, fitinputcontext):
"""Given a fit, return the fitting scale according to the theory"""
theory = fitinputcontext["theoryid"]
return theory.get_description()["Q0"]
[docs]
def produce_basisfromfit(self, fit):
"""Set the basis from fit config. In the fit config file the basis
is set using the key ``fitbasis``, but it is exposed to validphys
as ``basis``.
The name of this production rule is intentionally
set to not conflict with the existing ``fitbasis`` runcard key.
"""
with self.set_context(ns=self._curr_ns.new_child({"fit": fit})):
_, fitting = self.parse_from_("fit", "fitting", write=False)
basis = fitting["fitbasis"]
return {"basis": basis}
[docs]
def produce_fitpdfandbasis(self, fitpdf, basisfromfit):
"""Set the PDF and basis from the fit config."""
return {**fitpdf, **basisfromfit}
[docs]
def parse_use_fitcommondata(self, do_use: bool):
"""Use the commondata files in the fit instead of those in the data
directory."""
return do_use
[docs]
def produce_commondata(self, *, dataset_input, use_fitcommondata=False, fit=None):
"""Produce a CommondataSpec from a dataset input"""
name = dataset_input.name
sysnum = dataset_input.sys
try:
return self.loader.check_commondata(
setname=name,
sysnum=sysnum,
use_fitcommondata=use_fitcommondata,
fit=fit,
variant=dataset_input.variant,
)
except DataNotFoundError as e:
raise ConfigError(str(e), name, self.loader.available_datasets) from e
except LoadFailedError as e:
raise ConfigError(e) from e
except InconsistentMetaDataError as e:
raise ConfigError(e) from e
[docs]
def parse_cut_similarity_threshold(self, th: numbers.Real):
"""Maximum relative ratio when using `fromsimilarpredictons` cuts."""
return th
def _produce_fit_cuts(self, commondata):
"""Produce fit and then attempt to load cuts from that fit."""
_, fit = self.parse_from_(None, "fit", write=False)
try:
return self.loader.check_fit_cuts(commondata, fit)
except LoadFailedError as e:
raise ConfigError(e) from e
def _produce_internal_cuts(self, commondata):
"""Produce internal cut rules and then load cuts from those rules."""
_, rules = self.parse_from_(None, "rules", write=False)
return self.loader.check_internal_cuts(commondata, rules)
def _produce_matched_cuts(self, commondata):
"""Compute the internal cuts as per `use_cuts: 'internal'` within each
namespace in a namespace list called `cuts_intersection_spec` and take
the intersection of the results as the cuts for the given dataset. This
is useful for example for requiring the common subset of points that
pass the cuts at NLO and NNLO.
"""
cut_list = []
_, nss = self.parse_from_(None, "cuts_intersection_spec", write=False)
self._check_dataspecs_type(nss)
if not nss:
raise ConfigError("'cuts_intersection_spec' must contain at least one namespace.")
for ns in nss:
with self.set_context(
ns=self._curr_ns.new_child(ns).new_child({"use_cuts": CutsPolicy.INTERNAL})
):
# Note: Do not call _produce_internal_cuts directly here:
# That doesn't correctly set the namespace in a way that `rules`
# can be recovered, as there is no dataset_input object.
cut_list.append(self.parse_from_(None, "cuts", write=False)[1])
ndata = commondata.ndata
return MatchedCuts(cut_list, ndata=ndata)
def _produce_similarity_cuts(self, commondata):
"""Compute the intersection between two namespaces (similar to
`fromintersection`) but additionally require that the predictions
computed for each dataset across the namespaces are *similar*,
specifically that the ratio between the absolute difference in the
predictions and the total experimental uncertainty is smaller than a
given value, `cut_similarity_threshold` that must be provided. Note
that for this to work with different cfactors across the namespaces,
one must provide a different `dataset_inputs` list for each.
This mechanism can be sidetracked selectively for specific datasets.
To do that, add their names to a list called
`do_not_require_similarity_for`. The datasets in the list do not need
to appear in the `cuts_intersection_spec` name space and will be filtered
according to the internal cuts unconditionally.
"""
_, nss = self.parse_from_(None, "cuts_intersection_spec", write=False)
if len(nss) != 2:
raise ConfigError("Can only work with two namespaces")
_, cut_similarity_threshold = self.parse_from_(
None, "cut_similarity_threshold", write=False
)
try:
_, exclusion_list = self.parse_from_(None, "do_not_require_similarity_for", write=False)
except configparser.InputNotFoundError:
exclusion_list = []
name = commondata.name
# slightly circular here, since matched cuts will re-produce nss
if name in exclusion_list:
with self.set_context(ns=self._curr_ns.new_child({"use_cuts": CutsPolicy.INTERNAL})):
return self.parse_from_(None, "cuts", write=False)[1]
matched_cuts = self._produce_matched_cuts(commondata)
inps = []
for i, ns in enumerate(nss):
with self.set_context(ns=self._curr_ns.new_child({**ns})):
# TODO: find a way to not duplicate this and use a dict
# instead of a linear search
_, dins = self.parse_from_(None, "dataset_inputs", write=False)
try:
di = next(d for d in dins if d.name == name)
except StopIteration as e:
raise ConfigError(
f"cuts_intersection_spec namespace {i}: dataset inputs must define {name}"
) from e
with self.set_context(
ns=self._curr_ns.new_child(
{
"dataset_input": di,
"use_cuts": CutsPolicy.FROM_CUT_INTERSECTION_NAMESPACE,
"cuts": matched_cuts,
**ns,
}
)
):
_, ds = self.parse_from_(None, "dataset", write=False)
_, pdf = self.parse_from_(None, "pdf", write=False)
inps.append((ds, pdf))
return SimilarCuts(tuple(inps), cut_similarity_threshold)
[docs]
def produce_cuts(self, *, commondata, use_cuts):
"""Obtain cuts for a given dataset input, based on the
appropriate policy.
"""
if use_cuts is CutsPolicy.NOCUTS:
return None
elif use_cuts is CutsPolicy.FROMFIT:
return self._produce_fit_cuts(commondata)
elif use_cuts is CutsPolicy.INTERNAL:
return self._produce_internal_cuts(commondata)
elif use_cuts is CutsPolicy.FROM_CUT_INTERSECTION_NAMESPACE:
return self._produce_matched_cuts(commondata)
elif use_cuts is CutsPolicy.FROM_SIMILAR_PREDICTIONS_NAMESPACE:
return self._produce_similarity_cuts(commondata)
raise TypeError("Wrong use_cuts")
[docs]
def produce_dataset(
self,
*,
dataset_input,
theoryid,
cuts,
use_fitcommondata=False,
fit=None,
check_plotting: bool = False,
):
"""Dataset specification from the theory and CommonData.
Use the cuts from the fit, if provided. If check_plotting is set to
True, attempt to lod and check the PLOTTING files
(note this may cause a noticeable slowdown in general)."""
name = dataset_input.name
sysnum = dataset_input.sys
cfac = dataset_input.cfac
frac = dataset_input.frac
weight = dataset_input.weight
variant = dataset_input.variant
try:
ds = self.loader.check_dataset(
name=name,
sysnum=sysnum,
theoryid=theoryid,
cfac=cfac,
cuts=cuts,
frac=frac,
use_fitcommondata=use_fitcommondata,
fit=fit,
weight=weight,
variant=variant,
)
except DataNotFoundError as e:
raise ConfigError(str(e), name, self.loader.available_datasets)
except LoadFailedError as e:
raise ConfigError(e)
if check_plotting:
# normalize=True should check for more stuff
get_info(ds, normalize=True)
if not ds.commondata.plotfiles:
log.warning(f"Plotting files not found for: {ds}")
return ds
[docs]
def produce_t0dataset(
self,
*,
dataset_input,
t0id,
cuts,
use_fitcommondata=False,
fit=None,
check_plotting: bool = False,
):
"""
Same as produce_dataset, but if a ``t0theoryid`` has been defined in the
runcard then those corresponding fktables will be linked.
"""
ds = self.produce_dataset(
dataset_input=dataset_input,
theoryid=t0id,
cuts=cuts,
use_fitcommondata=use_fitcommondata,
fit=fit,
check_plotting=check_plotting,
)
return ds
[docs]
@configparser.element_of("experiments")
def parse_experiment(self, experiment: dict):
"""A set of datasets where correlated systematics are taken
into account. It is a mapping where the keys are the experiment
name 'experiment' and a list of datasets."""
try:
name, datasets = experiment["experiment"], experiment["datasets"]
except KeyError as e:
raise ConfigError(
"'experiment' must be a mapping with "
"'experiment' and 'datasets', but %s is missing" % e
) from e
dsinputs = [self.parse_dataset_input(ds) for ds in datasets]
return self.produce_data(group_name=name, data_input=dsinputs)
# TODO: Do away with the mapping and make the conversion implicitly
[docs]
def produce_sep_mult(self, separate_multiplicative=False):
if separate_multiplicative is False:
return False
return True
[docs]
def produce_loaded_theory_covmat(
self,
output_path,
data_input,
user_covmat_path=None,
point_prescriptions=None,
use_thcovmat_in_sampling=False,
use_thcovmat_in_fitting=False,
):
"""
Loads the theory covmat from the correct file according to how it
was generated by vp-setupfit.
"""
if not use_thcovmat_in_sampling and not use_thcovmat_in_fitting:
return 0.0
# Load correct file according to how the thcovmat was generated by vp-setupfit
generic_path = "datacuts_theory_theorycovmatconfig_theory_covmat_custom.csv"
if user_covmat_path is not None:
if point_prescriptions is not None:
generic_path = "datacuts_theory_theorycovmatconfig_total_theory_covmat.csv"
else:
generic_path = "datacuts_theory_theorycovmatconfig_user_covmat.csv"
theorypath = output_path / "tables" / generic_path
theory_covmat = pd.read_csv(
theorypath, index_col=[0, 1, 2], header=[0, 1, 2], sep="\t|,", engine="python"
).fillna(0)
# change ordering according to exp_covmat (so according to runcard order)
tmp = theory_covmat.droplevel(0, axis=0).droplevel(0, axis=1)
bb = [str(i) for i in data_input]
return tmp.reindex(index=bb, columns=bb, level=0).values
[docs]
@configparser.explicit_node
def produce_covmat_t0_considered(self, use_t0: bool = False):
"""Modifies which action is used as covariance_matrix depending on
the flag `use_t0`
"""
from validphys import covmats
if use_t0:
return covmats.t0_covmat_from_systematics
else:
return covmats.covmat_from_systematics
[docs]
@configparser.explicit_node
def produce_covariance_matrix(self, use_pdferr: bool = False):
"""Modifies which action is used as covariance_matrix depending on
the flag `use_pdferr`
"""
from validphys import covmats
if use_pdferr:
return covmats.pdferr_plus_covmat
else:
return covmats._covmat_t0_considered
# TODO: Do this better and elsewhere
@staticmethod
def _check_dataspecs_type(dataspecs):
if not isinstance(dataspecs, Sequence):
raise ConfigError(
"dataspecs should be a sequence of mappings, not " f"{type(dataspecs).__name__}"
)
for spec in dataspecs:
if not isinstance(spec, Mapping):
raise ConfigError(
"dataspecs should be a sequence of mappings, "
f" but {spec} is {type(spec).__name__}"
)
[docs]
def produce_matched_datasets_from_dataspecs(self, dataspecs):
"""Take an arbitrary list of mappings called dataspecs and
return a new list of mappings called dataspecs constructed as follows.
From each of the original dataspecs, resolve the key `process`, and
all the experiments and datasets therein.
Compute the intersection of the dataset names, and for each element in
the intersection construct a mapping with the follwing keys:
- process : A string with the common process name.
- experiment_name : A string with the common experiment name.
- dataset_name : A string with the common dataset name.
- dataspecs : A list of mappinngs matching the original
"dataspecs". Each mapping contains:
* dataset: A dataset with the name data_set name and the
properties (cuts, theory, etc) corresponding to the original
dataspec.
* dataset_input: The input line used to build dataset.
* All the other keys in the original dataspec.
"""
self._check_dataspecs_type(dataspecs)
all_names = []
for spec in dataspecs:
with self.set_context(ns=self._curr_ns.new_child(spec)):
_, data_input = self.parse_from_(None, "data_input", write=False)
names = {}
for dsin in data_input:
cd = self.produce_commondata(dataset_input=dsin)
proc = get_info(cd).nnpdf31_process
ds = dsin.name
names[(proc, ds)] = dsin
all_names.append(names)
used_set = set.intersection(*(set(d) for d in all_names))
res = []
for k in used_set:
inres = {"process": k[0], "dataset_name": k[1]}
# TODO: Should this have the same name?
inner_spec_list = inres["dataspecs"] = []
for ispec, spec in enumerate(dataspecs):
# Passing spec by referene
d = ChainMap({"dataset_input": all_names[ispec][k]}, spec)
inner_spec_list.append(d)
res.append(inres)
res.sort(key=lambda x: (x["process"], x["dataset_name"]))
return res
[docs]
def produce_matched_positivity_from_dataspecs(self, dataspecs):
"""Like produce_matched_datasets_from_dataspecs but for positivity datasets."""
self._check_dataspecs_type(dataspecs)
all_names = []
for spec in dataspecs:
with self.set_context(ns=self._curr_ns.new_child(spec)):
_, pos = self.parse_from_(None, "posdatasets", write=False)
names = {(p.name): (p) for p in pos}
all_names.append(names)
used_set = set.intersection(*(set(d) for d in all_names))
res = []
for k in used_set:
inres = {"posdataset_name": k}
# TODO: Should this have the same name?
l = inres["dataspecs"] = []
for ispec, spec in enumerate(dataspecs):
# Passing spec by referene
d = ChainMap({"posdataset": all_names[ispec][k]}, spec)
l.append(d)
res.append(inres)
res.sort(key=lambda x: (x["posdataset_name"]))
return res
[docs]
def produce_dataspecs_with_matched_cuts(self, dataspecs):
"""Take a list of namespaces (dataspecs), resolve ``dataset`` within
each of them, and return another list of dataspecs where the datasets
all have the same cuts, corresponding to the intersection of the
selected points. All the datasets must have the same name (i.e.
correspond with the same experimental measurement), but can otherwise
differ, for example in the theory used for the experimental
predictions.
This rule can be combined with ``matched_datasets_from_dataspecs``.
"""
self._check_dataspecs_type(dataspecs)
if not dataspecs:
return dataspecs
# Can now assume we have at least one element
cutlist = []
dslist = []
names = set()
for spec in dataspecs:
with self.set_context(ns=self._curr_ns.new_child(spec)):
_, ds = self.parse_from_(None, "dataset", write=False)
dslist.append(ds)
cutlist.append(ds.cuts)
names.add(ds.name)
lnames = len(names)
if lnames != 1:
raise ConfigError(
"Each dataspec must have a dataset with the same"
f"name, but got {lnames} different ones: {names}"
)
ndata = ds.commondata.ndata
matched_cuts = MatchedCuts(cutlist, ndata=ndata)
res = []
for spec, ds in zip(dataspecs, dslist):
newds = copy.copy(ds)
newds.cuts = matched_cuts
res.append(ChainMap({"dataset": newds}, spec))
return res
[docs]
def produce_theory_database(self):
"""Produces path to the folder of the theory runcards"""
return self.loader.theorydb_folder
[docs]
def produce_combined_shift_and_theory_dataspecs(self, dataspecs):
matched_datasets = self.produce_matched_datasets_from_dataspecs(dataspecs)
for ns in matched_datasets:
ns["dataspecs"] = self.produce_dataspecs_with_matched_cuts(ns["dataspecs"])
new_dataspecs = []
len_th = len(dataspecs)
for s in matched_datasets:
new_dataspecs.append(ChainMap({"dataspecs": s["dataspecs"][len_th:]}, s))
return {"dataspecs": {"dataspecs": new_dataspecs, "original": dataspecs}}
# TODO: Worth it to do some black magic to not pass params explicitly?
# Note that `parse_experiments` doesn't exist yet.
[docs]
def parse_reweighting_experiments(self, experiments, *, theoryid, use_cuts, fit=None):
"""A list of experiments to be used for reweighting."""
return self.parse_experiments(experiments, theoryid=theoryid, use_cuts=use_cuts, fit=fit)
[docs]
def parse_t0pdfset(self, name, unpolarized_bc=None):
"""PDF set used to generate the t0 covmat."""
return self.parse_pdf(name, unpolarized_bc=unpolarized_bc)
[docs]
def parse_use_t0(self, do_use_t0: bool):
"""Whether to use the t0 PDF set to generate covariance matrices."""
return do_use_t0
[docs]
def produce_t0set(self, t0pdfset=None, use_t0=False):
"""Return the t0set if use_t0 is True and None otherwise. Raises an
error if t0 is requested but no t0set is given.
"""
if use_t0:
if not t0pdfset:
raise ConfigError("Setting use_t0 requires specifying a valid t0pdfset")
return t0pdfset
return None
[docs]
def parse_t0theoryid(self, theoryID: (str, int)):
"""A number corresponding to the database theory ID where the
corresponding theory folder is installed in te data directory.
The t0theoryid is specifically used for SM parameter determinatins (e.g.
alphas) using the correlated replicas method of arXiv: 1802.03398. To do
an alphas determination we perform multiple fits, each with a different
value of alphas in the DGLAP kernel and hard scattering cross section.
Then we compute the chi2 for each fit to determine which alphas best
describes the data, however, to make a fair comparison we need to ensure
that the chi2 (and thus the t0 covariance matrix) has to be exactly the
same for each fit. This requires not only to fix the t0pdfset between
the different fits, but also to fix the t0theoryid.
"""
return self.parse_theoryid(theoryID)
[docs]
def produce_t0id(self, theoryid, t0theoryid=None):
"""Return the t0id if t0theoryid is set and return theoryid otherwise."""
if t0theoryid:
theoryid = t0theoryid
return theoryid
[docs]
def parse_luxset(self, name):
"""PDF set used to generate the photon with fiatlux."""
return self.parse_pdf(name)
[docs]
def parse_additional_errors(self, bool):
"""PDF set used to generate the photon additional errors:
they are constructed using the replicas 101-107 of the PDF set
LUXqed17_plus_PDF4LHC15_nnlo_100 (that are obtained varying some
parameters of the LuxQED approach) in the way described
in sec. 2.5 of https://arxiv.org/pdf/1712.07053.pdf
"""
if bool:
return self.parse_pdf("LUXqed17_plus_PDF4LHC15_nnlo_100")
else:
return False
[docs]
def parse_fakepdf(self, name):
"""PDF set used to generate the fake data in a closure test."""
return self.parse_pdf(name)
def _parse_lagrange_multiplier(self, kind, theoryid, setdict, rules):
"""Lagrange multiplier constraints are mappings
containing a `dataset` and a `maxlambda` argument which
defines the maximum value allowed for the multiplier"""
bad_msg = (
f"{kind} must be a mapping with a name ('dataset') and a float multiplier (maxlambda)"
)
theoryno, _ = theoryid
lambda_key = "maxlambda"
# BCH allow for old-style runcards with 'poslambda' instead of 'maxlambda'
if "poslambda" in setdict and "maxlambda" not in setdict:
log.warning("The `poslambda` argument has been deprecated in favour of `maxlambda`")
lambda_key = "poslambda"
try:
name = setdict["dataset"]
# Swap a possibly old name with the new one
name, _ = legacy_to_new_map(name, None)
maxlambda = float(setdict[lambda_key])
except KeyError as e:
raise ConfigError(bad_msg, setdict.keys(), e.args[0]) from e
except ValueError as e:
raise ConfigError(bad_msg) from e
if kind == "posdataset":
return self.loader.check_posset(theoryno, name, maxlambda, rules)
elif kind == "integdataset":
return self.loader.check_integset(theoryno, name, maxlambda, rules)
else:
raise ConfigError(f"The lagrange multiplier type {kind} is not understood")
[docs]
@element_of("posdatasets")
def parse_posdataset(self, posset: dict, *, theoryid, rules):
"""An observable used as positivity constrain in the fit.
It is a mapping containing 'dataset' and 'maxlambda'."""
return self._parse_lagrange_multiplier("posdataset", theoryid, posset, rules)
[docs]
def produce_posdatasets(self, positivity):
if not isinstance(positivity, dict) or "posdatasets" not in positivity:
raise ConfigError(
"Failed to get 'posdatasets' from positivity. " "Expected that key to be present."
)
return positivity["posdatasets"]
[docs]
@element_of("integdatasets")
def parse_integdataset(self, integset: dict, *, theoryid, rules):
"""An observable corresponding to a PDF in the evolution basis,
used as integrability constrain in the fit.
It is a mapping containing 'dataset' and 'maxlambda'."""
return self._parse_lagrange_multiplier("integdataset", theoryid, integset, rules)
[docs]
def produce_integdatasets(self, integrability):
if not isinstance(integrability, dict) or "integdatasets" not in integrability:
raise ConfigError(
"Failed to get 'integdatasets' from integrability. "
"Expected that key to be present."
)
return integrability["integdatasets"]
[docs]
def produce_reweight_all_datasets(self, experiments):
ret = []
for experiment in experiments:
for dsinput, dataset in zip(experiment, experiment.datasets):
single_exp = DataGroupSpec(experiment.name, datasets=[dataset], dsinputs=[dsinput])
ret.append({"reweighting_experiments": [single_exp], "dataset_input": dsinput})
return ret
[docs]
def produce_pdf_id(self, pdf) -> str:
"""Return a string containing the PDF's LHAPDF ID"""
return pdf.name
[docs]
def produce_fit_id(self, fit) -> str:
"""Return a string containing the ID of the fit"""
return fit.name
[docs]
@element_of("lumi_channels")
def parse_lumi_channel(self, ch: str):
if ch not in LUMI_CHANNELS:
raise ConfigError(
"lumi_channel not understood: %s" % ch,
ch,
alternatives=LUMI_CHANNELS,
display_alternatives="all",
)
return ch
[docs]
def produce_all_lumi_channels(self):
return {"lumi_channels": self.parse_lumi_channels(list(LUMI_CHANNELS))}
[docs]
def produce_loaded_user_covmat_path(self, user_covmat_path: str = ""):
"""
Path to the user covmat provided by user_covmat_path in the runcard.
If no path is provided, returns None.
For use in theorycovariance.construction.user_covmat.
"""
if user_covmat_path is None:
return None
else:
l = self.loader
fileloc = l.check_vp_output_file(user_covmat_path)
return fileloc
[docs]
@configparser.explicit_node
def produce_nnfit_theory_covmat(
self, point_prescriptions: list = None, user_covmat_path: str = None
):
"""
Return the theory covariance matrix used in the fit.
This function is only used in vp-setupfit to store the necessary covmats as .csv files in
the tables directory.
"""
if point_prescriptions is not None:
if user_covmat_path is not None:
# Both scalevar and user uncertainties
from validphys.theorycovariance.construction import total_theory_covmat_fitting
f = total_theory_covmat_fitting
else:
# Only scalevar uncertainties
from validphys.theorycovariance.construction import theory_covmat_custom
f = theory_covmat_custom
elif user_covmat_path is not None:
# Only user uncertainties
from validphys.theorycovariance.construction import user_covmat_fitting
f = user_covmat_fitting
return f
[docs]
def produce_fitthcovmat(
self, use_thcovmat_if_present: bool = False, fit: (str, type(None)) = None
):
"""If a `fit` is specified and `use_thcovmat_if_present` is `True` then returns the
corresponding covariance matrix for the given fit if it exists. If the fit doesn't have a
theory covariance matrix then returns `False`.
"""
if not isinstance(use_thcovmat_if_present, bool):
raise ConfigError("use_thcovmat_if_present should be a boolean, by default it is False")
if use_thcovmat_if_present and not fit:
raise ConfigError("`use_thcovmat_if_present` was true but no `fit` was specified.")
if use_thcovmat_if_present and fit:
try:
thcovmat_present = fit.as_input()["theorycovmatconfig"]["use_thcovmat_in_fitting"]
except KeyError:
# assume covmat wasn't used and fill in key accordingly but warn user
log.warning(
"use_thcovmat_if_present was true but the flag "
"`use_thcovmat_in_fitting` didn't exist in the runcard for "
f"{fit.name}. Theory covariance matrix will not be used "
"in any statistical estimators."
)
thcovmat_present = False
if use_thcovmat_if_present and thcovmat_present:
# Expected path of theory covmat hardcoded
covmat_path = (
fit.path / "tables" / "datacuts_theory_theorycovmatconfig_theory_covmat_custom.csv"
)
# All possible valid files
if not covmat_path.exists():
raise ConfigError(
"Fit appeared to use theory covmat in fit but the file was not at the "
f"usual location: {covmat_path}."
)
fit_theory_covmat = ThCovMatSpec(covmat_path)
else:
fit_theory_covmat = None
return fit_theory_covmat
[docs]
def parse_speclabel(self, label: (str, type(None))):
"""A label for a dataspec. To be used in some plots"""
return label
[docs]
@element_of("fitdeclarations")
def parse_fitdeclaration(self, label: str):
"""Used to guess some informtion from the fit name, without having
to download it. This is meant to be used with other providers like
e.g.:
{@with fits_as_from_fitdeclarations::fits_name_from_fitdeclarations@}
{@ ...do stuff... @}
{@endwith@}
"""
return label
[docs]
def produce_all_commondata(self):
"""produces all commondata using the loader function"""
ds_names = self.loader.available_datasets
ds_inputs = [self.parse_dataset_input({"dataset": ds}) for ds in ds_names]
cd_out = [self.produce_commondata(dataset_input=ds_input) for ds_input in ds_inputs]
return cd_out
[docs]
def parse_groupby(self, grouping: str):
"""parses the groupby key and checks it is an allowed grouping"""
# TODO: think if better way to do this properly
if grouping not in ["experiment", "nnpdf31_process"]:
raise ConfigError(f"Grouping not available: {grouping}, did you spell it " "correctly?")
return grouping
[docs]
def parse_norm_threshold(self, val: (numbers.Number, type(None))):
"""The threshold to use for covariance matrix normalisation, sets
the maximum l2 norm of the inverse covariance matrix, by clipping
smallest eigenvalues
If norm_threshold is set to None, then no covmat regularization is
performed
"""
if val is not None:
if val <= 0:
raise ConfigError("norm_threshold must be greater than zero.")
log.info(f"Regularizing covariance matrices with norm threshold: {val}")
return val
[docs]
def produce_no_covmat_reg(self):
"""explicitly set norm_threshold to None so that no covariance matrix
regularization is performed
"""
return {"norm_threshold": None}
[docs]
@configparser.record_from_defaults
def parse_default_filter_rules(self, spec: (str, type(None))):
return spec
[docs]
def load_default_default_filter_rules(self, spec):
import validphys.cuts.lockfiles
lock_token = "_filters.lock.yaml"
try:
return yaml.safe_load(read_text(validphys.cuts.lockfiles, f"{spec}{lock_token}"))
except FileNotFoundError as e:
alternatives = [
el.strip(lock_token)
for el in contents(validphys.cuts.lockfiles)
if el.endswith(lock_token)
]
raise ConfigError(
f"Default filter rules not found: {spec}",
bad_item=spec,
alternatives=alternatives,
display_alternatives="all",
)
[docs]
def parse_filter_rules(self, filter_rules: (list, type(None))):
"""A tuple of FilterRule objects. Rules are immutable after parsing.
See https://docs.nnpdf.science/vp/filters.html for details on the syntax"""
log.warning("Overwriting filter rules")
return tuple(FilterRule(**rule) for rule in filter_rules) if filter_rules else None
[docs]
def parse_default_filter_rules_recorded_spec_(self, spec):
"""This function is a hacky fix for parsing the recorded spec
of filter rules. The reason we need this function is that without
it reportengine detects a conflict in the `dataset` key.
"""
return spec
[docs]
def parse_added_filter_rules(self, rules: (list, type(None)) = None):
"""
Returns a tuple of AddedFilterRule objects. Rules are immutable after parsing.
AddedFilterRule objects inherit from FilterRule objects.
"""
return tuple(AddedFilterRule(**rule) for rule in rules) if rules else None
[docs]
@functools.lru_cache
def produce_rules(
self,
theoryid,
use_cuts,
defaults,
default_filter_rules=None,
filter_rules=None,
default_filter_rules_recorded_spec_=None,
added_filter_rules: (tuple, type(None)) = None,
):
"""Produce filter rules based on the user defined input and defaults."""
theory_parameters = theoryid.get_description()
if filter_rules is None:
# Don't bother loading the rules if we are not using them.
if use_cuts is not CutsPolicy.INTERNAL:
return None
if default_filter_rules_recorded_spec_ is not None:
filter_rules = default_filter_rules_recorded_spec_[default_filter_rules]
else:
filter_rules = default_filter_rules_input()
try:
rule_list = [
Rule(
initial_data=rule,
defaults=defaults,
theory_parameters=theory_parameters,
loader=self.loader,
)
for rule in filter_rules
]
except RuleProcessingError as e:
raise ConfigError(f"Error Processing filter rules: {e}") from e
if added_filter_rules:
for i, rule in enumerate(added_filter_rules):
try:
rule_list.append(
Rule(
initial_data=rule,
defaults=defaults,
theory_parameters=theory_parameters,
loader=self.loader,
)
)
except RuleProcessingError as e:
raise ConfigError(f"Error processing added rule {i}: {e}") from e
return tuple(rule_list)
[docs]
@configparser.record_from_defaults
def parse_default_filter_settings(self, spec: (str, type(None))):
return spec
[docs]
def load_default_default_filter_settings(self, spec):
import validphys.cuts.lockfiles
lock_token = "_defaults.lock.yaml"
try:
return yaml.safe_load(read_text(validphys.cuts.lockfiles, f"{spec}{lock_token}"))
except FileNotFoundError as e:
alternatives = alternatives = [
el.strip(lock_token)
for el in contents(validphys.cuts.lockfiles)
if el.endswith(lock_token)
]
raise ConfigError(
f"Default filter settings not found: {spec}",
bad_item=spec,
alternatives=alternatives,
display_alternatives="all",
)
[docs]
def parse_filter_defaults(self, filter_defaults: (dict, type(None))):
"""A mapping containing the default kinematic limits to be used when
filtering data (when using internal cuts).
Currently these limits are ``q2min``, ``w2min``, and ``maxTau``.
Parameters
----------
filter_defaults: dict, None
A mapping containing the default kinematic limits to be used when
filtering data (when using internal cuts).
Currently these limits are ``q2min``, ``w2min``, and ``maxTau``.
Returns
-------
FilterDefaults
A hashable object containing the default kinematic limits to be used when
filtering data (when using internal cuts).
Currently these limits are ``q2min``, ``w2min``, and ``maxTau``.
"""
log.warning("Overwriting filter defaults")
parsed_filter_defaults = FilterDefaults(**filter_defaults)
return parsed_filter_defaults
[docs]
def produce_defaults(
self,
q2min=None,
w2min=None,
maxTau=None,
default_filter_settings=None,
filter_defaults=None,
default_filter_settings_recorded_spec_=None,
):
"""Produce default values for filters taking into account the
values of ``q2min``, ``w2min`` and ``maxTau`` defined at namespace
level and those inside a ``filter_defaults`` mapping.
Within this function the hashable type FilterDefaults is turned into
a dictionary so as to allow for overwriting of the values of q2min, w2min and maxTau.
The dictionary is then turned back into a FilterDefaults object.
"""
if filter_defaults is None:
filter_defaults = {}
if isinstance(filter_defaults, FilterDefaults):
filter_defaults = filter_defaults.to_dict()
if q2min is not None and "q2min" in filter_defaults and q2min != filter_defaults["q2min"]:
raise ConfigError("q2min defined multiple times with different values")
if w2min is not None and "w2min" in filter_defaults and w2min != filter_defaults["w2min"]:
raise ConfigError("w2min defined multiple times with different values")
if maxTau is not None and filter_defaults.get("maxTau", maxTau) != maxTau:
raise ConfigError("maxTau defined multiple times with different values")
if default_filter_settings_recorded_spec_ is not None:
filter_defaults = FilterDefaults(
**default_filter_settings_recorded_spec_[default_filter_settings]
)
# If we find recorded specs return immediately and don't read q2min and w2min
# from runcard
return filter_defaults
elif not filter_defaults:
# if filter_defaults have not been set, load the defaults with default_filter_settings_input
filter_defaults = default_filter_settings_input().to_dict()
defaults_loaded = True
else:
defaults_loaded = False
if q2min is not None and defaults_loaded:
log.warning("Using q2min from runcard")
filter_defaults["q2min"] = q2min
if w2min is not None and defaults_loaded:
log.warning("Using w2min from runcard")
filter_defaults["w2min"] = w2min
if maxTau is not None and defaults_loaded:
log.warning("Using maxTau from runcard")
filter_defaults["maxTau"] = maxTau
# Turn the dictionary back into a hashable FilterDefaults object
filter_defaults = FilterDefaults(**filter_defaults)
return filter_defaults
[docs]
def produce_data(self, data_input, *, group_name="data"):
"""A set of datasets where correlated systematics are taken
into account
"""
datasets = []
for dsinp in data_input:
with self.set_context(ns=self._curr_ns.new_child({"dataset_input": dsinp})):
datasets.append(self.parse_from_(None, "dataset", write=False)[1])
return DataGroupSpec(name=group_name, datasets=datasets, dsinputs=data_input)
def _parse_data_input_from_(
self, parse_from_value: (str, type(None)), additional_context: (dict, type(None)) = None
):
"""Function which parses the ``data_input`` from a namespace. Usage
is similar to :py:meth:`self.parse_from_` except this function bridges
the gap between the new and old way of specifying data.
First it attempts to parse ``dataset_inputs`` from the namespace
specified by ``parse_from_value``, for more information see
:py:meth:`self.parse_from_`. If that fails then attempt to parse
``experiments``. If both should fail then raise the first exception
encountered from the second, so that the cause can be surface in
``debug`` mode.
Parameters
----------
parse_from_value: str, None
value which will be passed to :py:meth:`self.parse_from_`. If None
then parses from the current namespace but can also be another
input resource which can be resolved as a ``dict``.
additional_context: dict, None
additional context to update the namespace specified by
``parse_from_value``.
In the case of this function, if ``experiments`` needs to be parsed
then it has the additional requirements of ``theoryid`` and
``use_cuts`` which should either already be present in
``parse_from_value`` or can be passed as a ``dict`` using this
parameter i.e ``additional_context={"theoryid": 53}``.
"""
with self.set_context(ns=self._curr_ns.new_child(additional_context)):
# new fits have dataset_inputs, old fits have experiments
data_key = "dataset_inputs"
try:
_, data_val = self.parse_from_(parse_from_value, data_key, write=False)
except ConfigError as e:
data_key = "experiments"
log.warning(
"`experiments` has been deprecated, specify data using `dataset_inputs`. "
"Any grouping defined by `experiments` is being ignored."
)
# We need to make theoryid available if using experiments
try:
_, experiments = self.parse_from_(parse_from_value, data_key, write=False)
data_val = NSList(
[dsinput for experiment in experiments for dsinput in experiment.dsinputs],
nskey="dataset_input",
)
except ConfigError as inner_error:
log.error(inner_error)
raise e from inner_error
return data_val
[docs]
@record_from_defaults
def parse_data_grouping(self, key):
"""a key which indicates which default grouping to use. Mainly for
internal use. It allows the default grouping of experiment to be applied
to runcards which don't specify `metadata_group` without there being
a namespace conflict in the lockfile
"""
return key
[docs]
def load_default_data_grouping(self, spec):
"""Load the default grouping of data"""
# slightly superfluous, only one default at present but perhaps
# somebody will want to add to this at some point e.g for th. uncertainties
allowed = {"standard_report": "experiment", "thcovmat_fit": "ALL"}
return allowed[spec]
[docs]
def produce_processed_data_grouping(
self,
use_thcovmat_in_fitting=False,
use_thcovmat_in_sampling=False,
data_grouping=None,
data_grouping_recorded_spec_=None,
):
"""Process the data_grouping key from the runcard, or lockfile. If
`data_grouping_recorded_spec_` is present then its value is taken, and
the runcard is assumed to be a lockfile.
If data_grouping is None, then, if either use_thcovmat_in_fitting or use_thcovmat_in_sampling
(or both) are true (which means that the fit is a thcovmat fit), group all the datasets
together, otherwise fall back to the default behaviour of grouping by
experiment (called standard_report).
Else, the user can specfiy their own grouping, for example metadata_process.
"""
if data_grouping is None:
# fallback to old default behaviour, but still record to lockfile
data_grouping = self.parse_data_grouping("standard_report")
if use_thcovmat_in_fitting or use_thcovmat_in_sampling:
data_grouping = self.parse_data_grouping("thcovmat_fit")
if data_grouping_recorded_spec_ is not None:
return data_grouping_recorded_spec_[data_grouping]
return self.load_default_data_grouping(data_grouping)
[docs]
def produce_theoryids(self, t0id, point_prescription):
"""Produces a list of theoryids given a theoryid at central scales and a point
prescription. The options for the latter are defined in pointprescriptions.yaml.
This hard codes the theories needed for each prescription to avoid user error."""
th = t0id.id
lsv = yaml.safe_load(read_text(validphys.scalevariations, "scalevariationtheoryids.yaml"))
scalevarsfor_list = lsv["scale_variations_for"]
# Allowed central theoryids
cent_thids = [int(scalevarsfor_dict["theoryid"]) for scalevarsfor_dict in scalevarsfor_list]
if th not in cent_thids:
valid_thids = ", ".join([str(i) for i in cent_thids])
raise ConfigError(
"Scale variations are not currently defined for this central theoryid. It is "
+ f"currently only possible to use one of the following as the central theory: {valid_thids}. "
+ "Please use one of these instead if you wish to include theory uncertainties here."
)
# Find scales that correspond to this point prescription
pp_scales_dict = yaml.safe_load(
read_text(validphys.scalevariations, "pointprescriptions.yaml")
)
try:
scales = pp_scales_dict[point_prescription]
except KeyError:
valid_pps = ", ".join(pp_scales_dict.keys())
raise ConfigError(
"Scale variations are not currently defined for this point prescription. This "
+ "configuration only works when 'point_prescription' is equal to one of the "
+ f"following: {valid_pps}. Please use one of these instead if you wish to "
+ "include theory uncertainties here."
)
# Get dictionary containing theoryid and variations for central theory from runcard
for scalevarsfor_dict in scalevarsfor_list:
if scalevarsfor_dict["theoryid"] == int(th):
theoryid_variations = scalevarsfor_dict
# Find theoryids for given point prescription for given central theoryid
try:
thids = [theoryid_variations["variations"][scale] for scale in scales]
except KeyError:
available_scales = list(theoryid_variations["variations"])
missing_scales = []
for scale in scales:
if scale not in available_scales:
missing_scales.append(scale)
missing_scales_string = ", ".join(missing_scales)
raise ConfigError(
"For this central theoryid, the requested point prescription is not currently "
+ "available. To use this point prescription for this central theoryid, theoryids "
+ "that correspond to the following scale choices must be created and added to "
+ "validphys2/src/validphys/scalevariations/scalevariationtheoryids.yaml: "
+ f"missing variations = {missing_scales_string}."
)
# Check each theory is loaded
theoryids = [self.loader.check_theoryID(thid) for thid in thids]
return NSList(theoryids, nskey="theoryid")
[docs]
@configparser.explicit_node
def produce_filter_data(self, fakedata: bool = False, theorycovmatconfig=None):
"""Set the action used to filter the data to filter either real or
closure data. If the closure data filter is being used and if the
theory covariance matrix is not being closure tested then filter
data by experiment for efficiency"""
import validphys.filters
if not fakedata:
return validphys.filters.filter_real_data
else:
if theorycovmatconfig is not None and theorycovmatconfig.get(
"use_thcovmat_in_sampling"
):
# NOTE: By the time we run theory covmat closure tests,
# hopefully the generation of pseudodata will be done in python.
raise ConfigError(
"Generating closure test data which samples from the theory "
"covariance matrix has not been implemented yet."
)
return validphys.filters.filter_closure_data_by_experiment
[docs]
@configparser.explicit_node
def produce_total_chi2_data(self, fitthcovmat):
"""If there is no theory covmat for the fit, then calculate the
total chi2 by summing the chi2 from each experiment.
"""
import validphys.results
if fitthcovmat is None:
return validphys.results.total_chi2_data_from_experiments
return validphys.results.dataset_inputs_abs_chi2_data
[docs]
@configparser.explicit_node
def produce_total_phi_data(self, fitthcovmat):
"""If there is no theory covmat for the fit, then calculate the total
phi using contributions from each experiment.
"""
import validphys.results
if fitthcovmat is None:
return validphys.results.total_phi_data_from_experiments
return validphys.results.dataset_inputs_phi_data
[docs]
class Config(report.Config, CoreConfig):
"""The effective configuration parser class."""