diff --git a/.github/workflows/complete.yml b/.github/workflows/complete.yml index 354b51f0871..c57255bacb3 100644 --- a/.github/workflows/complete.yml +++ b/.github/workflows/complete.yml @@ -12,7 +12,6 @@ jobs: GITHUB_PR_SHA: ${{ github.event.pull_request.head.sha }} REGISTRY: gcr.io/kf-feast MAVEN_CACHE: gs://feast-templocation-kf-feast/.m2.2020-08-19.tar - DOCKER_BUILDKIT: '1' steps: - uses: actions/checkout@v2 - uses: GoogleCloudPlatform/github-actions/setup-gcloud@master diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 00000000000..f3c72b8ceb5 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,2 @@ +align.preset = more +maxColumn = 100 \ No newline at end of file diff --git a/pom.xml b/pom.xml index fd3faf5622f..0bcb5776716 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ common job-controller common-test + spark/ingestion @@ -85,6 +86,27 @@ 6.1.2.Final 1.6.6 + + + ${maven.multiModuleProjectDirectory} + false feast.common.auth.providers.http.client @@ -528,26 +550,7 @@ - - - + ${license.content} 1.7 @@ -558,6 +561,15 @@ + + + ${license.content} + + + 2.7.2 + ${parent.basedir}/.scalafmt.conf + + diff --git a/spark/ingestion/pom.xml b/spark/ingestion/pom.xml new file mode 100644 index 00000000000..a044ec9d603 --- /dev/null +++ b/spark/ingestion/pom.xml @@ -0,0 +1,299 @@ + + + + 4.0.0 + + + dev.feast + feast-parent + ${revision} + ../.. + + + Feast Spark Ingestion + feast-ingestion-spark + + + 2.12 + ${scala.version}.12 + 2.4.7 + 4.4.0 + 3.3.0 + 0.7-SNAPSHOT + + + + + + dev.feast + datatypes-java + ${project.version} + + + * + * + + + + + + com.google.protobuf + protobuf-java + 3.12.2 + + + + org.scala-lang + scala-library + ${scala.fullVersion} + + + + org.scala-lang.modules + scala-collection-compat_${scala.version} + 2.2.0 + + + + org.apache.spark + spark-core_${scala.version} + ${spark.version} + provided + + + + org.apache.spark + spark-streaming_${scala.version} + ${spark.version} + provided + + + + org.apache.spark + spark-sql_${scala.version} + ${spark.version} + provided + + + + org.codehaus.janino + janino + 3.0.16 + + + + org.apache.spark + spark-sql-kafka-0-10_${scala.version} + ${spark.version} + provided + + + + com.github.scopt + scopt_${scala.version} + 3.7.1 + + + + com.google.cloud.spark + spark-bigquery_${scala.version} + 0.17.2 + provided + + + + joda-time + joda-time + 2.10.6 + + + + com.redislabs + spark-redis_${scala.version} + 2.5.0 + + + + org.apache.arrow + arrow-vector + 0.16.0 + + + + io.netty + netty-all + 4.1.52.Final + + + + org.json4s + json4s-ext_${scala.version} + 3.7.0-M6 + + + + org.scalatest + scalatest_${scala.version} + 3.2.2 + test + + + + org.scalacheck + scalacheck_${scala.version} + 1.14.3 + test + + + + com.dimafeng + testcontainers-scala-scalatest_${scala.version} + 0.38.3 + test + + + + + + + src/main/scala + src/test/scala + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + ${scala.fullVersion} + + + + org.scalatest + scalatest-maven-plugin + 2.0.0 + + ${project.build.directory}/surefire-reports + . + TestSuiteReport.txt + + + + test + integration-test + + test + + + + + + org.apache.maven.plugins + maven-assembly-plugin + + + jar-with-dependencies + + + + + make-assembly + package + + single + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + com.google.protobuf + com.google.protobuf.vendor + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + org.apache.maven.plugins + maven-assembly-plugin + ${maven-assembly-plugin.version} + + + + + diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala new file mode 100644 index 00000000000..34c667b7e6d --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession + +trait BasePipeline { + def createSparkSession(jobConfig: IngestionJobConfig): SparkSession = { + // workaround for issue with arrow & netty + // see https://github.com/apache/arrow/tree/master/java#java-properties + System.setProperty("io.netty.tryReflectionSetAccessible", "true") + + val conf = new SparkConf() + conf + .setAppName(s"${jobConfig.mode} IngestionJob for ${jobConfig.featureTable.name}") + .setMaster("local") + + jobConfig.store match { + case RedisConfig(host, port) => + conf + .set("spark.redis.host", host) + .set("spark.redis.port", port.toString) + } + + jobConfig.metrics match { + case Some(c: StatsDConfig) => + conf + .set( + "spark.metrics.conf.*.source.redis.class", + "org.apache.spark.metrics.source.RedisSinkMetricSource" + ) + .set( + "spark.metrics.conf.*.source.redis.labels", + s"feature_table=${jobConfig.featureTable.name}" + ) + .set( + "spark.metrics.conf.*.sink.statsd.class", + "org.apache.spark.metrics.sink.StatsdSinkWithTags" + ) + .set("spark.metrics.conf.*.sink.statsd.host", c.host) + .set("spark.metrics.conf.*.sink.statsd.port", c.port.toString) + .set("spark.metrics.conf.*.sink.statsd.period", "30") + .set("spark.metrics.conf.*.sink.statsd.unit", "seconds") + .set("spark.metrics.namespace", jobConfig.mode.toString) + } + + SparkSession + .builder() + .config(conf) + .getOrCreate() + } + + def createPipeline(sparkSession: SparkSession, config: IngestionJobConfig): Unit +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala new file mode 100644 index 00000000000..ef463c1c11b --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion + +import feast.ingestion.sources.bq.BigQueryReader +import feast.ingestion.sources.file.FileReader +import feast.ingestion.validation.RowValidator +import org.apache.spark.sql.{Column, SparkSession} +import org.apache.spark.sql.functions.col + +/** + * Batch Ingestion Flow: + * 1. Read from source (BQ | File) + * 2. Map source columns to FeatureTable's schema + * 3. Validate + * 4. Store valid rows in redis + * 5. Store invalid rows in parquet format at `deadletter` destination + */ +object BatchPipeline extends BasePipeline { + override def createPipeline(sparkSession: SparkSession, config: IngestionJobConfig): Unit = { + val featureTable = config.featureTable + val projection = + inputProjection(config.source, featureTable.features, featureTable.entities) + val validator = new RowValidator(featureTable) + + val input = config.source match { + case source: BQSource => + BigQueryReader.createBatchSource( + sparkSession.sqlContext, + source, + config.startTime, + config.endTime + ) + case source: FileSource => + FileReader.createBatchSource( + sparkSession.sqlContext, + source, + config.startTime, + config.endTime + ) + } + + val projected = input.select(projection: _*).cache() + + val validRows = projected + .filter(validator.checkAll) + + validRows.write + .format("feast.ingestion.stores.redis") + .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) + .option("namespace", featureTable.name) + .option("project_name", featureTable.project) + .option("timestamp_column", config.source.timestampColumn) + .save() + + config.deadLetterPath match { + case Some(path) => + projected + .filter(!validator.checkAll) + .write + .format("parquet") + .save(path) + case _ => None + } + + } + + /** + * Build column projection using custom mapping with fallback to feature|entity names. + */ + private def inputProjection( + source: Source, + features: Seq[Field], + entities: Seq[Field] + ): Array[Column] = { + val featureColumns = features + .filter(f => !source.mapping.contains(f.name)) + .map(f => (f.name, f.name)) ++ source.mapping + + val timestampColumn = Seq((source.timestampColumn, source.timestampColumn)) + val entitiesColumns = + entities + .filter(e => !source.mapping.contains(e.name)) + .map(e => (e.name, e.name)) + + (featureColumns ++ entitiesColumns ++ timestampColumn).map { case (alias, source) => + col(source).alias(alias) + }.toArray + } +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala new file mode 100644 index 00000000000..14e26039829 --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion + +import org.joda.time.DateTime + +import org.json4s._ +import org.json4s.jackson.JsonMethods.{parse => parseJSON} +import org.json4s.ext.JavaEnumNameSerializer + +object IngestionJob { + import Modes._ + implicit val modesRead: scopt.Read[Modes.Value] = scopt.Read.reads(Modes withName _.capitalize) + implicit val formats: Formats = DefaultFormats + + new JavaEnumNameSerializer[feast.proto.types.ValueProto.ValueType.Enum]() + + val parser = new scopt.OptionParser[IngestionJobConfig]("IngestionJon") { + // ToDo: read version from Manifest + head("feast.ingestion.IngestionJob", "0.8-SNAPSHOT") + + opt[Modes]("mode") + .action((x, c) => c.copy(mode = x)) + .required() + .text("Mode to operate ingestion job (offline or online)") + + opt[String](name = "source") + .action((x, c) => + parseJSON(x).extract[Sources] match { + case Sources(file: Some[FileSource], _, _) => c.copy(source = file.get) + case Sources(_, bq: Some[BQSource], _) => c.copy(source = bq.get) + case Sources(_, _, kafka: Some[KafkaSource]) => c.copy(source = kafka.get) + } + ) + .required() + .text("JSON-encoded source object (e.g. {\"kafka\":{\"bootstrapServers\":...}}") + + opt[String](name = "feature-table") + .action((x, c) => c.copy(featureTable = parseJSON(x).extract[FeatureTable])) + .required() + .text("JSON-encoded FeatureTableSpec object") + + opt[String](name = "start") + .action((x, c) => c.copy(startTime = DateTime.parse(x))) + .text("Start timestamp for offline ingestion") + + opt[String](name = "end") + .action((x, c) => c.copy(endTime = DateTime.parse(x))) + .text("End timestamp for offline ingestion") + + opt[String](name = "redis") + .action((x, c) => c.copy(store = parseJSON(x).extract[RedisConfig])) + + opt[String](name = "statsd") + .action((x, c) => c.copy(metrics = Some(parseJSON(x).extract[StatsDConfig]))) + + opt[String](name = "deadletter-path") + .action((x, c) => c.copy(deadLetterPath = Some(x))) + + } + + def main(args: Array[String]): Unit = { + parser.parse(args, IngestionJobConfig()) match { + case Some(config) => + config.mode match { + case Modes.Offline => + val sparkSession = BatchPipeline.createSparkSession(config) + BatchPipeline.createPipeline(sparkSession, config) + } + case None => + println("Parameters can't be parsed") + } + } + +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala new file mode 100644 index 00000000000..880f00f37ff --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -0,0 +1,90 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion + +import feast.ingestion.Modes.Modes +import org.joda.time.DateTime + +object Modes extends Enumeration { + type Modes = Value + val Offline, Online = Value +} + +abstract class StoreConfig + +case class RedisConfig(host: String, port: Int) extends StoreConfig + +abstract class MetricConfig + +case class StatsDConfig(host: String, port: Int) extends MetricConfig + +abstract class Source { + def mapping: Map[String, String] + + def timestampColumn: String +} + +abstract class BatchSource extends Source + +abstract class StreamingSource extends Source + +case class FileSource( + path: String, + override val mapping: Map[String, String], + override val timestampColumn: String +) extends BatchSource + +case class BQSource( + project: String, + dataset: String, + table: String, + override val mapping: Map[String, String], + override val timestampColumn: String +) extends BatchSource + +case class KafkaSource( + bootstrapServers: String, + topic: String, + override val mapping: Map[String, String], + override val timestampColumn: String +) extends StreamingSource + +case class Sources( + file: Option[FileSource] = None, + bq: Option[BQSource] = None, + kafka: Option[KafkaSource] = None +) + +case class Field(name: String, `type`: feast.proto.types.ValueProto.ValueType.Enum) + +case class FeatureTable( + name: String, + project: String, + entities: Seq[Field], + features: Seq[Field] +) + +case class IngestionJobConfig( + mode: Modes = Modes.Offline, + featureTable: FeatureTable = null, + source: Source = null, + startTime: DateTime = DateTime.now(), + endTime: DateTime = DateTime.now(), + store: StoreConfig = RedisConfig("localhost", 6379), + metrics: Option[MetricConfig] = Some(StatsDConfig("localhost", 9125)), + deadLetterPath: Option[String] = None +) diff --git a/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala b/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala new file mode 100644 index 00000000000..66b48dd444e --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/metrics/StatsdReporterWithTags.scala @@ -0,0 +1,201 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.metrics + +import java.io.IOException +import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.SortedMap +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +import com.codahale.metrics._ +import org.apache.hadoop.net.NetUtils + +import org.apache.spark.internal.Logging + +/** + * @see + * StatsD metric types + */ +object StatsdMetricType { + val COUNTER = "c" + val GAUGE = "g" + val TIMER = "ms" + val Set = "s" +} + +/** + * This is fork of internal spark implementation of StatsdReporter. + * The main difference between this implementation & spark is support of tags inside metric names. + * + * When we extend metric name (by adding some suffix) we must keep tags part in the end of the name. + * Labels are separated from name by # (see tagging extensions of prometheus exporter + * https://github.com/prometheus/statsd_exporter#tagging-extensions) + * E.g., + * + * metric_name#tag=value + count -> metric_name.count#tag=value + */ +class StatsdReporterWithTags( + registry: MetricRegistry, + host: String = "127.0.0.1", + port: Int = 8125, + prefix: String = "", + filter: MetricFilter = MetricFilter.ALL, + rateUnit: TimeUnit = TimeUnit.SECONDS, + durationUnit: TimeUnit = TimeUnit.MILLISECONDS +) extends ScheduledReporter(registry, "statsd-reporter", filter, rateUnit, durationUnit) + with Logging { + + import StatsdMetricType._ + + private val address = new InetSocketAddress(host, port) + private val whitespace = "[\\s]+".r + + override def report( + gauges: SortedMap[String, Gauge[_]], + counters: SortedMap[String, Counter], + histograms: SortedMap[String, Histogram], + meters: SortedMap[String, Meter], + timers: SortedMap[String, Timer] + ): Unit = + Try(new DatagramSocket) match { + case Failure(ioe: IOException) => + logWarning( + "StatsD datagram socket construction failed", + NetUtils.wrapException(host, port, NetUtils.getHostname(), 0, ioe) + ) + case Failure(e) => logWarning("StatsD datagram socket construction failed", e) + case Success(s) => + implicit val socket = s + val localAddress = Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null) + val localPort = socket.getLocalPort + Try { + gauges.entrySet.asScala.foreach(e => reportGauge(e.getKey, e.getValue)) + counters.entrySet.asScala.foreach(e => reportCounter(e.getKey, e.getValue)) + histograms.entrySet.asScala.foreach(e => reportHistogram(e.getKey, e.getValue)) + meters.entrySet.asScala.foreach(e => reportMetered(e.getKey, e.getValue)) + timers.entrySet.asScala.foreach(e => reportTimer(e.getKey, e.getValue)) + } recover { + case ioe: IOException => + logDebug( + s"Unable to send packets to StatsD", + NetUtils.wrapException( + address.getHostString, + address.getPort, + localAddress, + localPort, + ioe + ) + ) + case e: Throwable => logDebug(s"Unable to send packets to StatsD at '$host:$port'", e) + } + Try(socket.close()) recover { + case ioe: IOException => + logDebug( + "Error when close socket to StatsD", + NetUtils.wrapException( + address.getHostString, + address.getPort, + localAddress, + localPort, + ioe + ) + ) + case e: Throwable => logDebug("Error when close socket to StatsD", e) + } + } + + private def reportGauge(name: String, gauge: Gauge[_])(implicit socket: DatagramSocket): Unit = + formatAny(gauge.getValue).foreach(v => send(fullName(name), v, GAUGE)) + + private def reportCounter(name: String, counter: Counter)(implicit socket: DatagramSocket): Unit = + send(fullName(name), format(counter.getCount), COUNTER) + + private def reportHistogram(name: String, histogram: Histogram)(implicit + socket: DatagramSocket + ): Unit = { + val snapshot = histogram.getSnapshot + send(fullName(name, "count"), format(histogram.getCount), GAUGE) + send(fullName(name, "max"), format(snapshot.getMax), TIMER) + send(fullName(name, "mean"), format(snapshot.getMean), TIMER) + send(fullName(name, "min"), format(snapshot.getMin), TIMER) + send(fullName(name, "stddev"), format(snapshot.getStdDev), TIMER) + send(fullName(name, "p50"), format(snapshot.getMedian), TIMER) + send(fullName(name, "p75"), format(snapshot.get75thPercentile), TIMER) + send(fullName(name, "p95"), format(snapshot.get95thPercentile), TIMER) + send(fullName(name, "p98"), format(snapshot.get98thPercentile), TIMER) + send(fullName(name, "p99"), format(snapshot.get99thPercentile), TIMER) + send(fullName(name, "p999"), format(snapshot.get999thPercentile), TIMER) + } + + private def reportMetered(name: String, meter: Metered)(implicit socket: DatagramSocket): Unit = { + send(fullName(name, "count"), format(meter.getCount), GAUGE) + send(fullName(name, "m1_rate"), format(convertRate(meter.getOneMinuteRate)), TIMER) + send(fullName(name, "m5_rate"), format(convertRate(meter.getFiveMinuteRate)), TIMER) + send(fullName(name, "m15_rate"), format(convertRate(meter.getFifteenMinuteRate)), TIMER) + send(fullName(name, "mean_rate"), format(convertRate(meter.getMeanRate)), TIMER) + } + + private def reportTimer(name: String, timer: Timer)(implicit socket: DatagramSocket): Unit = { + val snapshot = timer.getSnapshot + send(fullName(name, "max"), format(convertDuration(snapshot.getMax)), TIMER) + send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)), TIMER) + send(fullName(name, "min"), format(convertDuration(snapshot.getMin)), TIMER) + send(fullName(name, "stddev"), format(convertDuration(snapshot.getStdDev)), TIMER) + send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)), TIMER) + send(fullName(name, "p75"), format(convertDuration(snapshot.get75thPercentile)), TIMER) + send(fullName(name, "p95"), format(convertDuration(snapshot.get95thPercentile)), TIMER) + send(fullName(name, "p98"), format(convertDuration(snapshot.get98thPercentile)), TIMER) + send(fullName(name, "p99"), format(convertDuration(snapshot.get99thPercentile)), TIMER) + send(fullName(name, "p999"), format(convertDuration(snapshot.get999thPercentile)), TIMER) + + reportMetered(name, timer) + } + + private def send(name: String, value: String, metricType: String)(implicit + socket: DatagramSocket + ): Unit = { + val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8) + val packet = new DatagramPacket(bytes, bytes.length, address) + socket.send(packet) + } + + private val nameWithTag = """(\S+)#(\S+)""".r + + private def fullName(name: String, suffixes: String*): String = name match { + case nameWithTag(name, tags) => + MetricRegistry.name(prefix, name +: suffixes: _*) ++ "#" ++ tags + case _ => + MetricRegistry.name(prefix, name +: suffixes: _*) + } + + private def sanitize(s: String): String = whitespace.replaceAllIn(s, "-") + + private def format(v: Any): String = formatAny(v).getOrElse("") + + private def formatAny(v: Any): Option[String] = + v match { + case f: Float => Some("%2.2f".format(f)) + case d: Double => Some("%2.2f".format(d)) + case b: BigDecimal => Some("%2.2f".format(b)) + case n: Number => Some(n.toString) + case _ => None + } +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala new file mode 100644 index 00000000000..5dc5a7fddd5 --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.sources.bq + +import java.sql.Timestamp + +import feast.ingestion.BQSource +import org.joda.time.DateTime +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.functions.col + +object BigQueryReader { + def createBatchSource( + sqlContext: SQLContext, + source: BQSource, + start: DateTime, + end: DateTime + ): DataFrame = { + sqlContext.read + .format("bigquery") + .load(s"${source.project}.${source.dataset}.${source.table}") + .filter(col(source.timestampColumn) >= new Timestamp(start.getMillis)) + .filter(col(source.timestampColumn) < new Timestamp(end.getMillis)) + } +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala new file mode 100644 index 00000000000..e099a87aa3e --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.sources.file + +import java.sql.Timestamp + +import feast.ingestion.FileSource +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.joda.time.DateTime + +object FileReader { + def createBatchSource( + sqlContext: SQLContext, + source: FileSource, + start: DateTime, + end: DateTime + ): DataFrame = { + sqlContext.read + .parquet(source.path) + .filter(col(source.timestampColumn) >= new Timestamp(start.getMillis)) + .filter(col(source.timestampColumn) < new Timestamp(end.getMillis)) + } +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/DefaultSource.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/DefaultSource.scala new file mode 100644 index 00000000000..ec09a0bed1a --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/DefaultSource.scala @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.stores.redis + +import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} +import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, RelationProvider} + +/** + * Entrypoint to Redis Storage. Implements only `CreatableRelationProvider` since it's only possible write to Redis. + * Here we parse configuration from spark parameters & provide SparkRedisConfig to `RedisSinkRelation` + */ +class RedisRelationProvider extends CreatableRelationProvider { + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame + ): BaseRelation = { + val config = SparkRedisConfig.parse(parameters) + val relation = new RedisSinkRelation(sqlContext, config) + + relation.insert(data, overwrite = false) + + relation + } +} + +class DefaultSource extends RedisRelationProvider diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/HashTypePersistence.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/HashTypePersistence.scala new file mode 100644 index 00000000000..f1f3f8a5db8 --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/HashTypePersistence.scala @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.stores.redis + +import org.apache.spark.sql.Row +import org.apache.spark.sql.types._ +import redis.clients.jedis.{Pipeline, Response} + +import scala.jdk.CollectionConverters._ +import com.google.protobuf.Timestamp +import feast.ingestion.utils.TypeConversion +import scala.util.hashing.MurmurHash3 + +/** + * Use Redis hash type as storage layout. Every feature is stored as separate entry in Hash. + * Also additional `timestamp` column is stored per FeatureTable to track update time. + * + * Keys are hashed as murmur3(`featureTableName` : `featureName`). + * Values are serialized with protobuf (`ValueProto`). + */ +class HashTypePersistence(config: SparkRedisConfig) extends Persistence with Serializable { + def encodeRow( + keyColumns: Array[String], + timestampField: String, + value: Row + ): Map[Array[Byte], Array[Byte]] = { + val fields = value.schema.fields.map(_.name) + val types = value.schema.fields.map(f => (f.name, f.dataType)).toMap + val kvMap = value.getValuesMap[Any](fields) + + val values = kvMap + .filter { case (_, v) => + // don't store null values + v != null + } + .filter { case (k, _) => + // don't store entities & timestamp + !keyColumns.contains(k) && k != config.timestampColumn + } + .map { case (k, v) => + encodeKey(k) -> encodeValue(v, types(k)) + } + + val timestamp = Seq( + ( + timestampField.getBytes, + encodeValue(value.getAs[Timestamp](config.timestampColumn), TimestampType) + ) + ) + + values ++ timestamp + } + + def encodeValue(value: Any, `type`: DataType): Array[Byte] = { + TypeConversion.sqlTypeToProtoValue(value, `type`).toByteArray + } + + def encodeKey(key: String): Array[Byte] = { + val fullFeatureReference = s"${config.namespace}:$key" + MurmurHash3.stringHash(fullFeatureReference).toHexString.getBytes + } + + def save( + pipeline: Pipeline, + key: String, + value: Map[Array[Byte], Array[Byte]], + ttl: Int + ): Unit = { + pipeline.hset(key.getBytes, value.asJava) + if (ttl > 0) { + pipeline.expire(key, ttl) + } + } + + def getTimestamp( + pipeline: Pipeline, + key: String, + timestampField: String + ): Response[Array[Byte]] = { + pipeline.hget(key.getBytes(), timestampField.getBytes) + } + +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/Persistence.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/Persistence.scala new file mode 100644 index 00000000000..1be24737b03 --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/Persistence.scala @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.stores.redis + +import org.apache.spark.sql.Row +import redis.clients.jedis.{Pipeline, Response} + +trait Persistence { + def encodeRow( + keyColumns: Array[String], + timestampField: String, + value: Row + ): Map[Array[Byte], Array[Byte]] + + def save( + pipeline: Pipeline, + key: String, + value: Map[Array[Byte], Array[Byte]], + ttl: Int + ): Unit + + def getTimestamp( + pipeline: Pipeline, + key: String, + timestampField: String + ): Response[Array[Byte]] +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala new file mode 100644 index 00000000000..0c228beedc4 --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/RedisSinkRelation.scala @@ -0,0 +1,138 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.stores.redis + +import com.google.protobuf.Timestamp +import com.redislabs.provider.redis.{ReadWriteConfig, RedisConfig, RedisEndpoint} +import com.redislabs.provider.redis.rdd.Keys.groupKeysByNode +import com.redislabs.provider.redis.util.PipelineUtils.{foreachWithPipeline, mapWithPipeline} +import org.apache.spark.SparkEnv +import org.apache.spark.metrics.source.RedisSinkMetricSource +import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +/** + * High-level writer to Redis. Relies on `Persistence` implementation for actual storage layout. + * Here we define general flow: + * + * 1. Deduplicate rows within one batch (group by key and get only latest (by timestamp)) + * 2. Read last-stored timestamp from Redis + * 3. Check if current timestamp is more recent than already saved one + * 4. Save to storage if it's the case + */ +class RedisSinkRelation(override val sqlContext: SQLContext, config: SparkRedisConfig) + extends BaseRelation + with InsertableRelation + with Serializable { + private implicit val redisConfig: RedisConfig = { + new RedisConfig( + new RedisEndpoint(sqlContext.sparkContext.getConf) + ) + } + + private implicit val readWriteConfig: ReadWriteConfig = { + ReadWriteConfig.fromSparkConf(sqlContext.sparkContext.getConf) + } + + override def schema: StructType = ??? + + val persistence: Persistence = new HashTypePersistence(config) + + override def insert(data: DataFrame, overwrite: Boolean): Unit = { + // repartition for deduplication + val dataToStore = + if (config.repartitionByEntity) + data.repartition(config.entityColumns.map(col): _*) + else data + + dataToStore.foreachPartition { partition: Iterator[Row] => + // grouped iterator to only allocate memory for a portion of rows + partition.grouped(config.iteratorGroupingSize).foreach { batch => + // group by key and keep only latest row per each key + val rowsWithKey: Map[String, Row] = + compactRowsToLatestTimestamp(batch.map(row => dataKeyId(row) -> row)).toMap + + groupKeysByNode(redisConfig.hosts, rowsWithKey.keysIterator).foreach { case (node, keys) => + val conn = node.connect() + // retrieve latest stored timestamp per key + val timestamps = mapWithPipeline(conn, keys) { (pipeline, key) => + persistence.getTimestamp(pipeline, key, timestampField) + } + + val timestampByKey = timestamps + .map(_.asInstanceOf[Array[Byte]]) + .map( + Option(_) + .map(Timestamp.parseFrom) + .map(t => new java.sql.Timestamp(t.getSeconds * 1000)) + ) + .zip(rowsWithKey.keys) + .map(_.swap) + .toMap + + foreachWithPipeline(conn, keys) { (pipeline, key) => + val row = rowsWithKey(key) + + timestampByKey(key) match { + case Some(t) if !t.before(row.getAs[java.sql.Timestamp](config.timestampColumn)) => () + case _ => + if (metricSource.nonEmpty) { + val lag = System.currentTimeMillis() - row + .getAs[java.sql.Timestamp](config.timestampColumn) + .getTime + + metricSource.get.METRIC_TOTAL_ROWS_INSERTED.inc() + metricSource.get.METRIC_ROWS_LAG.update(lag) + } + + val encodedRow = persistence.encodeRow(config.entityColumns, timestampField, row) + persistence.save(pipeline, key, encodedRow, ttl = 0) + } + } + conn.close() + } + } + } + } + + private def compactRowsToLatestTimestamp(rows: Seq[(String, Row)]) = rows + .groupBy(_._1) + .values + .map(_.maxBy(_._2.getAs[java.sql.Timestamp](config.timestampColumn).getTime)) + + /** + * Key is built from entities columns values with prefix of entities columns names. + */ + private def dataKeyId(row: Row): String = { + val sortedEntities = config.entityColumns.sorted + val entityKey = sortedEntities.map(row.getAs[Any]).map(_.toString).mkString(":") + val entityPrefix = sortedEntities.mkString("_") + s"${config.projectName}_${entityPrefix}:$entityKey" + } + + private def timestampField: String = { + s"${config.timestampPrefix}:${config.namespace}" + } + + private lazy val metricSource: Option[RedisSinkMetricSource] = + SparkEnv.get.metricsSystem.getSourcesByName(RedisSinkMetricSource.sourceName) match { + case Seq(head) => Some(head.asInstanceOf[RedisSinkMetricSource]) + case _ => None + } +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala new file mode 100644 index 00000000000..389607ce99e --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/stores/redis/SparkRedisConfig.scala @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.stores.redis + +case class SparkRedisConfig( + namespace: String, + projectName: String, + entityColumns: Array[String], + timestampColumn: String, + iteratorGroupingSize: Int = 1000, + timestampPrefix: String = "_ts", + repartitionByEntity: Boolean = true +) + +object SparkRedisConfig { + val NAMESPACE = "namespace" + val ENTITY_COLUMNS = "entity_columns" + val TS_COLUMN = "timestamp_column" + val ENTITY_REPARTITION = "entity_repartition" + val PROJECT_NAME = "project_name" + + def parse(parameters: Map[String, String]): SparkRedisConfig = + SparkRedisConfig( + namespace = parameters.getOrElse(NAMESPACE, ""), + projectName = parameters.getOrElse(PROJECT_NAME, "default"), + entityColumns = parameters.getOrElse(ENTITY_COLUMNS, "").split(","), + timestampColumn = parameters.getOrElse(TS_COLUMN, "event_timestamp"), + repartitionByEntity = parameters.getOrElse(ENTITY_REPARTITION, "true") == "true" + ) +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/utils/TypeConversion.scala b/spark/ingestion/src/main/scala/feast/ingestion/utils/TypeConversion.scala new file mode 100644 index 00000000000..0846b95ce95 --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/utils/TypeConversion.scala @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.utils + +import java.sql + +import com.google.protobuf.{ByteString, Message, Timestamp} +import feast.proto.types.ValueProto +import org.apache.spark.sql.types._ + +import scala.collection.JavaConverters._ + +object TypeConversion { + def sqlTypeToProtoValue(value: Any, `type`: DataType): Message = { + (`type` match { + case IntegerType => ValueProto.Value.newBuilder().setInt32Val(value.asInstanceOf[Int]) + case LongType => ValueProto.Value.newBuilder().setInt64Val(value.asInstanceOf[Long]) + case StringType => ValueProto.Value.newBuilder().setStringVal(value.asInstanceOf[String]) + case DoubleType => ValueProto.Value.newBuilder().setDoubleVal(value.asInstanceOf[Double]) + case FloatType => ValueProto.Value.newBuilder().setFloatVal(value.asInstanceOf[Float]) + case StringType => ValueProto.Value.newBuilder().setStringVal(value.asInstanceOf[String]) + case ArrayType(t: IntegerType, _) => + ValueProto.Value + .newBuilder() + .setInt32ListVal( + ValueProto.Int32List.newBuilder + .addAllVal(value.asInstanceOf[Array[Integer]].toSeq.asJava) + ) + case ArrayType(t: LongType, _) => + ValueProto.Value + .newBuilder() + .setInt64ListVal( + ValueProto.Int64List.newBuilder + .addAllVal(value.asInstanceOf[Array[Long]].toSeq.map(java.lang.Long.valueOf).asJava) + ) + case ArrayType(t: BooleanType, _) => + ValueProto.Value + .newBuilder() + .setBoolListVal( + ValueProto.BoolList.newBuilder.addAllVal( + value.asInstanceOf[Array[Boolean]].toSeq.map(java.lang.Boolean.valueOf).asJava + ) + ) + case ArrayType(t: FloatType, _) => + ValueProto.Value + .newBuilder() + .setFloatListVal( + ValueProto.FloatList.newBuilder + .addAllVal(value.asInstanceOf[Array[Float]].toSeq.map(java.lang.Float.valueOf).asJava) + ) + case ArrayType(t: DoubleType, _) => + ValueProto.Value + .newBuilder() + .setDoubleListVal( + ValueProto.DoubleList.newBuilder.addAllVal( + value.asInstanceOf[Array[Double]].toSeq.map(java.lang.Double.valueOf).asJava + ) + ) + case ArrayType(t: ByteType, _) => + ValueProto.Value + .newBuilder() + .setBytesVal(ByteString.copyFrom(value.asInstanceOf[Array[Byte]])) + case ArrayType(t: StringType, _) => + ValueProto.Value + .newBuilder() + .setStringListVal( + ValueProto.StringList.newBuilder + .addAllVal(value.asInstanceOf[Array[String]].toSeq.asJava) + ) + case TimestampType => + Timestamp + .newBuilder() + .setSeconds(value.asInstanceOf[java.sql.Timestamp].getTime / 1000) + }).build + } + + class AsScala[A](op: => A) { + def asScala: A = op + } + + implicit def protoValueAsScala(v: ValueProto.Value): AsScala[Any] = new AsScala[Any]( + v.getValCase match { + case ValueProto.Value.ValCase.INT32_VAL => v.getInt32Val + case ValueProto.Value.ValCase.INT64_VAL => v.getInt64Val + case ValueProto.Value.ValCase.FLOAT_VAL => v.getFloatVal + case ValueProto.Value.ValCase.BOOL_VAL => v.getBoolVal + case ValueProto.Value.ValCase.DOUBLE_VAL => v.getDoubleVal + case ValueProto.Value.ValCase.STRING_VAL => v.getStringVal + case ValueProto.Value.ValCase.BYTES_VAL => v.getBytesVal + case ValueProto.Value.ValCase.INT32_LIST_VAL => v.getInt32ListVal.getValList + case ValueProto.Value.ValCase.INT64_LIST_VAL => v.getInt64ListVal.getValList + case ValueProto.Value.ValCase.FLOAT_LIST_VAL => v.getFloatListVal.getValList + case ValueProto.Value.ValCase.DOUBLE_LIST_VAL => v.getDoubleListVal.getValList + case ValueProto.Value.ValCase.BOOL_LIST_VAL => v.getBoolListVal.getValList + case ValueProto.Value.ValCase.STRING_LIST_VAL => v.getStringListVal.getValList + case ValueProto.Value.ValCase.BYTES_LIST_VAL => v.getBytesListVal.getValList + case ValueProto.Value.ValCase.VAL_NOT_SET => + throw new RuntimeException(s"$v not a ValueProto") + } + ) + + implicit def timestampAsScala(t: Timestamp): AsScala[java.sql.Timestamp] = + new AsScala[java.sql.Timestamp]( + new sql.Timestamp(t.getSeconds * 1000) + ) + +} diff --git a/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala b/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala new file mode 100644 index 00000000000..e2ea12ef0a2 --- /dev/null +++ b/spark/ingestion/src/main/scala/feast/ingestion/validation/RowValidator.scala @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.validation + +import feast.ingestion.FeatureTable +import org.apache.spark.sql.Column +import org.apache.spark.sql.functions.col + +class RowValidator(featureTable: FeatureTable) extends Serializable { + def allEntitiesPresent: Column = + featureTable.entities.map(e => col(e.name).isNotNull).reduce(_.&&(_)) + + def atLeastOneFeatureNotNull: Column = + featureTable.features.map(f => col(f.name).isNotNull).reduce(_.||(_)) + + def checkAll: Column = + allEntitiesPresent && atLeastOneFeatureNotNull +} diff --git a/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala b/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala new file mode 100644 index 00000000000..5c70da2b86c --- /dev/null +++ b/spark/ingestion/src/main/scala/org/apache/spark/metrics/sink/StatsdSinkWithTags.scala @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.MetricRegistry +import feast.ingestion.metrics.StatsdReporterWithTags +import org.apache.spark.SecurityManager +import org.apache.spark.internal.Logging +import org.apache.spark.metrics.MetricsSystem + +private[spark] class StatsdSinkWithTags( + val property: Properties, + val registry: MetricRegistry, + securityMgr: SecurityManager +) extends Sink + with Logging { + import StatsdSink._ + + val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST) + val port = property.getProperty(STATSD_KEY_PORT, STATSD_DEFAULT_PORT).toInt + + val pollPeriod = property.getProperty(STATSD_KEY_PERIOD, STATSD_DEFAULT_PERIOD).toInt + val pollUnit = + TimeUnit.valueOf(property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase) + + val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val reporter = new StatsdReporterWithTags(registry, host, port, prefix) + + override def start(): Unit = { + reporter.start(pollPeriod, pollUnit) + logInfo(s"StatsdSink started with prefix: '$prefix'") + } + + override def stop(): Unit = { + reporter.stop() + logInfo("StatsdSink stopped.") + } + + override def report(): Unit = reporter.report() +} diff --git a/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala new file mode 100644 index 00000000000..77c9218a7ec --- /dev/null +++ b/spark/ingestion/src/main/scala/org/apache/spark/metrics/source/RedisSinkMetricSource.scala @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.metrics.source + +import com.codahale.metrics.MetricRegistry +import org.apache.spark.{SparkConf, SparkEnv} + +class RedisSinkMetricSource extends Source { + override val sourceName: String = RedisSinkMetricSource.sourceName + + override val metricRegistry: MetricRegistry = new MetricRegistry + + private val sparkConfig = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf(true)) + + private val metricLabels = sparkConfig.get("spark.metrics.conf.*.source.redis.labels") + + private def nameWithLabels(name: String) = + if (metricLabels.isEmpty) { + name + } else { + s"$name#$metricLabels" + } + + val METRIC_TOTAL_ROWS_INSERTED = + metricRegistry.counter(nameWithLabels("feast_ingestion_feature_row_ingested_count")) + + val METRIC_ROWS_LAG = + metricRegistry.histogram(nameWithLabels("feast_ingestion_feature_row_lag_ms")) +} + +object RedisSinkMetricSource { + val sourceName = "redis_sink" +} diff --git a/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala new file mode 100644 index 00000000000..8887026a843 --- /dev/null +++ b/spark/ingestion/src/test/scala/feast/ingestion/BatchPipelineIT.scala @@ -0,0 +1,305 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion + +import java.nio.file.{Files, Paths} + +import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer} +import com.google.protobuf.Timestamp +import feast.ingestion.utils.TypeConversion._ +import feast.proto.types.ValueProto +import feast.proto.types.ValueProto.ValueType +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions._ +import org.joda.time.{DateTime, Seconds} +import org.scalacheck._ +import org.scalatest._ +import matchers.should.Matchers._ +import org.scalatest.matchers.Matcher + +import redis.clients.jedis.Jedis + +import scala.jdk.CollectionConverters._ +import scala.reflect.runtime.universe.TypeTag +import scala.util.hashing.MurmurHash3 + +case class Row(customer: String, feature1: Int, feature2: Float, eventTimestamp: java.sql.Timestamp) + +class BatchPipelineIT extends UnitSpec with ForAllTestContainer { + + override val container = GenericContainer("redis:6.0.8", exposedPorts = Seq(6379)) + + trait SparkContext { + val sparkConf = new SparkConf() + .setMaster("local[4]") + .setAppName("Testing") + .set("spark.redis.host", container.host) + .set("spark.default.parallelism", "8") + .set("spark.redis.port", container.mappedPort(6379).toString) + + val sparkSession = SparkSession + .builder() + .config(sparkConf) + .getOrCreate() + } + + trait DataHelper { + self: SparkContext => + + def generateTempPath(last: String) = + Paths.get(Files.createTempDirectory("test-dir").toString, last).toString + + def storeAsParquet[T <: Product: TypeTag](rows: Seq[T]): String = { + import self.sparkSession.implicits._ + + val tempPath = generateTempPath("rows") + + sparkSession + .createDataset(rows) + .withColumn("date", to_date($"eventTimestamp")) + .write + .partitionBy("date") + .save(tempPath) + + tempPath + } + } + + trait Scope extends SparkContext with DataHelper { + val jedis = new Jedis("localhost", container.mappedPort(6379)) + jedis.flushAll() + + def rowGenerator(start: DateTime, end: DateTime, customerGen: Option[Gen[String]] = None) = + for { + customer <- customerGen.getOrElse(Gen.asciiPrintableStr) + feature1 <- Gen.choose(0, 100) + feature2 <- Gen.choose[Float](0, 1) + eventTimestamp <- Gen + .choose(0, Seconds.secondsBetween(start, end).getSeconds) + .map(start.withMillisOfSecond(0).plusSeconds) + } yield Row(customer, feature1, feature2, new java.sql.Timestamp(eventTimestamp.getMillis)) + + def generateDistinctRows(gen: Gen[Row], N: Int) = + Gen.listOfN(N, gen).sample.get.groupBy(_.customer).map(_._2.head).toSeq + + val config = IngestionJobConfig( + featureTable = FeatureTable( + name = "test-fs", + project = "default", + entities = Seq(Field("customer", ValueType.Enum.STRING)), + features = Seq( + Field("feature1", ValueType.Enum.INT32), + Field("feature2", ValueType.Enum.FLOAT) + ) + ), + startTime = DateTime.parse("2020-08-01"), + endTime = DateTime.parse("2020-09-01") + ) + + def encodeEntityKey(row: Row, featureTable: FeatureTable): Array[Byte] = { + val entityPrefix = featureTable.entities.map(_.name).mkString("_") + s"${featureTable.project}_${entityPrefix}:${row.customer}".getBytes + } + + def encodeFeatureKey(featureTable: FeatureTable)(feature: String): String = { + val fullReference = s"${featureTable.name}:$feature" + MurmurHash3.stringHash(fullReference).toHexString + } + + def beStoredRow(mappedRow: Map[String, Any]) = { + val m: Matcher[Map[String, Any]] = contain.allElementsOf(mappedRow).matcher + + m compose { + (_: Map[Array[Byte], Array[Byte]]) + .map { case (k, v) => + ( + new String(k), + if (new String(k).startsWith("_ts")) + Timestamp.parseFrom(v).asScala + else + ValueProto.Value.parseFrom(v).asScala + ) + } + } + } + } + + "Parquet source file" should "be ingested in redis" in new Scope { + val gen = rowGenerator(DateTime.parse("2020-08-01"), DateTime.parse("2020-09-01")) + val rows = generateDistinctRows(gen, 10000) + val tempPath = storeAsParquet(rows) + val configWithOfflineSource = config.copy( + source = FileSource(tempPath, Map.empty, "eventTimestamp") + ) + + BatchPipeline.createPipeline(sparkSession, configWithOfflineSource) + + val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable) + + rows.foreach(r => { + val storedValues = jedis.hgetAll(encodeEntityKey(r, config.featureTable)).asScala.toMap + storedValues should beStoredRow( + Map( + featureKeyEncoder("feature1") -> r.feature1, + featureKeyEncoder("feature2") -> r.feature2, + "_ts:test-fs" -> r.eventTimestamp + ) + ) + }) + } + + "Ingested rows" should "be compacted before storing by timestamp column" in new Scope { + val entities = (0 to 10000).map(_.toString) + + val genLatest = rowGenerator( + DateTime.parse("2020-08-15"), + DateTime.parse("2020-09-01"), + Some(Gen.oneOf(entities)) + ) + val latest = generateDistinctRows(genLatest, 10000) + + val genOld = rowGenerator( + DateTime.parse("2020-08-01"), + DateTime.parse("2020-08-14"), + Some(Gen.oneOf(entities)) + ) + val old = generateDistinctRows(genOld, 10000) + + val tempPath = storeAsParquet(latest ++ old) + val configWithOfflineSource = + config.copy(source = FileSource(tempPath, Map.empty, "eventTimestamp")) + + BatchPipeline.createPipeline(sparkSession, configWithOfflineSource) + + val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable) + + latest.foreach(r => { + val storedValues = jedis.hgetAll(encodeEntityKey(r, config.featureTable)).asScala.toMap + storedValues should beStoredRow( + Map( + featureKeyEncoder("feature1") -> r.feature1, + featureKeyEncoder("feature2") -> r.feature2, + "_ts:test-fs" -> r.eventTimestamp + ) + ) + }) + } + + "Old rows in ingestion" should "not overwrite more recent rows from storage" in new Scope { + val entities = (0 to 10000).map(_.toString) + + val genLatest = rowGenerator( + DateTime.parse("2020-08-15"), + DateTime.parse("2020-09-01"), + Some(Gen.oneOf(entities)) + ) + val latest = generateDistinctRows(genLatest, 10000) + + val tempPath1 = storeAsParquet(latest) + val config1 = config.copy(source = FileSource(tempPath1, Map.empty, "eventTimestamp")) + + BatchPipeline.createPipeline(sparkSession, config1) + + val genOld = rowGenerator( + DateTime.parse("2020-08-01"), + DateTime.parse("2020-08-14"), + Some(Gen.oneOf(entities)) + ) + val old = generateDistinctRows(genOld, 10000) + + val tempPath2 = storeAsParquet(old) + val config2 = config.copy(source = FileSource(tempPath2, Map.empty, "eventTimestamp")) + + BatchPipeline.createPipeline(sparkSession, config2) + + val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable) + + latest.foreach(r => { + val storedValues = jedis.hgetAll(encodeEntityKey(r, config.featureTable)).asScala.toMap + storedValues should beStoredRow( + Map( + featureKeyEncoder("feature1") -> r.feature1, + featureKeyEncoder("feature2") -> r.feature2, + "_ts:test-fs" -> r.eventTimestamp + ) + ) + }) + } + + "Invalid rows" should "not be ingested and stored to deadletter instead" in new Scope { + val gen = rowGenerator(DateTime.parse("2020-08-01"), DateTime.parse("2020-09-01")) + val rows = generateDistinctRows(gen, 100) + + val rowsWithNullEntity = rows.map(_.copy(customer = null)) + + val tempPath = storeAsParquet(rowsWithNullEntity) + val deadletterConfig = config.copy( + source = FileSource(tempPath, Map.empty, "eventTimestamp"), + deadLetterPath = Some(generateTempPath("deadletters")) + ) + + BatchPipeline.createPipeline(sparkSession, deadletterConfig) + + jedis.keys("*").toArray should be(empty) + + sparkSession.read + .parquet(deadletterConfig.deadLetterPath.get) + .count() should be(rows.length) + } + + "Columns from source" should "be mapped according to configuration" in new Scope { + val gen = rowGenerator(DateTime.parse("2020-08-01"), DateTime.parse("2020-09-01")) + val rows = generateDistinctRows(gen, 100) + + val tempPath = storeAsParquet(rows) + + val configWithMapping = config.copy( + featureTable = config.featureTable.copy( + entities = Seq(Field("entity", ValueType.Enum.STRING)), + features = Seq( + Field("new_feature1", ValueType.Enum.INT32), + Field("feature2", ValueType.Enum.FLOAT) + ) + ), + source = FileSource( + tempPath, + Map( + "entity" -> "customer", + "new_feature1" -> "feature1" + ), + "eventTimestamp" + ) + ) + + BatchPipeline.createPipeline(sparkSession, configWithMapping) + + val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable) + + rows.foreach(r => { + val storedValues = + jedis.hgetAll(encodeEntityKey(r, configWithMapping.featureTable)).asScala.toMap + storedValues should beStoredRow( + Map( + featureKeyEncoder("new_feature1") -> r.feature1, + featureKeyEncoder("feature2") -> r.feature2, + "_ts:test-fs" -> r.eventTimestamp + ) + ) + }) + } +} diff --git a/spark/ingestion/src/test/scala/feast/ingestion/UnitSpec.scala b/spark/ingestion/src/test/scala/feast/ingestion/UnitSpec.scala new file mode 100644 index 00000000000..25b53cb57c2 --- /dev/null +++ b/spark/ingestion/src/test/scala/feast/ingestion/UnitSpec.scala @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion + +import org.scalatest._ +import matchers._ +import org.scalatest.flatspec.AnyFlatSpec + +abstract class UnitSpec + extends AnyFlatSpec + with should.Matchers + with OptionValues + with Inside + with Inspectors diff --git a/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala new file mode 100644 index 00000000000..1ae61724ed1 --- /dev/null +++ b/spark/ingestion/src/test/scala/feast/ingestion/metrics/StatsReporterSpec.scala @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.metrics + +import java.net.{DatagramPacket, DatagramSocket, SocketTimeoutException} +import java.util +import java.util.Collections + +import com.codahale.metrics.{Gauge, Histogram, MetricRegistry, UniformReservoir} +import feast.ingestion.UnitSpec + +import scala.collection.mutable.ArrayBuffer +import scala.jdk.CollectionConverters._ + +class StatsReporterSpec extends UnitSpec { + class SimpleServer { + val socket = new DatagramSocket() + socket.setSoTimeout(100) + + def port: Int = socket.getLocalPort + + def receive: Array[String] = { + val messages: ArrayBuffer[String] = ArrayBuffer() + var finished = false + + do { + val buf = new Array[Byte](65535) + val p = new DatagramPacket(buf, buf.length) + try { + socket.receive(p) + } catch { + case _: SocketTimeoutException => { + finished = true + } + } + messages += new String(p.getData, 0, p.getLength) + } while (!finished) + + messages.toArray + } + } + + trait Scope { + val server = new SimpleServer + val reporter = new StatsdReporterWithTags( + new MetricRegistry, + "127.0.0.1", + server.port + ) + + def gauge[A](v: A): Gauge[A] = new Gauge[A] { + override def getValue: A = v + } + + def histogram(values: Seq[Int]): Histogram = { + val hist = new Histogram(new UniformReservoir) + values.foreach(hist.update) + hist + } + } + + "Statsd reporter" should "send simple gauge unmodified" in new Scope { + reporter.report( + gauges = new util.TreeMap( + Map( + "test" -> gauge(0) + ).asJava + ), + counters = Collections.emptySortedMap(), + histograms = Collections.emptySortedMap(), + meters = Collections.emptySortedMap(), + timers = Collections.emptySortedMap() + ) + + server.receive should contain("test:0|g") + } + + "Statsd reporter" should "keep tags part in the name's end" in new Scope { + reporter.report( + gauges = Collections.emptySortedMap(), + counters = Collections.emptySortedMap(), + histograms = new util.TreeMap( + Map( + "test#fs=name" -> histogram((1 to 100)) + ).asJava + ), + meters = Collections.emptySortedMap(), + timers = Collections.emptySortedMap() + ) + + server.receive should contain("test.p95#fs=name:95.95|ms") + } +}