Skip to content

Commit a87eee2

Browse files
committed
Extend WriteMetricsTransform to write feature value stats to StatsD
1 parent aec7979 commit a87eee2

File tree

10 files changed

+787
-28
lines changed

10 files changed

+787
-28
lines changed

.prow/scripts/test-end-to-end.sh

+18-18
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,24 @@ tail -n10 /var/log/kafka.log
6767
kafkacat -b localhost:9092 -L
6868

6969
if [[ ${SKIP_BUILD_JARS} != "true" ]]; then
70-
echo "
71-
============================================================
72-
Building jars for Feast
73-
============================================================
74-
"
75-
76-
.prow/scripts/download-maven-cache.sh \
77-
--archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \
78-
--output-dir /root/
79-
80-
# Build jars for Feast
81-
mvn --quiet --batch-mode --define skipTests=true clean package
82-
83-
ls -lh core/target/*jar
84-
ls -lh serving/target/*jar
85-
else
86-
echo "[DEBUG] Skipping building jars"
87-
fi
70+
echo "
71+
============================================================
72+
Building jars for Feast
73+
============================================================
74+
"
75+
76+
.prow/scripts/download-maven-cache.sh \
77+
--archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \
78+
--output-dir /root/
79+
80+
# Build jars for Feast
81+
mvn --quiet --batch-mode --define skipTests=true clean package
82+
83+
ls -lh core/target/*jar
84+
ls -lh serving/target/*jar
85+
else
86+
echo "[DEBUG] Skipping building jars"
87+
fi
8888

8989
echo "
9090
============================================================

ingestion/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -248,5 +248,12 @@
248248
<version>2.8.1</version>
249249
</dependency>
250250

251+
<!-- For calculation of percentiles in feature values -->
252+
<dependency>
253+
<groupId>org.apache.commons</groupId>
254+
<artifactId>commons-math3</artifactId>
255+
<version>3.6.1</version>
256+
</dependency>
257+
251258
</dependencies>
252259
</project>

ingestion/src/main/java/feast/ingestion/options/ImportOptions.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@
2424
import org.apache.beam.sdk.options.PipelineOptions;
2525
import org.apache.beam.sdk.options.Validation.Required;
2626

27-
/** Options passed to Beam to influence the job's execution environment */
27+
/**
28+
* Options passed to Beam to influence the job's execution environment
29+
*/
2830
public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions {
31+
2932
@Required
3033
@Description(
3134
"JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format."
@@ -60,7 +63,7 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions,
6063

6164
/**
6265
* @param deadLetterTableSpec (Optional) BigQuery table for storing elements that failed to be
63-
* processed. Table spec must follow this format PROJECT_ID:DATASET_ID.PROJECT_ID
66+
* processed. Table spec must follow this format PROJECT_ID:DATASET_ID.PROJECT_ID
6467
*/
6568
void setDeadLetterTableSpec(String deadLetterTableSpec);
6669

@@ -83,4 +86,13 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions,
8386
int getStatsdPort();
8487

8588
void setStatsdPort(int StatsdPort);
89+
90+
@Description(
91+
"Fixed window size in seconds (default 30) to apply before aggregation of numerical value of features"
92+
+ "and writing the aggregated value to StatsD. Refer to feast.ingestion.transform.metrics.WriteFeatureValueMetricsDoFn"
93+
+ "for details on the metric names and types.")
94+
@Default.Integer(30)
95+
int getWindowSizeInSecForFeatureValueMetric();
96+
97+
void setWindowSizeInSecForFeatureValueMetric(int seconds);
8698
}

0 commit comments

Comments
 (0)