diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh
index 97d5d27b5cd..7709758345d 100755
--- a/.prow/scripts/test-end-to-end.sh
+++ b/.prow/scripts/test-end-to-end.sh
@@ -67,24 +67,24 @@ tail -n10 /var/log/kafka.log
kafkacat -b localhost:9092 -L
if [[ ${SKIP_BUILD_JARS} != "true" ]]; then
- echo "
- ============================================================
- Building jars for Feast
- ============================================================
- "
- .prow/scripts/download-maven-cache.sh \
- --archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \
- --output-dir /root/
- # Build jars for Feast
- mvn --quiet --batch-mode --define skipTests=true clean package
- ls -lh core/target/*jar
- ls -lh serving/target/*jar
- else
- echo "[DEBUG] Skipping building jars"
- fi
+echo "
+Building jars for Feast
+.prow/scripts/download-maven-cache.sh \
+ --archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \
+ --output-dir /root/
+# Build jars for Feast
+mvn --quiet --batch-mode --define skipTests=true clean package
+ls -lh core/target/*jar
+ls -lh serving/target/*jar
+ echo "[DEBUG] Skipping building jars"
echo "
diff --git a/ingestion/pom.xml b/ingestion/pom.xml
index c829674a64d..001da1a1453 100644
--- a/ingestion/pom.xml
+++ b/ingestion/pom.xml
@@ -248,5 +248,12 @@
+ org.apache.commons
+ commons-math3
+ 3.6.1
diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java
index 6afdd80dd72..c1bdcd5fd17 100644
--- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java
+++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java
@@ -26,6 +26,7 @@
/** Options passed to Beam to influence the job's execution environment */
public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions {
"JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format."
@@ -83,4 +84,13 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions,
int getStatsdPort();
void setStatsdPort(int StatsdPort);
+ @Description(
+ "Fixed window size in seconds (default 30) to apply before aggregation of numerical value of features"
+ + "and writing the aggregated value to StatsD. Refer to feast.ingestion.transform.metrics.WriteFeatureValueMetricsDoFn"
+ + "for details on the metric names and types.")
+ @Default.Integer(30)
+ int getWindowSizeInSecForFeatureValueMetric();
+ void setWindowSizeInSecForFeatureValueMetric(int seconds);
diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java
new file mode 100644
index 00000000000..8574d2414c3
--- /dev/null
+++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFn.java
@@ -0,0 +1,311 @@
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.transform.metrics;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_NAME_TAG_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_PROJECT_TAG_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_SET_VERSION_TAG_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.FEATURE_TAG_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.INGESTION_JOB_NAME_KEY;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.METRIC_PREFIX;
+import static feast.ingestion.transform.metrics.WriteRowMetricsDoFn.STORE_TAG_KEY;
+import com.google.auto.value.AutoValue;
+import com.timgroup.statsd.NonBlockingStatsDClient;
+import com.timgroup.statsd.StatsDClient;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.FieldProto.Field;
+import feast.types.ValueProto.Value;
+import java.util.ArrayList;
+import java.util.DoubleSummaryStatistics;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.apache.commons.math3.stat.descriptive.rank.Percentile;
+import org.slf4j.Logger;
+ * WriteFeatureValueMetricsDoFn accepts key value of FeatureSetRef(str) to FeatureRow(List) and
+ * writes a histogram of the numerical values of each feature to StatsD.
+ *
+ *
The histogram of the numerical values is represented as the following in StatsD:
+ *
+ *
+ * - gauge of feature_value_min
+ *
- gauge of feature_value_max
+ *
- gauge of feature_value_mean
+ *
- gauge of feature_value_percentile_50
+ *
- gauge of feature_value_percentile_90
+ *
- gauge of feature_value_percentile_95
+ *
+ *
+ * StatsD timing/histogram metric type is not used since it does not support negative values.
+ */
+public abstract class WriteFeatureValueMetricsDoFn
+ extends DoFn>, Void> {
+ abstract String getStoreName();
+ abstract String getStatsdHost();
+ abstract int getStatsdPort();
+ static Builder newBuilder() {
+ return new AutoValue_WriteFeatureValueMetricsDoFn.Builder();
+ }
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setStoreName(String storeName);
+ abstract Builder setStatsdHost(String statsdHost);
+ abstract Builder setStatsdPort(int statsdPort);
+ abstract WriteFeatureValueMetricsDoFn build();
+ }
+ private static final Logger log =
+ org.slf4j.LoggerFactory.getLogger(WriteFeatureValueMetricsDoFn.class);
+ private StatsDClient statsDClient;
+ public static String GAUGE_NAME_FEATURE_VALUE_MIN = "feature_value_min";
+ public static String GAUGE_NAME_FEATURE_VALUE_MAX = "feature_value_max";
+ public static String GAUGE_NAME_FEATURE_VALUE_MEAN = "feature_value_mean";
+ public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50 = "feature_value_percentile_50";
+ public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90 = "feature_value_percentile_90";
+ public static String GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95 = "feature_value_percentile_95";
+ @Setup
+ public void setup() {
+ // Note that exception may be thrown during StatsD client instantiation but no exception
+ // will be thrown when sending metrics (mimicking the UDP protocol behaviour).
+ // https://jar-download.com/artifacts/com.datadoghq/java-dogstatsd-client/2.1.1/documentation
+ // https://github.com/DataDog/java-dogstatsd-client#unix-domain-socket-support
+ try {
+ statsDClient = new NonBlockingStatsDClient(METRIC_PREFIX, getStatsdHost(), getStatsdPort());
+ } catch (Exception e) {
+ log.error("StatsD client cannot be started: " + e.getMessage());
+ }
+ }
+ @Teardown
+ public void tearDown() {
+ if (statsDClient != null) {
+ statsDClient.close();
+ }
+ }
+ @ProcessElement
+ public void processElement(
+ ProcessContext context,
+ @Element KV> featureSetRefToFeatureRows) {
+ if (statsDClient == null) {
+ return;
+ }
+ String featureSetRef = featureSetRefToFeatureRows.getKey();
+ if (featureSetRef == null) {
+ return;
+ }
+ String[] colonSplits = featureSetRef.split(":");
+ if (colonSplits.length != 2) {
+ log.error(
+ "Skip writing feature value metrics because the feature set reference '{}' does not"
+ + "follow the required format /:",
+ featureSetRef);
+ return;
+ }
+ String[] slashSplits = colonSplits[0].split("/");
+ if (slashSplits.length != 2) {
+ log.error(
+ "Skip writing feature value metrics because the feature set reference '{}' does not"
+ + "follow the required format /:",
+ featureSetRef);
+ return;
+ }
+ String projectName = slashSplits[0];
+ String featureSetName = slashSplits[1];
+ String version = colonSplits[1];
+ Map featureNameToStats = new HashMap<>();
+ Map> featureNameToValues = new HashMap<>();
+ for (FeatureRow featureRow : featureSetRefToFeatureRows.getValue()) {
+ for (Field field : featureRow.getFieldsList()) {
+ updateStats(featureNameToStats, featureNameToValues, field);
+ }
+ }
+ for (Entry entry : featureNameToStats.entrySet()) {
+ String featureName = entry.getKey();
+ DoubleSummaryStatistics stats = entry.getValue();
+ String[] tags = {
+ STORE_TAG_KEY + ":" + getStoreName(),
+ FEATURE_SET_PROJECT_TAG_KEY + ":" + projectName,
+ FEATURE_SET_NAME_TAG_KEY + ":" + featureSetName,
+ FEATURE_SET_VERSION_TAG_KEY + ":" + version,
+ FEATURE_TAG_KEY + ":" + featureName,
+ INGESTION_JOB_NAME_KEY + ":" + context.getPipelineOptions().getJobName()
+ };
+ // stats can return non finite values when there is no element
+ // or there is an element that is not a number. Metric should only be sent for finite values.
+ if (Double.isFinite(stats.getMin())) {
+ if (stats.getMin() < 0) {
+ // StatsD gauge will asssign a delta instead of the actual value, if there is a sign in
+ // the value. E.g. if the value is negative, a delta will be assigned. For this reason,
+ // the gauge value is set to zero beforehand.
+ // https://github.com/statsd/statsd/blob/master/docs/metric_types.md#gauges
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MIN, stats.getMin(), tags);
+ }
+ if (Double.isFinite(stats.getMax())) {
+ if (stats.getMax() < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MAX, stats.getMax(), tags);
+ }
+ if (Double.isFinite(stats.getAverage())) {
+ if (stats.getAverage() < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_MEAN, stats.getAverage(), tags);
+ }
+ // For percentile calculation, Percentile class from commons-math3 from Apache is used.
+ // Percentile requires double[], hence the conversion below.
+ if (!featureNameToValues.containsKey(featureName)) {
+ continue;
+ }
+ List valueList = featureNameToValues.get(featureName);
+ if (valueList == null || valueList.size() < 1) {
+ continue;
+ }
+ double[] values = new double[valueList.size()];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = valueList.get(i);
+ }
+ double p50 = new Percentile().evaluate(values, 50);
+ if (p50 < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_50, p50, tags);
+ double p90 = new Percentile().evaluate(values, 90);
+ if (p90 < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_90, p90, tags);
+ double p95 = new Percentile().evaluate(values, 95);
+ if (p95 < 0) {
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, 0, tags);
+ }
+ statsDClient.gauge(GAUGE_NAME_FEATURE_VALUE_PERCENTILE_95, p95, tags);
+ }
+ }
+ // Update stats and values array for the feature represented by the field.
+ // If the field contains non-numerical or non-boolean value, the stats and values array
+ // won't get updated because we are only concerned with numerical value in metrics data.
+ // For boolean value, true and false are treated as numerical value of 1 of 0 respectively.
+ private void updateStats(
+ Map featureNameToStats,
+ Map> featureNameToValues,
+ Field field) {
+ if (featureNameToStats == null || featureNameToValues == null || field == null) {
+ return;
+ }
+ String featureName = field.getName();
+ if (!featureNameToStats.containsKey(featureName)) {
+ featureNameToStats.put(featureName, new DoubleSummaryStatistics());
+ }
+ if (!featureNameToValues.containsKey(featureName)) {
+ featureNameToValues.put(featureName, new ArrayList<>());
+ }
+ Value value = field.getValue();
+ DoubleSummaryStatistics stats = featureNameToStats.get(featureName);
+ List values = featureNameToValues.get(featureName);
+ switch (value.getValCase()) {
+ case INT32_VAL:
+ stats.accept(value.getInt32Val());
+ values.add(((double) value.getInt32Val()));
+ break;
+ case INT64_VAL:
+ stats.accept(value.getInt64Val());
+ values.add((double) value.getInt64Val());
+ break;
+ case DOUBLE_VAL:
+ stats.accept(value.getDoubleVal());
+ values.add(value.getDoubleVal());
+ break;
+ case FLOAT_VAL:
+ stats.accept(value.getFloatVal());
+ values.add((double) value.getFloatVal());
+ break;
+ case BOOL_VAL:
+ stats.accept(value.getBoolVal() ? 1 : 0);
+ values.add(value.getBoolVal() ? 1d : 0d);
+ break;
+ case INT32_LIST_VAL:
+ for (Integer val : value.getInt32ListVal().getValList()) {
+ stats.accept(val);
+ values.add(((double) val));
+ }
+ break;
+ case INT64_LIST_VAL:
+ for (Long val : value.getInt64ListVal().getValList()) {
+ stats.accept(val);
+ values.add(((double) val));
+ }
+ break;
+ for (Double val : value.getDoubleListVal().getValList()) {
+ stats.accept(val);
+ values.add(val);
+ }
+ break;
+ for (Float val : value.getFloatListVal().getValList()) {
+ stats.accept(val);
+ values.add(((double) val));
+ }
+ break;
+ for (Boolean val : value.getBoolListVal().getValList()) {
+ stats.accept(val ? 1 : 0);
+ values.add(val ? 1d : 0d);
+ }
+ break;
+ case BYTES_VAL:
+ case STRING_VAL:
+ case VAL_NOT_SET:
+ default:
+ }
+ }
diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java
index 43f314aa861..10322ac812f 100644
--- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java
+++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteMetricsTransform.java
@@ -21,11 +21,16 @@
import feast.ingestion.values.FailedElement;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
public abstract class WriteMetricsTransform extends PTransform {
@@ -79,6 +84,42 @@ public PDone expand(PCollectionTuple input) {
+ // 1. Apply a fixed window
+ // 2. Group feature row by feature set reference
+ // 3. Calculate min, max, mean, percentiles of numerical values of features in the window
+ // and
+ // 4. Send the aggregate value to StatsD metric collector.
+ //
+ // NOTE: window is applied here so the metric collector will not be overwhelmed with
+ // metrics data. And for metric data, only statistic of the values are usually required
+ // vs the actual values.
+ input
+ .get(getSuccessTag())
+ .apply(
+ "FixedWindow",
+ Window.into(
+ FixedWindows.of(
+ Duration.standardSeconds(
+ options.getWindowSizeInSecForFeatureValueMetric()))))
+ .apply(
+ "ConvertTo_FeatureSetRefToFeatureRow",
+ ParDo.of(
+ new DoFn>() {
+ @ProcessElement
+ public void processElement(ProcessContext c, @Element FeatureRow featureRow) {
+ c.output(KV.of(featureRow.getFeatureSet(), featureRow));
+ }
+ }))
+ .apply("GroupByFeatureSetRef", GroupByKey.create())
+ .apply(
+ "WriteFeatureValueMetrics",
+ ParDo.of(
+ WriteFeatureValueMetricsDoFn.newBuilder()
+ .setStatsdHost(options.getStatsdHost())
+ .setStatsdPort(options.getStatsdPort())
+ .setStoreName(getStoreName())
+ .build()));
return PDone.in(input.getPipeline());
case "none":
diff --git a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java
index db2d1acd6d8..2cd1ee94ecc 100644
--- a/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java
+++ b/ingestion/src/main/java/feast/ingestion/transform/metrics/WriteRowMetricsDoFn.java
@@ -31,13 +31,13 @@ public abstract class WriteRowMetricsDoFn extends DoFn {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(WriteRowMetricsDoFn.class);
- private final String METRIC_PREFIX = "feast_ingestion";
- private final String STORE_TAG_KEY = "feast_store";
- private final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name";
- private final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name";
- private final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version";
- private final String FEATURE_TAG_KEY = "feast_feature_name";
- private final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";
+ public static final String METRIC_PREFIX = "feast_ingestion";
+ public static final String STORE_TAG_KEY = "feast_store";
+ public static final String FEATURE_SET_PROJECT_TAG_KEY = "feast_project_name";
+ public static final String FEATURE_SET_NAME_TAG_KEY = "feast_featureSet_name";
+ public static final String FEATURE_SET_VERSION_TAG_KEY = "feast_featureSet_version";
+ public static final String FEATURE_TAG_KEY = "feast_feature_name";
+ public static final String INGESTION_JOB_NAME_KEY = "ingestion_job_name";
public abstract String getStoreName();
diff --git a/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java
new file mode 100644
index 00000000000..8f0adf40168
--- /dev/null
+++ b/ingestion/src/test/java/feast/ingestion/transform/metrics/WriteFeatureValueMetricsDoFnTest.java
@@ -0,0 +1,315 @@
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.transform.metrics;
+import static org.junit.Assert.fail;
+import com.google.protobuf.ByteString;
+import feast.types.FeatureRowProto.FeatureRow;
+import feast.types.FeatureRowProto.FeatureRow.Builder;
+import feast.types.FieldProto.Field;
+import feast.types.ValueProto.BoolList;
+import feast.types.ValueProto.BytesList;
+import feast.types.ValueProto.DoubleList;
+import feast.types.ValueProto.FloatList;
+import feast.types.ValueProto.Int32List;
+import feast.types.ValueProto.Int64List;
+import feast.types.ValueProto.StringList;
+import feast.types.ValueProto.Value;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Rule;
+import org.junit.Test;
+public class WriteFeatureValueMetricsDoFnTest {
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ private static final int STATSD_SERVER_PORT = 17254;
+ private final DummyStatsDServer statsDServer = new DummyStatsDServer(STATSD_SERVER_PORT);
+ @Test
+ public void shouldSendCorrectStatsDMetrics() throws IOException, InterruptedException {
+ PipelineOptions pipelineOptions = PipelineOptionsFactory.create();
+ pipelineOptions.setJobName("job");
+ Map> input =
+ readTestInput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input");
+ List expectedLines =
+ readTestOutput("feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output");
+ pipeline
+ .apply(Create.of(input))
+ .apply(
+ ParDo.of(
+ WriteFeatureValueMetricsDoFn.newBuilder()
+ .setStatsdHost("localhost")
+ .setStatsdPort(STATSD_SERVER_PORT)
+ .setStoreName("store")
+ .build()));
+ pipeline.run(pipelineOptions).waitUntilFinish();
+ // Wait until StatsD has finished processed all messages, 3 sec is a reasonable duration
+ // based on empirical testing.
+ Thread.sleep(3000);
+ List actualLines = statsDServer.messagesReceived();
+ for (String expected : expectedLines) {
+ boolean matched = false;
+ for (String actual : actualLines) {
+ if (actual.equals(expected)) {
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ System.out.println("Print actual metrics output for debugging:");
+ for (String line : actualLines) {
+ System.out.println(line);
+ }
+ fail(String.format("Expected StatsD metric not found:\n%s", expected));
+ }
+ }
+ }
+ // Test utility method to read expected StatsD metrics output from a text file.
+ @SuppressWarnings("SameParameterValue")
+ private List readTestOutput(String path) throws IOException {
+ URL url = Thread.currentThread().getContextClassLoader().getResource(path);
+ if (url == null) {
+ throw new IllegalArgumentException(
+ "cannot read test data, path contains null url. Path: " + path);
+ }
+ List lines = new ArrayList<>();
+ try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) {
+ String line = reader.readLine();
+ while (line != null) {
+ if (line.trim().length() > 1) {
+ lines.add(line);
+ }
+ line = reader.readLine();
+ }
+ }
+ return lines;
+ }
+ // Test utility method to create test feature row data from a text file.
+ @SuppressWarnings("SameParameterValue")
+ private Map> readTestInput(String path) throws IOException {
+ Map> data = new HashMap<>();
+ URL url = Thread.currentThread().getContextClassLoader().getResource(path);
+ if (url == null) {
+ throw new IllegalArgumentException(
+ "cannot read test data, path contains null url. Path: " + path);
+ }
+ List lines = new ArrayList<>();
+ try (BufferedReader reader = Files.newBufferedReader(Paths.get(url.getPath()))) {
+ String line = reader.readLine();
+ while (line != null) {
+ lines.add(line);
+ line = reader.readLine();
+ }
+ }
+ List colNames = new ArrayList<>();
+ for (String line : lines) {
+ if (line.strip().length() < 1) {
+ continue;
+ }
+ String[] splits = line.split(",");
+ colNames.addAll(Arrays.asList(splits));
+ if (line.startsWith("featuresetref")) {
+ // Header line
+ colNames.addAll(Arrays.asList(splits).subList(1, splits.length));
+ continue;
+ }
+ Builder featureRowBuilder = FeatureRow.newBuilder();
+ for (int i = 0; i < splits.length; i++) {
+ String colVal = splits[i].strip();
+ if (i == 0) {
+ featureRowBuilder.setFeatureSet(colVal);
+ continue;
+ }
+ String colName = colNames.get(i);
+ Field.Builder fieldBuilder = Field.newBuilder().setName(colName);
+ if (!colVal.isEmpty()) {
+ switch (colName) {
+ case "int32":
+ fieldBuilder.setValue(Value.newBuilder().setInt32Val((Integer.parseInt(colVal))));
+ break;
+ case "int64":
+ fieldBuilder.setValue(Value.newBuilder().setInt64Val((Long.parseLong(colVal))));
+ break;
+ case "double":
+ fieldBuilder.setValue(Value.newBuilder().setDoubleVal((Double.parseDouble(colVal))));
+ break;
+ case "float":
+ fieldBuilder.setValue(Value.newBuilder().setFloatVal((Float.parseFloat(colVal))));
+ break;
+ case "bool":
+ fieldBuilder.setValue(Value.newBuilder().setBoolVal((Boolean.parseBoolean(colVal))));
+ break;
+ case "int32list":
+ List int32List = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ int32List.add(Integer.parseInt(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder().setInt32ListVal(Int32List.newBuilder().addAllVal(int32List)));
+ break;
+ case "int64list":
+ List int64list = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ int64list.add(Long.parseLong(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder().setInt64ListVal(Int64List.newBuilder().addAllVal(int64list)));
+ break;
+ case "doublelist":
+ List doubleList = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ doubleList.add(Double.parseDouble(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder()
+ .setDoubleListVal(DoubleList.newBuilder().addAllVal(doubleList)));
+ break;
+ case "floatlist":
+ List floatList = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ floatList.add(Float.parseFloat(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder().setFloatListVal(FloatList.newBuilder().addAllVal(floatList)));
+ break;
+ case "boollist":
+ List boolList = new ArrayList<>();
+ for (String val : colVal.split("\\|")) {
+ boolList.add(Boolean.parseBoolean(val));
+ }
+ fieldBuilder.setValue(
+ Value.newBuilder().setBoolListVal(BoolList.newBuilder().addAllVal(boolList)));
+ break;
+ case "bytes":
+ fieldBuilder.setValue(
+ Value.newBuilder().setBytesVal(ByteString.copyFromUtf8("Dummy")));
+ break;
+ case "byteslist":
+ fieldBuilder.setValue(
+ Value.newBuilder().setBytesListVal(BytesList.getDefaultInstance()));
+ break;
+ case "string":
+ fieldBuilder.setValue(Value.newBuilder().setStringVal("Dummy"));
+ break;
+ case "stringlist":
+ fieldBuilder.setValue(
+ Value.newBuilder().setStringListVal(StringList.getDefaultInstance()));
+ break;
+ }
+ }
+ featureRowBuilder.addFields(fieldBuilder);
+ }
+ if (!data.containsKey(featureRowBuilder.getFeatureSet())) {
+ data.put(featureRowBuilder.getFeatureSet(), new ArrayList<>());
+ }
+ List featureRowsByFeatureSetRef = data.get(featureRowBuilder.getFeatureSet());
+ featureRowsByFeatureSetRef.add(featureRowBuilder.build());
+ }
+ // Convert List to Iterable to match the function signature in
+ // WriteFeatureValueMetricsDoFn
+ Map> dataWithIterable = new HashMap<>();
+ for (Entry> entrySet : data.entrySet()) {
+ String key = entrySet.getKey();
+ Iterable value = entrySet.getValue();
+ dataWithIterable.put(key, value);
+ }
+ return dataWithIterable;
+ }
+ // Modified version of
+ // https://github.com/tim-group/java-statsd-client/blob/master/src/test/java/com/timgroup/statsd/NonBlockingStatsDClientTest.java
+ @SuppressWarnings("CatchMayIgnoreException")
+ private static final class DummyStatsDServer {
+ private final List messagesReceived = new ArrayList();
+ private final DatagramSocket server;
+ public DummyStatsDServer(int port) {
+ try {
+ server = new DatagramSocket(port);
+ } catch (SocketException e) {
+ throw new IllegalStateException(e);
+ }
+ new Thread(
+ () -> {
+ try {
+ while (true) {
+ final DatagramPacket packet = new DatagramPacket(new byte[65535], 65535);
+ server.receive(packet);
+ messagesReceived.add(
+ new String(packet.getData(), StandardCharsets.UTF_8).trim() + "\n");
+ Thread.sleep(50);
+ }
+ } catch (Exception e) {
+ }
+ })
+ .start();
+ }
+ public void stop() {
+ server.close();
+ }
+ public void waitForMessage() {
+ while (messagesReceived.isEmpty()) {
+ try {
+ Thread.sleep(50L);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ public List messagesReceived() {
+ List out = new ArrayList<>();
+ for (String msg : messagesReceived) {
+ String[] lines = msg.split("\n");
+ out.addAll(Arrays.asList(lines));
+ }
+ return out;
+ }
+ }
diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README
new file mode 100644
index 00000000000..3c8759d1702
--- /dev/null
+++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.README
@@ -0,0 +1,9 @@
+WriteFeatureValueMetricsDoFnTest.input file contains data that can be read by test utility
+into map of FeatureSetRef -> [FeatureRow]. In the first row, the cell value corresponds to the
+field name in the FeatureRow. This should not be changed as the test utility derives the value
+type from this name. Empty value in the cell is a value that is not set. For list type, the values
+of different element is separated by the '|' character.
+WriteFeatureValueMetricsDoFnTest.output file contains lines of expected StatsD metrics that should
+be sent when WriteFeatureValueMetricsDoFn runs. It can be checked against the actual outputted
+StatsD metrics to test for correctness.
diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input
new file mode 100644
index 00000000000..d2985711cee
--- /dev/null
+++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.input
@@ -0,0 +1,4 @@
\ No newline at end of file
diff --git a/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output
new file mode 100644
index 00000000000..63bc7bbfa4e
--- /dev/null
+++ b/ingestion/src/test/resources/feast/ingestion/transform/WriteFeatureValueMetricsDoFnTest.output
@@ -0,0 +1,66 @@
\ No newline at end of file