Skip to content

Commit

Permalink
removing the columnFamilyAccumulator
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jun 3, 2024
1 parent 37526de commit 67acea8
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class StatefulProcessorHandleImpl(
isStreaming: Boolean = true,
batchTimestampMs: Option[Long] = None,
metrics: Map[String, SQLMetric] = Map.empty,
existingColFamilies: Map[String, ColumnFamilyAccumulator] = Map.empty)
existingColFamilies: Map[String, ColumnFamilySchemaV1] = Map.empty)
extends StatefulProcessorHandle with Logging {
import StatefulProcessorHandleState._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ case class TransformWithStateExec(
"operatorPropsFromExecutor"
)

private lazy val colFamilyAccumulators: Map[String, ColumnFamilyAccumulator] =
private lazy val colFamilySchemas: Map[String, ColumnFamilySchemaV1] =
initializeColFamilyAccumulators()

private def initializeColFamilyAccumulators(): Map[String, ColumnFamilyAccumulator] = {
private def initializeColFamilyAccumulators(): Map[String, ColumnFamilySchemaV1] = {
val stateCheckpointPath = new Path(stateInfo.get.checkpointLocation,
getStateInfo.operatorId.toString)
val hadoopConf = session.sqlContext.sessionState.newHadoopConf()

val reader = new SchemaV3Reader(stateCheckpointPath, hadoopConf)

reader.read.map { colFamilyMetadata =>
val acc = ColumnFamilyAccumulator.create(colFamilyMetadata, sparkContext)
colFamilyMetadata.asInstanceOf[ColumnFamilySchemaV1].columnFamilyName -> acc
val schemaV1 = colFamilyMetadata.asInstanceOf[ColumnFamilySchemaV1]
schemaV1.columnFamilyName -> schemaV1
}.toMap
}

Expand Down Expand Up @@ -430,7 +430,7 @@ case class TransformWithStateExec(

override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver
colFamilyAccumulators
colFamilySchemas

validateTimeMode()

Expand Down Expand Up @@ -549,7 +549,7 @@ case class TransformWithStateExec(
CompletionIterator[InternalRow, Iterator[InternalRow]] = {
val processorHandle = new StatefulProcessorHandleImpl(
store, getStateInfo.queryRunId, keyEncoder, timeMode,
isStreaming, batchTimestampMs, metrics, colFamilyAccumulators)
isStreaming, batchTimestampMs, metrics, colFamilySchemas)
assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
statefulProcessor.setHandle(processorHandle)
statefulProcessor.init(outputMode, timeMode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.SparkContext
import org.apache.spark.sql.execution.streaming.{CheckpointFileManager, MetadataVersionUtil}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{AccumulatorV2, Utils}
import org.apache.spark.util.Utils

sealed trait ColumnFamilySchema extends Serializable {
def jsonValue: JsonAST.JObject
Expand All @@ -56,63 +55,6 @@ case class ColumnFamilySchemaV1(
}
}

class ColumnFamilyAccumulator(
columnFamilyMetadata: ColumnFamilySchema) extends
AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema] {

private var _value: ColumnFamilySchema = columnFamilyMetadata
/**
* Returns if this accumulator is zero value or not. e.g. for a counter accumulator, 0 is zero
* value; for a list accumulator, Nil is zero value.
*/
override def isZero: Boolean = _value == null

/**
* Creates a new copy of this accumulator.
*/
override def copy(): AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema] = {
new ColumnFamilyAccumulator(_value)
}

/**
* Resets this accumulator, which is zero value. i.e. call `isZero` must
* return true.
*/
override def reset(): Unit = {
_value = null
}

/**
* Takes the inputs and accumulates.
*/
override def add(v: ColumnFamilySchema): Unit = {
_value = v
}

/**
* Merges another same-type accumulator into this one and update its state, i.e. this should be
* merge-in-place.
*/
override def merge(other: AccumulatorV2[ColumnFamilySchema, ColumnFamilySchema]): Unit = {
_value = other.value
}

/**
* Defines the current value of this accumulator
*/
override def value: ColumnFamilySchema = _value
}

object ColumnFamilyAccumulator {
def create(
columnFamilyMetadata: ColumnFamilySchema,
sparkContext: SparkContext): ColumnFamilyAccumulator = {
val acc = new ColumnFamilyAccumulator(columnFamilyMetadata)
acc.register(sparkContext)
acc
}
}

object ColumnFamilySchemaV1 {
def fromJson(json: List[Map[String, Any]]): List[ColumnFamilySchema] = {
assert(json.isInstanceOf[List[_]])
Expand Down

0 comments on commit 67acea8

Please sign in to comment.