"""
Resolve paths to useful objects, and query the existence of different resources
within the specified paths.
"""
import functools
import logging
import mimetypes
import os
import pathlib
import pkgutil
import re
import shutil
import sys
import tarfile
import tempfile
import urllib.parse as urls
import requests
from nnpdf_data import legacy_to_new_mapping, path_vpdata
from reportengine import filefinder
from reportengine.compat import yaml
from validphys import lhaindex
from validphys.commondataparser import load_commondata_old, parse_new_metadata, parse_set_metadata
from validphys.core import (
PDF,
CommonDataSpec,
Cuts,
CutsPolicy,
DataGroupSpec,
DataSetSpec,
FitSpec,
FKTableSpec,
HyperscanSpec,
IntegrabilitySetSpec,
InternalCutsWrapper,
PositivitySetSpec,
TheoryIDSpec,
peek_commondata_metadata,
)
from validphys.utils import generate_path_filtered_data, tempfile_cleaner
log = logging.getLogger(__name__)
NNPDF_DIR = "NNPDF"
[docs]
class LoaderError(Exception):
pass
[docs]
class LoadFailedError(FileNotFoundError, LoaderError):
pass
[docs]
class DataNotFoundError(LoadFailedError):
pass
[docs]
class SysNotFoundError(LoadFailedError):
pass
[docs]
class FKTableNotFound(LoadFailedError):
pass
[docs]
class CfactorNotFound(LoadFailedError):
pass
[docs]
class CompoundNotFound(LoadFailedError):
pass
[docs]
class TheoryNotFound(LoadFailedError):
pass
[docs]
class EkoNotFound(LoadFailedError):
pass
[docs]
class TheoryDataBaseNotFound(LoadFailedError):
pass
[docs]
class FitNotFound(LoadFailedError):
pass
[docs]
class HyperscanNotFound(LoadFailedError):
pass
[docs]
class CutsNotFound(LoadFailedError):
pass
[docs]
class PDFNotFound(LoadFailedError):
pass
[docs]
class ProfileNotFound(LoadFailedError):
pass
[docs]
class RemoteLoaderError(LoaderError):
pass
def _get_nnpdf_profile(profile_path=None):
"""Returns the NNPDF profile as a dictionary
If no ``profile_path`` is provided it will be autodiscovered in the following order:
1. Environment variable $NNPDF_PROFILE_PATH
2. ${XDG_CONFIG_HOME}/NNPDF/nnprofile.yaml (usually ~/.config/nnprofile)
Any value not filled by 1 or 2 will then be filled by the default values
found within the validphys python package `nnporfile_default.yaml`
If ``nnpdf_share`` is set to the special key ``RELATIVE_TO_PYTHON``
the python prefix (``Path(sys.prefix)/"share"/"NNPDF"``) will be used
"""
yaml_reader = yaml.YAML(typ='safe', pure=True)
home_config = pathlib.Path().home() / ".config"
config_folder = pathlib.Path(os.environ.get("XDG_CONFIG_HOME", home_config)) / NNPDF_DIR
# Set all default values
profile_content = pkgutil.get_data("validphys", "nnprofile_default.yaml")
profile_dict = yaml_reader.load(profile_content)
# including the data_path to the validphys package
profile_dict.setdefault("data_path", path_vpdata)
# Look at profile path
if profile_path is None:
profile_path = os.environ.get("NNPDF_PROFILE_PATH", profile_path)
# If profile_path is still none and there is a .config/NNPDF/nnprofile.yaml, read that
if profile_path is None:
if (config_nnprofile := config_folder / "nnprofile.yaml").exists():
profile_path = config_nnprofile
elif (config_nnprofile := config_folder / "nnprofile.yml").exists():
profile_path = config_nnprofile
if profile_path is not None:
with open(profile_path, encoding="utf-8") as f:
profile_entries = yaml_reader.load(f)
if profile_entries is not None:
profile_dict.update(profile_entries)
nnpdf_share = profile_dict.get("nnpdf_share")
if nnpdf_share is None:
if profile_path is not None:
raise ValueError(
f"`nnpdf_share` is not set in {profile_path}, please set it, e.g.: nnpdf_share: `.local/share/NNPDF`"
)
raise ValueError(
"`nnpdf_share` not found in validphys, something is very wrong with the installation"
)
if nnpdf_share == "RELATIVE_TO_PYTHON":
nnpdf_share = pathlib.Path(sys.prefix) / "share" / NNPDF_DIR
# At this point nnpdf_share needs to be a path to somewhere
nnpdf_share = pathlib.Path(nnpdf_share)
# Make sure that we expand any ~ or ~<username>
nnpdf_share = nnpdf_share.expanduser()
# Make sure we can either write to this directory or it exists
try:
nnpdf_share.mkdir(exist_ok=True, parents=True)
except PermissionError as e:
raise FileNotFoundError(
f"{nnpdf_share} does not exist and you haven't got permissions to create it!"
) from e
# Now read all paths and define them as relative to nnpdf_share (unless given as absolute)
for var in ["results_path", "theories_path", "validphys_cache_path", "hyperscan_path"]:
# if there are any problems setting or getting these variable erroring out is more than justified
absolute_var = nnpdf_share / pathlib.Path(profile_dict[var]).expanduser()
profile_dict[var] = absolute_var.absolute().as_posix()
return profile_dict
def _use_fit_commondata_old_format_to_new_format(setname, file_path):
"""Reads an old commondata written in the old format
(e.g., a closure test ran for NNPDF4.0) and creates a new-format version
in a temporary folder to be read by the commondata.
Note that this does not modify the fit"""
if not file_path.exists():
raise DataNotFoundError(f"Data for {setname} at {file_path} not found")
# This function (as well as the loader) is only kept during this first tag to ensure that cuts fromfit
# can be used even with old fits... for now
log.error(
"Note, the function `_use_fit_commondata_old_format_to_new_format` is deprecated and will be removed in future releases"
)
# Try loading the data from file_path, using the systypes from there
# although they are not used
systypes = next(file_path.parent.glob("systypes/*.dat"))
commondata = load_commondata_old(file_path, systypes, setname)
# Export the data central
new_data_stream = tempfile.NamedTemporaryFile(
delete=False, prefix=f"filter_{setname}_data", suffix=".yaml", mode="w"
)
commondata.export_data(new_data_stream)
new_data_stream.close()
data_path = pathlib.Path(new_data_stream.name)
# Export the uncertainties
new_unc_stream = tempfile.NamedTemporaryFile(
delete=False, prefix=f"filter_{setname}_uncertainties", suffix=".yaml", mode="w"
)
commondata.export_uncertainties(new_unc_stream)
new_unc_stream.close()
unc_path = pathlib.Path(new_unc_stream.name)
return data_path, unc_path
[docs]
class LoaderBase:
"""
Base class for the NNPDF loader.
It can take as input a profile dictionary from which all data can be read.
It is possible to override the datapath and resultpath when the class is instantiated.
"""
def __init__(self, profile=None):
if not isinstance(profile, dict):
# If profile is a path, a str or None, read it from the default path
profile = _get_nnpdf_profile(profile)
# Retrieve important paths from the profile if not given
datapath = pathlib.Path(profile["data_path"])
theories_path = pathlib.Path(profile["theories_path"])
resultspath = pathlib.Path(profile["results_path"])
if not datapath.exists():
raise LoaderError(f"The data path {datapath} does not exist.")
# Create the theories and results paths if they don't exist already
theories_path.mkdir(exist_ok=True, parents=True)
resultspath.mkdir(exist_ok=True, parents=True)
# And save them up
self.datapath = datapath
self._theories_path = theories_path
self.resultspath = resultspath
self._extremely_old_fits = set()
self.nnprofile = profile
@property
def hyperscan_resultpath(self):
hyperscan_path = pathlib.Path(self.nnprofile["hyperscan_path"])
hyperscan_path.mkdir(parents=True, exist_ok=True)
return hyperscan_path
def _vp_cache(self):
"""Return the vp-cache path, and create it if it doesn't exist"""
vpcache = pathlib.Path(self.nnprofile['validphys_cache_path'])
if not vpcache.exists():
try:
log.info(f"Creating validphys cache directory: {vpcache}")
vpcache.mkdir(parents=True, exist_ok=True)
except Exception as e:
raise LoaderError("Could not create the cache directory " f"at {vpcache}") from e
return vpcache
# TODO: Deprecate get methods?
[docs]
class Loader(LoaderBase):
"""Load various resources from the NNPDF data path."""
@property
def available_fits(self):
try:
return [p.name for p in self.resultspath.iterdir() if p.is_dir()]
except OSError:
return []
@property
def available_hyperscans(self):
try:
return [p.name for p in self.hyperscan_resultpath.iterdir() if p.is_dir()]
except OSError:
return []
@property
@functools.lru_cache
def available_theories(self):
"""Return a string token for each of the available theories"""
theory_token = 'theory_'
return {
folder.name[len(theory_token) :]
for folder in self._theories_path.glob(theory_token + '*')
}
@property
@functools.lru_cache
def available_ekos(self):
"""Return a string token for each of the available theories"""
return {
eko_path.parent.name.split("_")[1] for eko_path in self._theories_path.glob("*/eko.tar")
}
@property
@functools.lru_cache
def available_datasets(self):
"""Provide all available datasets that were available before the new commondata
was implemented and that have a translation.
Returns old names
TODO: This should be substituted by a subset of `implemented_dataset` that returns only
complete datasets.
"""
# Skip Positivity and Integrability
skip = ["POS", "INTEG"]
# Skip datasets for which a translation exists but were not implemented in the old way
skip += ["STAR", "ATLAS_WJ_JET_8TEV_"]
# skip all the _DW_ datasets that are repeated
skip += [i for i in legacy_to_new_mapping.keys() if "_DW_" in i]
old_datasets = [i for i in legacy_to_new_mapping.keys() if not i.startswith(tuple(skip))]
return set(old_datasets)
@property
@functools.lru_cache
def implemented_datasets(self):
"""Provide all implemented datasets that can be found in the datafiles folder
regardless of whether they can be used for fits (i.e., whether they include a theory),
are "fake" (integrability/positivity) or are missing some information.
"""
datasets = []
for metadata_file in self.commondata_folder.glob("*/metadata.yaml"):
datasets += parse_set_metadata(metadata_file).allowed_datasets
return datasets
@property
@functools.lru_cache
def available_pdfs(self):
return lhaindex.expand_local_names('*')
@property
def commondata_folder(self):
return self.datapath / 'commondata'
def _use_fit_commondata_old_format_to_old_format(self, basedata, fit):
"""Load pseudodata from a fit where the data was generated in the old format
and does not exist a new-format version.
"""
# TODO: deprecated, will be removed
setname = basedata.name
log.warning(f"Please update {basedata} to the new format to keep using it")
datafilefolder = (fit.path / 'filter') / setname
data_path = datafilefolder / f'FILTER_{setname}.dat'
if not data_path.exists():
oldpath = datafilefolder / f'DATA_{setname}.dat'
if not oldpath.exists():
raise DataNotFoundError(f"{data_path} is needed with `use_fitcommondata`")
raise DataNotFoundError(
f"""This data format: {oldpath} is no longer supported
In order to upgrade it you need to use the script `vp-rebuild-data` with a version of NNPDF < 4.0.9"""
)
return data_path
[docs]
def check_commondata(
self, setname, sysnum=None, use_fitcommondata=False, fit=None, variant=None
):
"""Prepare the commondata files to be loaded.
A commondata is defined by its name (``setname``) and the variant (``variant``)
At the moment both old-format and new-format commondata can be utilized and loaded
however old-format commondata are deprecated and will be removed in future relases.
The function ``parse_dataset_input`` in ``config.py`` translates all known old commondata
into their new names (and variants),
therefore this function should only receive requestes for new format.
Any actions trying to requests an old-format commondata from this function will log
an error message. This error message will eventually become an actual error.
"""
if use_fitcommondata:
if not fit:
raise LoadFailedError("Must specify a fit when setting use_fitcommondata")
# Using commondata generated with a previous fit requires some branching since it depends on
# 1. Whether the data is now in the new commondata
# 2. Whether the data was in the old format when it was generated
# First, load the base commondata which will be used as container and to check point 1
basedata = self.check_commondata(setname, variant=variant, sysnum=sysnum)
# and the possible filename for the new data
data_path, unc_path = generate_path_filtered_data(fit.path, setname)
if not data_path.exists():
# We might be dealing with legacy names and with legacy paths
err_str = f"No fit data found for {setname} ({data_path})"
if basedata.legacy_names is None:
raise DataNotFoundError(err_str)
for legacy_name in basedata.legacy_names:
# try old commondata format
old_path = fit.path / "filter" / legacy_name / f"FILTER_{legacy_name}.dat"
if old_path.exists():
data_path, unc_path = _use_fit_commondata_old_format_to_new_format(
setname, old_path
)
break
# try new commondata format
old_path = fit.path / "filter" / legacy_name / f"filtered_uncertainties_{legacy_name}.yaml"
if old_path.exists():
data_path = old_path.with_name(f"filtered_data_{legacy_name}.yaml")
unc_path = old_path.with_name(f"filtered_uncertainties_{legacy_name}.yaml")
break
else:
# If no old path was found, then, error out
err_str += f" and no filter found for its legacy names: {basedata.legacy_names}"
return DataNotFoundError(err_str)
return basedata.with_modified_data(data_path, uncertainties_file=unc_path)
# Get data folder and observable name and check for existence
try:
setfolder, observable_name = setname.rsplit("_", 1)
except ValueError:
raise DataNotFoundError(
f"Dataset {setname} not found. Is the name correct? Old commondata is no longer accepted"
)
set_path = self.commondata_folder / setfolder
if not set_path.exists():
raise DataNotFoundError(f"Dataset {setname} not found")
metadata_path = set_path / "metadata.yaml"
metadata = parse_new_metadata(metadata_path, observable_name, variant=variant)
return CommonDataSpec(setname, metadata)
[docs]
@functools.lru_cache
def check_theoryID(self, theoryID):
theoryID = int(theoryID)
theopath = self._theories_path / f"theory_{theoryID}"
if not theopath.exists():
raise TheoryNotFound(f"Could not find theory {theoryID}. Folder '{theopath}' not found")
return TheoryIDSpec(theoryID, theopath, self.theorydb_folder)
[docs]
@functools.lru_cache
def check_eko(self, theoryID):
"""Check the eko (and the parent theory) both exists and returns the path to it"""
theory = self.check_theoryID(theoryID)
eko_path = theory.path / "eko.tar"
if not eko_path.exists():
raise EkoNotFound(f"Could not find eko {eko_path} in theory: {theoryID}")
return eko_path
@property
def theorydb_folder(self):
"""Checks theory db file exists and returns path to it"""
dbpath = self.datapath / "theory_cards"
if not dbpath.is_dir():
raise TheoryDataBaseNotFound(
f"could not find theory db folder. Directory not found at {dbpath}"
)
return dbpath
[docs]
def get_commondata(self, setname, sysnum):
"""Get a Commondata from the set name and number."""
# TODO: check where this is used
# as this might ignore cfactors or variants
cd = self.check_commondata(setname, sysnum)
return cd.load()
# @functools.lru_cache()
[docs]
def check_fktable(self, theoryID, setname, cfac):
_, theopath = self.check_theoryID(theoryID)
fkpath = theopath / 'fastkernel' / ('FK_%s.dat' % setname)
if not fkpath.exists():
raise FKTableNotFound(
f"Could not find FKTable for set '{setname}'. File '{fkpath}' not found"
)
cfactors = self.check_cfactor(theoryID, setname, cfac)
return FKTableSpec(fkpath, cfactors)
[docs]
def check_compound(self, theoryID, setname, cfac):
thid, theopath = self.check_theoryID(theoryID)
compound_spec_path = theopath / 'compound' / ('FK_%s-COMPOUND.dat' % setname)
try:
with compound_spec_path.open() as f:
# Drop first line with comment
next(f)
txt = f.read()
except FileNotFoundError as e:
msg = "Could not find COMPOUND set '%s' for theory %d: %s" % (setname, int(thid), e)
raise CompoundNotFound(msg)
# This is a little bit funny, but is the least amount of thinking...
yaml_format = 'FK:\n' + re.sub('FK:', ' - ', txt)
data = yaml.safe_load(yaml_format)
# we have to split out 'FK_' the extension to get a name consistent
# with everything else
try:
tables = [self.check_fktable(theoryID, name[3:-4], cfac) for name in data['FK']]
except FKTableNotFound as e:
raise LoadFailedError(
f"Incorrect COMPOUND file '{compound_spec_path}'. "
f"Searching for non-existing FKTable:\n{e}"
) from e
op = data['OP']
return tuple(tables), op
[docs]
def get_fktable(self, theoryID, setname, cfac):
fkspec = self.check_fktable(theoryID, setname, cfac)
return fkspec.load()
[docs]
def check_cfactor(self, theoryID, setname, cfactors):
_, theopath = self.check_theoryID(theoryID)
cf = []
for cfactor in cfactors:
cfactorpath = theopath / "cfactor" / f"CF_{cfactor}_{setname}.dat"
if not cfactorpath.exists():
msg = (
f"Could not find cfactor '{cfactor}' for FKTable {setname}."
f"File {cfactorpath} does not exist in {theoryID}"
)
raise CfactorNotFound(msg)
cf.append(cfactorpath)
return tuple(cf)
def _check_lagrange_multiplier_set(self, theoryID, setname):
"""Check an integrability or positivity dataset"""
cd = self.check_commondata(setname, 'DEFAULT')
th = self.check_theoryID(theoryID)
fk, _ = self._check_theory_old_or_new(th, cd, [])
return cd, fk, th
[docs]
def check_posset(self, theoryID, setname, postlambda, rules):
"""Load a positivity dataset"""
cd, fk, th = self._check_lagrange_multiplier_set(theoryID, setname)
return PositivitySetSpec(setname, cd, fk, postlambda, th, rules)
[docs]
def check_integset(self, theoryID, setname, postlambda, rules):
"""Load an integrability dataset"""
cd, fk, th = self._check_lagrange_multiplier_set(theoryID, setname)
return IntegrabilitySetSpec(setname, cd, fk, postlambda, th, rules)
[docs]
def get_posset(self, theoryID, setname, postlambda, rules):
return self.check_posset(theoryID, setname, postlambda, rules).load()
[docs]
def check_fit(self, fitname):
resultspath = self.resultspath
if fitname != pathlib.Path(fitname).name:
raise FitNotFound(
f"Could not find fit '{fitname}' in '{resultspath} "
"because the name doesn't correspond to a valid filename"
)
p = resultspath / fitname
if p.is_dir():
return FitSpec(fitname, p)
if not p.is_dir():
msg = f"Could not find fit '{fitname}' in '{resultspath}'. Folder '{p}' not found"
raise FitNotFound(msg)
msg = f"Could not load fit '{fitname}' from '{resultspath}. '{p}' must be a folder"
raise FitNotFound(msg)
[docs]
def check_hyperscan(self, hyperscan_name):
"""Obtain a hyperscan run"""
resultspath = self.hyperscan_resultpath
if hyperscan_name != pathlib.Path(hyperscan_name).name:
raise HyperscanNotFound(
f"Could not find fit '{hyperscan_name}' in '{resultspath} "
"because the name doesn't correspond to a valid filename"
)
p = resultspath / hyperscan_name
if p.is_dir():
hyperspec = HyperscanSpec(hyperscan_name, p)
if hyperspec.tries_files:
return hyperspec
raise HyperscanNotFound(f"No hyperscan output find in {hyperscan_name}")
raise HyperscanNotFound(
f"Could not find hyperscan '{hyperscan_name}' in '{resultspath}'."
f" Folder '{hyperscan_name}' not found"
)
[docs]
def check_default_filter_rules(self, theoryid, defaults=None):
# avoid circular import
from validphys.filters import (
Rule,
default_filter_rules_input,
default_filter_settings_input,
)
th_params = theoryid.get_description()
if defaults is None:
defaults = default_filter_settings_input()
return tuple(
Rule(inp, defaults=defaults, theory_parameters=th_params, loader=self)
for inp in default_filter_rules_input()
)
def _check_theory_old_or_new(self, theoryid, commondata, cfac):
"""Given a theory and a commondata and a theory load the right fktable
checks whether:
1. the theory is a pineappl theory
2. Select the right information (commondata name, legacy name or theory meta)
"""
theoryno, _ = theoryid
name = commondata.name
if theoryid.is_pineappl():
if (thmeta := commondata.metadata.theory) is None:
# Regardless of the type of theory, request the existence of the field
raise TheoryMetadataNotFound(f"No theory metadata found for {name}")
fkspec, op = self.check_fk_from_theory_metadata(thmeta, theoryno, cfac)
elif commondata.legacy_names is None:
raise LoadFailedError(f"Cannot use an old theory with a purely new dataset ({name})")
else:
# Old theories can only be used with datasets that have a corresponding
# old name to map to, and so we need to be able to load the cd at this point
for legacy_name in commondata.legacy_names:
# This might be slow, if it becomes a problem, the map function can be used instead
try:
fkspec, op = self.check_compound(theoryno, legacy_name, cfac)
except CompoundNotFound:
fkspec = self.check_fktable(theoryno, legacy_name, cfac)
op = None
except FKTableNotFound:
continue
break
else:
# If no fktable has been found after looping over legacy names, raise an Error
raise FKTableNotFound(
"Could not find old FKtable or CompoundFile for any of the legacy names of {name} {commondata.legacy_names}"
)
return fkspec, op
[docs]
def check_dataset(
self,
name,
*,
rules=None,
sysnum=None,
theoryid,
cfac=(),
frac=1,
cuts=CutsPolicy.INTERNAL,
use_fitcommondata=False,
fit=None,
weight=1,
variant=None,
):
"""Loads a given dataset
If the dataset contains new-type fktables, use the
pineappl loading function, otherwise fallback to legacy
"""
if not isinstance(theoryid, TheoryIDSpec):
theoryid = self.check_theoryID(theoryid)
# TODO:
# The dataset is checked twice, once here
# and once by config in produce_commondata
# once of the two __must__ be superfluous
# note that both use information from dataset_input
commondata = self.check_commondata(
name, sysnum, use_fitcommondata=use_fitcommondata, fit=fit, variant=variant
)
fkspec, op = self._check_theory_old_or_new(theoryid, commondata, cfac)
# Note this is simply for convenience when scripting. The config will
# construct the actual Cuts object by itself
if isinstance(cuts, str):
cuts = CutsPolicy(cuts)
if isinstance(cuts, CutsPolicy):
if cuts is CutsPolicy.NOCUTS:
cuts = None
elif cuts is CutsPolicy.FROMFIT:
cuts = self.check_fit_cuts(commondata, fit)
elif cuts is CutsPolicy.INTERNAL:
if rules is None:
rules = self.check_default_filter_rules(theoryid)
cuts = self.check_internal_cuts(commondata, rules)
elif cuts is CutsPolicy.FROM_CUT_INTERSECTION_NAMESPACE:
raise LoaderError(f"Intersection cuts not supported in loader calls.")
return DataSetSpec(
name=name,
commondata=commondata,
fkspecs=fkspec,
thspec=theoryid,
cuts=cuts,
frac=frac,
op=op,
weight=weight,
rules=rules,
)
[docs]
def check_experiment(self, name: str, datasets: list[DataSetSpec]) -> DataGroupSpec:
"""Loader method for instantiating DataGroupSpec objects. The NNPDF::Experiment
object can then be instantiated using the load method.
Parameters
----------
name: str
A string denoting the name of the resulting DataGroupSpec object.
dataset: List[DataSetSpec]
A list of DataSetSpec objects pre-created by the user. Note, these too
will be loaded by Loader.
Returns
-------
DataGroupSpec
Example
-------
>>> from validphys.loader import Loader
>>> l = Loader()
>>> ds = l.check_dataset("NMC", theoryid=53, cuts="internal")
>>> exp = l.check_experiment("My DataGroupSpec Name", [ds])
"""
if not isinstance(datasets, list):
raise TypeError("Must specify a list of DataSetSpec objects to use")
return DataGroupSpec(name, datasets)
[docs]
def check_pdf(self, name):
if lhaindex.isinstalled(name):
return PDF(name)
raise PDFNotFound(name)
[docs]
def get_pdf(self, name):
return self.check_pdf(name).load()
[docs]
def check_fit_cuts(self, commondata, fit):
setname = commondata.name
if fit is None:
raise TypeError("Must specify a fit to use the cuts.")
if not isinstance(fit, FitSpec):
fit = self.check_fit(fit)
_, fitpath = fit
cuts_path = (fitpath / 'filter') / setname / ('FKMASK_' + setname + '.dat')
# After 4.0.9 we changed to a new commondata format
# In order to utilize cuts from old fits in new fits it is necessary to translate the names
# There are two translation that might be necessary:
# 1. New names in the runcard, old cuts in the 'fromfit' fit
# 2. Old names in the runcard, new cuts in the 'fromfit' fit
# In order to enforce the usage of the new names, only (1.) will be implemented
if not cuts_path.parent.exists():
if commondata.legacy_names is None:
raise CutsNotFound(f"Bad filter configuration. Could not find {cuts_path.parent}")
# Maybe we have an old-name version of the cuts?
for old_name in commondata.legacy_names:
# Then, check whether there are cuts with the corresponding old name
old_dir = cuts_path.parent.with_name(old_name)
if old_dir.exists():
cuts_path = old_dir / f"FKMASK_{old_name}.dat"
break
else:
raise CutsNotFound(f"Bad filter configuration. Could not find {cuts_path.parent}")
if not cuts_path.exists():
cuts_path = None
return Cuts(commondata, cuts_path)
[docs]
def check_internal_cuts(self, commondata, rules):
return InternalCutsWrapper(commondata, rules)
[docs]
def check_vp_output_file(self, filename, extra_paths=('.',)):
"""Find a file in the vp-cache folder, or (with higher priority) in
the ``extra_paths``."""
try:
vpcache = self._vp_cache()
except KeyError as e:
log.warning("Entry validphys_cache_path expected but not found in the nnprofile.")
else:
extra_paths = (*extra_paths, vpcache)
finder = filefinder.FallbackFinder(extra_paths)
try:
path, name = finder.find(filename)
except FileNotFoundError as e:
raise LoadFailedError(f"Could not find '{filename}'") from e
except filefinder.FinderError as e:
raise LoaderError(e) from e
return path / name
# http://stackoverflow.com/a/15645088/1007990
def _download_and_show(response, stream):
total_length = response.headers.get('content-length')
if total_length is None or not log.isEnabledFor(logging.INFO):
stream.write(response.content)
else:
dl = 0
prev_done = -1
total_length = int(total_length)
for data in response.iter_content(chunk_size=4096):
dl += len(data)
stream.write(data)
if sys.stdout.isatty():
done = int(50 * dl / total_length)
if prev_done != done:
sys.stdout.write(f"\r[{'=' * done}{' '*(50 - done)}] ({done * 2}%)")
prev_done = done
sys.stdout.flush()
sys.stdout.write('\n')
[docs]
def download_file(url, stream_or_path, make_parents=False, delete_on_failure=False):
"""Download a file and show a progress bar if the INFO log level is
enabled. If ``make_parents`` is ``True`` ``stream_or_path``
is path-like, all the parent folders will
be created."""
# There is a bug in CERN's
# Apache that incorrectly sets the Content-Encodig header to gzip, even
# though it doesn't compress two times.
# See: http://mail-archives.apache.org/mod_mbox/httpd-dev/200207.mbox/%3C3D2D4E76.4010502@talex.com.pl%3E
# and e.g. https://bugzilla.mozilla.org/show_bug.cgi?id=610679#c30
# If it looks like the url is already encoded, we do not request
# it to be compressed
headers = {}
if mimetypes.guess_type(url)[1] is not None:
headers['Accept-Encoding'] = None
response = requests.get(url, stream=True, headers=headers)
response.raise_for_status()
if isinstance(stream_or_path, (str, bytes, os.PathLike)):
p = pathlib.Path(stream_or_path)
if p.is_dir():
raise IsADirectoryError(p)
log.info("Downloading %s to %s.", url, stream_or_path)
if make_parents:
p.parent.mkdir(exist_ok=True, parents=True)
with tempfile.NamedTemporaryFile(
delete=delete_on_failure, dir=p.parent, prefix=p.name, suffix='.part'
) as f:
_download_and_show(response, f)
shutil.move(f.name, p)
else:
log.info("Downloading %s.", url)
_download_and_show(response, stream_or_path)
[docs]
def download_and_extract(url, local_path, target_name=None):
"""Download a compressed archive and then extract it to the given path"""
local_path = pathlib.Path(local_path)
if not local_path.is_dir():
raise NotADirectoryError(local_path)
name = url.split('/')[-1]
archive_dest = tempfile.NamedTemporaryFile(delete=False, suffix=name, dir=local_path)
with archive_dest as t:
log.debug("Saving data to %s", t.name)
download_file(url, t)
log.info("Extracting archive to %s", local_path)
try:
with tarfile.open(archive_dest.name) as res_tar:
# Extract to a temporary directory
folder_dest = tempfile.TemporaryDirectory(dir=local_path, suffix=name)
dest_path = pathlib.Path(folder_dest.name)
try:
res_tar.extractall(path=dest_path, filter="data")
except tarfile.LinkOutsideDestinationError as e:
if sys.version_info > (3, 11):
raise e
# For older versions of python ``filter=data`` might be too restrictive
# for the links inside the ``postfit`` folder if you are using more than one disk
res_tar.extractall(path=dest_path, filter="tar")
# Check there are no more than one item in the top level
top_level_stuff = list(dest_path.glob("*"))
if len(top_level_stuff) > 1:
raise RemoteLoaderError(f"More than one item in the top level directory of {url}")
if target_name is None:
target_path = local_path
else:
target_path = local_path / target_name
shutil.move(top_level_stuff[0], target_path)
except Exception as e:
log.error(
f"The original archive at {t.name} was only extracted partially at \n{local_path}"
)
raise e
else:
os.unlink(archive_dest.name)
def _key_or_loader_error(f):
@functools.wraps(f)
def f_(*args, **kwargs):
try:
return f(*args, **kwargs)
except KeyError as e:
log.error(f"nnprofile is configured improperly: Key {e} is missing from the profile!")
raise LoaderError(
"Cannot attempt download because "
"nnprofile is configured improperly: "
f"Missing key '{e}'"
) from e
return f_
# TODO: Make this async someday
[docs]
class RemoteLoader(LoaderBase):
@property
@_key_or_loader_error
def fit_urls(self):
return self.nnprofile['fit_urls']
@property
@_key_or_loader_error
def fit_index(self):
return self.nnprofile['fit_index']
@property
@_key_or_loader_error
def hyperscan_url(self):
return self.nnprofile['hyperscan_urls']
@property
@_key_or_loader_error
def hyperscan_index(self):
return self.nnprofile['hyperscan_index']
@property
@_key_or_loader_error
def theory_urls(self):
return self.nnprofile['theory_urls']
@property
@_key_or_loader_error
def theory_index(self):
return self.nnprofile['theory_index']
@property
@_key_or_loader_error
def eko_index(self):
return self.nnprofile['eko_index']
@property
@_key_or_loader_error
def eko_urls(self):
return self.nnprofile['eko_urls']
@property
@_key_or_loader_error
def nnpdf_pdfs_urls(self):
return self.nnprofile['nnpdf_pdfs_urls']
@property
@_key_or_loader_error
def nnpdf_pdfs_index(self):
return self.nnprofile['nnpdf_pdfs_index']
@property
@_key_or_loader_error
def lhapdf_urls(self):
urls = self.nnprofile['lhapdf_urls']
if len(urls) > 1:
log.warning("Only one lhapdf_url is supported at the moment.")
if len(urls) == 0:
raise LoaderError("The specification for lhapdf_urls is empty in nnprofile")
return urls
def _remote_files_from_url(self, url, index, thing='files'):
index_url = url + index
try:
resp = requests.get(index_url)
resp.raise_for_status()
except Exception as e:
raise RemoteLoaderError(f"Failed to fetch remote {thing} index {index_url}: {e}") from e
try:
info = resp.json()['files']
except Exception as e:
raise RemoteLoaderError(
f"Malformed index {index_url}. Expecting json with a key 'files': {e}"
) from e
return {file.split('.')[0]: url + file for file in info}
[docs]
def remote_files(self, urls, index, thing='files'):
d = {}
for url in urls:
try:
d.update(self._remote_files_from_url(url, index, thing))
except RemoteLoaderError as e:
log.error(e)
return d
@property
@functools.lru_cache
def remote_fits(self):
return self.remote_files(self.fit_urls, self.fit_index, thing="fits")
@property
@functools.lru_cache
def remote_hyperscans(self):
return self.remote_files(self.hyperscan_url, self.hyperscan_index, thing="hyperscan")
@property
@functools.lru_cache
def remote_theories(self):
token = 'theory_'
rt = self.remote_files(self.theory_urls, self.theory_index, thing="theories")
return {k[len(token) :]: v for k, v in rt.items()}
@property
@functools.lru_cache
def remote_ekos(self):
token = 'eko_'
rt = self.remote_files(self.eko_urls, self.eko_index, thing="ekos")
return {k[len(token) :]: v for k, v in rt.items()}
@property
@functools.lru_cache
def remote_nnpdf_pdfs(self):
return self.remote_files(self.nnpdf_pdfs_urls, self.nnpdf_pdfs_index, thing="PDFs")
@functools.cached_property
def remote_keywords(self):
root = self.nnprofile['reports_root_url']
url = urls.urljoin(root, 'index.json')
try:
req = requests.get(url)
req.raise_for_status()
keyobjs = req.json()['keywords']
l = [k[0] for k in keyobjs]
except requests.RequestException as e:
raise RemoteLoaderError(e) from e
return l
@property
def downloadable_fits(self):
return list(self.remote_fits)
@property
def downloadable_hyperscans(self):
return list(self.remote_hyperscans)
@property
def downloadable_theories(self):
return list(self.remote_theories)
@property
def downloadable_ekos(self):
return list(self.remote_ekos)
@property
def lhapdf_pdfs(self):
return lhaindex.expand_index_names('*')
@property
def nnpdf_pdfs(self):
return list(self.remote_nnpdf_pdfs)
@property
def downloadable_pdfs(self):
return {*self.lhapdf_pdfs, *self.downloadable_fits, *self.nnpdf_pdfs}
[docs]
def download_fit(self, fitname):
if not fitname in self.remote_fits:
raise FitNotFound(f"Could not find fit '{fitname}' in remote index {self.fit_index}")
with tempfile_cleaner(
root=self.resultspath,
exit_func=shutil.rmtree,
exc=KeyboardInterrupt,
prefix='fit_download_deleteme_',
) as tempdir:
download_and_extract(self.remote_fits[fitname], tempdir)
# Handle old-style fits compressed with 'results' as root.
old_style_res = tempdir / 'results'
if old_style_res.is_dir():
move_target = old_style_res / fitname
else:
move_target = tempdir / fitname
if not move_target.is_dir():
raise RemoteLoaderError(
f"Unknown format for fit in {tempdir}. Expecting a folder {move_target}"
)
fitpath = self.resultspath / fitname
shutil.move(move_target, fitpath)
if lhaindex.isinstalled(fitname):
log.warning(
f"The PDF corresponding to the downloaded fit '{fitname}' "
"exists in the LHAPDF path."
" Will be erased and replaced with the new one."
)
p = pathlib.Path(lhaindex.finddir(fitname))
if p.is_symlink():
p.unlink()
else:
shutil.rmtree(p)
else:
p = pathlib.Path(lhaindex.get_lha_datapath()) / fitname
# This is needed here as well because the path may be a
# broken symlink.
if p.is_symlink():
p.unlink()
gridpath = fitpath / 'postfit' / fitname
gridpath_old = fitpath / 'nnfit' / fitname
if gridpath.is_dir():
p.symlink_to(gridpath, target_is_directory=True)
else:
log.warning(f"Cannot find {gridpath}. Falling back to old behaviour")
p.symlink_to(gridpath_old, target_is_directory=True)
[docs]
def download_hyperscan(self, hyperscan_name):
"""Download a hyperscan run from the remote server
Downloads the run to the results folder
"""
if not hyperscan_name in self.remote_hyperscans:
raise HyperscanNotFound(
f"Could not find hyperscan {hyperscan_name} in remote index {self.hyperscan_index}"
)
with tempfile_cleaner(
root=self.hyperscan_resultpath,
exit_func=shutil.rmtree,
exc=KeyboardInterrupt,
prefix='fit_download_deleteme_',
) as tempdir:
download_and_extract(self.remote_hyperscans[hyperscan_name], tempdir)
move_target = tempdir / hyperscan_name
if not move_target.is_dir():
raise RemoteLoaderError(
f"Unknown format for fit in {tempdir}. Expecting a folder {move_target}"
)
hyperscan_path = self.hyperscan_resultpath / hyperscan_name
shutil.move(move_target, hyperscan_path)
[docs]
def download_pdf(self, name):
# Check if the pdf is an existing fit first
try:
# We don't want to download the fit here
fit = Loader.check_fit(self, name)
except FitNotFound:
pass
else:
p = pathlib.Path(lhaindex.get_lha_datapath()) / fit.name
fitpath = fit.path / 'postfit'
fitpath_old = fit.path / 'nnfit'
if fitpath.exists() or fitpath_old.exists():
log.info(
"Found existing fit with the same name as the "
"requested PDF (%s). Symlinking the grid to the LHAPDF path (%s).",
name,
p,
)
# This is needed here as well because the path may be a
# broken symlink.
if p.is_symlink():
p.unlink()
if fitpath.exists():
p.symlink_to(fitpath / fit.name)
else:
p.symlink_to(fitpath_old / fit.name)
return
# It would be good to use the LHAPDF command line, except that it does
# questionable things like returning 0 exit status when it fails to
# download.
_saved_exception = False
if name in self.lhapdf_pdfs:
try:
url = self.lhapdf_urls[0] + name + '.tar.gz'
# url = 'https://data.nnpdf.science/thisisatesttodelete/NNPDF31_nlo_as_0118.tar.gz'
# url = 'https://data.nnpdf.science/patata/NNPDF31_nlo_as_0118.tar.gz'
return download_and_extract(url, lhaindex.get_lha_datapath())
except shutil.ReadError as e:
_saved_exception = e
log.error(
f"{e}. It seems the LHAPDF URLs aren't behaving, "
f"attempting to find resource in other repositories"
)
pass
except requests.RequestException as e:
_saved_exception = e
log.error(
f"There was a problem with the connection: {e}. "
f"Attempting to find resource elsewhere."
)
pass
except RemoteLoaderError as e:
_saved_exception = e
log.error(f"Failed to download resource: {e}. Attempting " f"to find it elsewhere.")
pass
if name in self.downloadable_fits:
try:
return self.download_fit(name)
except requests.RequestException as e:
_saved_exception = e
log.error(
f"There was a problem with the connection: {e}. "
f"Attempting to find resource elsewhere."
)
pass
except RemoteLoaderError as e:
_saved_exception = e
log.error(f"Failed to download resource: {e}. Attempting " f"to find it elsewhere.")
pass
if name in self.remote_nnpdf_pdfs:
return download_and_extract(self.remote_nnpdf_pdfs[name], lhaindex.get_lha_datapath())
elif _saved_exception:
raise LoadFailedError(
f"{_saved_exception}. The resource could not " f"be found elsewhere."
) from _saved_exception
else:
raise PDFNotFound("PDF '%s' is neither an uploaded fit nor an " "LHAPDF set." % name)
[docs]
def download_theoryID(self, thid):
thid = str(thid)
remote = self.remote_theories
if thid not in remote:
raise TheoryNotFound("Theory %s not available." % thid)
download_and_extract(remote[thid], self._theories_path, target_name=f"theory_{thid}")
[docs]
def download_eko(self, thid):
"""Download the EKO for a given theory ID"""
thid = str(thid)
remote = self.remote_ekos
if thid not in remote:
raise EkoNotFound(f"EKO for TheoryID {thid} is not available in the remote server")
# Check that we have the theory we need
theory = self.check_theoryID(thid)
target_path = theory.path / "eko.tar"
download_file(remote[thid], target_path, delete_on_failure=True)
[docs]
def download_vp_output_file(self, filename, **kwargs):
try:
root_url = self.nnprofile['reports_root_url']
except KeyError as e:
raise LoadFailedError('Key report_root_url not found in nnprofile')
try:
url = root_url + filename
except Exception as e:
raise LoadFailedError(e) from e
try:
filename = pathlib.Path(filename)
download_file(url, self._vp_cache() / filename, make_parents=True)
except requests.HTTPError as e:
if e.response.status_code == requests.codes.not_found:
raise RemoteLoaderError(
f"Resource {filename} could not " f"be found on the validphys " f"server {url}"
) from e
elif e.response.status_code == requests.codes.unauthorized:
log.error(
"Could not access the validphys reports page "
"because the authentification is not provided. "
"Please, update your ~/.netrc file to contain the "
"following:\n\n"
f"machine {urls.urlsplit(root_url).netloc}\n"
f" login nnpdf\n"
f" password <PASSWORD>\n"
)
raise
[docs]
class FallbackLoader(Loader, RemoteLoader):
"""A loader that first tries to find resources locally
(calling Loader.check_*) and if it fails, it tries to download them
(calling RemoteLoader.download_*)."""
[docs]
def make_checker(self, resource):
# We are intercepting the check_
orig = super().__getattribute__('check_' + resource)
download = getattr(self, 'download_' + resource)
@functools.wraps(orig)
def f(*args, **kwargs):
try:
return orig(*args, **kwargs)
except LoadFailedError as e:
saved_exception = e
log.info(
"Could not find a resource "
f"({resource}): {saved_exception}. "
f"Attempting to download it."
)
try:
download(*args, **kwargs)
except RemoteLoaderError as e:
log.error(f"Failed to download resource: {e}")
raise e
except LoadFailedError as e:
log.error(f"Resource not in the remote repository: {e}")
raise saved_exception
except requests.RequestException as e:
log.error(f"There was a problem with the connection: {e}")
raise saved_exception from e
except Exception as e:
# Simply raise these for now so we can find and fix them
raise e
else:
return orig(*args, **kwargs)
return f
def __getattribute__(self, attr):
token = 'check_'
if attr.startswith(token):
resname = attr[len(token) :]
if hasattr(RemoteLoader, 'download_' + resname):
return super().__getattribute__('make_checker')(resname)
return super().__getattribute__(attr)