Source code for validphys.fkparser

"""
This module implements parsers for FKtable  and CFactor files into useful
datastructures, contained in the :py:mod:`validphys.coredata` module, which can
be easily pickled and interfaced with common Python libraries.

Most users will be interested in using the high level interface
:py:func:`load_fktable`.  Given a :py:class:`validphys.core.FKTableSpec`
object, it returns an instance of :py:class:`validphys.coredata.FKTableData`,
an object with the required information to compute a convolution, with the
CFactors applied.

.. code-block:: python

    from validphys.fkparser import load_fktable
    from validphys.loader import Loader
    l = Loader()
    fk = l.check_fktable(setname="ATLASTTBARTOT", theoryID=53, cfac=('QCD',))
    res = load_fktable(fk)
"""

# TODO: this module is deprecated and support for older theories is not guaranteed

import dataclasses
import functools
import io
import tarfile

import numpy as np
import pandas as pd

from validphys.coredata import CFactorData, FKTableData
from validphys.pineparser import pineappl_reader


[docs]class BadCFactorError(Exception): """Exception raised when an CFactor cannot be parsed correctly"""
[docs]class BadFKTableError(Exception): """Exception raised when an FKTable cannot be parsed correctly"""
[docs]@dataclasses.dataclass(frozen=True) class GridInfo: """Class containing the basic properties of an FKTable grid.""" setname: str hadronic: bool ndata: int nx: int
[docs]@functools.lru_cache() def load_fktable(spec): """Load the data corresponding to a FKSpec object. The cfactors will be applied to the grid. If we have a new-type fktable, call directly `load()`, otherwise fallback to the old parser """ if spec.legacy: with open_fkpath(spec.fkpath) as handle: tabledata = parse_fktable(handle) else: tabledata = pineappl_reader(spec) # In the new theories, the cfactor get applied as the fktables are loaded if not spec.cfactors or not spec.legacy: return tabledata cfprod = 1.0 for cf in spec.cfactors: with open(cf, "rb") as f: cfdata = parse_cfactor(f) cfprod *= cfdata.central_value return tabledata.with_cfactor(cfprod)
def _get_compressed_buffer(path): archive = tarfile.open(path) members = archive.getmembers() l = len(members) if l != 1: raise BadFKTableError(f"Archive {path} should contain one file, but it contains {l}.") return archive.extractfile(members[0])
[docs]def open_fkpath(path): """Return a file-like object from the fktable path, regardless of whether it is compressed Parameters .......... path: Path or str Path like file containing a valid FKTable. It can be either inside a tarball or in plain text. Returns ------- f: file A file like object for further processing. """ if tarfile.is_tarfile(path): return _get_compressed_buffer(path) return open(path, 'rb')
def _is_header_line(line): return line.startswith((b'_', b'{')) def _bytes_to_bool(x): return bool(int(x)) def _parse_fk_options(line_and_stream, value_parsers=None): """Parse a sequence of lines of the form *OPTION: VALUE into a dictionary. """ res = {} if value_parsers is None: value_parsers = {} for lineno, next_line in line_and_stream: if _is_header_line(next_line): return res, lineno, next_line if not next_line.startswith(b'*'): raise BadFKTableError(f"Error on line {lineno}: Expecting an option starting with '*'") try: keybytes, valuebytes = next_line.split(b':', maxsplit=1) except ValueError: raise BadFKTableError(f"Error on line {lineno}: Expecting an option containing ':'") key = keybytes[1:].strip().decode() if key in value_parsers: try: value = value_parsers[key](valuebytes) except Exception as e: raise BadFKTableError(f"Could not parse key {key} on line {lineno}") from e else: value = valuebytes.strip().decode() res[key] = value raise BadFKTableError("FKTable should end with FastKernel spec, not with a set of options") def _segment_parser(f): @functools.wraps(f) def f_(line_and_stream): buf = io.BytesIO() for lineno, next_line in line_and_stream: if _is_header_line(next_line): processed = f(buf) return processed, lineno, next_line buf.write(next_line) raise BadFKTableError("FKTable should end with FastKernel spec, not with a segment string") return f_ @_segment_parser def _parse_string(buf): return buf.getvalue().decode() @_segment_parser def _parse_flavour_map(buf): buf.seek(0) return np.loadtxt(buf, dtype=bool) @_segment_parser def _parse_xgrid(buf): return np.fromstring(buf.getvalue(), sep='\n') # This used a different interface from segment parser because we want it to # be fast. # We assume it is going to be the last section. def _parse_hadronic_fast_kernel(f): """Parse the FastKernel secrion of an hadronic FKTable into a DataFrame. ``f`` should be a stream containing only the section""" # Note that we need the slower whitespace here because it turns out # that there are fktables where space and tab are used as separators # within the same table. df = pd.read_csv(f, sep=r'\s+', header=None, index_col=(0, 1, 2)) df.columns = list(range(14 * 14)) df.index.names = ['data', 'x1', 'x2'] return df def _parse_dis_fast_kernel(f): """Parse the FastKernel section of a DIS FKTable into a DataFrame. ``f`` should be a stream containing only the section""" df = pd.read_csv(f, sep=r'\s+', header=None, index_col=(0, 1)) df.columns = list(range(14)) df.index.names = ['data', 'x'] return df def _parse_gridinfo(line_and_stream): dict_result, line_number, next_line = _parse_fk_options( line_and_stream, value_parsers={"HADRONIC": _bytes_to_bool, "NDATA": int, "NX": int} ) gi = GridInfo(**{k.lower(): v for k, v in dict_result.items()}) return gi, line_number, next_line def _parse_header(lineno, header): if not _is_header_line(header): raise BadFKTableError( f"Bad header at line {lineno}: First character should be either '_' or '{{'" ) try: endname = header.index(b'_', 1) except ValueError: raise BadFKTableError(f"Bad header at line {lineno}: Expected '_' after name") from None header_name = header[1:endname] # Note: This is not the same as header[0]. Bytes iterate as ints. return header[0:1], header_name.decode() def _build_sigma(f, res): gi = res["GridInfo"] fm = res["FlavourMap"] table = _parse_hadronic_fast_kernel(f) if gi.hadronic else _parse_dis_fast_kernel(f) # Filter out empty flavour indices table = table.loc[:, fm.ravel()] return table _KNOWN_SEGMENTS = { "GridDesc": _parse_string, "VersionInfo": _parse_fk_options, "GridInfo": _parse_gridinfo, "FlavourMap": _parse_flavour_map, "xGrid": _parse_xgrid, "TheoryInfo": functools.partial( _parse_fk_options, value_parsers={ "ID": int, "PTO": int, "DAMP": _bytes_to_bool, "IC": _bytes_to_bool, "XIR": float, "XIF": float, "NfFF": int, "MaxNfAs": int, "MaxNfPdf": int, "Q0": float, "alphas": float, "Qref": float, "QED": _bytes_to_bool, "alphaqed": float, "Qedref": float, "SxRes": _bytes_to_bool, "mc": float, "Qmc": float, "kcThr": float, "mb": float, "Qmb": float, "kbThr": float, "mt": float, "Qmt": float, "ktThr": float, "MZ": float, "MW": float, "GF": float, "SIN2TW": float, "TMC": _bytes_to_bool, "MP": float, "global_nx": int, "EScaleVar": _bytes_to_bool, }, ), } def _check_required_sections(res, lineno): """Check that we have found all the required sections by the time we reach 'FastKernel'""" for section in _KNOWN_SEGMENTS: if section not in res: raise BadFKTableError(f"{section} must come before 'FastKernel' section at {lineno}")
[docs]def parse_fktable(f): """Parse an open byte stream into an FKTableData. Raise a BadFKTableError if problems are encountered. Parameters ---------- f : file Open file-like object. See :func:`open_fkpath`to obtain it. Returns ------- fktable : FKTableData An object containing the FKTable data and information. Notes ----- This function operates at the level of a single file, and therefore it does not apply CFactors (see :py:func:`load_fktable` for that) or handle operations within COMPOUND ensembles. """ line_and_stream = enumerate(f, start=1) res = {} lineno, header = next(line_and_stream) while True: marker, header_name = _parse_header(lineno, header) if header_name == "FastKernel": _check_required_sections(res, lineno) Q0 = res['TheoryInfo']['Q0'] sigma = _build_sigma(f, res) hadronic = res['GridInfo'].hadronic ndata = res['GridInfo'].ndata xgrid = res.pop('xGrid') data_idx = sigma.index.get_level_values("data").unique().to_series() return FKTableData( sigma=sigma, ndata=ndata, Q0=Q0, metadata=res, hadronic=hadronic, xgrid=xgrid, data_index=data_idx, legacy=True, ) elif header_name in _KNOWN_SEGMENTS: parser = _KNOWN_SEGMENTS[header_name] elif marker == b'{': parser = _parse_string elif marker == b'_': parser = _parse_fk_options else: raise RuntimeError("Should not be here") try: out, lineno, header = parser(line_and_stream) except Exception as e: # Note that the old lineno is the one we want raise BadFKTableError(f"Failed processing header {header_name} on line {lineno}") from e res[header_name] = out
[docs]def parse_cfactor(f): """Parse an open byte stream into a :py:class`CFactorData`. Raise a BadCFactorError if problems are encountered. Parameters ---------- f : file Binary file-like object Returns ------- cfac : CFactorData An object containing the data on the cfactor for each point. """ stars = f.readline() if not stars.startswith(b'*'): raise BadCFactorError("First line should start with '*'.") descbytes = io.BytesIO() for line in f: if line.startswith(b'*'): break descbytes.write(line) description = descbytes.getvalue().decode() try: data = np.loadtxt(f) except Exception as e: raise BadCFactorError(e) from e data = data.reshape(-1, 2) central_value = data[:, 0] uncertainty = data[:, 1] return CFactorData( description=description, central_value=central_value, uncertainty=uncertainty )