This module contains example pipelines that use the Beam RunInference API.
Some examples are also used in our benchmarks.
You must have the latest (possibly unreleased) apache-beam
or greater installed from the Beam repo in order to run these pipelines,
because some examples rely on the latest features that are actively in development. To install Beam, run the following from the sdks/python
directory:
pip install -e .[gcp]
The following installation requirement is for the Tensorflow model handler examples.
The RunInference API supports the Tensorflow framework. To use Tensorflow locally, first install tensorflow
.
pip install tensorflow==2.12.0
The following installation requirements are for the files used in these examples.
The RunInference API supports the PyTorch framework. To use PyTorch locally, first install torch
.
pip install torch==1.10.0
If you are using pretrained models from Pytorch's torchvision.models
subpackage, you also need to install torchvision
.
pip install torchvision
If you are using pretrained models from Hugging Face's transformers
package, you also need to install transformers
.
pip install transformers
For installation of the torch
dependency on a distributed runner such as Dataflow, refer to the
PyPI dependency instructions.
The following installation requirement is for the Hugging Face model handler examples.
The RunInference API supports loading models from the Hugging Face Hub. To use it, first install transformers
.
pip install transformers==4.30.0
Additional dependicies for PyTorch and TensorFlow may need to be installed separately:
pip install tensorflow==2.12.0
pip install torch==1.10.0
The RunInference API supports TensorRT SDK for high-performance deep learning inference with NVIDIA GPUs. To use TensorRT locally, we suggest an environment with TensorRT >= 8.0.1. Install TensorRT as per the TensorRT Install Guide. You will need to make sure the Python bindings for TensorRT are also installed correctly, these are available by installing the python3-libnvinfer and python3-libnvinfer-dev packages on your TensorRT download.
If you would like to use Docker, you can use an NGC image like:
docker pull nvcr.io/nvidia/tensorrt:22.04-py3
as an existing container base to build custom Apache Beam container.
The RunInference API supports ONNX runtime for accelerated inference. To use ONNX, we suggest installing the following dependencies:
pip install onnxruntime==1.13.1
The onnxruntime dependency is sufficient if you already have a model in onnx format. This library also supports conversion from PyTorch models to ONNX. If you need to convert TensorFlow models into ONNX, please install:
pip install tf2onnx==1.13.0
If you need to convert sklearn models into ONNX, please install:
pip install skl2onnx
For more information, see the About Beam ML and the RunInference transform documentation.
pytorch_image_classification.py
contains an implementation for a RunInference pipeline that performs image classification using the mobilenet_v2
architecture.
The pipeline reads the images, performs basic preprocessing, passes the images to the PyTorch implementation of RunInference, and then writes the predictions to a text file.
To use this transform, you need a dataset and model for image classification.
- Create a directory named
IMAGES_DIR
. Create or download images and put them in this directory. The directory is not required if image names in the input fileIMAGE_FILE_NAMES.txt
you create in step 2 have absolute paths. One popular dataset is from ImageNet. Follow their instructions to download the images. - Create a file named
IMAGE_FILE_NAMES.txt
that contains the absolute paths of each of the images inIMAGES_DIR
that you want to use to run image classification. The path to the file can be different types of URIs such as your local file system, an AWS S3 bucket, or a GCP Cloud Storage bucket. For example:
/absolute/path/to/image1.jpg
/absolute/path/to/image2.jpg
- Download the mobilenet_v2 model from Pytorch's repository of pretrained models. This model requires the torchvision library. To download this model, run the following commands from a Python shell:
import torch
from torchvision.models import mobilenet_v2
model = mobilenet_v2(pretrained=True)
torch.save(model.state_dict(), 'mobilenet_v2.pth') # You can replace mobilenet_v2.pth with your preferred file name for your model state dictionary.
To run the image classification pipeline locally, use the following command:
python -m apache_beam.examples.inference.pytorch_image_classification \
--input IMAGE_FILE_NAMES \
--images_dir IMAGES_DIR \
--output OUTPUT \
--model_state_dict_path MODEL_STATE_DICT
images_dir
is only needed if your IMAGE_FILE_NAMES.txt
file contains relative paths (they will be relative from IMAGES_DIR
).
For example, if you've followed the naming conventions recommended above:
python -m apache_beam.examples.inference.pytorch_image_classification \
--input IMAGE_FILE_NAMES.txt \
--output predictions.csv \
--model_state_dict_path mobilenet_v2.pth
This writes the output to the predictions.csv
with contents like:
/absolute/path/to/image1.jpg;1
/absolute/path/to/image2.jpg;333
...
Each image path is paired with a value representing the Imagenet class that returned the highest confidence score out of Imagenet's 1000 classes.
pytorch_image_segmentation.py
contains an implementation for a RunInference pipeline that performs image segementation using the maskrcnn_resnet50_fpn
architecture.
The pipeline reads images, performs basic preprocessing, passes the images to the PyTorch implementation of RunInference, and then writes predictions to a text file.
To use this transform, you need a dataset and model for image segmentation.
- Create a directory named
IMAGES_DIR
. Create or download images and put them in this directory. The directory is not required if image names in the input fileIMAGE_FILE_NAMES.txt
you create in step 2 have absolute paths. A popular dataset is from Coco. Follow their instructions to download the images. - Create a file named
IMAGE_FILE_NAMES.txt
that contains the absolute paths of each of the images inIMAGES_DIR
that you want to use to run image segmentation. The path to the file can be different types of URIs such as your local file system, an AWS S3 bucket, or a GCP Cloud Storage bucket. For example:
/absolute/path/to/image1.jpg
/absolute/path/to/image2.jpg
- Download the maskrcnn_resnet50_fpn model from Pytorch's repository of pretrained models. This model requires the torchvision library. To download this model, run the following commands from a Python shell:
import torch
from torchvision.models.detection import maskrcnn_resnet50_fpn
model = maskrcnn_resnet50_fpn(pretrained=True)
torch.save(model.state_dict(), 'maskrcnn_resnet50_fpn.pth') # You can replace maskrcnn_resnet50_fpn.pth with your preferred file name for your model state dictionary.
- Note the path to the
OUTPUT
file. This file is used by the pipeline to write the predictions.
To run the image segmentation pipeline locally, use the following command:
python -m apache_beam.examples.inference.pytorch_image_segmentation \
--input IMAGE_FILE_NAMES \
--images_dir IMAGES_DIR \
--output OUTPUT \
--model_state_dict_path MODEL_STATE_DICT
images_dir
is only needed if your IMAGE_FILE_NAMES.txt
file contains relative paths (they will be relative from IMAGES_DIR
).
For example, if you've followed the naming conventions recommended above:
python -m apache_beam.examples.inference.pytorch_image_segmentation \
--input IMAGE_FILE_NAMES.txt \
--output predictions.csv \
--model_state_dict_path maskrcnn_resnet50_fpn.pth
This writes the output to the predictions.csv
with contents like:
/absolute/path/to/image1.jpg;['parking meter', 'bottle', 'person', 'traffic light', 'traffic light', 'traffic light']
/absolute/path/to/image2.jpg;['bottle', 'person', 'person']
...
Each line has data separated by a semicolon ";". The first item is the file name. The second item is a list of predicted instances.
pytorch_model_per_key_image_segmentation.py
contains an implementation for a RunInference pipeline that performs image segementation using multiple different trained models based on the maskrcnn_resnet50_fpn
architecture.
The pipeline reads images, performs basic preprocessing, passes the images to the PyTorch implementation of RunInference, and then writes predictions to a text file.
To use this transform, you need a dataset and model for image segmentation. If you've already done the previous example (Image segmentation with pytorch_image_segmentation.py you can reuse the results from some of those setup steps).
- Create a directory named
IMAGES_DIR
. Create or download images and put them in this directory. The directory is not required if image names in the input fileIMAGE_FILE_NAMES.txt
you create in step 2 have absolute paths. A popular dataset is from Coco. Follow their instructions to download the images. - Create a file named
IMAGE_FILE_NAMES.txt
that contains the absolute paths of each of the images inIMAGES_DIR
that you want to use to run image segmentation. The path to the file can be different types of URIs such as your local file system, an AWS S3 bucket, or a GCP Cloud Storage bucket. For example:
/absolute/path/to/image1.jpg
/absolute/path/to/image2.jpg
- Download the maskrcnn_resnet50_fpn and maskrcnn_resnet50_fpn_v2 models from Pytorch's repository of pretrained models. These models require the torchvision library. To download this model, run the following commands from a Python shell:
import torch
from torchvision.models.detection import maskrcnn_resnet50_fpn
from torchvision.models.detection import maskrcnn_resnet50_fpn_v2
model = maskrcnn_resnet50_fpn(pretrained=True)
torch.save(model.state_dict(), 'maskrcnn_resnet50_fpn.pth') # You can replace maskrcnn_resnet50_fpn.pth with your preferred file name for your model state dictionary.
model = maskrcnn_resnet50_fpn_v2(pretrained=True)
torch.save(model.state_dict(), 'maskrcnn_resnet50_fpn_v2.pth') # You can replace maskrcnn_resnet50_fpn_v2.pth with your preferred file name for your model state dictionary.
- Note a path to an
OUTPUT
file that can be used by the pipeline to write the predictions.
To run the image segmentation pipeline locally, use the following command:
python -m apache_beam.examples.inference.pytorch_model_per_key_image_segmentation \
--input IMAGE_FILE_NAMES \
--images_dir IMAGES_DIR \
--output OUTPUT \
--model_state_dict_paths MODEL_STATE_DICT1,MODEL_STATE_DICT2
images_dir
is only needed if your IMAGE_FILE_NAMES.txt
file contains relative paths (they will be relative from IMAGES_DIR
).
For example, if you've followed the naming conventions recommended above:
python -m apache_beam.examples.inference.pytorch_model_per_key_image_segmentation \
--input IMAGE_FILE_NAMES.txt \
--output predictions.csv \
--model_state_dict_path 'maskrcnn_resnet50_fpn.pth,maskrcnn_resnet50_fpn_v2.pth'
This writes the output to the predictions.csv
with contents like:
/Users/dannymccormick/Downloads/images/datasets_coco_raw-data_val2017_000000000139.jpg --- v1 predictions: ['chair', 'tv','potted plant'] --- v2 predictions: ['motorcycle', 'frisbee', 'couch']
...
Each image has 2 pieces of associated data - v1 predictions
and v2 predictions
corresponding to the version of the model that was used for segmentation.
tensorrt_object_detection.py
contains an implementation for a RunInference pipeline that performs object detection using Tensorflow Object Detection's SSD MobileNet v2 320x320 architecture.
The pipeline reads the images, performs basic preprocessing, passes them to the TensorRT implementation of RunInference, and then writes the predictions to a text file.
You will need to create or download images, and place them into your IMAGES_DIR
directory. Popular dataset for such task is COCO dataset. COCO validation dataset can be obtained here.
- Required: A path to a file called
IMAGE_FILE_NAMES
that contains the absolute paths of each of the images inIMAGES_DIR
on which you want to run image segmentation. Paths can be different types of URIs such as your local file system, a AWS S3 bucket or GCP Cloud Storage bucket. For example:
/absolute/path/to/000000000139.jpg
/absolute/path/to/000000289594.jpg
-
Required: A path to a file called
TRT_ENGINE
that contains the pre-built TensorRT engine from SSD MobileNet v2 320x320 model. You will need to follow instructions on how to download and convert this SSD model into TensorRT engine. At Create ONNX Graph step, keep batch size at 1. As soon as you are done with Build TensorRT Engine step. You can use resulted engine asTRT_ENGINE
input. In addition, make sure that environment you use for TensorRT engine creation is the same environment you use to run TensorRT inference. It is related not only to TensorRT version, but also to a specific GPU used. Read more about it here. -
Required: A path to a file called
OUTPUT
, to which the pipeline will write the predictions. -
Optional:
IMAGES_DIR
, which is the path to the directory where images are stored. Not required if image names in the input fileIMAGE_FILE_NAMES
have absolute paths.
To run the image classification pipeline locally, use the following command:
python -m apache_beam.examples.inference.tensorrt_object_detection \
--input IMAGE_FILE_NAMES \
--images_dir IMAGES_DIR \
--output OUTPUT \
--engine_path TRT_ENGINE
For example:
python -m apache_beam.examples.inference.tensorrt_object_detection \
--input image_file_names.txt \
--output predictions.csv \
--engine_path ssd_mobilenet_v2_320x320_coco17_tpu-8.trt
This writes the output to the predictions.csv
with contents like:
/absolute/path/to/000000000139.jpg;[{'ymin': '217.31875205039978' 'xmin': '295.93122482299805' 'ymax': '315.90323209762573' 'xmax': '357.8959655761719' 'score': '0.72342616' 'class': 'chair'} {'ymin': '166.81788557767868'.....
/absolute/path/to/000000289594.jpg;[{'ymin': '227.25109100341797' 'xmin': '331.7402381300926' 'ymax': '476.88533782958984' 'xmax': '402.2928895354271' 'score': '0.77217317' 'class': 'person'} {'ymin': '231.8712615966797' 'xmin': '292.8590789437294'.....
...
Each line has data separated by a semicolon ";". The first item is the file name. The second item is a list of dictionaries, where each dictionary corresponds with a single detection. A detection contains: box coordinates (ymin, xmin, ymax, xmax); score; and class.
pytorch_language_modeling.py
contains an implementation for a RunInference pipeline that performs masked language modeling (that is, decoding a masked token in a sentence) using the BertForMaskedLM
architecture from Hugging Face.
The pipeline reads sentences, performs basic preprocessing to convert the last word into a [MASK]
token, passes the masked sentence to the PyTorch implementation of RunInference, and then writes the predictions to a text file.
To use this transform, you need a dataset and model for language modeling.
- Download the BertForMaskedLM model from Hugging Face's repository of pretrained models. You must already have
transformers
installed, then from a Python shell run:
import torch
from transformers import BertForMaskedLM
model = BertForMaskedLM.from_pretrained('bert-base-uncased', return_dict=True)
torch.save(model.state_dict(), 'BertForMaskedLM.pth') # You can replace BertForMaskedLM.pth with your preferred file name for your model state dictionary.
- (Optional) Create a file named
SENTENCES.txt
that contains sentences to feed into the model. The content of the file should be similar to the following example:
The capital of France is Paris .
He looked up and saw the sun and stars .
...
To run the language modeling pipeline locally, use the following command:
python -m apache_beam.examples.inference.pytorch_language_modeling \
--input SENTENCES \
--output OUTPUT \
--model_state_dict_path MODEL_STATE_DICT
The input
argument is optional. If none is provided, it will run the pipeline with some
example sentences.
For example, if you've followed the naming conventions recommended above:
python -m apache_beam.examples.inference.pytorch_language_modeling \
--input SENTENCES.txt \
--output predictions.csv \
--model_state_dict_path BertForMaskedLM.pth
Or, using the default example sentences:
python -m apache_beam.examples.inference.pytorch_language_modeling \
--output predictions.csv \
--model_state_dict_path BertForMaskedLM.pth
This writes the output to the predictions.csv
with contents like:
The capital of France is Paris .;paris
He looked up and saw the sun and stars .;moon
...
Each line has data separated by a semicolon ";". The first item is the input sentence. The model masks the last word and tries to predict it; the second item is the word that the model predicts for the mask.
sklearn_mnist_classification.py
contains an implementation for a RunInference pipeline that performs image classification on handwritten digits from the MNIST database.
The pipeline reads rows of pixels corresponding to a digit, performs basic preprocessing, passes the pixels to the Scikit-learn implementation of RunInference, and then writes the predictions to a text file.
To use this transform, you need a dataset and model for MNIST digit classification.
- Create a file named
INPUT.csv
that contains labels and pixels to feed into the model. Each row should have comma-separated elements. The first element is the label. All other elements are pixel values. The csv should not have column headers. The content of the file should be similar to the following example:
1,0,0,0...
0,0,0,0...
1,0,0,0...
4,0,0,0...
...
- Create a file named
MODEL_PATH
that contains the pickled file of a scikit-learn model trained on MNIST data. Please refer to this scikit-learn model persistence documentation on how to serialize models. - Update sklearn_examples_requirements.txt to match the version of sklearn used to train the model. Sklearn doesn't guarantee model compatability between versions.
To run the MNIST classification pipeline locally, use the following command:
python -m apache_beam.examples.inference.sklearn_mnist_classification.py \
--input INPUT \
--output OUTPUT \
--model_path MODEL_PATH
For example:
python -m apache_beam.examples.inference.sklearn_mnist_classification.py \
--input INPUT.csv \
--output predictions.txt \
--model_path mnist_model_svm.pickle
This writes the output to the predictions.txt
with contents like:
1,1
4,9
7,1
0,0
...
Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit.
Data for this example can be found at: https://www.kaggle.com/datasets/nishiodens/japan-real-estate-transaction-prices
Prebuilt sklearn pipelines are hosted at: https://storage.cloud.google.com/apache-beam-ml/models/japanese_housing/
Note: This example uses more than one model. Since not all features in an example are populated, a different model will be chosen based on available data.
For example, an example without distance to the nearest station will use a model that doesn't rely on that data.
To run locally, use the following command:
python -m apache_beam.examples.inference.sklearn_japanese_housing_regression.py \
--input_file INPUT \
--output OUTPUT \
--model_path MODEL_PATH
For example:
python -m apache_beam.examples.inference.sklearn_japanese_housing_regression.py \
--input_file housing_examples.csv \
--output predictions.txt \
--model_path https://storage.cloud.google.com/apache-beam-ml/models/japanese_housing/
This writes the output to the predictions.txt
with contents like:
True Price 40000000.0, Predicted Price 34645912.039208
True Price 34000000.0, Predicted Price 28648634.135857
True Price 31000000.0, Predicted Price 25654277.256461
...
onnx_sentiment_classification.py
contains an implementation for a RunInference pipeline that performs sentiment classification on movie reviews.
The pipeline reads rows of txt files corresponding to movie reviews, performs basic preprocessing, passes the pixels to the ONNX version of RoBERTa via RunInference, and then writes the predictions (0 for negative, 1 for positive) to a text file.
We assume you already have a trained model in onnx format. In our example, we use RoBERTa from https://github.com/SeldonIO/seldon-models/blob/master/pytorch/moviesentiment_roberta/pytorch-roberta-onnx.ipynb.
For input data, you can generate your own movie reviews (separated by line breaks) or use IMDB reviews online (https://ai.stanford.edu/~amaas/data/sentiment/).
The output will be a text file, with a binary label (0 for negative, 1 for positive) appended to the review, separated by a semicolon.
To run locally, you can use the following command:
python -m apache_beam.examples.inference.onnx_sentiment_classification.py \
--input_file [input file path] \
--output [output file path] \
--model_uri [path to onnx model]
This writes the output to the output file path with contents like:
A comedy-drama of nearly epic proportions rooted in a sincere performance by the title character undergoing midlife crisis .;1
tensorflow_mnist_classification.py
contains an implementation for a RunInference pipeline that performs image classification on handwritten digits from the MNIST database.
The pipeline reads rows of pixels corresponding to a digit, performs basic preprocessing(converts the input shape to 28x28), passes the pixels to the trained Tensorflow model with RunInference, and then writes the predictions to a text file.
To use this transform, you need a dataset and model for MNIST digit classification.
- Create a file named
INPUT.csv
that contains labels and pixels to feed into the model. Each row should have comma-separated elements. The first element is the label. All other elements are pixel values. The csv should not have column headers. The content of the file should be similar to the following example:
1,0,0,0...
0,0,0,0...
1,0,0,0...
4,0,0,0...
...
- Save the trained tensorflow model to a directory
MODEL_DIR
.
To run the MNIST classification pipeline locally, use the following command:
python -m apache_beam.examples.inference.tensorflow_mnist_classification.py \
--input INPUT \
--output OUTPUT \
--model_path MODEL_DIR
For example:
python -m apache_beam.examples.inference.tensorflow_mnist_classification.py \
--input INPUT.csv \
--output predictions.txt \
--model_path MODEL_DIR
This writes the output to the predictions.txt
with contents like:
1,1
4,4
0,0
7,7
3,3
5,5
...
Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit.
tensorflow_imagenet_segmentation.py
contains an implementation for a RunInference pipeline that performs image segementation using the mobilenet_v2
architecture from the tensorflow hub.
The pipeline reads images, performs basic preprocessing, passes the images to the Tensorflow implementation of RunInference, and then writes predictions to a text file.
To use this transform, you need a dataset and model for image segmentation.
- Create a directory named
IMAGE_DIR
. Create or download images and put them in this directory. We will use the example image on tensorflow. - Create a file named
IMAGE_FILE_NAMES.txt
that names of each of the images inIMAGE_DIR
that you want to use to run image segmentation. For example:
grace_hopper.jpg
- A tensorflow
MODEL_PATH
, we will use the mobilenet model. - Note the path to the
OUTPUT
file. This file is used by the pipeline to write the predictions. - Install TensorflowHub:
pip install tensorflow_hub
To run the image segmentation pipeline locally, use the following command:
python -m apache_beam.examples.inference.tensorflow_imagenet_segmentation \
--input IMAGE_FILE_NAMES \
--image_dir IMAGES_DIR \
--output OUTPUT \
--model_path MODEL_PATH
For example, if you've followed the naming conventions recommended above:
python -m apache_beam.examples.inference.tensorflow_imagenet_segmentation \
--input IMAGE_FILE_NAMES.txt \
--image_dir "https://storage.googleapis.com/download.tensorflow.org/example_images/" \
--output predictions.txt \
--model_path "https://tfhub.dev/google/tf2-preview/mobilenet_v2/classification/4"
This writes the output to the predictions.txt
with contents like:
background
...
Each line has a list of predicted label.
tensorflow_mnist_with_weights.py
contains an implementation for a RunInference pipeline that performs image classification on handwritten digits from the MNIST database.
The pipeline reads rows of pixels corresponding to a digit, performs basic preprocessing(converts the input shape to 28x28), passes the pixels to the trained Tensorflow model with RunInference, and then writes the predictions to a text file.
The model is loaded from the saved model weights. This can be done by passing a function which creates the model and setting the model type as
ModelType.SAVED_WEIGHTS
to the TFModelHandler
. The path to saved weights saved using model.save_weights(path)
should be passed to the model_path
argument.
To use this transform, you need a dataset and model for MNIST digit classification.
- Create a file named
INPUT.csv
that contains labels and pixels to feed into the model. Each row should have comma-separated elements. The first element is the label. All other elements are pixel values. The csv should not have column headers. The content of the file should be similar to the following example:
1,0,0,0...
0,0,0,0...
1,0,0,0...
4,0,0,0...
...
- Save the weights of trained tensorflow model to a directory
SAVED_WEIGHTS_DIR
.
To run the MNIST classification pipeline locally, use the following command:
python -m apache_beam.examples.inference.tensorflow_mnist_with_weights.py \
--input INPUT \
--output OUTPUT \
--model_path SAVED_WEIGHTS_DIR
For example:
python -m apache_beam.examples.inference.tensorflow_mnist_with_weights.py \
--input INPUT.csv \
--output predictions.txt \
--model_path SAVED_WEIGHTS_DIR
This writes the output to the predictions.txt
with contents like:
1,1
4,4
0,0
7,7
3,3
5,5
...
Each line has data separated by a comma ",". The first item is the actual label of the digit. The second item is the predicted label of the digit.
xgboost_iris_classification.py
contains an implementation for a RunInference pipeline that performs classification on tabular data from the Iris Dataset.
The pipeline reads rows that contain the features of a given iris. The features are Sepal Length, Sepal Width, Petal Length and Petal Width. The pipeline passes those features to the XGBoost implementation of RunInference which writes the iris type predictions to a text file.
To use this transform, you need to have sklearn installed. The dataset is loaded from using sklearn. The _train_model
function can be used to train a simple classifier. The function outputs it's configuration in a file that can be loaded by the XGBoostModelHandler
.
The following function allows you to train a simple classifier using the sklearn Iris dataset. The trained model will be saved in the location passed as a parameter and can then later be loaded in a pipeline using the XGBoostModelHandler
.
import xgboost
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
def _train_model(model_state_output_path: str = '/tmp/model.json', seed=999):
"""Function to train an XGBoost Classifier using the sklearn Iris dataset"""
dataset = load_iris()
x_train, _, y_train, _ = train_test_split(
dataset['data'], dataset['target'], test_size=.2, random_state=seed)
booster = xgboost.XGBClassifier(
n_estimators=2, max_depth=2, learning_rate=1, objective='binary:logistic')
booster.fit(x_train, y_train)
booster.save_model(model_state_output_path)
return booster
To run locally, use the following command:
python -m apache_beam.examples.inference.xgboost_iris_classification.py \
--input_type INPUT_TYPE \
--output OUTPUT_FILE \
-- model_state MODEL_STATE_JSON \
[--no_split|--split]
For example:
python -m apache_beam.examples.inference.xgboost_iris_classification.py \
--input_type numpy \
--output predictions.txt \
--model_state model_state.json \
--split
This writes the output to the predictions.txt
. Each line contains the batch number and a list with all outputted class labels. There are 3 possible values for class labels: 0
, 1
, and 2
. When each batch contains a single elements the output look like this:
0,[1]
1,[2]
2,[1]
3,[0]
...
When all elements are in a single batch the output looks like this:
0,[1 1 1 0 0 0 0 1 2 0 0 2 0 2 1 2 2 2 2 0 0 0 0 2 2 0 2 2 2 1]
milk_quality_prediction_windowing.py
contains an implementation of a windowing pipeline making use of the RunInference transform. An XGBoost classification the quality of milk based on measurements of pH, temperature, taste, odor, fat, turbidity and color. The model labels a measurement as bad
, medium
or good
. The model is trained on the Kaggle Milk Quality Prediction dataset.
The preprocess_data
function loads the Kaggle dataset from a csv file and splits it into a training and accompanying label set as well as a test set. In typical machine learning setting we would use the training set and the labels to train the model and the test set is used to calculate various metrics such as recall and precision. We will use the test set data in a test streaming pipeline to showcase the windowing capabilities.
The train_model
function allows you to train a simple XGBoost classifier using the Kaggle Milk Quality Prediction dataset. The trained model will be saved in JSON format at the location passed as a parameter and can then later be used for inference using by loading it via the XGBoostModelhandler.
python -m apache_beam.examples.inference.milk_quality_prediction_windowing.py \
--dataset \
<DATASET> \
--pipeline_input_data \
<INPUT_DATA> \
--training_set \
<TRAINING_SET> \
--labels \
<LABELS> \
--model_state \
<MODEL_STATE>
Where <DATASET>
is the path to a csv file containing the Kaggle Milk Quality prediction dataset, <INPUT_DATA>
a filepath to save the data that will be used as input for the streaming pipeline (test set), <TRAINING_SET>
a filepath to store the training set in csv format, <LABELS>
a filepath to store the csv containing the labels used to train the model and <MODEL_STATE>
the path to the JSON file containing the trained model.
<INPUT_DATA>
, <TRAINING_SET>
, and <LABELS>
will all be parsed from <DATASET>
and saved before pipeline execution.
Using the test set, we simulate a streaming pipeline that a receives a new measurement of the milk quality parameters every minute. A sliding window keeps track of the measurement of the last 30 minutes and new window starts every 5 minutes. The model predicts the quality of each measurement. After 30 minutes the results are aggregated in a tuple containing the number of measurements that were predicted as bad, medium and high quality samples. The output of each window looks as follows:
MilkQualityAggregation(bad_quality_measurements=10, medium_quality_measurements=13, high_quality_measurements=6)
MilkQualityAggregation(bad_quality_measurements=9, medium_quality_measurements=11, high_quality_measurements=4)
MilkQualityAggregation(bad_quality_measurements=8, medium_quality_measurements=7, high_quality_measurements=4)
MilkQualityAggregation(bad_quality_measurements=6, medium_quality_measurements=4, high_quality_measurements=4)
MilkQualityAggregation(bad_quality_measurements=3, medium_quality_measurements=3, high_quality_measurements=3)
MilkQualityAggregation(bad_quality_measurements=1, medium_quality_measurements=2, high_quality_measurements=1)
huggingface_language_modeling.py
contains an implementation for a RunInference pipeline that performs masked language modeling (that is, decoding a masked token in a sentence) using the AutoModelForMaskedLM
architecture from Hugging Face.
The pipeline reads sentences, performs basic preprocessing to convert the last word into a <mask>
token, passes the masked sentence to the Hugging Face implementation of RunInference, and then writes the predictions to a text file.
To use this transform, you need a dataset and model for language modeling.
- Choose a checkpoint to load from Hugging Face Hub, eg:MaskedLanguageModel.
- (Optional) Create a file named
SENTENCES.txt
that contains sentences to feed into the model. The content of the file should be similar to the following example:
The capital of France is Paris .
He looked up and saw the sun and stars .
...
To run the language modeling pipeline locally, use the following command:
python -m apache_beam.examples.inference.huggingface_language_modeling \
--input SENTENCES \
--output OUTPUT \
--model_name REPOSITORY_ID
The input
argument is optional. If none is provided, it will run the pipeline with some
example sentences.
For example, if you've followed the naming conventions recommended above:
python -m apache_beam.examples.inference.huggingface_language_modeling \
--input SENTENCES.txt \
--output predictions.csv \
--model_name "stevhliu/my_awesome_eli5_mlm_model"
Or, using the default example sentences:
python -m apache_beam.examples.inference.huggingface_language_modeling \
--output predictions.csv \
--model_name "stevhliu/my_awesome_eli5_mlm_model"
This writes the output to the predictions.csv
with contents like:
The capital of France is Paris .;paris
He looked up and saw the sun and stars .;moon
...
Each line has data separated by a semicolon ";". The first item is the input sentence. The model masks the last word and tries to predict it; the second item is the word that the model predicts for the mask.
vertex_ai_image_classification.py
contains an implementation for a RunInference pipeline that performs image classification using a model hosted on Vertex AI (based on https://cloud.google.com/vertex-ai/docs/tutorials/image-recognition-custom).
The pipeline reads image urls, performs basic preprocessing to convert them into a List of floats, passes the masked sentence to the Vertex AI implementation of RunInference, and then writes the predictions to a text file.
To use this transform, you need a dataset and model hosted on Vertex AI for image classification.
- Train a model by following the tutorial at https://cloud.google.com/vertex-ai/docs/tutorials/image-recognition-custom
- Create a file named
IMAGE_FILE_NAMES.txt
that contains the absolute paths of each of the images inIMAGES_DIR
that you want to use to run image classification. The path to the file can be different types of URIs such as your local file system, an AWS S3 bucket, or a GCP Cloud Storage bucket. For example:
/absolute/path/to/image1.jpg
/absolute/path/to/image2.jpg
To run the image classification pipeline locally, use the following command:
python -m apache_beam.examples.inference.vertex_ai_image_classification \
--endpoint_id '<endpoint of trained model>' \
--endpoint_project '<gcp project>' \
--endpoint_region '<gcp region>' \
--input 'path/to/IMAGE_FILE_NAMES.txt' \
--output 'path/to/output/file.txt'
This writes the output to the output file with contents like:
path/to/my/image: tulips (90)
path/to/my/image2: dandelions (78)
...
Each line represents a prediction of the flower type along with the confidence in that prediction.
vertex_ai_llm_text_classification.py
contains an implementation for a RunInference pipeline that performs image classification using a model hosted on Vertex AI (based on https://cloud.google.com/vertex-ai/docs/tutorials/image-recognition-custom).
The pipeline reads image urls, performs basic preprocessing to convert them into a List of floats, passes the masked sentence to the Vertex AI implementation of RunInference, and then writes the predictions to a text file.
To use this transform, you need a model hosted on Vertex AI for text classification. You can get this by tuning the text-bison model following the instructions here - https://cloud.google.com/vertex-ai/docs/generative-ai/models/tune-models#create_a_model_tuning_job
To run the text classification pipeline locally, use the following command:
python -m apache_beam.examples.inference.vertex_ai_llm_text_classification \
--endpoint_id '<endpoint of trained LLM>' \
--endpoint_project '<gcp project>' \
--endpoint_region '<gcp region>'
This writes the output to the output file with contents like:
('What is 5+2?', PredictionResult(example={'prompt': 'What is 5+2?'}, inference={'content': '7', 'citationMetadata': {'citations': []}, 'safetyAttributes': {'blocked': False, 'scores': [], 'categories': []}}, model_id='6795590989097467904'))
...
Each line represents a tuple containing the example, a PredictionResult object with the response from the model in the inference field, and the endpoint id representing the model id.
vllm_text_completion.py
contains an implementation for a RunInference pipeline that performs text completion using a local vLLM server.
The pipeline reads in a set of text prompts or past messages, uses RunInference to spin up a local inference server and perform inference, and then writes the predictions to a text file.
To use this transform, you can use any LLM supported by vLLM.
To run the text completion pipeline locally using the Facebook opt 125M model, use the following command.
python -m apache_beam.examples.inference.vllm_text_completion \
--model "facebook/opt-125m" \
--output 'path/to/output/file.txt' \
<... aditional pipeline arguments to configure runner if not running in GPU environment ...>
You will either need to run this locally with a GPU accelerator or remotely on a runner that supports acceleration. For example, you could run this on Dataflow with a GPU with the following command:
python -m apache_beam.examples.inference.vllm_text_completion \
--model "facebook/opt-125m" \
--output 'gs://path/to/output/file.txt' \
--runner dataflow \
--project <gcp project> \
--region us-central1 \
--temp_location <temp gcs location> \
--worker_harness_container_image "gcr.io/apache-beam-testing/beam-ml/vllm:latest" \
--machine_type "n1-standard-4" \
--dataflow_service_options "worker_accelerator=type:nvidia-tesla-t4;count:1;install-nvidia-driver:5xx" \
--staging_location <temp gcs location>
Make sure to enable the 5xx driver since vLLM only works with 5xx drivers, not 4xx.
This writes the output to the output file location with contents like:
'Hello, my name is', PredictionResult(example={'prompt': 'Hello, my name is'}, inference=Completion(id='cmpl-5f5113a317c949309582b1966511ffc4', choices=[CompletionChoice(finish_reason='length', index=0, logprobs=None, text=' Joel, my dad is Anton Harriman and my wife is Lydia. ', stop_reason=None)], created=1714064548, model='facebook/opt-125m', object='text_completion', system_fingerprint=None, usage=CompletionUsage(completion_tokens=16, prompt_tokens=6, total_tokens=22))})
Each line represents a tuple containing the example, a PredictionResult object with the response from the model in the inference field.
You can also choose to run with chat examples. Doing this requires 2 steps:
- Upload a chat_template to a filestore which is accessible from your job's environment (e.g. a public Google Cloud Storage bucket). You can copy this sample template to get started. You can skip this step if using a model other than
facebook/opt-125m
and you know your model provides a chat template. - Add the
--chat true
and--chat_template <gs://path/to/your/file>
parameters:
python -m apache_beam.examples.inference.vllm_text_completion \
--model "facebook/opt-125m" \
--output 'gs://path/to/output/file.txt' \
--chat true \
--chat_template gs://path/to/your/file \
<... aditional pipeline arguments to configure runner if not running in GPU environment ...>
This will configure the pipeline to run against a sequence of previous messages instead of a single text completion prompt. For example, it might run against:
[
OpenAIChatMessage(role='user', content='What is an example of a type of penguin?'),
OpenAIChatMessage(role='system', content='An emperor penguin is a type of penguin.'),
OpenAIChatMessage(role='user', content='Tell me about them')
],
and produce the following result in your output file location:
An emperor penguin is an adorable creature that lives in Antarctica.