Skip to content

Commit ba17484

Browse files
authored
Merge ac764ec into 78fce3b
2 parents 78fce3b + ac764ec commit ba17484

File tree

6 files changed

+169
-13
lines changed

6 files changed

+169
-13
lines changed

changelog/8560.improvement.md

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implement a new interface `run_inference` inside `RasaModel` which performs batch inferencing through tensorflow models.

rasa/core/policies/ted_policy.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -680,11 +680,11 @@ def predict_action_probabilities(
680680
tracker, domain, interpreter
681681
)
682682
model_data = self._create_model_data(tracker_state_features)
683-
output = self.model.rasa_predict(model_data)
683+
outputs = self.model.run_inference(model_data)
684684

685685
# take the last prediction in the sequence
686-
similarities = output["similarities"][:, -1, :]
687-
confidences = output["action_scores"][:, -1, :]
686+
similarities = outputs["similarities"][:, -1, :]
687+
confidences = outputs["action_scores"][:, -1, :]
688688
# take correct prediction from batch
689689
confidence, is_e2e_prediction = self._pick_confidence(
690690
confidences, similarities, domain
@@ -698,14 +698,14 @@ def predict_action_probabilities(
698698
)
699699

700700
optional_events = self._create_optional_event_for_entities(
701-
output, is_e2e_prediction, interpreter, tracker
701+
outputs, is_e2e_prediction, interpreter, tracker
702702
)
703703

704704
return self._prediction(
705705
confidence.tolist(),
706706
is_end_to_end_prediction=is_e2e_prediction,
707707
optional_events=optional_events,
708-
diagnostic_data=output.get(DIAGNOSTIC_DATA),
708+
diagnostic_data=outputs.get(DIAGNOSTIC_DATA),
709709
)
710710

711711
def _create_optional_event_for_entities(

rasa/nlu/classifiers/diet_classifier.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -875,7 +875,7 @@ def _predict(
875875

876876
# create session data from message and convert it into a batch of 1
877877
model_data = self._create_model_data([message], training=False)
878-
return self.model.rasa_predict(model_data)
878+
return self.model.run_inference(model_data)
879879

880880
def _predict_label(
881881
self, predict_out: Optional[Dict[Text, tf.Tensor]]

rasa/utils/tensorflow/models.py

+51-5
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
CONSTRAIN_SIMILARITIES,
3636
MODEL_CONFIDENCE,
3737
)
38+
import rasa.utils.train_utils
3839
from rasa.utils.tensorflow import layers
3940
from rasa.utils.tensorflow import rasa_layers
4041
from rasa.utils.tensorflow.temp_keras_modules import TmpKerasModel
@@ -47,6 +48,25 @@
4748
logger = logging.getLogger(__name__)
4849

4950

51+
def _merge_batch_outputs(
52+
all_outputs: Dict[Text, Union[np.ndarray, Dict[Text, np.ndarray]]],
53+
batch_output: Dict[Text, Union[np.ndarray, Dict[Text, np.ndarray]]],
54+
) -> Dict[Text, Union[np.ndarray, Dict[Text, np.ndarray]]]:
55+
if not all_outputs:
56+
return batch_output
57+
for key, val in batch_output.items():
58+
if isinstance(val, np.ndarray):
59+
all_outputs[key] = np.concatenate(
60+
[all_outputs[key], batch_output[key]], axis=0
61+
)
62+
63+
elif isinstance(val, dict):
64+
# recurse and merge the inner dict first
65+
all_outputs[key] = _merge_batch_outputs(all_outputs[key], val)
66+
67+
return all_outputs
68+
69+
5070
# noinspection PyMethodOverriding
5171
class RasaModel(TmpKerasModel):
5272
"""Abstract custom Keras model.
@@ -231,12 +251,12 @@ def _dynamic_signature(
231251
return [element_spec]
232252

233253
def rasa_predict(
234-
self, model_data: RasaModelData
254+
self, batch_in: Tuple[np.ndarray]
235255
) -> Dict[Text, Union[np.ndarray, Dict[Text, Any]]]:
236256
"""Custom prediction method that builds tf graph on the first call.
237257
238258
Args:
239-
model_data: The model data to use for prediction.
259+
batch_in: Prepared batch ready for input to predict_step method of model.
240260
241261
Return:
242262
Prediction output, including diagnostic data.
@@ -248,8 +268,6 @@ def rasa_predict(
248268
self.prepare_for_predict()
249269
self.prepared_for_prediction = True
250270

251-
batch_in = RasaBatchDataGenerator.prepare_batch(model_data.data)
252-
253271
if self._run_eagerly:
254272
outputs = tf_utils.to_numpy_or_python_type(self.predict_step(batch_in))
255273
outputs[DIAGNOSTIC_DATA] = self._empty_lists_to_none_in_dict(
@@ -268,6 +286,34 @@ def rasa_predict(
268286
)
269287
return outputs
270288

289+
def run_inference(
290+
self, model_data: RasaModelData, batch_size: Union[int, List[int]] = 1
291+
) -> Dict[Text, Union[np.ndarray, Dict[Text, Any]]]:
292+
"""Implements bulk inferencing through the model.
293+
294+
Args:
295+
model_data: Input data to be fed to the model.
296+
batch_size: Size of batches that the generator should create.
297+
298+
Returns:
299+
Model outputs corresponding to the inputs fed.
300+
"""
301+
outputs = {}
302+
(data_generator, _,) = rasa.utils.train_utils.create_data_generators(
303+
model_data=model_data, batch_sizes=batch_size, epochs=1, shuffle=False,
304+
)
305+
data_iterator = iter(data_generator)
306+
while True:
307+
try:
308+
# Only want x, since y is always None out of our data generators
309+
batch_in = next(data_iterator)[0]
310+
batch_out = self.rasa_predict(batch_in)
311+
outputs = _merge_batch_outputs(outputs, batch_out)
312+
except StopIteration:
313+
# Generator ran out of batches, time to finish inferencing
314+
break
315+
return outputs
316+
271317
@staticmethod
272318
def _empty_lists_to_none_in_dict(input_dict: Dict[Text, Any]) -> Dict[Text, Any]:
273319
"""Recursively replaces empty list or np array with None in a dictionary."""
@@ -339,7 +385,7 @@ def load(
339385
# predict on one data example to speed up prediction during inference
340386
# the first prediction always takes a bit longer to trace tf function
341387
if not finetune_mode and predict_data_example:
342-
model.rasa_predict(predict_data_example)
388+
model.run_inference(predict_data_example)
343389

344390
logger.debug("Finished loading the model.")
345391
return model

rasa/utils/train_utils.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,7 @@ def create_data_generators(
382382
batch_strategy: Text = SEQUENCE,
383383
eval_num_examples: int = 0,
384384
random_seed: Optional[int] = None,
385+
shuffle: bool = True,
385386
) -> Tuple[RasaBatchDataGenerator, Optional[RasaBatchDataGenerator]]:
386387
"""Create data generators for train and optional validation data.
387388
@@ -392,6 +393,7 @@ def create_data_generators(
392393
batch_strategy: The batch strategy to use.
393394
eval_num_examples: Number of examples to use for validation data.
394395
random_seed: The random seed.
396+
shuffle: Whether to shuffle data inside the data generator
395397
396398
Returns:
397399
The training data generator and optional validation data generator.
@@ -406,15 +408,15 @@ def create_data_generators(
406408
batch_size=batch_sizes,
407409
epochs=epochs,
408410
batch_strategy=batch_strategy,
409-
shuffle=True,
411+
shuffle=shuffle,
410412
)
411413

412414
data_generator = RasaBatchDataGenerator(
413415
model_data,
414416
batch_size=batch_sizes,
415417
epochs=epochs,
416418
batch_strategy=batch_strategy,
417-
shuffle=True,
419+
shuffle=shuffle,
418420
)
419421

420422
return data_generator, validation_data_generator

tests/utils/tensorflow/test_models.py

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import pytest
2+
from typing import Dict, Text, Union
3+
import numpy as np
4+
import tensorflow as tf
5+
6+
from rasa.utils.tensorflow.models import _merge_batch_outputs, RasaModel
7+
from rasa.utils.tensorflow.model_data import RasaModelData
8+
from rasa.shared.constants import DIAGNOSTIC_DATA
9+
from rasa.utils.tensorflow.model_data import FeatureArray
10+
11+
12+
@pytest.mark.parametrize(
13+
"existing_outputs, new_batch_outputs, expected_output",
14+
[
15+
(
16+
{"a": np.array([1, 2]), "b": np.array([3, 1])},
17+
{"a": np.array([5, 6]), "b": np.array([2, 4])},
18+
{"a": np.array([1, 2, 5, 6]), "b": np.array([3, 1, 2, 4])},
19+
),
20+
(
21+
{},
22+
{"a": np.array([5, 6]), "b": np.array([2, 4])},
23+
{"a": np.array([5, 6]), "b": np.array([2, 4])},
24+
),
25+
(
26+
{"a": np.array([1, 2]), "b": {"c": np.array([3, 1])}},
27+
{"a": np.array([5, 6]), "b": {"c": np.array([2, 4])}},
28+
{"a": np.array([1, 2, 5, 6]), "b": {"c": np.array([3, 1, 2, 4])}},
29+
),
30+
],
31+
)
32+
def test_merging_batch_outputs(
33+
existing_outputs: Dict[Text, Union[np.ndarray, Dict[Text, np.ndarray]]],
34+
new_batch_outputs: Dict[Text, Union[np.ndarray, Dict[Text, np.ndarray]]],
35+
expected_output: Dict[Text, Union[np.ndarray, Dict[Text, np.ndarray]]],
36+
):
37+
38+
predicted_output = _merge_batch_outputs(existing_outputs, new_batch_outputs)
39+
40+
def test_equal_dicts(
41+
dict1: Dict[Text, Union[np.ndarray, Dict[Text, np.ndarray]]],
42+
dict2: Dict[Text, Union[np.ndarray, Dict[Text, np.ndarray]]],
43+
):
44+
assert dict2.keys() == dict1.keys()
45+
for key in dict1:
46+
val_1 = dict1[key]
47+
val_2 = dict2[key]
48+
assert type(val_1) == type(val_2)
49+
50+
if isinstance(val_2, np.ndarray):
51+
assert np.array_equal(val_1, val_2)
52+
53+
elif isinstance(val_2, dict):
54+
test_equal_dicts(val_1, val_2)
55+
56+
test_equal_dicts(predicted_output, expected_output)
57+
58+
59+
@pytest.mark.parametrize(
60+
"batch_size, number_of_data_points, expected_number_of_batch_iterations",
61+
[(2, 3, 2), (1, 3, 3), (5, 3, 1),],
62+
)
63+
def test_batch_inference(
64+
batch_size: int,
65+
number_of_data_points: int,
66+
expected_number_of_batch_iterations: int,
67+
):
68+
model = RasaModel()
69+
70+
def batch_predict(batch_in: np.ndarray):
71+
72+
dummy_output = batch_in[0]
73+
output = {
74+
"dummy_output": dummy_output,
75+
DIAGNOSTIC_DATA: tf.constant(np.array([[1, 2]]), dtype=tf.int32),
76+
}
77+
return output
78+
79+
# Monkeypatch batch predict so that run_inference interface can be tested
80+
model.batch_predict = batch_predict
81+
82+
# Create dummy model data to pass to model
83+
model_data = RasaModelData(
84+
label_key="label",
85+
label_sub_key="ids",
86+
data={
87+
"text": {
88+
"sentence": [
89+
FeatureArray(
90+
np.random.rand(number_of_data_points, 2),
91+
number_of_dimensions=2,
92+
),
93+
]
94+
}
95+
},
96+
)
97+
output = model.run_inference(model_data, batch_size=batch_size)
98+
99+
# Firstly, the number of data points in dummy_output should be equal
100+
# to the number of data points sent as input.
101+
assert output["dummy_output"].shape[0] == number_of_data_points
102+
103+
# Secondly, the number of data points inside diagnostic_data should be
104+
# equal to the number of batches passed to the model because for every
105+
# batch passed as input, it would have created a
106+
# corresponding diagnostic data entry.
107+
assert output[DIAGNOSTIC_DATA].shape == (expected_number_of_batch_iterations, 2)

0 commit comments

Comments
 (0)