"""
This module implements parsers for FKtable and CFactor files into useful
datastructures, contained in the :py:mod:`validphys.coredata` module, which can
be easily pickled and interfaced with common Python libraries.
Most users will be interested in using the high level interface
:py:func:`load_fktable`. Given a :py:class:`validphys.core.FKTableSpec`
object, it returns an instance of :py:class:`validphys.coredata.FKTableData`,
an object with the required information to compute a convolution, with the
CFactors applied.
.. code-block:: python
from validphys.fkparser import load_fktable
from validphys.loader import Loader
l = Loader()
fk = l.check_fktable(setname="ATLASTTBARTOT", theoryID=53, cfac=('QCD',))
res = load_fktable(fk)
"""
import dataclasses
import functools
import io
import tarfile
import numpy as np
import pandas as pd
from validphys.coredata import CFactorData, FKTableData
from validphys.pineparser import pineappl_reader
[docs]class BadCFactorError(Exception):
"""Exception raised when an CFactor cannot be parsed correctly"""
[docs]class BadFKTableError(Exception):
"""Exception raised when an FKTable cannot be parsed correctly"""
[docs]@dataclasses.dataclass(frozen=True)
class GridInfo:
"""Class containing the basic properties of an FKTable grid."""
setname: str
hadronic: bool
ndata: int
nx: int
[docs]@functools.lru_cache()
def load_fktable(spec):
"""Load the data corresponding to a FKSpec object. The cfactors
will be applied to the grid.
If we have a new-type fktable, call directly `load()`, otherwise
fallback to the old parser
"""
if spec.legacy:
with open_fkpath(spec.fkpath) as handle:
tabledata = parse_fktable(handle)
else:
tabledata = pineappl_reader(spec)
# In the new theories, the cfactor get applied as the fktables are loaded
if not spec.cfactors or not spec.legacy:
return tabledata
cfprod = 1.0
for cf in spec.cfactors:
with open(cf, "rb") as f:
cfdata = parse_cfactor(f)
cfprod *= cfdata.central_value
return tabledata.with_cfactor(cfprod)
def _get_compressed_buffer(path):
archive = tarfile.open(path)
members = archive.getmembers()
l = len(members)
if l != 1:
raise BadFKTableError(f"Archive {path} should contain one file, but it contains {l}.")
return archive.extractfile(members[0])
[docs]def open_fkpath(path):
"""Return a file-like object from the fktable path, regardless of whether
it is compressed
Parameters
..........
path: Path or str
Path like file containing a valid FKTable. It can be either inside a
tarball or in plain text.
Returns
-------
f: file
A file like object for further processing.
"""
if tarfile.is_tarfile(path):
return _get_compressed_buffer(path)
return open(path, 'rb')
def _is_header_line(line):
return line.startswith((b'_', b'{'))
def _bytes_to_bool(x):
return bool(int(x))
def _parse_fk_options(line_and_stream, value_parsers=None):
"""Parse a sequence of lines of the form
*OPTION: VALUE
into a dictionary.
"""
res = {}
if value_parsers is None:
value_parsers = {}
for lineno, next_line in line_and_stream:
if _is_header_line(next_line):
return res, lineno, next_line
if not next_line.startswith(b'*'):
raise BadFKTableError(f"Error on line {lineno}: Expecting an option starting with '*'")
try:
keybytes, valuebytes = next_line.split(b':', maxsplit=1)
except ValueError:
raise BadFKTableError(f"Error on line {lineno}: Expecting an option containing ':'")
key = keybytes[1:].strip().decode()
if key in value_parsers:
try:
value = value_parsers[key](valuebytes)
except Exception as e:
raise BadFKTableError(f"Could not parse key {key} on line {lineno}") from e
else:
value = valuebytes.strip().decode()
res[key] = value
raise BadFKTableError("FKTable should end with FastKernel spec, not with a set of options")
def _segment_parser(f):
@functools.wraps(f)
def f_(line_and_stream):
buf = io.BytesIO()
for lineno, next_line in line_and_stream:
if _is_header_line(next_line):
processed = f(buf)
return processed, lineno, next_line
buf.write(next_line)
raise BadFKTableError("FKTable should end with FastKernel spec, not with a segment string")
return f_
@_segment_parser
def _parse_string(buf):
return buf.getvalue().decode()
@_segment_parser
def _parse_flavour_map(buf):
buf.seek(0)
return np.loadtxt(buf, dtype=bool)
@_segment_parser
def _parse_xgrid(buf):
return np.fromstring(buf.getvalue(), sep='\n')
# This used a different interface from segment parser because we want it to
# be fast.
# We assume it is going to be the last section.
def _parse_hadronic_fast_kernel(f):
"""Parse the FastKernel secrion of an hadronic FKTable into a DataFrame.
``f`` should be a stream containing only the section"""
# Note that we need the slower whitespace here because it turns out
# that there are fktables where space and tab are used as separators
# within the same table.
df = pd.read_csv(f, sep=r'\s+', header=None, index_col=(0, 1, 2))
df.columns = list(range(14 * 14))
df.index.names = ['data', 'x1', 'x2']
return df
def _parse_dis_fast_kernel(f):
"""Parse the FastKernel section of a DIS FKTable into a DataFrame.
``f`` should be a stream containing only the section"""
df = pd.read_csv(f, sep=r'\s+', header=None, index_col=(0, 1))
df.columns = list(range(14))
df.index.names = ['data', 'x']
return df
def _parse_gridinfo(line_and_stream):
dict_result, line_number, next_line = _parse_fk_options(
line_and_stream, value_parsers={"HADRONIC": _bytes_to_bool, "NDATA": int, "NX": int}
)
gi = GridInfo(**{k.lower(): v for k, v in dict_result.items()})
return gi, line_number, next_line
def _parse_header(lineno, header):
if not _is_header_line(header):
raise BadFKTableError(
f"Bad header at line {lineno}: First character should be either '_' or '{{'"
)
try:
endname = header.index(b'_', 1)
except ValueError:
raise BadFKTableError(f"Bad header at line {lineno}: Expected '_' after name") from None
header_name = header[1:endname]
# Note: This is not the same as header[0]. Bytes iterate as ints.
return header[0:1], header_name.decode()
def _build_sigma(f, res):
gi = res["GridInfo"]
fm = res["FlavourMap"]
table = _parse_hadronic_fast_kernel(f) if gi.hadronic else _parse_dis_fast_kernel(f)
# Filter out empty flavour indices
table = table.loc[:, fm.ravel()]
return table
_KNOWN_SEGMENTS = {
"GridDesc": _parse_string,
"VersionInfo": _parse_fk_options,
"GridInfo": _parse_gridinfo,
"FlavourMap": _parse_flavour_map,
"xGrid": _parse_xgrid,
"TheoryInfo": functools.partial(
_parse_fk_options,
value_parsers={
"ID": int,
"PTO": int,
"DAMP": _bytes_to_bool,
"IC": _bytes_to_bool,
"XIR": float,
"XIF": float,
"NfFF": int,
"MaxNfAs": int,
"MaxNfPdf": int,
"Q0": float,
"alphas": float,
"Qref": float,
"QED": _bytes_to_bool,
"alphaqed": float,
"Qedref": float,
"SxRes": _bytes_to_bool,
"mc": float,
"Qmc": float,
"kcThr": float,
"mb": float,
"Qmb": float,
"kbThr": float,
"mt": float,
"Qmt": float,
"ktThr": float,
"MZ": float,
"MW": float,
"GF": float,
"SIN2TW": float,
"TMC": _bytes_to_bool,
"MP": float,
"global_nx": int,
"EScaleVar": _bytes_to_bool,
},
),
}
def _check_required_sections(res, lineno):
"""Check that we have found all the required sections by the time we
reach 'FastKernel'"""
for section in _KNOWN_SEGMENTS:
if section not in res:
raise BadFKTableError(f"{section} must come before 'FastKernel' section at {lineno}")
[docs]def parse_fktable(f):
"""Parse an open byte stream into an FKTableData. Raise a BadFKTableError
if problems are encountered.
Parameters
----------
f : file
Open file-like object. See :func:`open_fkpath`to obtain it.
Returns
-------
fktable : FKTableData
An object containing the FKTable data and information.
Notes
-----
This function operates at the level of a single file, and therefore it does
not apply CFactors (see :py:func:`load_fktable` for that) or handle operations
within COMPOUND ensembles.
"""
line_and_stream = enumerate(f, start=1)
res = {}
lineno, header = next(line_and_stream)
while True:
marker, header_name = _parse_header(lineno, header)
if header_name == "FastKernel":
_check_required_sections(res, lineno)
Q0 = res['TheoryInfo']['Q0']
sigma = _build_sigma(f, res)
hadronic = res['GridInfo'].hadronic
ndata = res['GridInfo'].ndata
xgrid = res.pop('xGrid')
return FKTableData(
sigma=sigma, ndata=ndata, Q0=Q0, metadata=res, hadronic=hadronic, xgrid=xgrid
)
elif header_name in _KNOWN_SEGMENTS:
parser = _KNOWN_SEGMENTS[header_name]
elif marker == b'{':
parser = _parse_string
elif marker == b'_':
parser = _parse_fk_options
else:
raise RuntimeError("Should not be here")
try:
out, lineno, header = parser(line_and_stream)
except Exception as e:
# Note that the old lineno is the one we want
raise BadFKTableError(f"Failed processing header {header_name} on line {lineno}") from e
res[header_name] = out
[docs]def parse_cfactor(f):
"""Parse an open byte stream into a :py:class`CFactorData`. Raise a
BadCFactorError if problems are encountered.
Parameters
----------
f : file
Binary file-like object
Returns
-------
cfac : CFactorData
An object containing the data on the cfactor for each point.
"""
stars = f.readline()
if not stars.startswith(b'*'):
raise BadCFactorError("First line should start with '*'.")
descbytes = io.BytesIO()
for line in f:
if line.startswith(b'*'):
break
descbytes.write(line)
description = descbytes.getvalue().decode()
try:
data = np.loadtxt(f)
except Exception as e:
raise BadCFactorError(e) from e
data = data.reshape(-1, 2)
central_value = data[:, 0]
uncertainty = data[:, 1]
return CFactorData(
description=description, central_value=central_value, uncertainty=uncertainty
)