diff --git a/hls4ml/model/profiling.py b/hls4ml/model/profiling.py index 6b52426ccd..be76afc588 100644 --- a/hls4ml/model/profiling.py +++ b/hls4ml/model/profiling.py @@ -343,21 +343,22 @@ def activations_keras(model, X, fmt='longform', plot='boxplot'): # return summary statistics for matplotlib.axes.Axes.bxp # or histogram bin edges and heights data = [] - - for layer in model.layers: - print(f" {layer.name}") - if not isinstance(layer, keras.layers.InputLayer): - y = _get_output(layer, X, model.input).flatten() - y = abs(y[y != 0]) - if len(y) == 0: - print(f'Activations for {layer.name} are only zeros, ignoring.') - continue - if fmt == 'longform': - data['x'].extend(y.tolist()) - data['weight'].extend([layer.name for i in range(len(y))]) - elif fmt == 'summary': - data.append(array_to_summary(y, fmt=plot)) - data[-1]['weight'] = layer.name + outputs = _get_outputs( + [layer for layer in model.layers if not isinstance(layer, keras.layers.InputLayer)], X, model.input + ) + for layer_name, y in outputs.items(): + print(f" {layer_name}") + y = y.flatten() + y = abs(y[y != 0]) + if len(y) == 0: + print(f'Activations for {layer_name} are only zeros, ignoring.') + continue + if fmt == 'longform': + data['x'].extend(y.tolist()) + data['weight'].extend([layer_name for i in range(len(y))]) + elif fmt == 'summary': + data.append(array_to_summary(y, fmt=plot)) + data[-1]['weight'] = layer_name if fmt == 'longform': data = pandas.DataFrame(data) @@ -544,10 +545,10 @@ def _is_ignored_layer(layer): return False -def _get_output(layer, X, model_input): - """Get output of partial model""" - partial_model = keras.models.Model(inputs=model_input, outputs=layer.output) - y = partial_model.predict(X) +def _get_outputs(layers, X, model_input): + """Get outputs of intermediate layers""" + partial_models = keras.models.Model(inputs=model_input, outputs=[layer.output for layer in layers]) + y = partial_models.predict(X) return y @@ -562,37 +563,30 @@ def get_ymodel_keras(keras_model, X): Returns: dict: A dictionary in the form {"layer_name": ouput array of layer}. """ - ymodel = {} - + traced_layers = [] + layer_names = [] for layer in keras_model.layers: - print(f"Processing {layer.name} in Keras model...") - if not _is_ignored_layer(layer): - # If the layer has activation integrated then separate them - # Note that if the layer is a standalone activation layer then skip this - if hasattr(layer, 'activation') and not ( - isinstance(layer, keras.layers.Activation) or isinstance(layer, qkeras.qlayers.QActivation) - ): - if layer.activation: - if layer.activation.__class__.__name__ == "linear": - ymodel[layer.name] = _get_output(layer, X, keras_model.input) - - else: - temp_activation = layer.activation - layer.activation = None - # Get output for layer without activation - ymodel[layer.name] = _get_output(layer, X, keras_model.input) - - # Add the activation back - layer.activation = temp_activation - # Get ouput for activation - ymodel[layer.name + f"_{temp_activation.__class__.__name__}"] = _get_output( - layer, X, keras_model.input - ) - else: - ymodel[layer.name] = _get_output(layer, X, keras_model.input) - else: - ymodel[layer.name] = _get_output(layer, X, keras_model.input) + if _is_ignored_layer(layer): + continue + # If the layer has activation integrated then separate them + # Note that if the layer is a standalone activation layer then skip this + name = layer.name + if ( + hasattr(layer, "activation") + and layer.activation.__name__ != "linear" + and not isinstance(layer, (keras.layers.Activation, qkeras.qlayers.QActivation)) + ): + tmp_activation = layer.activation + layer.activation = None + ymodel.update({layer.name: _get_outputs([layer], X, keras_model.input)}) + layer.activation = tmp_activation + name = layer.name + f"_{tmp_activation.__name__}" + traced_layers.append(layer) + layer_names.append(name) + outputs = _get_outputs(traced_layers, X, keras_model.input) + for name, output in zip(layer_names, outputs): + ymodel[name] = output print("Done taking outputs for Keras model.") return ymodel diff --git a/test/pytest/test_trace.py b/test/pytest/test_trace.py index b0e7e3542f..14e218fd1c 100644 --- a/test/pytest/test_trace.py +++ b/test/pytest/test_trace.py @@ -12,7 +12,8 @@ @pytest.mark.parametrize('backend', ['Vivado', 'Vitis', 'Quartus']) -def test_trace(backend): +@pytest.mark.parametrize('activation', ['relu', None]) +def test_trace(backend, activation): '''Test the tracing feature with a simple Keras model.''' model = tf.keras.models.Sequential() model.add( @@ -20,6 +21,7 @@ def test_trace(backend): 2, input_shape=(1,), name='Dense', + activation=activation, use_bias=True, kernel_initializer=tf.keras.initializers.RandomUniform(minval=1, maxval=10), bias_initializer='zeros', @@ -48,6 +50,7 @@ def test_trace(backend): hls_model.compile() hls4ml_pred, hls4ml_trace = hls_model.trace(X_input) keras_trace = hls4ml.model.profiling.get_ymodel_keras(model, X_input) - - np.testing.assert_allclose(hls4ml_trace['Dense'], keras_trace['Dense'], rtol=1e-2, atol=0.01) + assert keras_trace.keys() == hls4ml_trace.keys() + for key in hls4ml_trace.keys(): + np.testing.assert_allclose(hls4ml_trace[key], keras_trace[key], rtol=1e-2, atol=0.01) np.testing.assert_allclose(hls4ml_pred, keras_prediction, rtol=1e-2, atol=0.01)