Skip to content

Commit

Permalink
Use version 1 when recovering from state
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Jul 11, 2018
1 parent dbd08ca commit 05e3acf
Show file tree
Hide file tree
Showing 19 changed files with 93 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,8 @@ trait ComplexTypeMergingExpression extends Expression {
"The collection of input data types must not be empty.")
require(
areInputTypesForMergingEqual,
"All input types must be the same except nullable, containsNull, valueContainsNull flags.")
"All input types must be the same except nullable, containsNull, valueContainsNull flags." +
s" The input types found are\n\t${inputTypesForMerging.mkString("\n\t")}")
inputTypesForMerging.reduceLeft(TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(_, _).get)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.json4s.jackson.Serialization

import org.apache.spark.internal.Logging
import org.apache.spark.sql.RuntimeConfig
import org.apache.spark.sql.internal.SQLConf._
import org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper
import org.apache.spark.sql.internal.SQLConf.{FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION, _}

/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
Expand Down Expand Up @@ -87,7 +88,8 @@ case class OffsetSeqMetadata(
object OffsetSeqMetadata extends Logging {
private implicit val format = Serialization.formats(NoTypeHints)
private val relevantSQLConfs = Seq(
SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY)
SHUFFLE_PARTITIONS, STATE_STORE_PROVIDER_CLASS, STREAMING_MULTIPLE_WATERMARK_POLICY,
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)

/**
* Default values of relevant configurations that are used for backward compatibility.
Expand All @@ -100,7 +102,9 @@ object OffsetSeqMetadata extends Logging {
* with a specific default value for ensuring same behavior of the query as before.
*/
private val relevantSQLConfDefaultValues = Map[String, String](
STREAMING_MULTIPLE_WATERMARK_POLICY.key -> MultipleWatermarkPolicy.DEFAULT_POLICY_NAME
STREAMING_MULTIPLE_WATERMARK_POLICY.key -> MultipleWatermarkPolicy.DEFAULT_POLICY_NAME,
FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION.key ->
FlatMapGroupsWithStateExecHelper.legacyVersion.toString
)

def apply(json: String): OffsetSeqMetadata = Serialization.read[OffsetSeqMetadata](json)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.types._
object FlatMapGroupsWithStateExecHelper {

val supportedVersions = Seq(1, 2)
val legacyVersion = 1

/**
* Class to capture deserialized state and timestamp return by the state manager.
Expand Down Expand Up @@ -217,7 +218,8 @@ object FlatMapGroupsWithStateExecHelper {
val deserExpr = stateEncoder.resolveAndBind().deserializer.transformUp {
case BoundReference(ordinal, _, _) => GetStructField(boundRefToNestedState, ordinal)
}
CaseWhen(Seq(IsNull(boundRefToNestedState) -> Literal(null)), elseValue = deserExpr)
val nullLiteral = Literal(null, deserExpr.dataType)
CaseWhen(Seq(IsNull(boundRefToNestedState) -> nullLiteral), elseValue = deserExpr)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
v1
{}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"id":"04d960cd-d38f-4ce6-b8d0-ebcf84c9dccc"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1531292029003,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
0
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":5000,"batchTimestampMs":1531292030005,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
1
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.sql.streaming

import java.io.File
import java.sql.Date
import java.util.concurrent.ConcurrentHashMap

import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfterAll
import org.scalatest.exceptions.TestFailedException

Expand All @@ -36,6 +38,7 @@ import org.apache.spark.sql.execution.streaming.state.{FlatMapGroupsWithStateExe
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.{DataType, IntegerType}
import org.apache.spark.util.Utils

/** Class to check custom state types */
case class RunningCount(count: Long)
Expand Down Expand Up @@ -855,6 +858,73 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest
)
}


test("flatMapGroupsWithState - recovery from checkpoint uses state format version 1") {
// Function to maintain the max event time as state and set the timeout timestamp based on the
// current max event time seen. It returns the max event time in the state, or -1 if the state
// was removed by timeout.
val stateFunc = (key: String, values: Iterator[(String, Long)], state: GroupState[Long]) => {
assertCanGetProcessingTime { state.getCurrentProcessingTimeMs() >= 0 }
assertCanGetWatermark { state.getCurrentWatermarkMs() >= -1 }

val timeoutDelaySec = 5
if (state.hasTimedOut) {
state.remove()
Iterator((key, -1))
} else {
val valuesSeq = values.toSeq
val maxEventTimeSec = math.max(valuesSeq.map(_._2).max, state.getOption.getOrElse(0L))
val timeoutTimestampSec = maxEventTimeSec + timeoutDelaySec
state.update(maxEventTimeSec)
state.setTimeoutTimestamp(timeoutTimestampSec * 1000)
Iterator((key, maxEventTimeSec.toInt))
}
}
val inputData = MemoryStream[(String, Int)]
val result =
inputData.toDS
.select($"_1".as("key"), $"_2".cast("timestamp").as("eventTime"))
.withWatermark("eventTime", "10 seconds")
.as[(String, Long)]
.groupByKey(_._1)
.flatMapGroupsWithState(Update, EventTimeTimeout)(stateFunc)

val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-2.3.1-flatMapGroupsWithState-state-format-1/").toURI

val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)

inputData.addData(("a", 11), ("a", 13), ("a", 15))
inputData.addData(("a", 4))

testStream(result, Update)(
StartStream(checkpointLocation = checkpointDir.getAbsolutePath),
/*
Note: The checkpoint was generated using the following input in Spark version 2.3.1
AddData(inputData, ("a", 11), ("a", 13), ("a", 15)),
// Max event time = 15. Timeout timestamp for "a" = 15 + 5 = 20. Watermark = 15 - 10 = 5.
CheckNewAnswer(("a", 15)), // Output = max event time of a
AddData(inputData, ("a", 4)), // Add data older than watermark for "a"
CheckNewAnswer(), // No output as data should get filtered by watermark
*/

AddData(inputData, ("a", 10)), // Add data newer than watermark for "a"
CheckNewAnswer(("a", 15)), // Max event time is still the same
// Timeout timestamp for "a" is still 20 as max event time for "a" is still 15.
// Watermark is still 5 as max event time for all data is still 15.

AddData(inputData, ("b", 31)), // Add data newer than watermark for "b", not "a"
// Watermark = 31 - 10 = 21, so "a" should be timed out as timeout timestamp for "a" is 20.
CheckNewAnswer(("a", -1), ("b", 31)) // State for "a" should timeout and emit -1
)
}


test("mapGroupsWithState - streaming") {
// Function to maintain running count up to 2, and then remove the count
// Returns the data and the count (-1 if count reached beyond 2 and state was just removed)
Expand Down

0 comments on commit 05e3acf

Please sign in to comment.