Source code for validphys.commondataparser
"""
This module implements parsers for commondata and its associated metadata and uncertainties files
into useful structures that can be fed to the main :py:class:`validphys.coredata.CommonData` class.
A CommonData file is completely defined by a dataset name
(which defines the folder in which the information is)
and observable name (which defines the specific data, fktables and plotting settings to read).
<experiment>_<process>_<energy>{_<extras>}_<observable>
Where the folder name is ``<experiment>_<process>_<energy>{_<extras>}``
The definition of all information for a given dataset (and all its observable) is in the
``metadata.yaml`` file and its ``implemented_observables``.
This module defines a number of parsers using the ``validobj`` library.
The full ``metadata.yaml`` is read as a ``SetMetaData`` object
which contains a list of ``ObservableMetaData``.
These ``ObservableMetaData`` are the "datasets" of NNPDF for all intents and purposes.
The parent ``SetMetaData`` collects some shared variables such as the version of the dataset,
arxiv, inspire or hepdata ids, the folder in which the data is, etc.
The main class in this module is thus ``ObservableMetaData`` which holds _all_ information
about the particular dataset-observable that we are interested in (and a reference to its parent).
Inside the ``ObservableMetaData`` we can find:
- ``TheoryMeta``: contains the necessary information to read the (new style) fktables
- ``KinematicsMeta``: containins metadata about the kinematics
- ``PlottingOptions``: plotting style and information for validphys
- ``Variant``: variant to be used
The CommonMetaData defines how the CommonData file is to be loaded,
by modifying the CommonMetaData using one of the loaded Variants one can change the resulting
:py:class:`validphys.coredata.CommonData` object.
"""
import dataclasses
from functools import cached_property, lru_cache
import logging
from operator import attrgetter
from pathlib import Path
from typing import Any, Dict, Optional
import numpy as np
import pandas as pd
from validobj import ValidationError, parse_input
from validobj.custom import Parser
from nnpdf_data import new_to_legacy_map, path_commondata
from nnpdf_data.utils import parse_yaml_inp
# We cannot use ruamel directly due to the ambiguity ruamel.yaml / ruamel_yaml
# of some versions which are pinned in some of the conda packages we use...
from reportengine.compat import yaml
from validphys.coredata import KIN_NAMES, CommonData
from validphys.plotoptions.plottingoptions import PlottingOptions, labeler_functions
from validphys.process_options import ValidProcess
try:
# If libyaml is available, use the C loader to speed up some of the read
# https://pyyaml.org/wiki/LibYAML
# libyaml is available for most linux distributions
Loader = yaml.CLoader
except AttributeError:
# fallback to the slow loader
Loader = yaml.Loader
def _quick_yaml_load(filepath):
return yaml.load(filepath.read_text(encoding="utf-8"), Loader=Loader)
# JCM:
# Some notes for developers
# The usage of `frozen` in the definitions of the dataclass is not strictly necessary
# however, changing the metadata can have side effects in many parts on validphys.
# By freezing the overall class (and leaving only specific attributes unfrozen) we have a more
# granular control. Please, use setter to modify frozen class instead of removing frozen.
EXT = "pineappl.lz4"
_INDEX_NAME = "entry"
log = logging.getLogger(__name__)
KINLABEL_LATEX = {
"DIJET": ("\\eta", "$\\m_{1,2} (GeV)", "$\\sqrt{s} (GeV)"),
"DIS": ("$x$", "$Q^2 (GeV^2)$", "$y$"),
"DYP": ("$y$", "$M^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWJ_JPT": ("$p_T (GeV)$", "$M^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWJ_JRAP": ("$\\eta/y$", "$M^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWJ_MLL": ("$M_{ll} (GeV)$", "$M_{ll}^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWJ_PT": ("$p_T (GeV)$", "$M^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWJ_PTRAP": ("$\\eta/y$", "$p_T^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWJ_RAP": ("$\\eta/y$", "$M^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWK_MLL": ("$M_{ll} (GeV)$", "$M_{ll}^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWK_PT": ("$p_T$ (GeV)", "$M^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWK_PTRAP": ("$\\eta/y$", "$p_T^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWK_RAP": ("$\\eta/y$", "$M^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"EWK_RAP_ASY": ("$\\eta/y$", "$M^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"HIG_RAP": ("$y$", "$M_H^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"HQP_MQQ": ("$M^{QQ} (GeV)$", "$\\mu^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"HQP_PTQ": ("$p_T^Q (GeV)$", "$\\mu^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"HQP_PTQQ": ("$p_T^{QQ} (GeV)$", "$\\mu^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"HQP_YQ": ("$y^Q$", "$\\mu^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"HQP_YQQ": ("$y^{QQ}$", "$\\mu^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"INC": ("$0$", "$\\mu^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"JET": ("$\\eta$", "$p_T^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"PHT": ("$\\eta_\\gamma$", "$E_{T,\\gamma}^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"SIA": ("$z$", "$Q^2 (GeV^2)$", "$y$"),
"SHP_ASY": ("$\\eta$", "$p_T (GeV)$", "$\\sqrt{s} (GeV)$"),
"JET_POL": ("$\\eta$", "$p_T^2 (GeV^2)$", "$\\sqrt{s} (GeV)$"),
"DIJET_POL": ("$\\m_{1,2} (GeV)", "$\\eta_1$", "$\\eta_2$"),
}
PROCESS_DESCRIPTION_LABEL = {
"EWJ_JRAP": "Jet Rapidity Distribution",
"EWK_RAP": "Drell-Yan Rapidity Distribution",
"EWJ_RAP": "Jet Rapidity Distribution",
"HQP_PTQ": "Heavy Quarks Production Single Quark Transverse Momentum Distribution",
"JET": "Jets Rapidity Distribution",
"HIG_RAP": "Higgs Rapidity Distribution",
"HQP_YQ": "Heavy Quarks Production Single Quark Rapidity Distribution",
"EWJ_JPT": "Jet Transverse Momentum Distribution",
"DIS": "Deep Inelastic Scattering",
"HQP_PTQQ": "Heavy Quarks Production Transverse Momentum Distribution",
"EWK_PT": "Drell-Yan Transverse Momentum Distribution",
"EWJ_PT": "Jet Transverse Momentum Distribution",
"PHT": "Photon Production",
"HQP_MQQ": "Heavy Quarks Production Mass Distribution",
"EWK_PTRAP": "Drell-Yan Transverse Momentum Distribution",
"HQP_YQQ": "Heavy Quarks Production Rapidity Distribution",
"INC": "Heavy Quarks Total Cross Section",
"EWJ_MLL": "Jet Mass Distribution",
"EWK_MLL": "Drell-Yan Mass Distribution",
"DIJET": "Dijets Invariant Mass and Rapidity Distribution",
"DYP": "Fixed-Target Drell-Yan",
"JET_POL": "Inclusive Jet longitudinal double-spin asymmetry",
"DIJET_POL": "Dijets longitudinal double-spin asymmetry",
"SHP_ASY": "double spin asymmetry in single hadron production",
}
def _get_ported_kinlabel(process_type):
"""Get the kinematic label for ported datasets
In principle there is a one to one correspondance between the process label in the kinematic and
``KINLABEL_LATEX``, however, there were some special cases that need to be taken into account
"""
process_type = str(process_type)
if process_type in KINLABEL_LATEX:
return KINLABEL_LATEX[process_type]
# special case in which the process in DIS- or DYP-like
if process_type[:3] in ("DIS", "DYP"):
return _get_ported_kinlabel(process_type[:3])
if len(process_type.split("_")) > 1:
return _get_process_description(process_type.rsplit("_", 1)[0])
raise KeyError(f"Label {process_type} not recognized in KINLABEL_LATEX")
def _get_process_description(process_type):
"""Get the process description string for a given process type
Similarly to kinlabel, some special cases are taken into account.
"""
try:
return process_type.description
except AttributeError:
# This process needs to be updated
pass
if process_type in PROCESS_DESCRIPTION_LABEL:
return PROCESS_DESCRIPTION_LABEL[process_type]
# If not, is this a DYP- or DIS-like dataset?
if process_type[:3] in ("DIS", "DYP"):
return _get_process_description(process_type[:3])
# Remove pieces of "_" until it is found
if len(process_type.split("_")) > 1:
return _get_process_description(process_type.rsplit("_", 1)[0])
raise KeyError(f"Label {process_type} not found in PROCESS_DESCRIPTION_LABEL")
@Parser
def ValidPath(path_str: str) -> Path:
"""Parse strings into paths"""
return Path(path_str)
### Theory metadata
@Parser
def ValidOperation(op_str: Optional[str]) -> str:
"""Ensures that the operation defined in the commondata file is implemented in validphys"""
if op_str is None:
op_str = "NONE"
ret = op_str.upper()
# TODO: move accepted operations to this module so that the convolution receives an operation to apply
# instead of an operation to understand
from validphys.convolution import OP
if ret not in OP:
raise ValidationError(f"The operation '{op_str}' is not implemented in validphys")
return str(ret)
[docs]@dataclasses.dataclass(frozen=True)
class TheoryMeta:
"""Contains the necessary information to load the associated fktables
The theory metadata must always contain a key ``FK_tables`` which defines
the fktables to be loaded.
The ``FK_tables`` is organized as a double list such that:
The inner list is concatenated
In practice these are different fktables that might refer to the same observable but
that are divided in subgrids for practical reasons.
The outer list instead are the operands for whatever operation needs to be computed
in order to match the experimental data.
In addition there are other flags that can affect how the fktables are read or used:
- operation: defines the operation to apply to the outer list
- shifts: mapping with the single fktables and their respective shifts
useful to create "gaps" so that the fktables and the respective experimental data
are ordered in the same way (for instance, when some points are missing from a grid)
This class is inmutable, what is read from the commondata metadata should be considered final
Example
-------
>>> from validphys.commondataparser import TheoryMeta
... from validobj import parse_input
... from reportengine.compat import yaml
... theory_raw = '''
... FK_tables:
... - - fk1
... - - fk2
... - fk3
... operation: ratio
... '''
... theory = yaml.safe_load(theory_raw)
... parse_input(theory, TheoryMeta)
TheoryMeta(FK_tables=[['fk1'], ['fk2', 'fk3']], operation='RATIO', shifts = None, conversion_factor=1.0, comment=None, normalization=None))
"""
FK_tables: list[tuple]
operation: ValidOperation = "NULL"
conversion_factor: float = 1.0
shifts: Optional[dict] = None
normalization: Optional[dict] = None
comment: Optional[str] = None
[docs] def fktables_to_paths(self, grids_folder):
"""Given a source for pineappl grids, constructs the lists of fktables
to be loaded"""
ret = []
for operand in self.FK_tables:
ret.append([grids_folder / f"{m}.{EXT}" for m in operand])
return ret
[docs] @classmethod
def parser(cls, yaml_file):
"""The yaml databases in the server use "operands" as key instead of "FK_tables" """
if not yaml_file.exists():
raise FileNotFoundError(yaml_file)
meta = yaml.safe_load(yaml_file.read_text())
# Make sure the operations are upper-cased for compound-compatibility
meta["operation"] = "NULL" if meta["operation"] is None else meta["operation"].upper()
if "operands" in meta:
meta["FK_tables"] = meta.pop("operands")
return parse_input(meta, cls)
def __hash__(self):
"""Include in the hash all pieces of information available for functions using a cache.
This includes also any possible comment in the off-chance it could have a meaning."""
to_be_hashed = [self.operation, self.conversion_factor]
to_be_hashed.append(tuple([tuple(i) for i in self.FK_tables]))
if self.shifts is not None:
to_be_hashed.append(tuple(self.shifts.keys()))
to_be_hashed.append(tuple(self.shifts.values()))
if self.normalization is not None:
to_be_hashed.append(tuple(self.normalization.keys()))
to_be_hashed.append(tuple(self.normalization.values()))
if self.comment is not None:
to_be_hashed.append(self.comment)
return hash(tuple(to_be_hashed))
## Theory end
[docs]@dataclasses.dataclass(frozen=True)
class Variant:
"""The new commondata format allow the usage of variants
A variant can overwrite a number of keys, as defined by this dataclass
"""
data_uncertainties: Optional[list[ValidPath]] = None
theory: Optional[TheoryMeta] = None
data_central: Optional[ValidPath] = None
ValidVariants = Dict[str, Variant]
### Kinematic data
[docs]@dataclasses.dataclass(frozen=True)
class ValidVariable:
"""Defines the variables"""
label: str
description: str = ""
units: str = ""
[docs] def apply_label(self, value):
"""Return a string formatted as label = value (units)"""
tmp = f"{self.label} = {value}"
if self.units:
tmp += f" ({self.units})"
return tmp
[docs]@dataclasses.dataclass(frozen=True)
class ValidKinematics:
"""Contains the metadata necessary to load the kinematics of the dataset.
The variables should be a dictionary with the key naming the variable
and the content complying with the ``ValidVariable`` spec.
Only the kinematics defined by the key ``kinematic_coverage`` will be loaded,
which must be three.
Three shall be the number of the counting and the number of the counting shall be three.
Four shalt thou not count, neither shalt thou count two,
excepting that thou then proceedeth to three.
Once the number three, being the number of the counting, be reached,
then the kinematics be loaded in the direction of thine validobject.
"""
file: ValidPath
variables: Dict[str, ValidVariable]
[docs] def get_label(self, var):
"""For the given variable, return the label as label (unit)
If the label is an "extra" return the last one
"""
if var.startswith("extra_"):
return list(self.variables.values())[-1]
return self.variables[var].full_label()
[docs] def apply_label(self, var, value):
"""For a given value for a given variable, return the labels
as label = value (unit)
If the variable is not included in the list of variables, returns None
as the variable could've been transformed by a kinematic transformation
"""
if var not in self.variables:
return None
return self.variables[var].apply_label(value)
### kinematics end
### Observable and dataset definitions
[docs]@dataclasses.dataclass(frozen=True, eq=True)
class ObservableMetaData:
observable_name: str
observable: dict
ndata: int
# Plotting options
plotting: PlottingOptions
process_type: ValidProcess
kinematic_coverage: list[str]
# Data itself
kinematics: ValidKinematics
data_uncertainties: list[ValidPath]
# The central data is optional _only_ for
# positivity datasets, and will be checked as soon as the class is instantiated
data_central: Optional[ValidPath] = None
# Optional data
theory: Optional[TheoryMeta] = None
tables: Optional[list] = dataclasses.field(default_factory=list)
npoints: Optional[list] = dataclasses.field(default_factory=list)
variants: Optional[ValidVariants] = dataclasses.field(default_factory=dict)
applied_variant: Optional[str] = None
ported_from: Optional[str] = None
# Derived quantities:
# Note that an observable without a parent will fail in many different ways
_parent: Optional[Any] = None
def __post_init__(self):
"""
Small modifications for better compatibility with the rest of validphys
"""
# Since vp will rely on the kinematics being 3 variables,
# fill the extra with whatever can be found in the kinematics dictionary
# otherwise just fill with extra_x
if len(self.kinematic_coverage) < 3:
unused = list(set(self.kinematics.variables) - set(self.kinematic_coverage))
diff_to_3 = 3 - len(self.kinematic_coverage)
if unused:
nkincov = self.kinematic_coverage + unused[diff_to_3:]
else:
nkincov = self.kinematic_coverage + [f"extra_{i}" for i in range(diff_to_3)]
object.__setattr__(self, 'kinematic_coverage', nkincov)
def __hash__(self):
"""ObservableMetaData is defined by:
- the setname
- the variant used
- the data
"""
return hash((self.name, self.applied_variant, self.data_central))
[docs] def check(self):
"""Various checks to apply manually to the observable before it is used anywhere
These are not part of the __post_init__ call since they can only happen after the metadata
has been read, the observable selected and (likely) variants applied.
"""
# Check whether the data central or the uncertainties are empty for a non-positivity/integrability set
if not self.is_lagrange_multiplier:
if self.data_central is None:
raise ValidationError(f"Missing `data_central` field for {self.name}")
if not self.data_uncertainties:
ermsg = f"Missing `data_uncertainties` for {self.name}."
# be polite
if "legacy" in self.variants:
ermsg += " Maybe you intended to use `variant: legacy`?"
raise ValidationError(ermsg)
# Check that plotting.plot_x is being filled
if self.plotting.plot_x is None:
ermsg = f"No variable selected as x-axis in the plot for {self.name}. Please add `plotting::plot_x`."
if self.plotting.x is not None:
ermsg += "Please replace `plotting::x` with `plotting::plot_x`."
raise ValidationError(ermsg)
# Ensure that all variables in the kinematic coverage exist
for var in self.kinematic_coverage:
if var not in self.kinematics.variables and not var.startswith("extra_"):
raise ValidationError(
f"Variable {var} is in `kinematic_coverage` but not included in `kinematics`"
f", nor it is a variable of type `extra_` for {self.name} dataset."
)
if len(self.kinematic_coverage) > 3:
raise ValidationError(
"Only a maximum of 3 variables can be used for `kinematic_coverage`"
)
[docs] def apply_variant(self, variant_name):
"""Return a new instance of this class with the variant applied
This class also defines how the variant is applied to the commondata
"""
try:
variant = self.variants[variant_name]
except KeyError as e:
raise ValueError(f"The requested variant does not exist {variant_name}") from e
variant_replacement = {}
if variant.data_uncertainties is not None:
variant_replacement["data_uncertainties"] = variant.data_uncertainties
if variant.theory is not None:
variant_replacement["theory"] = variant.theory
if variant.data_central is not None:
variant_replacement["data_central"] = variant.data_central
return dataclasses.replace(self, applied_variant=variant_name, **variant_replacement)
@property
def is_positivity(self):
return self.setname.startswith("NNPDF_POS")
@property
def is_integrability(self):
return self.setname.startswith("NNPDF_INTEG")
@property
def is_lagrange_multiplier(self):
return self.is_positivity or self.is_integrability
@property
def path_data_central(self):
return self._parent.folder / self.data_central
[docs] def load_data_central(self):
"""Loads the data for this commondata returns a dataframe
Returns
-------
pd.DataFrame
a dataframe containing the data
"""
if self.is_lagrange_multiplier:
data = np.zeros(self.ndata)
else:
datayaml = _quick_yaml_load(self.path_data_central)
data = datayaml["data_central"]
if len(data) != self.ndata:
raise ValueError(
f"The number of bins in {self.path_data_central} does not match ndata={self.ndata}"
)
data_df = pd.DataFrame(data, index=range(1, self.ndata + 1), columns=["data"])
data_df.index.name = _INDEX_NAME
return data_df
@property
def paths_uncertainties(self):
return [self._parent.folder / i for i in self.data_uncertainties]
[docs] def load_uncertainties(self):
"""Returns a dataframe with all appropiate uncertainties
Returns
-------
pd.DataFrame
a dataframe containing the uncertainties
"""
if self.is_lagrange_multiplier:
return pd.DataFrame([{}] * self.ndata, index=range(1, self.ndata + 1))
all_df = []
for ufile in self.paths_uncertainties:
uncyaml = _quick_yaml_load(ufile)
mindex = pd.MultiIndex.from_tuples(
[(k, v["treatment"], v["type"]) for k, v in uncyaml["definitions"].items()],
names=["name", "treatment", "type"],
)
bin_list = pd.DataFrame(uncyaml["bins"]).values.astype(float)
if len(bin_list) != self.ndata:
raise ValueError(f"The number of bins in {ufile} does not match ndata={self.ndata}")
# I'm guessing there will be a better way of doing this than calling dataframe twice for the same thing?
final_df = pd.DataFrame(bin_list, columns=mindex, index=range(1, self.ndata + 1))
final_df.index.name = _INDEX_NAME
all_df.append(final_df)
return pd.concat(all_df, axis=1)
@property
def path_kinematics(self):
return self._parent.folder / self.kinematics.file
[docs] def load_kinematics(self, fill_to_three=True, drop_minmax=True):
"""Returns a dataframe with the kinematic information
Parameters
----------
fill_to_three: bool
ensure that there are always three columns (repeat the last one) in the kinematics
drop_minmax: bool
Drop the min and max value, necessary for legacy comparisons
Returns
-------
pd.DataFrame
a dataframe containing the kinematics
"""
kinematics_file = self.path_kinematics
kinyaml = _quick_yaml_load(kinematics_file)
kin_dict = {}
for bin_index, dbin in enumerate(kinyaml["bins"], start=1):
for d in dbin.values():
if d["mid"] is None:
d["mid"] = 0.5 * (d["max"] + d["min"])
if drop_minmax:
d["min"] = None
d["max"] = None
else:
# If we are not dropping it, ensure that it has something!
d["min"] = d["min"] if d.get("min") is not None else d["mid"]
d["max"] = d["max"] if d.get("max") is not None else d["mid"]
# The old commondata always had 3 kinematic variables and the code sometimes
# relies on this fact
# Add a fake one at the end repeating the last one
if fill_to_three and (ncol := len(dbin)) < 3:
for i in range(3 - ncol):
dbin[f"extra_{i}"] = d
kin_dict[bin_index] = pd.DataFrame(dbin).stack()
if len(kin_dict) != self.ndata:
raise ValueError(
f"The number of bins in {kinematics_file} does not match ndata={self.ndata}"
)
return pd.concat(kin_dict, axis=1, names=[_INDEX_NAME]).swaplevel(0, 1).T
# Properties inherited from parent
@property
def nnpdf_metadata(self):
return self._parent.nnpdf_metadata
@property
def setname(self):
return self._parent.setname
@property
def experiment(self):
return self.setname.split("_")[0]
@property
def process(self):
return self.setname.split("_")[1]
@property
def cm_energy(self):
return self._parent.cm_energy
@property
def name(self):
return f"{self.setname}_{self.observable_name}"
@property
def is_ported_dataset(self):
"""Return True if this is an automatically ported dataset that has not been updated"""
if self.ported_from is None:
return False
# If it is using a legacy variant and has a ported_from field, then it is a ported one
if self.applied_variant is not None and self.applied_variant.startswith("legacy"):
return True
# If not using a legacy variant, we consider it ported if the kin variables are still k1,k2,k3
return {"k1", "k2", "k3"} == set(self.kinematic_coverage)
@property
def kinlabels(self):
"""Return the kinematic labels in the same order as they are set
in ``kinematic_coverage`` (which in turns follow the key kinematic_coverage)
If this is a ported dataset, rely on the process type using the legacy labels
"""
if self.is_ported_dataset:
return _get_ported_kinlabel(self.process_type)
return [self.kinematics.get_label(i) for i in self.kinematic_coverage]
[docs] def digest_plotting_variable(self, variable):
"""Digest plotting variables in the ``line_by`` or ``figure_by`` fields
and return the appropiate ``kX`` or other label such that the plotting functions
of validphys can understand it.
These might be variables included as part of the kinematics or extra labels
defined in the plotting dictionary.
"""
# If it is part of the coverage, just return the relevant KN
if variable in self.kinematic_coverage:
fig_idx = self.kinematic_coverage.index(variable)
return f"k{fig_idx + 1}"
# If it is not in the coverage, it might be a _known_ extra label
if self.plotting.extra_labels is not None and variable in self.plotting.extra_labels:
# In that case return it raw
return variable
# Or, it might be a variable that VP knows how to deal with automagically
if variable in labeler_functions:
return variable
raise ValueError(f"Don't know what to do with plotting variable {variable} for {self.name}")
def _plotting_options_set(self):
"""Set and return the PlottingOptions metadata
Fill in missing information that can be learnt from the other variables (xlabel/ylabel)
or that is shared by the whole dataset.
"""
if self.plotting.already_digested:
return self.plotting
if self.plotting.nnpdf31_process is None:
self.plotting.nnpdf31_process = self.nnpdf_metadata["nnpdf31_process"]
if self.plotting.experiment is None:
self.plotting.experiment = self.nnpdf_metadata["experiment"]
if self.plotting.process_description is None:
self.plotting.process_description = _get_process_description(self.process_type)
## Swap variables by the k_idx
# Internally validphys takes the x/y to be "k1" "k2" or "k3"
# Therefore, for the time being, swap the actual keys by k1/k2/k3
try:
x_idx = self.kinematic_coverage.index(self.plotting.plot_x)
self.plotting.x = f"k{x_idx + 1}"
if self.plotting.x_label is None and not self.is_ported_dataset:
self.plotting.x_label = self.kinematics.get_label(self.plotting.plot_x)
except ValueError:
# it is possible that the x value is an "extra", if that's the case continue
self.plotting.x = self.plotting.plot_x
self.plotting.x_label = None
# Swap the `figure_by` and `line_by` variables by k1/k2/k3
# unless this is something coming from the "extra labels"
if self.plotting.figure_by is not None:
new_fig_by = []
for var in self.plotting.figure_by:
new_fig_by.append(self.digest_plotting_variable(var))
self.plotting.figure_by = new_fig_by
if self.plotting.line_by is not None:
new_line_by = []
for var in self.plotting.line_by:
new_line_by.append(self.digest_plotting_variable(var))
self.plotting.line_by = new_line_by
self.plotting.already_digested = True
return self.plotting
@cached_property
def plotting_options(self):
try:
return self._plotting_options_set()
except Exception as e:
# There are many chances for failure here
log.error(f"Failure for: {self.name}")
raise e
[docs]@dataclasses.dataclass(frozen=True)
class ValidReference:
"""Holds literature information for the dataset"""
url: str
version: Optional[int] = None
journal: Optional[str] = None
tables: list[int] = dataclasses.field(default_factory=list)
[docs]@dataclasses.dataclass(frozen=True)
class SetMetaData:
"""Metadata of the whole set"""
setname: str
version: int
version_comment: str
nnpdf_metadata: dict
implemented_observables: list[ObservableMetaData]
arXiv: Optional[ValidReference] = None
iNSPIRE: Optional[ValidReference] = None
hepdata: Optional[ValidReference] = None
@property
def folder(self):
return path_commondata / self.setname
@property
def cm_energy(self):
"""Return the center of mass energy as GeV if it can be understood from the name
otherwise return None"""
energy_string = self.setname.split("_")[2]
if energy_string == "NOTFIXED":
return None
if energy_string.endswith("GEV"):
factor = 1.0
elif energy_string.endswith("TEV"):
factor = 1000
else:
return None
return float(energy_string[:-3].replace("P", ".")) * factor
@cached_property
def allowed_datasets(self):
"""Return the implemented datasets as a list <setname>_<observable>"""
return [f"{self.setname}_{i.observable_name}" for i in self.implemented_observables]
@cached_property
def allowed_observables(self):
"""
Returns the implemented observables as a {observable_name.upper(): observable} dictionary
"""
return {o.observable_name.upper(): o for o in self.implemented_observables}
[docs] def select_observable(self, obs_name_raw):
"""Check whether the observable is implemented and return said observable"""
obs_name = obs_name_raw.upper()
try:
observable = self.allowed_observables[obs_name]
except KeyError:
raise ValueError(
f"The selected observable {obs_name_raw} does not exist in {self.setname}"
)
# Now burn the _parent key into the observable and apply checks
object.__setattr__(observable, "_parent", self)
return observable
[docs]@lru_cache
def parse_set_metadata(metadata_file):
"""Read the metadata file"""
return parse_yaml_inp(metadata_file, SetMetaData)
[docs]@lru_cache
def parse_new_metadata(metadata_file, observable_name, variant=None):
"""Given a metadata file in the new format and the specific observable to be read
load and parse the metadata and select the observable. If any variants are selected, apply them.
The triplet (metadata_file, observable_name, variant) define unequivocally the information
to be parsed from the commondata library
"""
set_metadata = parse_set_metadata(metadata_file)
# Select one observable from the entire metadata
metadata = set_metadata.select_observable(observable_name)
# And apply variant if given
if variant is not None:
metadata = metadata.apply_variant(variant)
return metadata
[docs]def load_commondata_new(metadata):
"""
TODO: update this docstring since now the load_commondata_new takes the information from
the metadata, and the name -> split is done outside
In the current iteration of the commondata, each of the commondata
(i.e., an observable from a data publication) correspond to one single observable
inside a folder which is named as "<experiment>_<process>_<energy>_<extra>"
The observable is defined by a last suffix of the form "_<obs>" so that the full name
of the dataset is always:
"<experiment>_<process>_<energy>{_<extra>}_<obs>"
where <extra> is optional.
This function right now works under the assumotion that the folder/observable
is separated in the last _ so that:
folder_name = <experiment>_<process>_<energy>{_<extra>}
but note that this convention is still not fully defined.
This function returns a commondata object constructed by parsing the metadata.
Once a variant is selected, it can no longer be changed
Note that this function reproduces `parse_commondata` below, which parses the
_old_ file format
"""
# Before loading, apply the checks
metadata.check()
# Now parse the data
data_df = metadata.load_data_central()
# the uncertainties
uncertainties_df = metadata.load_uncertainties()
# and the kinematics
kin_df = metadata.load_kinematics()
# Once we have loaded all uncertainty files, let's check how many sys we have
nsys = len(
[i for i in uncertainties_df.columns.get_level_values(0) if not i.startswith("stat")]
)
# Backwards-compatibility
# Finally, create the commondata by merging the dataframes in the old commondata_table
procname = metadata.process_type # nnpdf_metadata["nnpdf31_process"]
kin_df = kin_df[metadata.kinematic_coverage]
kin_df.columns = KIN_NAMES
kin_df["process"] = procname
kin_df = kin_df[["process"] + KIN_NAMES]
# For the uncertainties, create a simplified version to concatenate
# and save the systype information
new_columns = []
systypes = {"treatment": [], "name": []}
for col in uncertainties_df.columns:
if col[0].startswith("stat"):
new_columns.append("stat")
else:
# if it is syst add the ADD/MULT information
new_columns.append(col[1])
systypes["treatment"].append(col[1])
systypes["name"].append(col[2])
uncertainties_df.columns = new_columns
commondata_table = pd.concat([kin_df, data_df, uncertainties_df], axis=1)
systype_table = pd.DataFrame(systypes, index=range(1, nsys + 1))
systype_table.index.name = "sys_index"
# TODO: Legacy compatibility
# 1. Add a stat column if it doesn't exist
# 2. Transform multiplicative uncertainties into % as it was done in the older version
if "stat" not in commondata_table:
commondata_table["stat"] = 0.0
if "MULT" in commondata_table:
commondata_table["MULT"] = commondata_table["MULT"].multiply(
100 / commondata_table["data"], axis="index"
)
# The old -> new map is not bijective, as different old dataset can refer to the same new one
# therefore "legacy_names", when filled, will be a list. With None otherwise.
legacy_names = new_to_legacy_map(metadata.name, metadata.applied_variant)
return CommonData(
setname=metadata.name,
ndata=metadata.ndata,
commondataproc=procname,
nkin=3,
nsys=nsys,
commondata_table=commondata_table,
systype_table=systype_table,
legacy_names=legacy_names,
kin_variables=metadata.kinematic_coverage,
)
###########################################
[docs]@lru_cache
def load_commondata(spec):
"""
Load the data corresponding to a CommonDataSpec object.
Returns an instance of CommonData
"""
if spec.legacy:
commondatafile = spec.datafile
setname = spec.name
systypefile = spec.sysfile
return load_commondata_old(commondatafile, systypefile, setname)
return load_commondata_new(spec.metadata)
### Old commondata:
### All code below this line is deprecated and will be removed
[docs]def load_commondata_old(commondatafile, systypefile, setname):
"""Parse a commondata file and a systype file into a CommonData.
Parameters
----------
commondatafile : file or path to file
systypefile : file or path to file
Returns
-------
commondata : CommonData
An object containing the data and information from the commondata
and systype files.
"""
# First parse commondata file
commondatatable = pd.read_csv(commondatafile, sep=r"\s+", skiprows=1, header=None)
# Remove NaNs
# TODO: replace commondata files with bad formatting
# Build header
commondataheader = ["entry", "process", "kin1", "kin2", "kin3", "data", "stat"]
nsys = (commondatatable.shape[1] - len(commondataheader)) // 2
commondataheader += ["ADD", "MULT"] * nsys
commondatatable.columns = commondataheader
commondatatable.set_index("entry", inplace=True)
ndata = len(commondatatable)
commondataproc = commondatatable["process"][1]
# Check for consistency with commondata metadata
cdmetadata = peek_commondata_metadata(commondatafile)
if (nsys, ndata) != attrgetter("nsys", "ndata")(cdmetadata):
raise ValueError(f"Commondata table information does not match metadata for {setname}")
# Now parse the systype file
systypetable = parse_systypes(systypefile)
# Populate CommonData object
return CommonData(
setname=setname,
ndata=ndata,
commondataproc=commondataproc,
nkin=3,
nsys=nsys,
commondata_table=commondatatable,
systype_table=systypetable,
legacy=True,
)
[docs]def parse_systypes(systypefile):
"""Parses a systype file and returns a pandas dataframe."""
systypeheader = ["sys_index", "treatment", "name"]
try:
systypetable = pd.read_csv(
systypefile, sep=r"\s+", names=systypeheader, skiprows=1, header=None
)
systypetable.dropna(axis="columns", inplace=True)
# Some datasets e.g. CMSWCHARMRAT have no systematics
except pd.errors.EmptyDataError:
systypetable = pd.DataFrame(columns=systypeheader)
systypetable.set_index("sys_index", inplace=True)
return systypetable
[docs]@dataclasses.dataclass(frozen=True)
class CommonDataMetadata:
"""Contains metadata information about the data being read"""
name: str
nsys: int
ndata: int
process_type: str
[docs]def peek_commondata_metadata(commondatafilename):
"""Read some of the properties of the commondata object as a CommonData Metadata"""
with open(commondatafilename) as f:
try:
l = f.readline()
name, nsys_str, ndata_str = l.split()
l = f.readline()
process_type_str = l.split()[1]
except Exception:
log.error(f"Error processing {commondatafilename}")
raise
return CommonDataMetadata(
name, int(nsys_str), int(ndata_str), get_kinlabel_key(process_type_str)
)
[docs]def get_plot_kinlabels(commondata):
"""Return the LaTex kinematic labels for a given Commondata"""
key = commondata.process_type
# TODO: the keys in KINLABEL_LATEX need to be updated for the new commondata
return KINLABEL_LATEX.get(key, key)
[docs]def get_kinlabel_key(process_label):
"""
Since there is no 1:1 correspondence between latex keys and the old libNNPDF names
we match the longest key such that the proc label starts with it.
"""
l = process_label
try:
if process_label == "EWK_RAP_ASY":
# TODO this function is disappearing in this PR
l = "EWK_RAP"
return next(k for k in sorted(KINLABEL_LATEX, key=len, reverse=True) if l.startswith(k))
except StopIteration as e:
raise ValueError(
"Could not find a set of kinematic "
"variables matching the process %s Check the "
"labels defined in commondata.cc. " % (l)
) from e