from collections import defaultdict
import dataclasses
import json
import logging
import pathlib
import sys
from ekobox import apply, genpdf, info_file
import numpy as np
import eko
from eko import basis_rotation, runner
from eko.interpolation import XGrid
from eko.io import manipulate
from validphys.utils import yaml_safe
from . import eko_utils, utils
_logger = logging.getLogger(__name__)
LOG_FILE = "evolven3fit.log"
LOGGING_SETTINGS = {
"formatter": logging.Formatter("%(asctime)s %(name)s/%(levelname)s: %(message)s"),
"level": logging.DEBUG,
}
[docs]def evolve_fit(
fit_folder, q_fin, q_points, op_card_dict, theory_card_dict, force, eko_path, dump_eko=None
):
"""
Evolves all the fitted replica in fit_folder/nnfit
Parameters
----------
fit_folder: str or pathlib.Path
path to the folder containing the fit
q_fin: float
final point of the q_grid
q_points: int
number of points in the q_grid
op_card_dict: dict
user settings for the op_card
theory_card_dict: dict
user settings for the t_card
force: bool
whether to force the evolution to be done again
eko_path: str or pathlib.Path
path where the eko is stored (if None the eko will be
recomputed)
dump_eko: str or pathlib.Path
path where the eko is dumped (necessary only if the eko is computed)
"""
log_file = pathlib.Path(fit_folder) / LOG_FILE
if log_file.exists():
if force:
log_file.unlink()
else:
raise FileExistsError(
f"Log file already exists: {log_file}. Has evolven3fit already been run?"
)
log_file = logging.FileHandler(log_file)
stdout_log = logging.StreamHandler(sys.stdout)
for log in [log_file, stdout_log]:
log.setFormatter(LOGGING_SETTINGS["formatter"])
# The log file will get everything
log_file.setLevel(LOGGING_SETTINGS["level"])
# While the terminal only up to info
stdout_log.setLevel(logging.INFO)
for logger in (_logger, *[logging.getLogger("eko")]):
logger.handlers = []
logger.setLevel(LOGGING_SETTINGS["level"])
logger.addHandler(log_file)
logger.addHandler(stdout_log)
usr_path = pathlib.Path(fit_folder)
initial_PDFs_dict = load_fit(usr_path)
x_grid = np.array(initial_PDFs_dict[list(initial_PDFs_dict.keys())[0]]["xgrid"]).astype(float)
theoryID = utils.get_theoryID_from_runcard(usr_path)
if eko_path is not None:
eko_path = pathlib.Path(eko_path)
_logger.info(f"Loading eko from : {eko_path}")
if eko_path is None or not eko_path.exists():
if dump_eko is not None:
_logger.warning(f"Trying to construct the eko at {dump_eko}")
theory, op = eko_utils.construct_eko_cards(
theoryID, q_fin, q_points, x_grid, op_card_dict, theory_card_dict
)
runner.solve(theory, op, dump_eko)
eko_path = dump_eko
else:
raise ValueError(f"dump_eko not provided and {eko_path=} not found")
# Open the EKO in read-only mode, if it needs to be manipulated keep it in memory
with eko.EKO.read(eko_path) as eko_op:
# Read the cards directly from the eko to make sure they are consistent
theory = eko_op.theory_card
op = eko_op.operator_card
# And dump them to the log
_logger.debug(f"Theory card: {json.dumps(theory.raw)}")
_logger.debug(f"Operator card: {json.dumps(op.raw)}")
eko_original_xgrid = eko_op.xgrid
if XGrid(x_grid) != eko_original_xgrid:
# If the xgrid of the eko is not directly usable, construct a copy in memory
# by replacing the internal inventory of operators in a readonly copy
new_xgrid = XGrid(x_grid)
new_metadata = dataclasses.replace(eko_op.metadata, xgrid=new_xgrid)
new_operators = {}
for target_key in eko_op.operators:
elem = eko_op[target_key.ep]
if eko_op.metadata.version == "0.13.4":
# For eko 0.13.4 xgrid is the internal interpolation so we need to check
# whether the rotation is truly needed
# <in practice> this means checking whether the operator shape matches the grid
oplen = elem.operator.shape[-1]
if oplen != len(eko_original_xgrid):
# The operator and its xgrid have different shape
# either prepare an identity, or this EKO is not supported
if oplen != len(x_grid):
raise ValueError(
f"The operator at {eko_path} is not usable, version not supported"
)
eko_original_xgrid = XGrid(x_grid)
new_operators[target_key] = manipulate.xgrid_reshape(
elem,
eko_original_xgrid,
op.configs.interpolation_polynomial_degree,
targetgrid=XGrid(x_grid),
inputgrid=XGrid(x_grid),
)
new_inventory = dataclasses.replace(eko_op.operators, cache=new_operators)
eko_op = dataclasses.replace(eko_op, metadata=new_metadata, operators=new_inventory)
# Modify the info file with the fit-specific info
info = info_file.build(theory, op, 1, info_update={})
info["NumMembers"] = "REPLACE_NREP"
info["ErrorType"] = "replicas"
info["XMin"] = float(x_grid[0])
info["XMax"] = float(x_grid[-1])
# Save the PIDs in the info file in the same order as in the evolution
info["Flavors"] = basis_rotation.flavor_basis_pids
info.setdefault("NumFlavors", 5)
dump_info_file(usr_path, info)
# Read the information from all the sorted replicas into what eko wants
n_replicas = len(initial_PDFs_dict)
all_replicas = []
for rep_idx in range(1, n_replicas + 1):
# swap photon position to match eko.basis_rotation.flavor_basis_pids
pdfgrid = np.array(initial_PDFs_dict[f"replica_{rep_idx}"]["pdfgrid"])
pdfgrid = np.append(pdfgrid[:, -1].reshape(x_grid.size, 1), pdfgrid[:, :-1], axis=1)
# and divide by x
all_replicas.append(pdfgrid.T / x_grid)
# output is {(Q2, nf): (replica, flavour, x)}
all_evolved, _ = apply.apply_grids(eko_op, np.array(all_replicas))
# Now, replica by replica, break into nf blocks
targetgrid = eko_op.xgrid.tolist()
by_nf = defaultdict(list)
for q2, nf in sorted(eko_op.evolgrid, key=lambda ep: ep[1]):
by_nf[nf].append(q2)
q2block_per_nf = {nf: sorted(q2s) for nf, q2s in by_nf.items()}
for replica in range(n_replicas):
blocks = []
for nf, q2grid in q2block_per_nf.items():
def pdf_xq2(pid, x, Q2):
x_idx = targetgrid.index(x)
pid_idx = info["Flavors"].index(pid)
return x * all_evolved[(Q2, nf)][replica][pid_idx][x_idx]
block = genpdf.generate_block(
pdf_xq2,
xgrid=targetgrid,
sorted_q2grid=q2grid,
pids=basis_rotation.flavor_basis_pids,
)
blocks.append(block)
dump_evolved_replica(blocks, usr_path, replica + 1)
# remove folder:
# The function dump_evolved_replica uses a temporary folder
# We need then to remove it after fixing the position of those replica files
(usr_path / "nnfit" / usr_path.stem).rmdir()
[docs]def load_fit(usr_path):
"""
Loads all the replica pdfs at fitting scale in usr_path and return the exportgrids
Parameters
----------
usr_path: pathlib.Path
path to the folder containing the fit
Returns
-------
: dict
exportgrids info
"""
nnfitpath = usr_path / "nnfit"
pdf_dict = {}
for yaml_file in nnfitpath.glob(f"replica_*/{usr_path.name}.exportgrid"):
data = yaml_safe.load(yaml_file.read_text(encoding="UTF-8"))
pdf_dict[yaml_file.parent.stem] = data
return pdf_dict
[docs]def dump_evolved_replica(evolved_blocks, usr_path, replica_num):
"""
Dump the evolved replica given by evolved_block as the replica num "replica_num" in
the folder usr_path/nnfit/replica_<replica_num>/usr_path.stem.dat
Parameters
----------
evolved_block: list(numpy.array)
list of blocks of an evolved PDF
usr_path: pathlib.Path
path of the fit folder
replica_num: int
replica number
"""
path_where_dump = usr_path / "nnfit" / usr_path.stem
# create folder to dump the evolved replica if it does not exist
path_where_dump.mkdir(exist_ok=True)
to_write_in_head = f"PdfType: replica\nFromMCReplica: {replica_num}\n"
genpdf.export.dump_blocks(
path_where_dump, replica_num, evolved_blocks, pdf_type=to_write_in_head
)
# fixing_replica_path
utils.fix_replica_path(usr_path, replica_num)
[docs]def dump_info_file(usr_path, info):
"""
Dump the info file given by info in the folder usr_path/nnfit/usr_path.stem.info.
Parameters
----------
usr_path: pathlib.Path
path of the fit folder
info: dict
info of the fit
"""
path_where_dump = usr_path / "nnfit" / usr_path.stem
genpdf.export.dump_info(path_where_dump, info)
utils.fix_info_path(usr_path)