Source code for n3fit.model_gen

"""
Library of functions which generate the models used by n3fit to determine PDF.

It contains functions to generate:

1) Observables
    The main function is ``observable_generator`` which takes the input theory
    and generates the path from the PDF result to the computation of the
    training and validation losses / chi2

2) PDFs
    The main function is ``generate_pdf_model``, which takes a list of settings
    defining the replica-dependent architecture of each of the models that form
    the ensemble as well as ensemble-wide options such as the flavour basis,
    sum rule definition or theoretical settings, and generates a PDF model
    which takes an array of (x) as input and outputs the value of the PDF
    for each replica, for each x for each flavour.
"""

from dataclasses import asdict, dataclass, field
from typing import Callable

import numpy as np

from n3fit.backends import (
    NN_LAYER_ALL_REPLICAS,
    NN_PREFIX,
    PREPROCESSING_LAYER_ALL_REPLICAS,
    Input,
    Lambda,
    MetaLayer,
    MetaModel,
    base_layer_selector,
)
from n3fit.backends import operations as op
from n3fit.layers import (
    DIS,
    DY,
    AddPhoton,
    FkRotation,
    FlavourToEvolution,
    Mask,
    ObsRotation,
    Preprocessing,
    losses,
)
from n3fit.layers.observable import is_unique
from n3fit.msr import generate_msr_model_and_grid
from validphys.photon.compute import Photon

from n3fit.backends import regularizer_selector  # isort: skip isort and black don't agree


[docs]@dataclass class ObservableWrapper: """Wraps many observables into an experimental layer once the PDF model is prepared It can take normal datasets or Lagrange-multiplier-like datasets (such as positivity or integrability) """ # IDEALLY: # In principle this is something that could be automatically provided by validphyts # __but__ it requires backend (i.e., tensorflow) information # but maybe it can be constructed in such a way that the backend is lazyly imported # and make this part of the experiment spec name: str observables: list trvl_mask_layer: Mask dataset_xsizes: list invcovmat: np.array = None covmat: np.array = None multiplier: float = 1.0 integrability: bool = False positivity: bool = False data: np.array = None rotation: ObsRotation = None # only used for diagonal covmat def _generate_loss(self, mask=None): """Generates the corresponding loss function depending on the values the wrapper was initialized with""" if self.invcovmat is not None: covmat_matrix = self.covmat invcovmat_matrix = self.invcovmat loss = losses.LossInvcovmat( invcovmat_matrix, self.data, mask, covmat=covmat_matrix, name=self.name ) elif self.positivity: loss = losses.LossPositivity(name=self.name, c=self.multiplier) elif self.integrability: loss = losses.LossIntegrability(name=self.name, c=self.multiplier) return loss def _generate_experimental_layer(self, pdf): """Generate the experimental layer by feeding to each observable its PDF. In the most general case, each observable might need a PDF evaluated on a different xgrid, the input PDF is evaluated in all points that the experiment needs and needs to be split """ if len(self.dataset_xsizes) > 1: splitting_layer = op.tensor_splitter( pdf.shape, self.dataset_xsizes, axis=2, name=f"{self.name}_split" ) sp_pdf = splitting_layer(pdf) output_layers = [obs(p) for obs, p in zip(self.observables, sp_pdf)] else: output_layers = [obs(pdf) for obs in self.observables] # Finally concatenate all observables (so that experiments are one single entity) ret = op.concatenate(output_layers, axis=-1) # rotate the predictions to the diagonal basis if rotation is provided. if self.rotation is not None: ret = self.rotation(ret) if self.trvl_mask_layer is not None: ret = self.trvl_mask_layer(ret) return ret def __call__(self, pdf_layer, mask=None): loss_f = self._generate_loss(mask) experiment_prediction = self._generate_experimental_layer(pdf_layer) return loss_f(experiment_prediction)
[docs]def observable_generator( spec_dict, boundary_condition=None, training_mask_array=None, validation_mask_array=None, training_data=None, validation_data=None, invcovmat_tr=None, invcovmat_vl=None, positivity_initial=1.0, integrability=False, n_replicas=1, ): # pylint: disable=too-many-locals """ This function generates the observable models for each experiment. These are models which takes as input a PDF tensor (1 x size_of_xgrid x flavours) and outputs the result of the observable for each contained dataset (n_points,). In summary the model has the following structure: Observable layers, corresponding to commondata datasets and made of any number of fktables (and an operation on them). An observable contains an fktable, which is loaded by the convolution layer (be it hadronic or DIS) and a inv covmat which loaded by the loss. This function also outputs three "output objects" (which are functions that generate layers) that use the training and validation mask to create a training_output, validation_output and experimental_output If the dataset is a positivity dataset acts in consequence. The output is a dictionary (`layer_info`), each one of the three output functions have a signature: `def out_tr(pdf_layer, dataset_out=None)` The `pdf_layer` must be a layer of shape (1, size_of_xgrid, flavours) `datasets_out` is the list of dataset to be masked to 0 when generating the layer Parameters ---------- spec_dict: dict a dictionary-like object containing the information of the experiment boundary_condition: dict dictionary containing the instance of the a PDF set to be used as a Boundary Condition. training_mask_array: np.ndarray training mask per replica validation_mask_array: np.ndarray validation mask per replica, when not given ~training_mask_array will be used while in general the validation is a negation of the training, in special cases such as 1-point datasets, these are accepted by both masks and then removed by the loss n_replicas: int number of replicas fitted simultaneously positivity_initial: float set the positivity lagrange multiplier for epoch 1 integrability: bool switch on/off the integrability constraints Returns ------ layer_info: dict a dictionary with: - `inputs`: input layer - `output`: output layer (unmasked) - `output_tr`: output layer (training) - `output_vl`: output layer (validation) - `experiment_xsize`: int (size of the output array) """ spec_name = spec_dict["name"] dataset_xsizes = [] model_inputs = [] model_observables = [] # The first step is to compute the observable for each of the datasets for dataset in spec_dict["datasets"]: # Get the generic information of the dataset dataset_name = dataset.name # Look at what kind of layer do we need for this dataset if dataset.hadronic: Obs_Layer = DY else: Obs_Layer = DIS # Set the operation (if any) to be applied to the fktables of this dataset operation_name = dataset.operation # Now generate the observable layer, which takes the following information: # operation name # dataset name # list of validphys.coredata.FKTableData objects # these will then be used to check how many different pdf inputs are needed # (and convolutions if given the case) obs_layer = Obs_Layer( dataset.fktables_data, dataset.fktables(), dataset_name, boundary_condition, operation_name, n_replicas=n_replicas, name=f"dat_{dataset_name}", ) # If the observable layer found that all input grids are equal, the splitting will be None # otherwise the different xgrids need to be stored separately # Note: for pineappl grids, obs_layer_tr.splitting should always be None if obs_layer.splitting is None: xgrid = dataset.fktables_data[0].xgrid model_inputs.append(xgrid) dataset_xsizes.append(len(xgrid)) else: xgrids = [i.xgrid for i in dataset.fktables_data] model_inputs += xgrids dataset_xsizes.append(sum([len(i) for i in xgrids])) model_observables.append(obs_layer) # Check whether all xgrids of all observables in this experiment are equal # if so, simplify the model input if is_unique(model_inputs): model_inputs = model_inputs[0:1] dataset_xsizes = dataset_xsizes[0:1] # Reshape all inputs arrays to be (1, nx) model_inputs = np.concatenate(model_inputs).reshape(1, -1) # Make the mask layers... if training_mask_array is None: tr_mask_layer = None if validation_mask_array is None: vl_mask_layer = None else: vl_mask_layer = Mask(validation_mask_array, name=f"vlmask_{spec_name}") else: tr_mask_layer = Mask(training_mask_array, name=f"trmask_{spec_name}") if validation_mask_array is None: vl_mask_layer = Mask(~training_mask_array, name=f"vlmask_{spec_name}") else: vl_mask_layer = Mask(validation_mask_array, name=f"vlmask_{spec_name}") # get rotation matrix to diagonal basis if spec_dict.get("data_transformation") is not None: obsrot = ObsRotation(spec_dict.get("data_transformation")) else: obsrot = None if spec_dict["positivity"]: out_positivity = ObservableWrapper( spec_name, model_observables, tr_mask_layer, dataset_xsizes, multiplier=positivity_initial, positivity=not integrability, integrability=integrability, ) layer_info = { "inputs": model_inputs, "output_tr": out_positivity, "experiment_xsize": sum(dataset_xsizes), } # For positivity we end here return layer_info out_tr = ObservableWrapper( spec_name, model_observables, tr_mask_layer, dataset_xsizes, invcovmat=invcovmat_tr, data=training_data, rotation=obsrot, ) out_vl = ObservableWrapper( f"{spec_name}_val", model_observables, vl_mask_layer, dataset_xsizes, invcovmat=invcovmat_vl, data=validation_data, rotation=obsrot, ) # experimental data has already been rotated if diagonal basis is requested out_exp = ObservableWrapper( f"{spec_name}_exp", model_observables, None, dataset_xsizes, invcovmat=spec_dict["invcovmat_true"], covmat=spec_dict["covmat"], data=spec_dict["expdata_true"], rotation=None, ) layer_info = { "inputs": model_inputs, "output": out_exp, "output_tr": out_tr, "output_vl": out_vl, "experiment_xsize": sum(dataset_xsizes), } return layer_info
[docs]@dataclass class ReplicaSettings: """Dataclass which holds all necessary replica-dependent information of a PDF. Parameters ---------- seed: int seed for the initialization of the neural network nodes: list[int] nodes of each of the layers, starting at the first hidden layer activations: list[str] list of activation functions, should be of equal length as nodes architecture: str select the architecture of the neural network used for the replica, e.g. ``dense`` or ``dense_per_flavour`` initializer: str initializer to be used for this replica dropout: float rate of dropout for each layer regularizer: str name of the regularizer to use for this replica (if any) regularizer_args: dict options to pass down to the regularizer (if any) """ seed: int nodes: list[int] activations: list[str] architecture: str = "dense" initializer: str = "glorot_normal" dropout_rate: float = 0.0 regularizer: str = None regularizer_args: dict = field(default_factory=dict) def __post_init__(self): """Apply checks to the input, and expand hyperopt callables""" # Expansions if callable(self.activations): # Hyperopt might pass down a function to generate the list of activations # depending on the number of layers self.activations = self.activations(len(self.nodes)) if self.regularizer_args is None: self.regularizer_args = dict() # Checks if len(self.nodes) != len(self.activations): raise ValueError( f"nodes and activations do not match ({self.nodes} vs {self.activations}" ) if self.regularizer_args and self.regularizer is None: raise ValueError( "Regularizer arguments have been provided but no regularizer is selected" )
[docs]def generate_pdf_model( replicas_settings: list[ReplicaSettings], flav_info: dict = None, fitbasis: str = "NN31IC", out: int = 14, impose_sumrule: str = None, scaler: Callable = None, photons: Photon = None, ): """ Generation of the full PDF model which will be used to determine the full PDF. The full PDF model can have any number of replicas, which can be trained in parallel, the limitations of the determination means that there are certain traits that all replicas must share, while others are fre per-PDF. In its most general form, the output of this function is a :py:class:`n3fit.backend.MetaModel` with the following architecture: <input layer> in the standard PDF fit this includes only the (x) grid of the NN [ list of a separate architecture per replica ] which can be, but is not necessary, equal for all replicas [ <preprocessing factors> ] postprocessing of the network output by a variation x^{alpha}*(1-x)^{beta} <normalization> physical sum rules, requires an integral over the PDF <rotation to FK-basis> regardless of the physical basis in which the PDF and preprocessing factors are applied the output is rotated to the 14-flavour general basis used in FkTables following PineaAPPL's convention [<output layer>] 14 flavours per value of x per replica note that, depending on the fit basis (and fitting scale) the output of the PDF will contain repeated values This function defines how the PDFs will be generated. In the case of identical PDF models (``identical_models = True``, default) the same settings will be used for all replicas. Otherwise, the sampling routines will be used. Parameters: ----------- replica_settings: list[ReplicaSettings] list of ReplicaSettings objects which must contain the following information nodes: list(int) list of the number of nodes per layer of the PDF NN activation: list list of activation functions to apply to each layer initializer_name: str selects the initializer of the weights of the NN. Default: glorot_normal layer_type: str selects the type of architecture of the NN. Default: dense seed: int the initialization seed for the NN dropout: float rate of dropout layer by layer regularizer: str name of the regularizer to use for the NN regularizer_args: dict options to pass down to the regularizer (if any) flav_info: dict dictionary containing the information about each PDF (basis dictionary in the runcard) to be used by Preprocessing fitbasis: str fitbasis used during the fit. Default: NN31IC out: int number of output flavours of the model (default 14) impose_sumrule: str whether to impose sumrules on the output pdf and which one to impose (All, MSR, VSR, TSR) scaler: callable Function to apply to the input. If given the input to the model will be a (1, None, 2) tensor where dim [:,:,0] is scaled When None, instead turn the x point into a (x, log(x)) pair photons: :py:class:`validphys.photon.compute.Photon` If given, gives the AddPhoton layer a function to compute a photon which will be added at the index 0 of the 14-size FK basis This same function will also be used to compute the MSR component for the photon Returns ------- pdf_model: MetaModel pdf model, with `single_replica_generator` attached as an attribute """ shared_config = { "flav_info": flav_info, "fitbasis": fitbasis, "output_size": out, "impose_sumrule": impose_sumrule, "scaler": scaler, "photons": photons, } pdf_model = _pdfNN_layer_generator(replicas_settings, **shared_config) # Note that the photons are passed unchanged to the single replica generator # computing the photon requires running fiatlux which takes 30' per replica # and so at the moment parallel photons are disabled with a check in checks.py # In order to enable it `single_replica_generator` must take the index of the replica # to select the appropiate photon as all of them will be computed and fixed before the fit def single_replica_generator(replica_idx=0): """Generate one single replica from the entire batch. The select index is relative to the batch, not the entire PDF determination. This function is necessary to separate all the different models after training. """ settings = replicas_settings[replica_idx] # TODO: # In principle we want to recover the initial replica exactly, # however, for the regression tests to pass # _in the polarized case and only in the polarized case_ this line is necessary # it most likely has to do with numerical precision, but panicking might be in order settings.seed = 0 return _pdfNN_layer_generator([settings], **shared_config, replica_axis=False) pdf_model.single_replica_generator = single_replica_generator return pdf_model
def _pdfNN_layer_generator( replicas_settings: list[ReplicaSettings], flav_info: dict = None, fitbasis: str = "NN31IC", output_size: int = 14, impose_sumrule: str = None, scaler: Callable = None, photons: Photon = None, replica_axis: bool = True, ): """ Generates the PDF model which takes as input a point in x (from 0 to 1) and outputs a basis of 14 PDFs. It generates the preprocessing of the x into a set (x, log(x)), the arbitrary NN to fit the form of the PDF and the preprocessing factor. The funtional form of the output of this function is of: f_{i}(x) = R_{ji} N(x)_{j} * x^{1-alpha_{j}} * (1-x)^{beta_{j}} Where i goes from 1 to 14 while j goes from 1 to the size of the basis. R_{ji} is the rotation from the fitting basis to the physical basis needed for the convolution with the fktables. `layer_type` defines the architecture of the Neural Network, currently the following two options are implemented: - `dense` it will generate a densely connected networks where all nodes of layer n are connected with all nodes of layer n+1 (and n-1), the output layer is of the size of the last item in the `nodes` list (usually 8) - `dense_per_flavour` similar to `dense` but the nodes are disconnected in a per-flavour basis This means at the end of the PDF (and before the preprocessing) we have 8 (or whatever number) nodes just as before, but from the input to the output the nodes are disconnected This is a complicated function comprised of several sections: 1. As initialization a number of checks are carried outs to ensure the number of layers and the number of activation functions match up. 2. The neural network NN(x) is constructed depending on the requested layer_type 3. Break the input into (x, log(x)) and passes down to the NN. The output of the NN in the previous step is a list of independent layers. A function is constructed that joins all those layers. The function takes a tensor as the input and applies all layers for NN in order. 4. Create a preprocessing factor layer (that takes as input the same tensor x as the NN) and multiply it to the NN. We have now: N(x)_{j} * x^{1-alpha_{j}} * (1-x)^{beta_{j}} 5. Create a rotation layer (that takes as input the previous step) and applies it so that we get f(x)_{i} Finally we output the final answer as well as the list of all generating functions in the model for easy usage within `n3fit`. Example ------- >>> import numpy as np >>> from n3fit.vpinterface import N3PDF >>> from n3fit.model_gen import _pdfNN_layer_generator, ReplicaSettings >>> from validphys.pdfgrids import xplotting_grid >>> rp = [ReplicaSettings(nodes = [8], activations=["linear"], seed=i) for i in [1,2]] >>> fake_fl = [{'fl' : i, 'largex' : [0,1], 'smallx': [1,2]} for i in ['u', 'ubar', 'd', 'dbar', 'c', 'g', 's', 'sbar']] >>> fake_x = np.linspace(1e-3,0.8,3).reshape(1,-1,1) >>> pdf_model = _pdfNN_layer_generator(rp, flav_info=fake_fl, fitbasis='FLAVOUR', impose_sumrule=False) >>> pdf_model(fake_x).shape TensorShape([1, 2, 3, 14]) # 1 batch, 2 replicas, 3 x points, 14 flavours Parameters ---------- replicas_settings: list(:py:class:`ReplicaSettings`) list of ``ReplicaSettings`` objects holding the settings of each of the replicas flav_info: dict dictionary containing the information about each PDF (basis dictionary in the runcard) to be used by Preprocessing fitbasis: str fitbasis used during the fit. Default: NN31IC output_size: int number of output flavours of the model (default 14) impose_sumrule: str whether to impose sumrules on the output pdf and which one to impose (All, MSR, VSR, TSR) scaler: callable Function to apply to the input. If given the input to the model will be a (1, None, 2) tensor where dim [:,:,0] is scaled When None, instead turn the x point into a (x, log(x)) pair photon: :py:class:`validphys.photon.compute.Photon` If given, gives the AddPhoton layer a function to compute a photon which will be added at the index 0 of the 14-size FK basis This same function will also be used to compute the MSR component for the photon replica_axis: bool Whether there is an explicit replica axis. True even with one replica for the main model, used internally for the single replica models. Returns ------- pdf_model: n3fit.backends.MetaModel a model f(x) = y where x is a tensor (1, xgrid, 1) and y a tensor (1, replicas, xgrid, out) """ all_seed = [i.seed for i in replicas_settings] num_replicas = len(replicas_settings) if impose_sumrule is None: impose_sumrule = "All" ## Process the input data (x grid) # There a currently two options: # 1. Append log(x) to the input # 2. Scale the input do_nothing = lambda x: x model_input = {} if scaler is None: # add log(x) use_feature_scaling = subtract_one = False # The PDF itself receives only x pdf_input_dimensions = 1 # But the NN will see (x, log(x)) nn_input_dimensions = 2 pdf_input = Input(shape=(None, pdf_input_dimensions), batch_size=1, name="pdf_input") process_input = Lambda(lambda x: op.concatenate([x, op.op_log(x)], axis=-1), name="x_logx") extract_original = do_nothing extract_nn_input = do_nothing else: use_feature_scaling = subtract_one = True # The NN will only receive x nn_input_dimensions = 1 # But the PDF itself will receive both (x, scaler(x)) pdf_input_dimensions = 2 pdf_input = Input(shape=(None, pdf_input_dimensions), batch_size=1, name="scaledx_x") process_input = do_nothing extract_nn_input = Lambda(lambda x: op.op_gather_keep_dims(x, 0, axis=-1), name="x_scaled") extract_original = Lambda(lambda x: op.op_gather_keep_dims(x, 1, axis=-1), name="pdf_input") if subtract_one: # TODO: make it its own option, even though now it only activates in the scaler if above input_x_eq_1 = [1.0] if use_feature_scaling: input_x_eq_1 = scaler(input_x_eq_1)[0] # the layer that subtracts 1 from the NN output subtract_one_layer = Lambda(op.op_subtract, name="subtract_one") layer_x_eq_1 = op.numpy_to_input(np.array(input_x_eq_1).reshape(1, 1), name="x_eq_1") model_input["layer_x_eq_1"] = layer_x_eq_1 model_input["pdf_input"] = pdf_input ## Create the actual NeuralNetwork PDF # loop over the settings for all replicas and generate a list of NN per replica # which will be then stack together and built into a single (input -> output) MetaModel # all PDFs _must_ share the same input layer x_input = Input(shape=(None, nn_input_dimensions), batch_size=1, name="NN_input") list_of_nn_pdfs = [] for i, replica_settings in enumerate(replicas_settings): rep_pdf = _generate_nn(x_input, i, **asdict(replica_settings)) # And build them all with the same input layer list_of_nn_pdfs.append(rep_pdf(x_input)) # Stack all replicas together as one single object nn_pdfs = Lambda(lambda nns: op.stack(nns, axis=1), name="stack_replicas")(list_of_nn_pdfs) nn_replicas = MetaModel({'NN_input': x_input}, nn_pdfs, name=NN_LAYER_ALL_REPLICAS) ## Preprocessing factors: # the layer that multiplies the NN output by the preprocessing factor # This includes # - x^{a}(1-x)^{b} # - NN(x) - N(1.0) apply_preprocessing_factor = Lambda(op.op_multiply, name="prefactor_times_NN") compute_preprocessing_factor = Preprocessing( flav_info=flav_info, name=PREPROCESSING_LAYER_ALL_REPLICAS, replica_seeds=all_seed, large_x=not subtract_one, ) # The NN subtracted by NN(1), if applicable, otherwise do nothing def nn_subtracted(x): NNs_x = nn_replicas(x) if subtract_one: x_eq_1_processed = process_input(layer_x_eq_1) NNs_x_1 = nn_replicas(x_eq_1_processed) NNs_x = subtract_one_layer([NNs_x, NNs_x_1]) return NNs_x ## Unnormalized PDF # updf_r(x) = FkRotation( NN_r(input(x)) * preprocessing_layer_r(x) ) # with _r: replica index # input: whatever processing is applied to the input # The preprocessing_layer and weights is specific to each replica # The final PDF will be in the 14 flavours evolution basis used in the FkTables # Basis rotation basis_rotation = FlavourToEvolution( flav_info=flav_info, fitbasis=fitbasis, name="pdf_evolution_basis" ) # Evolution layer layer_evln = FkRotation(output_dim=output_size, name="pdf_FK_basis") def compute_unnormalized_pdf(x): # Preprocess the input grid x_nn_input = extract_nn_input(x) x_processed = process_input(x_nn_input) x_original = extract_original(x) # Compute the neural network output NNs_x = nn_subtracted(x_processed) # Compute the preprocessing factor preprocessing_factors_x = compute_preprocessing_factor(x_original) # Apply the preprocessing factor pref_NNs_x = apply_preprocessing_factor([preprocessing_factors_x, NNs_x]) # Transform to FK basis, this is the full evolution basis # Rotate to the 9f evolution basis first before expanding up to 14f # TODO: make these two steps into one if not basis_rotation.is_identity(): pref_NNs_x = basis_rotation(pref_NNs_x) PDFs_unnormalized = layer_evln(pref_NNs_x) return PDFs_unnormalized PDFs_unnormalized = compute_unnormalized_pdf(pdf_input) ## Normalization and sum rules, produces normalized PDF # pdf_r(x) = updf_r(x) * Normalization(updf_r(integration_xgrid)) # The normalization layer is shared across replicas (but evaluated at each replica separately) # if impose_sumrule: sumrule_layer, integrator_input = generate_msr_model_and_grid( fitbasis=fitbasis, mode=impose_sumrule, scaler=scaler, replica_seeds=all_seed ) model_input["xgrid_integration"] = integrator_input # We need a second unnormalized PDF evaluated on the integrated grid PDFs_integration_grid = compute_unnormalized_pdf(integrator_input) # Photon contribution to the sum rule if photons: # add batch and flavor dimensions ph_tensor = op.numpy_to_tensor(photons.integral) photon_integrals = op.batchit(op.batchit(ph_tensor)) else: photon_integrals = op.numpy_to_tensor(np.zeros((1, num_replicas, 1))) PDFs = sumrule_layer( { "pdf_x": PDFs_unnormalized, "pdf_xgrid_integration": PDFs_integration_grid, "xgrid_integration": integrator_input, # The photon is treated separately, need to get its integrals to normalize the pdf "photon_integral": photon_integrals, } ) else: PDFs = PDFs_unnormalized sumrule_layer = lambda x: x ## Include the photon in the PDF for QED-enabled fits # (by default the entry corresponding to the photon is set to 0) if photons: layer_photon = AddPhoton(photons=photons, name="add_photon") PDFs = layer_photon(PDFs) # Return a PDF without a replica axis, to extract single replicas from an ensemble if not replica_axis: PDFs = Lambda(lambda pdfs: pdfs[:, 0], name="remove_replica_axis")(PDFs) return MetaModel(model_input, PDFs, name="PDFs", scaler=scaler) # TODO: is there a way of keeping sincronized the input of this function and ReplicaSettings # beyond a test of it? In principle we might want to have the arguments explicitly here... def _generate_nn( input_layer: Input, replica_idx: int = 0, seed: int = None, nodes: list[int] = None, activations: list[str] = None, architecture: str = "dense", initializer: str = None, dropout_rate: float = 0.0, regularizer: str = None, regularizer_args: dict = field(default_factory=dict), ) -> MetaModel: """ Create a Neural Network according to the input settings Parameters ---------- input_layer: :py:class:`n3fit.backends.Input` input layer of the replica replica_idx: int Index of the replica used to name the PDF All other arguments follow exactly the documentation of ``ReplicaSettings``. See :py:class:`n3fit.model_gen.ReplicaSettings` Returns ------- nn_pdf: MetaModel A single PDF NN model """ reg = regularizer_selector(regularizer, **regularizer_args) *hidden_layers, n_flavours = nodes # Preparatory step: prepare a ``layer_generator`` function to iteratively create all layers # TODO: create a factory of layers instead of an ugly function # this layer generator takes the index of the layer (useful for seeding) # the output nodes of the layer # and the activation function if architecture == "dense_per_flavour": # Reset the last node in the list to be 1, we will then # repeat it n-times nodes = hidden_layers + [1] def layer_generator(i_layer, nodes_out, activation): """Generate the ``i_layer``-th dense_per_flavour layer for all replicas.""" l_seed = int(seed + i_layer * n_flavours) initializers = [ MetaLayer.select_initializer(initializer, seed=l_seed + b) for b in range(n_flavours) ] layer = base_layer_selector( architecture, kernel_initializer=initializers, units=int(nodes_out), activation=activation, basis_size=n_flavours, ) return layer elif architecture == "dense": def layer_generator(i_layer, nodes_out, activation): kini = MetaLayer.select_initializer(initializer, seed=int(seed + i_layer)) return base_layer_selector( architecture, kernel_initializer=kini, units=nodes_out, activation=activation, regularizer=reg, ) else: raise ValueError(f"{architecture=} not recognized during model generation") # Use the previous layer generator to generate all layers previous_layer = input_layer for layer_idx, (nodes_out, activation) in enumerate(zip(nodes, activations)): layer = layer_generator(layer_idx, nodes_out, activation) # Apply the layer to the output of the previous one previous_layer = layer(previous_layer) # Add dropout if any to the second to last layer if dropout_rate > 0 and layer_idx == (len(hidden_layers) - 2): dropout_l = base_layer_selector("dropout", rate=dropout_rate) previous_layer = dropout_l(previous_layer) # In a dense-per-flavour, concatenate the last layer if architecture == "dense_per_flavour": concat = base_layer_selector("concatenate") previous_layer = concat(previous_layer) # Return the PDF model return MetaModel({"NN_input": input_layer}, previous_layer, name=f"{NN_PREFIX}_{replica_idx}")