Source code for validphys.coredata

"""
Data containers backed by Python managed memory (Numpy arrays and Pandas
dataframes).
"""

import dataclasses
import logging
from typing import Optional

import numpy as np
import pandas as pd

from validphys.utils import generate_path_filtered_data, yaml_safe

KIN_NAMES = ["kin1", "kin2", "kin3"]
log = logging.getLogger(__name__)


[docs] @dataclasses.dataclass(eq=False) class FKTableData: """ Data contained in an FKTable Parameters ---------- hadronic : bool Whether a hadronic (two PDFs) or a DIS (one PDF) convolution is needed. Q0 : float The scale at which the PDFs should be evaluated (in GeV). ndata : int The number of data points in the grid. xgrid : array, shape (nx) The points in x at which the PDFs should be evaluated. sigma : pd.DataFrame For hadronic data, the columns are the indexes in the ``NfxNf`` list of possible flavour combinations of two PDFs. The MultiIndex contains three keys, the data index, an index into ``xgrid`` for the first PDF and an idex into ``xgrid`` for the second PDF, indicating if the points in ``x`` where the PDF should be evaluated. For DIS data, the columns are indexes in the ``Nf`` list of flavours. The MultiIndex contains two keys, the data index and an index into ``xgrid`` indicating the points in ``x`` where the PDF should be evaluated. convolution_types: tuple[str] The type of convolution that the FkTable is expecting for each of the functions to be convolved with (usually the two types of PDF from the two incoming hadrons). metadata : dict Other information contained in the FKTable. protected: bool When a fktable is protected cuts will not be applied. The most common use-case is when a total cross section is used as a normalization table for a differential cross section, in legacy code (<= NNPDF4.0) both fktables would be cut using the differential index. """ hadronic: bool Q0: float ndata: int xgrid: np.ndarray sigma: pd.DataFrame convolution_types: tuple[str] = None metadata: dict = dataclasses.field(default_factory=dict, repr=False) protected: bool = False
[docs] def with_cfactor(self, cfactor): """Returns a copy of the FKTableData object with cfactors applied to the fktable""" if all(c == 1.0 for c in cfactor): return self if len(cfactor) != self.ndata: if self.protected: cfactor = cfactor[0] else: name = self.metadata.get("target_dataset") raise ValueError( f"The length of cfactor for {name} differs from the number of datapoints in the grid" ) new_sigma = self.sigma.multiply(pd.Series(cfactor), axis=0, level=0) return dataclasses.replace(self, sigma=new_sigma)
[docs] def with_cuts(self, cuts): """Return a copy of the FKTable with the cuts applied. The data index of the sigma operator (the outermost level), contains the data point that have been kept. The ndata property is updated to reflect the new number of datapoints. If cuts is None, return the object unmodified. Parameters ---------- cuts : array_like or validphys.core.Cuts or None. The cuts to be applied. Returns ------- res : FKTableData A copy of the FKtable with the cuts applies. Notes ----- The original number of points can be accessed with ``table.metadata['GridInfo'].ndata``. Examples -------- >>> from validphys.fkparser import load_fktable ... from validphys.loader import Loader ... l = Loader() ... ds = l.check_dataset('ATLASTTBARTOT', theoryid=53, cfac=('QCD',)) ... table = load_fktable(ds.fkspecs[0]) ... newtable = table.with_cuts([0,1]) >>> assert set(newtable.sigma.index.get_level_values(0)) == {0,1} >>> assert newtable.ndata == 2 >>> assert newtable.metadata['GridInfo'].ndata == 3 """ if hasattr(cuts, "load"): cuts = cuts.load() if cuts is None or self.protected: return self newndata = len(cuts) try: newsigma = self.sigma.loc[cuts] except KeyError as e: # This will be an ugly erorr msg, but it should be scary anyway log.error(f"Problem applying cuts to {self.metadata}") raise e return dataclasses.replace(self, ndata=newndata, sigma=newsigma)
@property def luminosity_mapping(self): """Return the flavour combinations that contribute to the fktable in the form of a single array The return shape is: (nbasis,) for DIS (nbasis*2,) for hadronic """ basis = self.sigma.columns.to_numpy() if self.hadronic: ret = np.zeros(14 * 14, dtype=bool) ret[basis] = True basis = np.array(np.where(ret.reshape(14, 14))).T.reshape(-1) return basis
[docs] def get_np_fktable(self): """Returns the fktable as a dense numpy array that can be directly manipulated with numpy The return shape is: (ndata, nx, nbasis) for DIS (ndata, nx, nx, nbasis) for hadronic where nx is the length of the xgrid and nbasis the number of flavour contributions that contribute """ # Read up the shape of the output table ndata = self.ndata nx = len(self.xgrid) nbasis = self.sigma.shape[1] if ndata == 0: if self.hadronic: return np.zeros((ndata, nbasis, nx, nx)) return np.zeros((ndata, nbasis, nx)) # Make the dataframe into a dense numpy array # First get the data index out of the way # this is necessary because cuts/shifts and for performance reasons # otherwise we will be putting things in a numpy array in very awkward orders ns = self.sigma.unstack(level=("data",), fill_value=0) x1 = ns.index.get_level_values(0) if self.hadronic: x2 = ns.index.get_level_values(1) fk_raw = np.zeros((nx, nx, ns.shape[1])) fk_raw[x2, x1, :] = ns.values # The output is (ndata, basis, x1, x2) fktable = fk_raw.reshape((nx, nx, nbasis, ndata)).T else: fk_raw = np.zeros((nx, ns.shape[1])) fk_raw[x1, :] = ns.values # The output is (ndata, basis, x1) fktable = fk_raw.reshape((nx, nbasis, ndata)).T return fktable
[docs] def determine_pdfs(self, pdf): """Determine the PDF (or PDFs) that should be used to be convoluted with this fktable. Uses the `convolution_types` key to decide the PDFs. If `convolution_types` is not defined, it returns the pdf object. """ if self.convolution_types is None: if self.hadronic: return [pdf, pdf] return [pdf] conv_pdfs = [] for convolution_type in self.convolution_types: # Check the type of convolutions that the fktable is asking for and match it to the PDF if convolution_type == "UnpolPDF": if pdf.is_polarized: if pdf.unpolarized_bc is None: raise ValueError( "The FKTable asked for an unpolarized PDF but received only polarized PDFs" ) conv_pdfs.append(pdf.unpolarized_bc.make_only_cv()) else: conv_pdfs.append(pdf) elif convolution_type == "PolPDF": if not pdf.is_polarized: raise ValueError( """The FKTable asked for a polarized PDF, but the PDF received cannot be understood as polarized. When using a polarized PDF make sure to include a boundary condition `unpolarized_bc: <pdf name>` whenever needed (`t0`, `dataspecs`...).""" ) conv_pdfs.append(pdf) return conv_pdfs
[docs] @dataclasses.dataclass(eq=False) class CFactorData: """ Data contained in a CFactor Parameters ---------- description : str Information on how the data was obtained. central_value : array, shape(ndata) The value of the cfactor for each data point. uncertainty : array, shape(ndata) The absolute uncertainty on the cfactor if available. """ description: str central_value: np.array uncertainty: np.array
[docs] @dataclasses.dataclass(eq=False) class CommonData: """ Data contained in Commondata files, relevant cuts applied. Parameters ---------- setname : str Name of the dataset ndata : int Number of data points commondataproc : str Process type, one of 21 options nkin : int Number of kinematics specified nsys : int Number of systematics commondata_table : pd.DataFrame Pandas dataframe containing the commondata systype_table : pd.DataFrame Pandas dataframe containing the systype index for each systematic alongside the uncertainty type (ADD/MULT/RAND) and name (CORR/UNCORR/THEORYCORR/SKIP) systematics_table: pd.DataFrame Panda dataframe containing the table of systematics """ setname: str ndata: int commondataproc: str nkin: int nsys: int commondata_table: pd.DataFrame = dataclasses.field(repr=False) systype_table: pd.DataFrame = dataclasses.field(repr=False) legacy: bool = False systematics_table: Optional[pd.DataFrame] = dataclasses.field(init=None, repr=False) legacy_names: Optional[list] = None kin_variables: Optional[list] = None def __post_init__(self): self.systematics_table = self.commondata_table.drop( columns=["process", "data", "stat"] + KIN_NAMES ) # TODO: set for now commondataproc as a string self.commondataproc = str(self.commondataproc)
[docs] def with_cuts(self, cuts): """A method to return a CommonData object where an integer mask has been applied, keeping only data points which pass cuts. Note if the first data point passes cuts, the first entry of ``cuts`` should be ``0``. Paramters --------- cuts: list or validphys.core.Cuts or None """ # Ensure that the cuts we're applying applies to this dataset # only check, however, if the cuts is of type :py:class:`validphys.core.Cuts` if hasattr(cuts, "name") and self.setname != cuts.name: raise ValueError( f"The cuts provided are for {cuts.name} which does not apply " f"to this commondata file: {self.setname}" ) if hasattr(cuts, "load"): cuts = cuts.load() if cuts is None: return self # We must shift the cuts up by 1 since a cut of 0 implies the first data point # while commondata indexing starts at 1. cuts = list(map(lambda x: x + 1, cuts)) newndata = len(cuts) new_commondata_table = self.commondata_table.loc[cuts] return dataclasses.replace(self, ndata=newndata, commondata_table=new_commondata_table)
@property def kinematics(self): return self.commondata_table[KIN_NAMES]
[docs] def get_kintable(self): return self.kinematics.values
@property def central_values(self): return self.commondata_table["data"]
[docs] def with_central_value(self, cv): tb = self.commondata_table.copy() tb["data"] = cv return dataclasses.replace(self, commondata_table=tb)
[docs] def get_cv(self): return self.central_values.values
@property def stat_errors(self): return self.commondata_table["stat"] @property def multiplicative_errors(self): """Returns the systematics which are multiplicative (systype is MULT) in a percentage format, with SKIP uncertainties removed. """ mult_systype = self.systype_table[self.systype_table["treatment"] == "MULT"] mult_table = self.systematics_table.filter(like="MULT") if self.legacy: # Needed in legacy because every uncertainty appears as both mult and add # so it is necessary to select the uncertainties that are to be consireded as MULT/ADD # Minus 1 because iloc starts from 0, while the systype counting starts from 1 mult_table = mult_table.iloc[:, mult_systype.index - 1] mult_table.columns = mult_systype["name"].to_numpy() return mult_table.loc[:, mult_table.columns != "SKIP"] @property def additive_errors(self): """Returns the systematics which are additive (systype is ADD) as absolute uncertainties (same units as data), with SKIP uncertainties removed. """ add_systype = self.systype_table[self.systype_table["treatment"] == "ADD"] add_table = self.systematics_table.filter(like="ADD") if self.legacy: # Minus 1 because iloc starts from 0, while the systype counting starts from 1 add_table = add_table.iloc[:, add_systype.index - 1] add_table.columns = add_systype["name"].to_numpy() return add_table.loc[:, add_table.columns != "SKIP"]
[docs] def systematic_errors(self, central_values=None): """Returns all systematic errors as absolute uncertainties, with a single column for each uncertainty. Converts :py:attr:`multiplicative_errors` to units of data and then appends onto :py:attr:`additive_errors`. By default uses the experimental central values to perform conversion, but the user can supply a 1-D array of central values, with length :py:attr:`self.ndata`, to use instead of the experimental central values to calculate the absolute contribution of the multiplicative systematics. Parameters ---------- central_values: None, np.array 1-D array containing alternative central values to combine with multiplicative uncertainties. This array must have length equal to :py:attr:`self.ndata`. By default ``central_values`` is None, and the central values of the commondata are used. Returns ------- systematic_errors: pd.DataFrame Dataframe containing systematic errors. """ if central_values is None: central_values = self.central_values.to_numpy() converted_mult_errors = self.multiplicative_errors * central_values[:, np.newaxis] / 100 return pd.concat((self.additive_errors, converted_mult_errors), axis=1)
[docs] def export_data(self, buffer): """Exports the central data defined by this commondata instance to the given buffer""" ret = {"data_central": self.central_values.tolist()} yaml_safe.dump(ret, buffer)
[docs] def export_uncertainties(self, buffer): """Exports the uncertainties defined by this commondata instance to the given buffer""" definitions = {} for idx, row in self.systype_table.iterrows(): if row["name"] != "SKIP": definitions[f"sys_{idx}"] = {"treatment": row["treatment"], "type": row["name"]} # Order the definitions by treatment as ADD, MULT # TODO: make it so that it corresponds to the original order exactly sorted_definitions = { k: v for k, v in sorted(definitions.items(), key=lambda item: item[1]["treatment"]) } bins = [] for idx, row in self.systematic_errors().iterrows(): tmp = {"stat": float(self.stat_errors[idx])} # Hope things come in the right order... for key_name, val in zip(sorted_definitions, row): tmp[key_name] = float(val) bins.append(tmp) sorted_definitions["stat"] = { "description": "Uncorrelated statistical uncertainties", "treatment": "ADD", "type": "UNCORR", } ret = {"definitions": sorted_definitions, "bins": bins} yaml_safe.dump(ret, buffer)
[docs] def export(self, folder_path): """Wrapper around export_data and export_uncertainties to write both uncertainties and data after filtering to a given folder """ folder_path.mkdir(exist_ok=True) # Get the same names as one would use for the filters data_path, unc_path = generate_path_filtered_data(folder_path, self.setname) # And attach it to the given folder data_path = folder_path / data_path.name unc_path = folder_path / unc_path.name # Export data and uncertainties with open(data_path, "w") as file: self.export_data(file) with open(unc_path, "w") as file: self.export_uncertainties(file) return data_path, unc_path