diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml
index 354b51f0871..c57255bacb3 100644
--- a/.github/workflows/complete.yml
+++ b/.github/workflows/complete.yml
@@ -12,7 +12,6 @@ jobs:
GITHUB_PR_SHA: ${{ github.event.pull_request.head.sha }}
REGISTRY: gcr.io/kf-feast
MAVEN_CACHE: gs://feast-templocation-kf-feast/.m2.2020-08-19.tar
- DOCKER_BUILDKIT: '1'
steps:
- uses: actions/checkout@v2
- uses: GoogleCloudPlatform/github-actions/setup-gcloud@master
diff --git a/.scalafmt.conf b/.scalafmt.conf
new file mode 100644
index 00000000000..f3c72b8ceb5
--- /dev/null
+++ b/.scalafmt.conf
@@ -0,0 +1,2 @@
+align.preset = more
+maxColumn = 100
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index fd3faf5622f..0bcb5776716 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,7 @@
common
job-controller
common-test
+ spark/ingestion
@@ -85,6 +86,27 @@
6.1.2.Final
1.6.6
+
+
+ ${maven.multiModuleProjectDirectory}
+
false
feast.common.auth.providers.http.client
@@ -528,26 +550,7 @@
-
-
-
+ ${license.content}
1.7
@@ -558,6 +561,15 @@
+
+
+ ${license.content}
+
+
+ 2.7.2
+ ${parent.basedir}/.scalafmt.conf
+
+
diff --git a/spark/ingestion/pom.xml b/spark/ingestion/pom.xml
new file mode 100644
index 00000000000..a044ec9d603
--- /dev/null
+++ b/spark/ingestion/pom.xml
@@ -0,0 +1,299 @@
+
+
+
+ 4.0.0
+
+
+ dev.feast
+ feast-parent
+ ${revision}
+ ../..
+
+
+ Feast Spark Ingestion
+ feast-ingestion-spark
+
+
+ 2.12
+ ${scala.version}.12
+ 2.4.7
+ 4.4.0
+ 3.3.0
+ 0.7-SNAPSHOT
+
+
+
+
+
+ dev.feast
+ datatypes-java
+ ${project.version}
+
+
+ *
+ *
+
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.12.2
+
+
+
+ org.scala-lang
+ scala-library
+ ${scala.fullVersion}
+
+
+
+ org.scala-lang.modules
+ scala-collection-compat_${scala.version}
+ 2.2.0
+
+
+
+ org.apache.spark
+ spark-core_${scala.version}
+ ${spark.version}
+ provided
+
+
+
+ org.apache.spark
+ spark-streaming_${scala.version}
+ ${spark.version}
+ provided
+
+
+
+ org.apache.spark
+ spark-sql_${scala.version}
+ ${spark.version}
+ provided
+
+
+
+ org.codehaus.janino
+ janino
+ 3.0.16
+
+
+
+ org.apache.spark
+ spark-sql-kafka-0-10_${scala.version}
+ ${spark.version}
+ provided
+
+
+
+ com.github.scopt
+ scopt_${scala.version}
+ 3.7.1
+
+
+
+ com.google.cloud.spark
+ spark-bigquery_${scala.version}
+ 0.17.2
+ provided
+
+
+
+ joda-time
+ joda-time
+ 2.10.6
+
+
+
+ com.redislabs
+ spark-redis_${scala.version}
+ 2.5.0
+
+
+
+ org.apache.arrow
+ arrow-vector
+ 0.16.0
+
+
+
+ io.netty
+ netty-all
+ 4.1.52.Final
+
+
+
+ org.json4s
+ json4s-ext_${scala.version}
+ 3.7.0-M6
+
+
+
+ org.scalatest
+ scalatest_${scala.version}
+ 3.2.2
+ test
+
+
+
+ org.scalacheck
+ scalacheck_${scala.version}
+ 1.14.3
+ test
+
+
+
+ com.dimafeng
+ testcontainers-scala-scalatest_${scala.version}
+ 0.38.3
+ test
+
+
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ scala-compile-first
+ process-resources
+
+ add-source
+ compile
+
+
+
+ scala-test-compile
+ process-test-resources
+
+ testCompile
+
+
+
+
+ ${scala.fullVersion}
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 2.0.0
+
+ ${project.build.directory}/surefire-reports
+ .
+ TestSuiteReport.txt
+
+
+
+ test
+ integration-test
+
+ test
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+
+ jar-with-dependencies
+
+
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ compile
+
+ compile
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 3.2.4
+
+
+ package
+
+ shade
+
+
+
+
+ com.google.protobuf
+ com.google.protobuf.vendor
+
+
+
+
+ *:*
+
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
+
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ ${scala-maven-plugin.version}
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ ${maven-assembly-plugin.version}
+
+
+
+
+
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala
new file mode 100644
index 00000000000..34c667b7e6d
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala
@@ -0,0 +1,69 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+
+trait BasePipeline {
+ def createSparkSession(jobConfig: IngestionJobConfig): SparkSession = {
+ // workaround for issue with arrow & netty
+ // see https://github.com/apache/arrow/tree/master/java#java-properties
+ System.setProperty("io.netty.tryReflectionSetAccessible", "true")
+
+ val conf = new SparkConf()
+ conf
+ .setAppName(s"${jobConfig.mode} IngestionJob for ${jobConfig.featureTable.name}")
+ .setMaster("local")
+
+ jobConfig.store match {
+ case RedisConfig(host, port) =>
+ conf
+ .set("spark.redis.host", host)
+ .set("spark.redis.port", port.toString)
+ }
+
+ jobConfig.metrics match {
+ case Some(c: StatsDConfig) =>
+ conf
+ .set(
+ "spark.metrics.conf.*.source.redis.class",
+ "org.apache.spark.metrics.source.RedisSinkMetricSource"
+ )
+ .set(
+ "spark.metrics.conf.*.source.redis.labels",
+ s"feature_table=${jobConfig.featureTable.name}"
+ )
+ .set(
+ "spark.metrics.conf.*.sink.statsd.class",
+ "org.apache.spark.metrics.sink.StatsdSinkWithTags"
+ )
+ .set("spark.metrics.conf.*.sink.statsd.host", c.host)
+ .set("spark.metrics.conf.*.sink.statsd.port", c.port.toString)
+ .set("spark.metrics.conf.*.sink.statsd.period", "30")
+ .set("spark.metrics.conf.*.sink.statsd.unit", "seconds")
+ .set("spark.metrics.namespace", jobConfig.mode.toString)
+ }
+
+ SparkSession
+ .builder()
+ .config(conf)
+ .getOrCreate()
+ }
+
+ def createPipeline(sparkSession: SparkSession, config: IngestionJobConfig): Unit
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala
new file mode 100644
index 00000000000..ef463c1c11b
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala
@@ -0,0 +1,104 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion
+
+import feast.ingestion.sources.bq.BigQueryReader
+import feast.ingestion.sources.file.FileReader
+import feast.ingestion.validation.RowValidator
+import org.apache.spark.sql.{Column, SparkSession}
+import org.apache.spark.sql.functions.col
+
+/**
+ * Batch Ingestion Flow:
+ * 1. Read from source (BQ | File)
+ * 2. Map source columns to FeatureTable's schema
+ * 3. Validate
+ * 4. Store valid rows in redis
+ * 5. Store invalid rows in parquet format at `deadletter` destination
+ */
+object BatchPipeline extends BasePipeline {
+ override def createPipeline(sparkSession: SparkSession, config: IngestionJobConfig): Unit = {
+ val featureTable = config.featureTable
+ val projection =
+ inputProjection(config.source, featureTable.features, featureTable.entities)
+ val validator = new RowValidator(featureTable)
+
+ val input = config.source match {
+ case source: BQSource =>
+ BigQueryReader.createBatchSource(
+ sparkSession.sqlContext,
+ source,
+ config.startTime,
+ config.endTime
+ )
+ case source: FileSource =>
+ FileReader.createBatchSource(
+ sparkSession.sqlContext,
+ source,
+ config.startTime,
+ config.endTime
+ )
+ }
+
+ val projected = input.select(projection: _*).cache()
+
+ val validRows = projected
+ .filter(validator.checkAll)
+
+ validRows.write
+ .format("feast.ingestion.stores.redis")
+ .option("entity_columns", featureTable.entities.map(_.name).mkString(","))
+ .option("namespace", featureTable.name)
+ .option("project_name", featureTable.project)
+ .option("timestamp_column", config.source.timestampColumn)
+ .save()
+
+ config.deadLetterPath match {
+ case Some(path) =>
+ projected
+ .filter(!validator.checkAll)
+ .write
+ .format("parquet")
+ .save(path)
+ case _ => None
+ }
+
+ }
+
+ /**
+ * Build column projection using custom mapping with fallback to feature|entity names.
+ */
+ private def inputProjection(
+ source: Source,
+ features: Seq[Field],
+ entities: Seq[Field]
+ ): Array[Column] = {
+ val featureColumns = features
+ .filter(f => !source.mapping.contains(f.name))
+ .map(f => (f.name, f.name)) ++ source.mapping
+
+ val timestampColumn = Seq((source.timestampColumn, source.timestampColumn))
+ val entitiesColumns =
+ entities
+ .filter(e => !source.mapping.contains(e.name))
+ .map(e => (e.name, e.name))
+
+ (featureColumns ++ entitiesColumns ++ timestampColumn).map { case (alias, source) =>
+ col(source).alias(alias)
+ }.toArray
+ }
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala
new file mode 100644
index 00000000000..14e26039829
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala
@@ -0,0 +1,88 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion
+
+import org.joda.time.DateTime
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods.{parse => parseJSON}
+import org.json4s.ext.JavaEnumNameSerializer
+
+object IngestionJob {
+ import Modes._
+ implicit val modesRead: scopt.Read[Modes.Value] = scopt.Read.reads(Modes withName _.capitalize)
+ implicit val formats: Formats = DefaultFormats +
+ new JavaEnumNameSerializer[feast.proto.types.ValueProto.ValueType.Enum]()
+
+ val parser = new scopt.OptionParser[IngestionJobConfig]("IngestionJon") {
+ // ToDo: read version from Manifest
+ head("feast.ingestion.IngestionJob", "0.8-SNAPSHOT")
+
+ opt[Modes]("mode")
+ .action((x, c) => c.copy(mode = x))
+ .required()
+ .text("Mode to operate ingestion job (offline or online)")
+
+ opt[String](name = "source")
+ .action((x, c) =>
+ parseJSON(x).extract[Sources] match {
+ case Sources(file: Some[FileSource], _, _) => c.copy(source = file.get)
+ case Sources(_, bq: Some[BQSource], _) => c.copy(source = bq.get)
+ case Sources(_, _, kafka: Some[KafkaSource]) => c.copy(source = kafka.get)
+ }
+ )
+ .required()
+ .text("JSON-encoded source object (e.g. {\"kafka\":{\"bootstrapServers\":...}}")
+
+ opt[String](name = "feature-table")
+ .action((x, c) => c.copy(featureTable = parseJSON(x).extract[FeatureTable]))
+ .required()
+ .text("JSON-encoded FeatureTableSpec object")
+
+ opt[String](name = "start")
+ .action((x, c) => c.copy(startTime = DateTime.parse(x)))
+ .text("Start timestamp for offline ingestion")
+
+ opt[String](name = "end")
+ .action((x, c) => c.copy(endTime = DateTime.parse(x)))
+ .text("End timestamp for offline ingestion")
+
+ opt[String](name = "redis")
+ .action((x, c) => c.copy(store = parseJSON(x).extract[RedisConfig]))
+
+ opt[String](name = "statsd")
+ .action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig])))
+
+ opt[String](name = "deadletter-path")
+ .action((x, c) => c.copy(deadLetterPath = Some(x)))
+
+ }
+
+ def main(args: Array[String]): Unit = {
+ parser.parse(args, IngestionJobConfig()) match {
+ case Some(config) =>
+ config.mode match {
+ case Modes.Offline =>
+ val sparkSession = BatchPipeline.createSparkSession(config)
+ BatchPipeline.createPipeline(sparkSession, config)
+ }
+ case None =>
+ println("Parameters can't be parsed")
+ }
+ }
+
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala
new file mode 100644
index 00000000000..880f00f37ff
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala
@@ -0,0 +1,90 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion
+
+import feast.ingestion.Modes.Modes
+import org.joda.time.DateTime
+
+object Modes extends Enumeration {
+ type Modes = Value
+ val Offline, Online = Value
+}
+
+abstract class StoreConfig
+
+case class RedisConfig(host: String, port: Int) extends StoreConfig
+
+abstract class MetricConfig
+
+case class StatsDConfig(host: String, port: Int) extends MetricConfig
+
+abstract class Source {
+ def mapping: Map[String, String]
+
+ def timestampColumn: String
+}
+
+abstract class BatchSource extends Source
+
+abstract class StreamingSource extends Source
+
+case class FileSource(
+ path: String,
+ override val mapping: Map[String, String],
+ override val timestampColumn: String
+) extends BatchSource
+
+case class BQSource(
+ project: String,
+ dataset: String,
+ table: String,
+ override val mapping: Map[String, String],
+ override val timestampColumn: String
+) extends BatchSource
+
+case class KafkaSource(
+ bootstrapServers: String,
+ topic: String,
+ override val mapping: Map[String, String],
+ override val timestampColumn: String
+) extends StreamingSource
+
+case class Sources(
+ file: Option[FileSource] = None,
+ bq: Option[BQSource] = None,
+ kafka: Option[KafkaSource] = None
+)
+
+case class Field(name: String, `type`: feast.proto.types.ValueProto.ValueType.Enum)
+
+case class FeatureTable(
+ name: String,
+ project: String,
+ entities: Seq[Field],
+ features: Seq[Field]
+)
+
+case class IngestionJobConfig(
+ mode: Modes = Modes.Offline,
+ featureTable: FeatureTable = null,
+ source: Source = null,
+ startTime: DateTime = DateTime.now(),
+ endTime: DateTime = DateTime.now(),
+ store: StoreConfig = RedisConfig("localhost", 6379),
+ metrics: Option[MetricConfig] = Some(StatsDConfig("localhost", 9125)),
+ deadLetterPath: Option[String] = None
+)
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala b/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala
new file mode 100644
index 00000000000..66b48dd444e
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala
@@ -0,0 +1,201 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.metrics
+
+import java.io.IOException
+import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.SortedMap
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import com.codahale.metrics._
+import org.apache.hadoop.net.NetUtils
+
+import org.apache.spark.internal.Logging
+
+/**
+ * @see
+ * StatsD metric types
+ */
+object StatsdMetricType {
+ val COUNTER = "c"
+ val GAUGE = "g"
+ val TIMER = "ms"
+ val Set = "s"
+}
+
+/**
+ * This is fork of internal spark implementation of StatsdReporter.
+ * The main difference between this implementation & spark is support of tags inside metric names.
+ *
+ * When we extend metric name (by adding some suffix) we must keep tags part in the end of the name.
+ * Labels are separated from name by # (see tagging extensions of prometheus exporter
+ * https://github.com/prometheus/statsd_exporter#tagging-extensions)
+ * E.g.,
+ *
+ * metric_name#tag=value + count -> metric_name.count#tag=value
+ */
+class StatsdReporterWithTags(
+ registry: MetricRegistry,
+ host: String = "127.0.0.1",
+ port: Int = 8125,
+ prefix: String = "",
+ filter: MetricFilter = MetricFilter.ALL,
+ rateUnit: TimeUnit = TimeUnit.SECONDS,
+ durationUnit: TimeUnit = TimeUnit.MILLISECONDS
+) extends ScheduledReporter(registry, "statsd-reporter", filter, rateUnit, durationUnit)
+ with Logging {
+
+ import StatsdMetricType._
+
+ private val address = new InetSocketAddress(host, port)
+ private val whitespace = "[\\s]+".r
+
+ override def report(
+ gauges: SortedMap[String, Gauge[_]],
+ counters: SortedMap[String, Counter],
+ histograms: SortedMap[String, Histogram],
+ meters: SortedMap[String, Meter],
+ timers: SortedMap[String, Timer]
+ ): Unit =
+ Try(new DatagramSocket) match {
+ case Failure(ioe: IOException) =>
+ logWarning(
+ "StatsD datagram socket construction failed",
+ NetUtils.wrapException(host, port, NetUtils.getHostname(), 0, ioe)
+ )
+ case Failure(e) => logWarning("StatsD datagram socket construction failed", e)
+ case Success(s) =>
+ implicit val socket = s
+ val localAddress = Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null)
+ val localPort = socket.getLocalPort
+ Try {
+ gauges.entrySet.asScala.foreach(e => reportGauge(e.getKey, e.getValue))
+ counters.entrySet.asScala.foreach(e => reportCounter(e.getKey, e.getValue))
+ histograms.entrySet.asScala.foreach(e => reportHistogram(e.getKey, e.getValue))
+ meters.entrySet.asScala.foreach(e => reportMetered(e.getKey, e.getValue))
+ timers.entrySet.asScala.foreach(e => reportTimer(e.getKey, e.getValue))
+ } recover {
+ case ioe: IOException =>
+ logDebug(
+ s"Unable to send packets to StatsD",
+ NetUtils.wrapException(
+ address.getHostString,
+ address.getPort,
+ localAddress,
+ localPort,
+ ioe
+ )
+ )
+ case e: Throwable => logDebug(s"Unable to send packets to StatsD at '$host:$port'", e)
+ }
+ Try(socket.close()) recover {
+ case ioe: IOException =>
+ logDebug(
+ "Error when close socket to StatsD",
+ NetUtils.wrapException(
+ address.getHostString,
+ address.getPort,
+ localAddress,
+ localPort,
+ ioe
+ )
+ )
+ case e: Throwable => logDebug("Error when close socket to StatsD", e)
+ }
+ }
+
+ private def reportGauge(name: String, gauge: Gauge[_])(implicit socket: DatagramSocket): Unit =
+ formatAny(gauge.getValue).foreach(v => send(fullName(name), v, GAUGE))
+
+ private def reportCounter(name: String, counter: Counter)(implicit socket: DatagramSocket): Unit =
+ send(fullName(name), format(counter.getCount), COUNTER)
+
+ private def reportHistogram(name: String, histogram: Histogram)(implicit
+ socket: DatagramSocket
+ ): Unit = {
+ val snapshot = histogram.getSnapshot
+ send(fullName(name, "count"), format(histogram.getCount), GAUGE)
+ send(fullName(name, "max"), format(snapshot.getMax), TIMER)
+ send(fullName(name, "mean"), format(snapshot.getMean), TIMER)
+ send(fullName(name, "min"), format(snapshot.getMin), TIMER)
+ send(fullName(name, "stddev"), format(snapshot.getStdDev), TIMER)
+ send(fullName(name, "p50"), format(snapshot.getMedian), TIMER)
+ send(fullName(name, "p75"), format(snapshot.get75thPercentile), TIMER)
+ send(fullName(name, "p95"), format(snapshot.get95thPercentile), TIMER)
+ send(fullName(name, "p98"), format(snapshot.get98thPercentile), TIMER)
+ send(fullName(name, "p99"), format(snapshot.get99thPercentile), TIMER)
+ send(fullName(name, "p999"), format(snapshot.get999thPercentile), TIMER)
+ }
+
+ private def reportMetered(name: String, meter: Metered)(implicit socket: DatagramSocket): Unit = {
+ send(fullName(name, "count"), format(meter.getCount), GAUGE)
+ send(fullName(name, "m1_rate"), format(convertRate(meter.getOneMinuteRate)), TIMER)
+ send(fullName(name, "m5_rate"), format(convertRate(meter.getFiveMinuteRate)), TIMER)
+ send(fullName(name, "m15_rate"), format(convertRate(meter.getFifteenMinuteRate)), TIMER)
+ send(fullName(name, "mean_rate"), format(convertRate(meter.getMeanRate)), TIMER)
+ }
+
+ private def reportTimer(name: String, timer: Timer)(implicit socket: DatagramSocket): Unit = {
+ val snapshot = timer.getSnapshot
+ send(fullName(name, "max"), format(convertDuration(snapshot.getMax)), TIMER)
+ send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)), TIMER)
+ send(fullName(name, "min"), format(convertDuration(snapshot.getMin)), TIMER)
+ send(fullName(name, "stddev"), format(convertDuration(snapshot.getStdDev)), TIMER)
+ send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)), TIMER)
+ send(fullName(name, "p75"), format(convertDuration(snapshot.get75thPercentile)), TIMER)
+ send(fullName(name, "p95"), format(convertDuration(snapshot.get95thPercentile)), TIMER)
+ send(fullName(name, "p98"), format(convertDuration(snapshot.get98thPercentile)), TIMER)
+ send(fullName(name, "p99"), format(convertDuration(snapshot.get99thPercentile)), TIMER)
+ send(fullName(name, "p999"), format(convertDuration(snapshot.get999thPercentile)), TIMER)
+
+ reportMetered(name, timer)
+ }
+
+ private def send(name: String, value: String, metricType: String)(implicit
+ socket: DatagramSocket
+ ): Unit = {
+ val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8)
+ val packet = new DatagramPacket(bytes, bytes.length, address)
+ socket.send(packet)
+ }
+
+ private val nameWithTag = """(\S+)#(\S+)""".r
+
+ private def fullName(name: String, suffixes: String*): String = name match {
+ case nameWithTag(name, tags) =>
+ MetricRegistry.name(prefix, name +: suffixes: _*) ++ "#" ++ tags
+ case _ =>
+ MetricRegistry.name(prefix, name +: suffixes: _*)
+ }
+
+ private def sanitize(s: String): String = whitespace.replaceAllIn(s, "-")
+
+ private def format(v: Any): String = formatAny(v).getOrElse("")
+
+ private def formatAny(v: Any): Option[String] =
+ v match {
+ case f: Float => Some("%2.2f".format(f))
+ case d: Double => Some("%2.2f".format(d))
+ case b: BigDecimal => Some("%2.2f".format(b))
+ case n: Number => Some(n.toString)
+ case _ => None
+ }
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala
new file mode 100644
index 00000000000..5dc5a7fddd5
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala
@@ -0,0 +1,39 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.sources.bq
+
+import java.sql.Timestamp
+
+import feast.ingestion.BQSource
+import org.joda.time.DateTime
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.functions.col
+
+object BigQueryReader {
+ def createBatchSource(
+ sqlContext: SQLContext,
+ source: BQSource,
+ start: DateTime,
+ end: DateTime
+ ): DataFrame = {
+ sqlContext.read
+ .format("bigquery")
+ .load(s"${source.project}.${source.dataset}.${source.table}")
+ .filter(col(source.timestampColumn) >= new Timestamp(start.getMillis))
+ .filter(col(source.timestampColumn) < new Timestamp(end.getMillis))
+ }
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala
new file mode 100644
index 00000000000..e099a87aa3e
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala
@@ -0,0 +1,38 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.sources.file
+
+import java.sql.Timestamp
+
+import feast.ingestion.FileSource
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.joda.time.DateTime
+
+object FileReader {
+ def createBatchSource(
+ sqlContext: SQLContext,
+ source: FileSource,
+ start: DateTime,
+ end: DateTime
+ ): DataFrame = {
+ sqlContext.read
+ .parquet(source.path)
+ .filter(col(source.timestampColumn) >= new Timestamp(start.getMillis))
+ .filter(col(source.timestampColumn) < new Timestamp(end.getMillis))
+ }
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/DefaultSource.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/DefaultSource.scala
new file mode 100644
index 00000000000..ec09a0bed1a
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/DefaultSource.scala
@@ -0,0 +1,42 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.stores.redis
+
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
+import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider}
+
+/**
+ * Entrypoint to Redis Storage. Implements only `CreatableRelationProvider` since it's only possible write to Redis.
+ * Here we parse configuration from spark parameters & provide SparkRedisConfig to `RedisSinkRelation`
+ */
+class RedisRelationProvider extends CreatableRelationProvider {
+ override def createRelation(
+ sqlContext: SQLContext,
+ mode: SaveMode,
+ parameters: Map[String, String],
+ data: DataFrame
+ ): BaseRelation = {
+ val config = SparkRedisConfig.parse(parameters)
+ val relation = new RedisSinkRelation(sqlContext, config)
+
+ relation.insert(data, overwrite = false)
+
+ relation
+ }
+}
+
+class DefaultSource extends RedisRelationProvider
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/HashTypePersistence.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/HashTypePersistence.scala
new file mode 100644
index 00000000000..f1f3f8a5db8
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/HashTypePersistence.scala
@@ -0,0 +1,97 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.stores.redis
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+import redis.clients.jedis.{Pipeline, Response}
+
+import scala.jdk.CollectionConverters._
+import com.google.protobuf.Timestamp
+import feast.ingestion.utils.TypeConversion
+import scala.util.hashing.MurmurHash3
+
+/**
+ * Use Redis hash type as storage layout. Every feature is stored as separate entry in Hash.
+ * Also additional `timestamp` column is stored per FeatureTable to track update time.
+ *
+ * Keys are hashed as murmur3(`featureTableName` : `featureName`).
+ * Values are serialized with protobuf (`ValueProto`).
+ */
+class HashTypePersistence(config: SparkRedisConfig) extends Persistence with Serializable {
+ def encodeRow(
+ keyColumns: Array[String],
+ timestampField: String,
+ value: Row
+ ): Map[Array[Byte], Array[Byte]] = {
+ val fields = value.schema.fields.map(_.name)
+ val types = value.schema.fields.map(f => (f.name, f.dataType)).toMap
+ val kvMap = value.getValuesMap[Any](fields)
+
+ val values = kvMap
+ .filter { case (_, v) =>
+ // don't store null values
+ v != null
+ }
+ .filter { case (k, _) =>
+ // don't store entities & timestamp
+ !keyColumns.contains(k) && k != config.timestampColumn
+ }
+ .map { case (k, v) =>
+ encodeKey(k) -> encodeValue(v, types(k))
+ }
+
+ val timestamp = Seq(
+ (
+ timestampField.getBytes,
+ encodeValue(value.getAs[Timestamp](config.timestampColumn), TimestampType)
+ )
+ )
+
+ values ++ timestamp
+ }
+
+ def encodeValue(value: Any, `type`: DataType): Array[Byte] = {
+ TypeConversion.sqlTypeToProtoValue(value, `type`).toByteArray
+ }
+
+ def encodeKey(key: String): Array[Byte] = {
+ val fullFeatureReference = s"${config.namespace}:$key"
+ MurmurHash3.stringHash(fullFeatureReference).toHexString.getBytes
+ }
+
+ def save(
+ pipeline: Pipeline,
+ key: String,
+ value: Map[Array[Byte], Array[Byte]],
+ ttl: Int
+ ): Unit = {
+ pipeline.hset(key.getBytes, value.asJava)
+ if (ttl > 0) {
+ pipeline.expire(key, ttl)
+ }
+ }
+
+ def getTimestamp(
+ pipeline: Pipeline,
+ key: String,
+ timestampField: String
+ ): Response[Array[Byte]] = {
+ pipeline.hget(key.getBytes(), timestampField.getBytes)
+ }
+
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/Persistence.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/Persistence.scala
new file mode 100644
index 00000000000..1be24737b03
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/Persistence.scala
@@ -0,0 +1,41 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.stores.redis
+
+import org.apache.spark.sql.Row
+import redis.clients.jedis.{Pipeline, Response}
+
+trait Persistence {
+ def encodeRow(
+ keyColumns: Array[String],
+ timestampField: String,
+ value: Row
+ ): Map[Array[Byte], Array[Byte]]
+
+ def save(
+ pipeline: Pipeline,
+ key: String,
+ value: Map[Array[Byte], Array[Byte]],
+ ttl: Int
+ ): Unit
+
+ def getTimestamp(
+ pipeline: Pipeline,
+ key: String,
+ timestampField: String
+ ): Response[Array[Byte]]
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala
new file mode 100644
index 00000000000..0c228beedc4
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala
@@ -0,0 +1,138 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.stores.redis
+
+import com.google.protobuf.Timestamp
+import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisEndpoint}
+import com.redislabs.provider.redis.rdd.Keys.groupKeysByNode
+import com.redislabs.provider.redis.util.PipelineUtils.{foreachWithPipeline, mapWithPipeline}
+import org.apache.spark.SparkEnv
+import org.apache.spark.metrics.source.RedisSinkMetricSource
+import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+/**
+ * High-level writer to Redis. Relies on `Persistence` implementation for actual storage layout.
+ * Here we define general flow:
+ *
+ * 1. Deduplicate rows within one batch (group by key and get only latest (by timestamp))
+ * 2. Read last-stored timestamp from Redis
+ * 3. Check if current timestamp is more recent than already saved one
+ * 4. Save to storage if it's the case
+ */
+class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisConfig)
+ extends BaseRelation
+ with InsertableRelation
+ with Serializable {
+ private implicit val redisConfig: RedisConfig = {
+ new RedisConfig(
+ new RedisEndpoint(sqlContext.sparkContext.getConf)
+ )
+ }
+
+ private implicit val readWriteConfig: ReadWriteConfig = {
+ ReadWriteConfig.fromSparkConf(sqlContext.sparkContext.getConf)
+ }
+
+ override def schema: StructType = ???
+
+ val persistence: Persistence = new HashTypePersistence(config)
+
+ override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+ // repartition for deduplication
+ val dataToStore =
+ if (config.repartitionByEntity)
+ data.repartition(config.entityColumns.map(col): _*)
+ else data
+
+ dataToStore.foreachPartition { partition: Iterator[Row] =>
+ // grouped iterator to only allocate memory for a portion of rows
+ partition.grouped(config.iteratorGroupingSize).foreach { batch =>
+ // group by key and keep only latest row per each key
+ val rowsWithKey: Map[String, Row] =
+ compactRowsToLatestTimestamp(batch.map(row => dataKeyId(row) -> row)).toMap
+
+ groupKeysByNode(redisConfig.hosts, rowsWithKey.keysIterator).foreach { case (node, keys) =>
+ val conn = node.connect()
+ // retrieve latest stored timestamp per key
+ val timestamps = mapWithPipeline(conn, keys) { (pipeline, key) =>
+ persistence.getTimestamp(pipeline, key, timestampField)
+ }
+
+ val timestampByKey = timestamps
+ .map(_.asInstanceOf[Array[Byte]])
+ .map(
+ Option(_)
+ .map(Timestamp.parseFrom)
+ .map(t => new java.sql.Timestamp(t.getSeconds * 1000))
+ )
+ .zip(rowsWithKey.keys)
+ .map(_.swap)
+ .toMap
+
+ foreachWithPipeline(conn, keys) { (pipeline, key) =>
+ val row = rowsWithKey(key)
+
+ timestampByKey(key) match {
+ case Some(t) if !t.before(row.getAs[java.sql.Timestamp](config.timestampColumn)) => ()
+ case _ =>
+ if (metricSource.nonEmpty) {
+ val lag = System.currentTimeMillis() - row
+ .getAs[java.sql.Timestamp](config.timestampColumn)
+ .getTime
+
+ metricSource.get.METRIC_TOTAL_ROWS_INSERTED.inc()
+ metricSource.get.METRIC_ROWS_LAG.update(lag)
+ }
+
+ val encodedRow = persistence.encodeRow(config.entityColumns, timestampField, row)
+ persistence.save(pipeline, key, encodedRow, ttl = 0)
+ }
+ }
+ conn.close()
+ }
+ }
+ }
+ }
+
+ private def compactRowsToLatestTimestamp(rows: Seq[(String, Row)]) = rows
+ .groupBy(_._1)
+ .values
+ .map(_.maxBy(_._2.getAs[java.sql.Timestamp](config.timestampColumn).getTime))
+
+ /**
+ * Key is built from entities columns values with prefix of entities columns names.
+ */
+ private def dataKeyId(row: Row): String = {
+ val sortedEntities = config.entityColumns.sorted
+ val entityKey = sortedEntities.map(row.getAs[Any]).map(_.toString).mkString(":")
+ val entityPrefix = sortedEntities.mkString("_")
+ s"${config.projectName}_${entityPrefix}:$entityKey"
+ }
+
+ private def timestampField: String = {
+ s"${config.timestampPrefix}:${config.namespace}"
+ }
+
+ private lazy val metricSource: Option[RedisSinkMetricSource] =
+ SparkEnv.get.metricsSystem.getSourcesByName(RedisSinkMetricSource.sourceName) match {
+ case Seq(head) => Some(head.asInstanceOf[RedisSinkMetricSource])
+ case _ => None
+ }
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala
new file mode 100644
index 00000000000..389607ce99e
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala
@@ -0,0 +1,44 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.stores.redis
+
+case class SparkRedisConfig(
+ namespace: String,
+ projectName: String,
+ entityColumns: Array[String],
+ timestampColumn: String,
+ iteratorGroupingSize: Int = 1000,
+ timestampPrefix: String = "_ts",
+ repartitionByEntity: Boolean = true
+)
+
+object SparkRedisConfig {
+ val NAMESPACE = "namespace"
+ val ENTITY_COLUMNS = "entity_columns"
+ val TS_COLUMN = "timestamp_column"
+ val ENTITY_REPARTITION = "entity_repartition"
+ val PROJECT_NAME = "project_name"
+
+ def parse(parameters: Map[String, String]): SparkRedisConfig =
+ SparkRedisConfig(
+ namespace = parameters.getOrElse(NAMESPACE, ""),
+ projectName = parameters.getOrElse(PROJECT_NAME, "default"),
+ entityColumns = parameters.getOrElse(ENTITY_COLUMNS, "").split(","),
+ timestampColumn = parameters.getOrElse(TS_COLUMN, "event_timestamp"),
+ repartitionByEntity = parameters.getOrElse(ENTITY_REPARTITION, "true") == "true"
+ )
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/utils/TypeConversion.scala b/spark/ingestion/src/main/scala/feast/ingestion/utils/TypeConversion.scala
new file mode 100644
index 00000000000..0846b95ce95
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/utils/TypeConversion.scala
@@ -0,0 +1,121 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.utils
+
+import java.sql
+
+import com.google.protobuf.{ByteString, Message, Timestamp}
+import feast.proto.types.ValueProto
+import org.apache.spark.sql.types._
+
+import scala.collection.JavaConverters._
+
+object TypeConversion {
+ def sqlTypeToProtoValue(value: Any, `type`: DataType): Message = {
+ (`type` match {
+ case IntegerType => ValueProto.Value.newBuilder().setInt32Val(value.asInstanceOf[Int])
+ case LongType => ValueProto.Value.newBuilder().setInt64Val(value.asInstanceOf[Long])
+ case StringType => ValueProto.Value.newBuilder().setStringVal(value.asInstanceOf[String])
+ case DoubleType => ValueProto.Value.newBuilder().setDoubleVal(value.asInstanceOf[Double])
+ case FloatType => ValueProto.Value.newBuilder().setFloatVal(value.asInstanceOf[Float])
+ case StringType => ValueProto.Value.newBuilder().setStringVal(value.asInstanceOf[String])
+ case ArrayType(t: IntegerType, _) =>
+ ValueProto.Value
+ .newBuilder()
+ .setInt32ListVal(
+ ValueProto.Int32List.newBuilder
+ .addAllVal(value.asInstanceOf[Array[Integer]].toSeq.asJava)
+ )
+ case ArrayType(t: LongType, _) =>
+ ValueProto.Value
+ .newBuilder()
+ .setInt64ListVal(
+ ValueProto.Int64List.newBuilder
+ .addAllVal(value.asInstanceOf[Array[Long]].toSeq.map(java.lang.Long.valueOf).asJava)
+ )
+ case ArrayType(t: BooleanType, _) =>
+ ValueProto.Value
+ .newBuilder()
+ .setBoolListVal(
+ ValueProto.BoolList.newBuilder.addAllVal(
+ value.asInstanceOf[Array[Boolean]].toSeq.map(java.lang.Boolean.valueOf).asJava
+ )
+ )
+ case ArrayType(t: FloatType, _) =>
+ ValueProto.Value
+ .newBuilder()
+ .setFloatListVal(
+ ValueProto.FloatList.newBuilder
+ .addAllVal(value.asInstanceOf[Array[Float]].toSeq.map(java.lang.Float.valueOf).asJava)
+ )
+ case ArrayType(t: DoubleType, _) =>
+ ValueProto.Value
+ .newBuilder()
+ .setDoubleListVal(
+ ValueProto.DoubleList.newBuilder.addAllVal(
+ value.asInstanceOf[Array[Double]].toSeq.map(java.lang.Double.valueOf).asJava
+ )
+ )
+ case ArrayType(t: ByteType, _) =>
+ ValueProto.Value
+ .newBuilder()
+ .setBytesVal(ByteString.copyFrom(value.asInstanceOf[Array[Byte]]))
+ case ArrayType(t: StringType, _) =>
+ ValueProto.Value
+ .newBuilder()
+ .setStringListVal(
+ ValueProto.StringList.newBuilder
+ .addAllVal(value.asInstanceOf[Array[String]].toSeq.asJava)
+ )
+ case TimestampType =>
+ Timestamp
+ .newBuilder()
+ .setSeconds(value.asInstanceOf[java.sql.Timestamp].getTime / 1000)
+ }).build
+ }
+
+ class AsScala[A](op: => A) {
+ def asScala: A = op
+ }
+
+ implicit def protoValueAsScala(v: ValueProto.Value): AsScala[Any] = new AsScala[Any](
+ v.getValCase match {
+ case ValueProto.Value.ValCase.INT32_VAL => v.getInt32Val
+ case ValueProto.Value.ValCase.INT64_VAL => v.getInt64Val
+ case ValueProto.Value.ValCase.FLOAT_VAL => v.getFloatVal
+ case ValueProto.Value.ValCase.BOOL_VAL => v.getBoolVal
+ case ValueProto.Value.ValCase.DOUBLE_VAL => v.getDoubleVal
+ case ValueProto.Value.ValCase.STRING_VAL => v.getStringVal
+ case ValueProto.Value.ValCase.BYTES_VAL => v.getBytesVal
+ case ValueProto.Value.ValCase.INT32_LIST_VAL => v.getInt32ListVal.getValList
+ case ValueProto.Value.ValCase.INT64_LIST_VAL => v.getInt64ListVal.getValList
+ case ValueProto.Value.ValCase.FLOAT_LIST_VAL => v.getFloatListVal.getValList
+ case ValueProto.Value.ValCase.DOUBLE_LIST_VAL => v.getDoubleListVal.getValList
+ case ValueProto.Value.ValCase.BOOL_LIST_VAL => v.getBoolListVal.getValList
+ case ValueProto.Value.ValCase.STRING_LIST_VAL => v.getStringListVal.getValList
+ case ValueProto.Value.ValCase.BYTES_LIST_VAL => v.getBytesListVal.getValList
+ case ValueProto.Value.ValCase.VAL_NOT_SET =>
+ throw new RuntimeException(s"$v not a ValueProto")
+ }
+ )
+
+ implicit def timestampAsScala(t: Timestamp): AsScala[java.sql.Timestamp] =
+ new AsScala[java.sql.Timestamp](
+ new sql.Timestamp(t.getSeconds * 1000)
+ )
+
+}
diff --git a/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala b/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala
new file mode 100644
index 00000000000..e2ea12ef0a2
--- /dev/null
+++ b/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala
@@ -0,0 +1,32 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.validation
+
+import feast.ingestion.FeatureTable
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.functions.col
+
+class RowValidator(featureTable: FeatureTable) extends Serializable {
+ def allEntitiesPresent: Column =
+ featureTable.entities.map(e => col(e.name).isNotNull).reduce(_.&&(_))
+
+ def atLeastOneFeatureNotNull: Column =
+ featureTable.features.map(f => col(f.name).isNotNull).reduce(_.||(_))
+
+ def checkAll: Column =
+ allEntitiesPresent && atLeastOneFeatureNotNull
+}
diff --git a/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala b/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala
new file mode 100644
index 00000000000..5c70da2b86c
--- /dev/null
+++ b/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala
@@ -0,0 +1,60 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+import feast.ingestion.metrics.StatsdReporterWithTags
+import org.apache.spark.SecurityManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.MetricsSystem
+
+private[spark] class StatsdSinkWithTags(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager
+) extends Sink
+ with Logging {
+ import StatsdSink._
+
+ val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST)
+ val port = property.getProperty(STATSD_KEY_PORT, STATSD_DEFAULT_PORT).toInt
+
+ val pollPeriod = property.getProperty(STATSD_KEY_PERIOD, STATSD_DEFAULT_PERIOD).toInt
+ val pollUnit =
+ TimeUnit.valueOf(property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase)
+
+ val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX)
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val reporter = new StatsdReporterWithTags(registry, host, port, prefix)
+
+ override def start(): Unit = {
+ reporter.start(pollPeriod, pollUnit)
+ logInfo(s"StatsdSink started with prefix: '$prefix'")
+ }
+
+ override def stop(): Unit = {
+ reporter.stop()
+ logInfo("StatsdSink stopped.")
+ }
+
+ override def report(): Unit = reporter.report()
+}
diff --git a/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala
new file mode 100644
index 00000000000..77c9218a7ec
--- /dev/null
+++ b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala
@@ -0,0 +1,47 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics.source
+
+import com.codahale.metrics.MetricRegistry
+import org.apache.spark.{SparkConf, SparkEnv}
+
+class RedisSinkMetricSource extends Source {
+ override val sourceName: String = RedisSinkMetricSource.sourceName
+
+ override val metricRegistry: MetricRegistry = new MetricRegistry
+
+ private val sparkConfig = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf(true))
+
+ private val metricLabels = sparkConfig.get("spark.metrics.conf.*.source.redis.labels")
+
+ private def nameWithLabels(name: String) =
+ if (metricLabels.isEmpty) {
+ name
+ } else {
+ s"$name#$metricLabels"
+ }
+
+ val METRIC_TOTAL_ROWS_INSERTED =
+ metricRegistry.counter(nameWithLabels("feast_ingestion_feature_row_ingested_count"))
+
+ val METRIC_ROWS_LAG =
+ metricRegistry.histogram(nameWithLabels("feast_ingestion_feature_row_lag_ms"))
+}
+
+object RedisSinkMetricSource {
+ val sourceName = "redis_sink"
+}
diff --git a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala
new file mode 100644
index 00000000000..8887026a843
--- /dev/null
+++ b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala
@@ -0,0 +1,305 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion
+
+import java.nio.file.{Files, Paths}
+
+import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
+import com.google.protobuf.Timestamp
+import feast.ingestion.utils.TypeConversion._
+import feast.proto.types.ValueProto
+import feast.proto.types.ValueProto.ValueType
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions._
+import org.joda.time.{DateTime, Seconds}
+import org.scalacheck._
+import org.scalatest._
+import matchers.should.Matchers._
+import org.scalatest.matchers.Matcher
+
+import redis.clients.jedis.Jedis
+
+import scala.jdk.CollectionConverters._
+import scala.reflect.runtime.universe.TypeTag
+import scala.util.hashing.MurmurHash3
+
+case class Row(customer: String, feature1: Int, feature2: Float, eventTimestamp: java.sql.Timestamp)
+
+class BatchPipelineIT extends UnitSpec with ForAllTestContainer {
+
+ override val container = GenericContainer("redis:6.0.8", exposedPorts = Seq(6379))
+
+ trait SparkContext {
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName("Testing")
+ .set("spark.redis.host", container.host)
+ .set("spark.default.parallelism", "8")
+ .set("spark.redis.port", container.mappedPort(6379).toString)
+
+ val sparkSession = SparkSession
+ .builder()
+ .config(sparkConf)
+ .getOrCreate()
+ }
+
+ trait DataHelper {
+ self: SparkContext =>
+
+ def generateTempPath(last: String) =
+ Paths.get(Files.createTempDirectory("test-dir").toString, last).toString
+
+ def storeAsParquet[T <: Product: TypeTag](rows: Seq[T]): String = {
+ import self.sparkSession.implicits._
+
+ val tempPath = generateTempPath("rows")
+
+ sparkSession
+ .createDataset(rows)
+ .withColumn("date", to_date($"eventTimestamp"))
+ .write
+ .partitionBy("date")
+ .save(tempPath)
+
+ tempPath
+ }
+ }
+
+ trait Scope extends SparkContext with DataHelper {
+ val jedis = new Jedis("localhost", container.mappedPort(6379))
+ jedis.flushAll()
+
+ def rowGenerator(start: DateTime, end: DateTime, customerGen: Option[Gen[String]] = None) =
+ for {
+ customer <- customerGen.getOrElse(Gen.asciiPrintableStr)
+ feature1 <- Gen.choose(0, 100)
+ feature2 <- Gen.choose[Float](0, 1)
+ eventTimestamp <- Gen
+ .choose(0, Seconds.secondsBetween(start, end).getSeconds)
+ .map(start.withMillisOfSecond(0).plusSeconds)
+ } yield Row(customer, feature1, feature2, new java.sql.Timestamp(eventTimestamp.getMillis))
+
+ def generateDistinctRows(gen: Gen[Row], N: Int) =
+ Gen.listOfN(N, gen).sample.get.groupBy(_.customer).map(_._2.head).toSeq
+
+ val config = IngestionJobConfig(
+ featureTable = FeatureTable(
+ name = "test-fs",
+ project = "default",
+ entities = Seq(Field("customer", ValueType.Enum.STRING)),
+ features = Seq(
+ Field("feature1", ValueType.Enum.INT32),
+ Field("feature2", ValueType.Enum.FLOAT)
+ )
+ ),
+ startTime = DateTime.parse("2020-08-01"),
+ endTime = DateTime.parse("2020-09-01")
+ )
+
+ def encodeEntityKey(row: Row, featureTable: FeatureTable): Array[Byte] = {
+ val entityPrefix = featureTable.entities.map(_.name).mkString("_")
+ s"${featureTable.project}_${entityPrefix}:${row.customer}".getBytes
+ }
+
+ def encodeFeatureKey(featureTable: FeatureTable)(feature: String): String = {
+ val fullReference = s"${featureTable.name}:$feature"
+ MurmurHash3.stringHash(fullReference).toHexString
+ }
+
+ def beStoredRow(mappedRow: Map[String, Any]) = {
+ val m: Matcher[Map[String, Any]] = contain.allElementsOf(mappedRow).matcher
+
+ m compose {
+ (_: Map[Array[Byte], Array[Byte]])
+ .map { case (k, v) =>
+ (
+ new String(k),
+ if (new String(k).startsWith("_ts"))
+ Timestamp.parseFrom(v).asScala
+ else
+ ValueProto.Value.parseFrom(v).asScala
+ )
+ }
+ }
+ }
+ }
+
+ "Parquet source file" should "be ingested in redis" in new Scope {
+ val gen = rowGenerator(DateTime.parse("2020-08-01"), DateTime.parse("2020-09-01"))
+ val rows = generateDistinctRows(gen, 10000)
+ val tempPath = storeAsParquet(rows)
+ val configWithOfflineSource = config.copy(
+ source = FileSource(tempPath, Map.empty, "eventTimestamp")
+ )
+
+ BatchPipeline.createPipeline(sparkSession, configWithOfflineSource)
+
+ val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable)
+
+ rows.foreach(r => {
+ val storedValues = jedis.hgetAll(encodeEntityKey(r, config.featureTable)).asScala.toMap
+ storedValues should beStoredRow(
+ Map(
+ featureKeyEncoder("feature1") -> r.feature1,
+ featureKeyEncoder("feature2") -> r.feature2,
+ "_ts:test-fs" -> r.eventTimestamp
+ )
+ )
+ })
+ }
+
+ "Ingested rows" should "be compacted before storing by timestamp column" in new Scope {
+ val entities = (0 to 10000).map(_.toString)
+
+ val genLatest = rowGenerator(
+ DateTime.parse("2020-08-15"),
+ DateTime.parse("2020-09-01"),
+ Some(Gen.oneOf(entities))
+ )
+ val latest = generateDistinctRows(genLatest, 10000)
+
+ val genOld = rowGenerator(
+ DateTime.parse("2020-08-01"),
+ DateTime.parse("2020-08-14"),
+ Some(Gen.oneOf(entities))
+ )
+ val old = generateDistinctRows(genOld, 10000)
+
+ val tempPath = storeAsParquet(latest ++ old)
+ val configWithOfflineSource =
+ config.copy(source = FileSource(tempPath, Map.empty, "eventTimestamp"))
+
+ BatchPipeline.createPipeline(sparkSession, configWithOfflineSource)
+
+ val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable)
+
+ latest.foreach(r => {
+ val storedValues = jedis.hgetAll(encodeEntityKey(r, config.featureTable)).asScala.toMap
+ storedValues should beStoredRow(
+ Map(
+ featureKeyEncoder("feature1") -> r.feature1,
+ featureKeyEncoder("feature2") -> r.feature2,
+ "_ts:test-fs" -> r.eventTimestamp
+ )
+ )
+ })
+ }
+
+ "Old rows in ingestion" should "not overwrite more recent rows from storage" in new Scope {
+ val entities = (0 to 10000).map(_.toString)
+
+ val genLatest = rowGenerator(
+ DateTime.parse("2020-08-15"),
+ DateTime.parse("2020-09-01"),
+ Some(Gen.oneOf(entities))
+ )
+ val latest = generateDistinctRows(genLatest, 10000)
+
+ val tempPath1 = storeAsParquet(latest)
+ val config1 = config.copy(source = FileSource(tempPath1, Map.empty, "eventTimestamp"))
+
+ BatchPipeline.createPipeline(sparkSession, config1)
+
+ val genOld = rowGenerator(
+ DateTime.parse("2020-08-01"),
+ DateTime.parse("2020-08-14"),
+ Some(Gen.oneOf(entities))
+ )
+ val old = generateDistinctRows(genOld, 10000)
+
+ val tempPath2 = storeAsParquet(old)
+ val config2 = config.copy(source = FileSource(tempPath2, Map.empty, "eventTimestamp"))
+
+ BatchPipeline.createPipeline(sparkSession, config2)
+
+ val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable)
+
+ latest.foreach(r => {
+ val storedValues = jedis.hgetAll(encodeEntityKey(r, config.featureTable)).asScala.toMap
+ storedValues should beStoredRow(
+ Map(
+ featureKeyEncoder("feature1") -> r.feature1,
+ featureKeyEncoder("feature2") -> r.feature2,
+ "_ts:test-fs" -> r.eventTimestamp
+ )
+ )
+ })
+ }
+
+ "Invalid rows" should "not be ingested and stored to deadletter instead" in new Scope {
+ val gen = rowGenerator(DateTime.parse("2020-08-01"), DateTime.parse("2020-09-01"))
+ val rows = generateDistinctRows(gen, 100)
+
+ val rowsWithNullEntity = rows.map(_.copy(customer = null))
+
+ val tempPath = storeAsParquet(rowsWithNullEntity)
+ val deadletterConfig = config.copy(
+ source = FileSource(tempPath, Map.empty, "eventTimestamp"),
+ deadLetterPath = Some(generateTempPath("deadletters"))
+ )
+
+ BatchPipeline.createPipeline(sparkSession, deadletterConfig)
+
+ jedis.keys("*").toArray should be(empty)
+
+ sparkSession.read
+ .parquet(deadletterConfig.deadLetterPath.get)
+ .count() should be(rows.length)
+ }
+
+ "Columns from source" should "be mapped according to configuration" in new Scope {
+ val gen = rowGenerator(DateTime.parse("2020-08-01"), DateTime.parse("2020-09-01"))
+ val rows = generateDistinctRows(gen, 100)
+
+ val tempPath = storeAsParquet(rows)
+
+ val configWithMapping = config.copy(
+ featureTable = config.featureTable.copy(
+ entities = Seq(Field("entity", ValueType.Enum.STRING)),
+ features = Seq(
+ Field("new_feature1", ValueType.Enum.INT32),
+ Field("feature2", ValueType.Enum.FLOAT)
+ )
+ ),
+ source = FileSource(
+ tempPath,
+ Map(
+ "entity" -> "customer",
+ "new_feature1" -> "feature1"
+ ),
+ "eventTimestamp"
+ )
+ )
+
+ BatchPipeline.createPipeline(sparkSession, configWithMapping)
+
+ val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable)
+
+ rows.foreach(r => {
+ val storedValues =
+ jedis.hgetAll(encodeEntityKey(r, configWithMapping.featureTable)).asScala.toMap
+ storedValues should beStoredRow(
+ Map(
+ featureKeyEncoder("new_feature1") -> r.feature1,
+ featureKeyEncoder("feature2") -> r.feature2,
+ "_ts:test-fs" -> r.eventTimestamp
+ )
+ )
+ })
+ }
+}
diff --git a/spark/ingestion/src/test/scala/feast/ingestion/UnitSpec.scala b/spark/ingestion/src/test/scala/feast/ingestion/UnitSpec.scala
new file mode 100644
index 00000000000..25b53cb57c2
--- /dev/null
+++ b/spark/ingestion/src/test/scala/feast/ingestion/UnitSpec.scala
@@ -0,0 +1,28 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion
+
+import org.scalatest._
+import matchers._
+import org.scalatest.flatspec.AnyFlatSpec
+
+abstract class UnitSpec
+ extends AnyFlatSpec
+ with should.Matchers
+ with OptionValues
+ with Inside
+ with Inspectors
diff --git a/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala
new file mode 100644
index 00000000000..1ae61724ed1
--- /dev/null
+++ b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala
@@ -0,0 +1,107 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ * Copyright 2018-2020 The Feast Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package feast.ingestion.metrics
+
+import java.net.{DatagramPacket, DatagramSocket, SocketTimeoutException}
+import java.util
+import java.util.Collections
+
+import com.codahale.metrics.{Gauge, Histogram, MetricRegistry, UniformReservoir}
+import feast.ingestion.UnitSpec
+
+import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+
+class StatsReporterSpec extends UnitSpec {
+ class SimpleServer {
+ val socket = new DatagramSocket()
+ socket.setSoTimeout(100)
+
+ def port: Int = socket.getLocalPort
+
+ def receive: Array[String] = {
+ val messages: ArrayBuffer[String] = ArrayBuffer()
+ var finished = false
+
+ do {
+ val buf = new Array[Byte](65535)
+ val p = new DatagramPacket(buf, buf.length)
+ try {
+ socket.receive(p)
+ } catch {
+ case _: SocketTimeoutException => {
+ finished = true
+ }
+ }
+ messages += new String(p.getData, 0, p.getLength)
+ } while (!finished)
+
+ messages.toArray
+ }
+ }
+
+ trait Scope {
+ val server = new SimpleServer
+ val reporter = new StatsdReporterWithTags(
+ new MetricRegistry,
+ "127.0.0.1",
+ server.port
+ )
+
+ def gauge[A](v: A): Gauge[A] = new Gauge[A] {
+ override def getValue: A = v
+ }
+
+ def histogram(values: Seq[Int]): Histogram = {
+ val hist = new Histogram(new UniformReservoir)
+ values.foreach(hist.update)
+ hist
+ }
+ }
+
+ "Statsd reporter" should "send simple gauge unmodified" in new Scope {
+ reporter.report(
+ gauges = new util.TreeMap(
+ Map(
+ "test" -> gauge(0)
+ ).asJava
+ ),
+ counters = Collections.emptySortedMap(),
+ histograms = Collections.emptySortedMap(),
+ meters = Collections.emptySortedMap(),
+ timers = Collections.emptySortedMap()
+ )
+
+ server.receive should contain("test:0|g")
+ }
+
+ "Statsd reporter" should "keep tags part in the name's end" in new Scope {
+ reporter.report(
+ gauges = Collections.emptySortedMap(),
+ counters = Collections.emptySortedMap(),
+ histograms = new util.TreeMap(
+ Map(
+ "test#fs=name" -> histogram((1 to 100))
+ ).asJava
+ ),
+ meters = Collections.emptySortedMap(),
+ timers = Collections.emptySortedMap()
+ )
+
+ server.receive should contain("test.p95#fs=name:95.95|ms")
+ }
+}