Skip to content

Commit

Permalink
KE-42965 Kylin Storage V3 fix delta read broadcast error
Browse files Browse the repository at this point in the history
  • Loading branch information
7mming7 committed Jun 25, 2024
1 parent cc783fe commit c971f3c
Showing 1 changed file with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.commons.collections4.map.ReferenceMap
import org.apache.spark.SparkConf
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.internal.Logging
import org.apache.spark.util.SerializableConfiguration

private[spark] class BroadcastManager(
val isDriver: Boolean, conf: SparkConf) extends Logging {
Expand Down Expand Up @@ -70,31 +71,35 @@ private[spark] class BroadcastManager(
def cleanBroadCast(executionId: String): Unit = {
if (cachedBroadcast.containsKey(executionId)) {
cachedBroadcast.get(executionId)
.foreach(broadcastId => unbroadcast(broadcastId, true, false))
.foreach(broadcastId => {
logDebug(s"Clean broad cast $broadcastId")
unbroadcast(broadcastId, true, false)
})
cachedBroadcast.remove(executionId)
}
}

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, executionId: String): Broadcast[T] = {
val bid = nextBroadcastId.getAndIncrement()
if (executionId != null && cleanQueryBroadcast) {
if (cachedBroadcast.containsKey(executionId)) {
cachedBroadcast.get(executionId) += bid
} else {
val list = new scala.collection.mutable.ListBuffer[Long]
list += bid
cachedBroadcast.put(executionId, list)
}
}

value_ match {
case pb: PythonBroadcast =>
// SPARK-28486: attach this new broadcast variable's id to the PythonBroadcast,
// so that underlying data file of PythonBroadcast could be mapped to the
// BroadcastBlockId according to this id. Please see the specific usage of the
// id in PythonBroadcast.readObject().
pb.setBroadcastId(bid)

case _ => // do nothing
case sh: SerializableConfiguration =>
case _ =>
if (executionId != null && cleanQueryBroadcast) {
if (cachedBroadcast.containsKey(executionId)) {
cachedBroadcast.get(executionId) += bid
} else {
val list = new scala.collection.mutable.ListBuffer[Long]
list += bid
cachedBroadcast.put(executionId, list)
}
}
}
broadcastFactory.newBroadcast[T](value_, isLocal, bid)
}
Expand Down

0 comments on commit c971f3c

Please sign in to comment.