diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 66f3b803c0d47..9a109eedd5124 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -21,6 +21,7 @@ package org.apache.spark.internal * All structured logging keys should be defined here for standardization. */ object LogKey extends Enumeration { + val ACCUMULATOR_ID = Value val APP_DESC = Value val APP_ID = Value val APP_STATE = Value @@ -28,45 +29,67 @@ object LogKey extends Enumeration { val BLOCK_MANAGER_ID = Value val BROADCAST_ID = Value val BUCKET = Value + val BYTECODE_SIZE = Value val CATEGORICAL_FEATURES = Value val CLASS_LOADER = Value val CLASS_NAME = Value val COMMAND = Value val COMMAND_OUTPUT = Value + val COMPONENT = Value val CONFIG = Value val CONFIG2 = Value val CONTAINER_ID = Value val COUNT = Value val DRIVER_ID = Value + val END_POINT = Value val ERROR = Value + val EVENT_LOOP = Value val EVENT_QUEUE = Value val EXECUTOR_ID = Value - val EXECUTOR_STATE_CHANGED = Value + val EXECUTOR_STATE = Value val EXIT_CODE = Value + val FAILURES = Value val HOST = Value val JOB_ID = Value + val JOIN_CONDITION = Value val LEARNING_RATE = Value val LINE = Value val LINE_NUM = Value + val LISTENER = Value + val LOG_TYPE = Value val MASTER_URL = Value val MAX_ATTEMPTS = Value val MAX_CATEGORIES = Value val MAX_EXECUTOR_FAILURES = Value val MAX_SIZE = Value + val MERGE_DIR_NAME = Value + val METHOD_NAME = Value val MIN_SIZE = Value val NUM_ITERATIONS = Value + val OBJECT_ID = Value val OLD_BLOCK_MANAGER_ID = Value val OPTIMIZER_CLASS_NAME = Value val PARTITION_ID = Value val PATH = Value + val PATHS = Value val POD_ID = Value + val PORT = Value + val QUERY_PLAN = Value val RANGE = Value + val RDD_ID = Value val REASON = Value + val REDUCE_ID = Value val REMOTE_ADDRESS = Value val RETRY_COUNT = Value val RPC_ADDRESS = Value + val RULE_BATCH_NAME = Value + val RULE_NAME = Value + val RULE_NUMBER_OF_RUNS = Value + val SHUFFLE_BLOCK_INFO = Value val SHUFFLE_ID = Value + val SHUFFLE_MERGE_ID = Value val SIZE = Value + val SLEEP_TIME_SECONDS = Value val STAGE_ID = Value val SUBMISSION_ID = Value val SUBSAMPLING_RATE = Value @@ -75,9 +98,16 @@ object LogKey extends Enumeration { val TASK_NAME = Value val TASK_SET_NAME = Value val TASK_STATE = Value + val THREAD = Value + val THREAD_NAME = Value val TID = Value val TIMEOUT = Value + val TIME_UNITS = Value + val URI = Value + val USER_NAME = Value + val WATERMARK_CONSTRAINT = Value val WORKER_URL = Value + val XSD_PATH = Value type LogKey = Value } diff --git a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala index 2132e166eacf7..4768dbcdbc23d 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala @@ -49,6 +49,8 @@ case class MessageWithContext(message: String, context: java.util.HashMap[String resultMap.putAll(mdc.context) MessageWithContext(message + mdc.message, resultMap) } + + def stripMargin: MessageWithContext = copy(message = message.stripMargin) } /** diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index a1871cb231cfb..c16a84c13187b 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -24,7 +24,8 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled import scala.jdk.CollectionConverters._ import org.apache.spark.broadcast.Broadcast -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, BROADCAST_ID, LISTENER, RDD_ID, SHUFFLE_ID} import org.apache.spark.internal.config._ import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} import org.apache.spark.scheduler.SparkListener @@ -226,7 +227,7 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.rddCleaned(rddId)) logDebug("Cleaned RDD " + rddId) } catch { - case e: Exception => logError("Error cleaning RDD " + rddId, e) + case e: Exception => logError(log"Error cleaning RDD ${MDC(RDD_ID, rddId)}", e) } } @@ -245,7 +246,7 @@ private[spark] class ContextCleaner( logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)") } } catch { - case e: Exception => logError("Error cleaning shuffle " + shuffleId, e) + case e: Exception => logError(log"Error cleaning shuffle ${MDC(SHUFFLE_ID, shuffleId)}", e) } } @@ -257,7 +258,8 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.broadcastCleaned(broadcastId)) logDebug(s"Cleaned broadcast $broadcastId") } catch { - case e: Exception => logError("Error cleaning broadcast " + broadcastId, e) + case e: Exception => + logError(log"Error cleaning broadcast ${MDC(BROADCAST_ID, broadcastId)}", e) } } @@ -269,7 +271,8 @@ private[spark] class ContextCleaner( listeners.asScala.foreach(_.accumCleaned(accId)) logDebug("Cleaned accumulator " + accId) } catch { - case e: Exception => logError("Error cleaning accumulator " + accId, e) + case e: Exception => + logError(log"Error cleaning accumulator ${MDC(ACCUMULATOR_ID, accId)}", e) } } @@ -285,7 +288,8 @@ private[spark] class ContextCleaner( logDebug("Cleaned rdd checkpoint data " + rddId) } catch { - case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e) + case e: Exception => + logError(log"Error cleaning rdd checkpoint data ${MDC(RDD_ID, rddId)}", e) } } @@ -295,7 +299,8 @@ private[spark] class ContextCleaner( sc.listenerBus.removeListener(listener) logDebug(s"Cleaned Spark listener $listener") } catch { - case e: Exception => logError(s"Error cleaning Spark listener $listener", e) + case e: Exception => + logError(log"Error cleaning Spark listener ${MDC(LISTENER, listener)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index faa8504df3651..48569eb713793 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -781,7 +781,7 @@ private[spark] class MapOutputTrackerMaster( .getOrElse(Seq.empty[BlockManagerId])) } } catch { - case NonFatal(e) => logError(e.getMessage, e) + case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7595488cecee2..9d908cd8713ce 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -46,7 +46,8 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource} import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests._ import org.apache.spark.internal.config.UI._ @@ -748,7 +749,9 @@ class SparkContext(config: SparkConf) extends Logging { } } catch { case e: Exception => - logError(s"Exception getting thread dump from executor $executorId", e) + logError( + log"Exception getting thread dump from executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", + e) None } } @@ -778,7 +781,9 @@ class SparkContext(config: SparkConf) extends Logging { } } catch { case e: Exception => - logError(s"Exception getting heap histogram from executor $executorId", e) + logError( + log"Exception getting heap histogram from " + + log"executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", e) None } } @@ -2140,7 +2145,7 @@ class SparkContext(config: SparkConf) extends Logging { Seq(env.rpcEnv.fileServer.addJar(file)) } catch { case NonFatal(e) => - logError(s"Failed to add $path to Spark environment", e) + logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e) Nil } } @@ -2161,7 +2166,7 @@ class SparkContext(config: SparkConf) extends Logging { Seq(path) } catch { case NonFatal(e) => - logError(s"Failed to add $path to Spark environment", e) + logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e) Nil } } else { diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala index a3c36de151554..e433cc10ae731 100644 --- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala +++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala @@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ import org.apache.spark.executor.TaskMetrics -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.LISTENER import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source @@ -246,7 +247,7 @@ private[spark] class TaskContextImpl( } } listenerExceptions += e - logError(s"Error in $name", e) + logError(log"Error in ${MDC(LISTENER, name)}", e) } } if (listenerExceptions.nonEmpty) { diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 3f7a3ea70a7e7..1a05c8f35b7fb 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -26,7 +26,8 @@ import io.netty.handler.timeout.ReadTimeoutException import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.api.r.SerDe._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{METHOD_NAME, OBJECT_ID} import org.apache.spark.internal.config.R._ import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.util.ArrayImplicits._ @@ -76,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend) writeObject(dos, null, server.jvmObjectTracker) } catch { case e: Exception => - logError(s"Removing $objId failed", e) + logError(log"Removing ${MDC(OBJECT_ID, objId)} failed", e) writeInt(dos, -1) writeString(dos, s"Removing $objId failed: ${e.getMessage}") } @@ -192,7 +193,7 @@ private[r] class RBackendHandler(server: RBackend) } } catch { case e: Exception => - logError(s"$methodName on $objId failed", e) + logError(log"${MDC(METHOD_NAME, methodName)} on ${MDC(OBJECT_ID, objId)} failed", e) writeInt(dos, -1) // Writing the error message of the cause for the exception. This will be returned // to user in the R process. diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 1eec3e82f1b79..6cf240f12a1ca 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{DRIVER_ID, RPC_ADDRESS} +import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, RPC_ADDRESS} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} @@ -61,7 +61,7 @@ private class ClientEndpoint( t => t match { case ie: InterruptedException => // Exit normally case e: Throwable => - logError(e.getMessage, e) + logError(log"${MDC(ERROR, e.getMessage)}", e) System.exit(SparkExitCode.UNCAUGHT_EXCEPTION) }) diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala index 509049550ad4f..d317d6449f293 100644 --- a/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneResourceUtils.scala @@ -27,7 +27,8 @@ import org.json4s.{DefaultFormats, Extraction, Formats} import org.json4s.jackson.JsonMethods.{compact, render} import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.COMPONENT import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement} import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils @@ -103,9 +104,10 @@ private[spark] object StandaloneResourceUtils extends Logging { writeResourceAllocationJson(allocations, tmpFile) } catch { case NonFatal(e) => - val errMsg = s"Exception threw while preparing resource file for $compShortName" + val errMsg = + log"Exception threw while preparing resource file for ${MDC(COMPONENT, compShortName)}" logError(errMsg, e) - throw new SparkException(errMsg, e) + throw new SparkException(errMsg.message, e) } val resourcesFile = File.createTempFile(s"resource-$compShortName-", ".json", dir) tmpFile.renameTo(resourcesFile) diff --git a/core/src/main/scala/org/apache/spark/deploy/Utils.scala b/core/src/main/scala/org/apache/spark/deploy/Utils.scala index 9eb5a0042e515..4d2546cb808c0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Utils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Utils.scala @@ -22,7 +22,8 @@ import java.io.File import jakarta.servlet.http.HttpServletRequest import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.ui.JettyUtils.createServletHandler import org.apache.spark.ui.WebUI import org.apache.spark.util.Utils.{getFileLength, offsetBytes} @@ -95,7 +96,8 @@ private[deploy] object Utils extends Logging { (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " + + log"directory ${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e0128e35b761a..98cbd7b3eba82 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -37,7 +37,8 @@ import org.apache.hadoop.security.AccessControlException import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ @@ -920,7 +921,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case e: AccessControlException => logWarning(s"Insufficient permission while compacting log for $rootPath", e) case e: Exception => - logError(s"Exception while compacting log for $rootPath", e) + logError(log"Exception while compacting log for ${MDC(PATH, rootPath)}", e) } finally { endProcessing(rootPath) } @@ -1402,7 +1403,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) case _: AccessControlException => logInfo(s"No permission to delete $log, ignoring.") case ioe: IOException => - logError(s"IOException in cleaning $log", ioe) + logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe) } } deleted diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index e2ba221fb00cd..0659c26fd15b6 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -38,7 +38,7 @@ import org.apache.spark.deploy.StandaloneResourceUtils._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, EXECUTOR_STATE_CHANGED, MASTER_URL, MAX_ATTEMPTS} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.internal.config.UI._ import org.apache.spark.internal.config.Worker._ @@ -549,7 +549,7 @@ private[deploy] class Worker( }(cleanupThreadExecutor) cleanupFuture.failed.foreach(e => - logError("App dir cleanup failed: " + e.getMessage, e) + logError(log"App dir cleanup failed: ${MDC(ERROR, e.getMessage)}", e) )(cleanupThreadExecutor) } catch { case _: RejectedExecutionException if cleanupThreadExecutor.isShutdown => @@ -638,7 +638,9 @@ private[deploy] class Worker( addResourcesUsed(resources_) } catch { case e: Exception => - logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) + logError( + log"Failed to launch executor ${MDC(APP_ID, appId)}/${MDC(EXECUTOR_ID, execId)} " + + log"for ${MDC(APP_DESC, appDesc.name)}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId @@ -749,7 +751,7 @@ private[deploy] class Worker( Utils.deleteRecursively(new File(dir)) } }(cleanupThreadExecutor).failed.foreach(e => - logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e) + logError(log"Clean up app dir ${MDC(PATHS, dirList)} failed", e) )(cleanupThreadExecutor) } } catch { @@ -794,8 +796,10 @@ private[deploy] class Worker( case Failure(t) => val failures = executorStateSyncFailureAttempts.getOrElse(fullId, 0) + 1 if (failures < executorStateSyncMaxAttempts) { - logError(s"Failed to send $newState to Master $masterRef, " + - s"will retry ($failures/$executorStateSyncMaxAttempts).", t) + logError(log"Failed to send ${MDC(EXECUTOR_STATE, newState)}" + + log" to Master ${MDC(MASTER_URL, masterRef)}, will retry " + + log"(${MDC(FAILURES, failures)}/" + + log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)}).", t) executorStateSyncFailureAttempts(fullId) = failures // If the failure is not caused by TimeoutException, wait for a while before retry in // case the connection is temporarily unavailable. @@ -808,7 +812,7 @@ private[deploy] class Worker( } self.send(newState) } else { - logError(log"Failed to send ${MDC(EXECUTOR_STATE_CHANGED, newState)} " + + logError(log"Failed to send ${MDC(EXECUTOR_STATE, newState)} " + log"to Master ${MDC(MASTER_URL, masterRef)} for " + log"${MDC(MAX_ATTEMPTS, executorStateSyncMaxAttempts)} times. Giving up.") System.exit(1) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index c4a15095ec40c..006a388e98b5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -23,7 +23,8 @@ import scala.xml.{Node, Unparsed} import jakarta.servlet.http.HttpServletRequest -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.ui.{UIUtils, WebUIPage} import org.apache.spark.util.Utils import org.apache.spark.util.logging.RollingFileAppender @@ -174,7 +175,8 @@ private[ui] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") with (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " + + log"directory ${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 67d0c37c3eddf..a7657cd78cd9b 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -41,7 +41,7 @@ import org.slf4j.MDC import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC => LogMDC} -import org.apache.spark.internal.LogKey.{ERROR, MAX_ATTEMPTS, TASK_ID, TASK_NAME, TIMEOUT} +import org.apache.spark.internal.LogKey.{CLASS_NAME, ERROR, MAX_ATTEMPTS, TASK_ID, TASK_NAME, TIMEOUT} import org.apache.spark.internal.config._ import org.apache.spark.internal.plugin.PluginContainer import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager} @@ -661,9 +661,10 @@ private[spark] class Executor( // uh-oh. it appears the user code has caught the fetch-failure without throwing any // other exceptions. Its *possible* this is what the user meant to do (though highly // unlikely). So we will log an error and keep going. - logError(s"$taskName completed successfully though internally it encountered " + - s"unrecoverable fetch failures! Most likely this means user code is incorrectly " + - s"swallowing Spark's internal ${classOf[FetchFailedException]}", fetchFailure) + logError(log"${LogMDC(TASK_NAME, taskName)} completed successfully though internally " + + log"it encountered unrecoverable fetch failures! Most likely this means user code " + + log"is incorrectly swallowing Spark's internal " + + log"${LogMDC(CLASS_NAME, classOf[FetchFailedException])}", fetchFailure) } val taskFinishNs = System.nanoTime() val taskFinishCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) { @@ -802,7 +803,7 @@ private[spark] class Executor( // Attempt to exit cleanly by informing the driver of our failure. // If anything goes wrong (or this was a fatal exception), we will delegate to // the default uncaught exception handler, which will terminate the Executor. - logError(s"Exception in $taskName", t) + logError(log"Exception in ${LogMDC(TASK_NAME, taskName)}", t) // SPARK-20904: Do not report failure to driver if if happened during shut down. Because // libraries may set up shutdown hooks that race with running tasks during shutdown, diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala index b9f4486b66fa6..c7047ddd278b2 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorClassLoader.scala @@ -30,7 +30,7 @@ import org.apache.xbean.asm9.Opcodes._ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKey, MDC} import org.apache.spark.util.ParentClassLoader /** @@ -183,7 +183,8 @@ class ExecutorClassLoader( None case e: Exception => // Something bad happened while checking if the class exists - logError(s"Failed to check existence of class $name on REPL class server at $uri", e) + logError(log"Failed to check existence of class ${MDC(LogKey.CLASS_NAME, name)} " + + log"on REPL class server at ${MDC(LogKey.URI, uri)}", e) if (userClassPathFirst) { // Allow to try to load from "parentLoader" None diff --git a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala index 20239980eee5c..95ea814042d35 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.mapreduce.task.{TaskAttemptContextImpl => NewTaskAttemp import org.apache.spark.{SerializableWritable, SparkConf, SparkException, TaskContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID +import org.apache.spark.internal.LogKey.{JOB_ID, TASK_ATTEMPT_ID} import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.util.{SerializableConfiguration, SerializableJobConf, Utils} @@ -104,7 +104,7 @@ object SparkHadoopWriter extends Logging { logInfo(s"Write Job ${jobContext.getJobID} committed. Elapsed time: $duration ms.") } catch { case cause: Throwable => - logError(s"Aborting job ${jobContext.getJobID}.", cause) + logError(log"Aborting job ${MDC(JOB_ID, jobContext.getJobID)}.", cause) committer.abortJob(jobContext) throw new SparkException("Job aborted.", cause) } diff --git a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala index b059e82df23b5..c68999f34079d 100644 --- a/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala @@ -24,7 +24,8 @@ import org.apache.hadoop.mapreduce.{OutputCommitter => MapReduceOutputCommitter} import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.executor.CommitDeniedException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.TASK_ATTEMPT_ID import org.apache.spark.util.Utils object SparkHadoopMapRedUtil extends Logging { @@ -52,7 +53,9 @@ object SparkHadoopMapRedUtil extends Logging { logInfo(s"$mrTaskAttemptID: Committed. Elapsed time: $timeCost ms.") } catch { case cause: IOException => - logError(s"Error committing the output of task: $mrTaskAttemptID", cause) + logError( + log"Error committing the output of task: ${MDC(TASK_ATTEMPT_ID, mrTaskAttemptID)}", + cause) committer.abortTask(mrTaskContext) throw cause } diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 195c5b0f47f57..12df40c3476a0 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -25,7 +25,8 @@ import scala.jdk.CollectionConverters._ import scala.util.matching.Regex import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.METRICS_CONF import org.apache.spark.util.Utils @@ -140,7 +141,7 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { } catch { case e: Exception => val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME) - logError(s"Error loading configuration file $file", e) + logError(log"Error loading configuration file ${MDC(PATH, file)}", e) } finally { if (is != null) { is.close() diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index 05e0b6b9c4ef0..555083bb65d24 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -188,7 +188,8 @@ private[spark] class MetricsSystem private ( val source = Utils.classForName[Source](classPath).getConstructor().newInstance() registerSource(source) } catch { - case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e) + case e: Exception => + logError(log"Source class ${MDC(CLASS_NAME, classPath)} cannot be instantiated", e) } } } diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 086df62313249..127bdf6d91812 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -34,7 +34,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Partition, TaskContext} import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.LogKey.{COMMAND, ERROR} +import org.apache.spark.internal.LogKey.{COMMAND, ERROR, PATH} import org.apache.spark.internal.MDC import org.apache.spark.util.Utils @@ -107,8 +107,9 @@ private[spark] class PipedRDD[T: ClassTag]( pb.directory(taskDirFile) workInTaskDirectory = true } catch { - case e: Exception => logError("Unable to setup task working directory: " + e.getMessage + - " (" + taskDirectory + ")", e) + case e: Exception => + logError(log"Unable to setup task working directory: ${MDC(ERROR, e.getMessage)}" + + log" (${MDC(PATH, taskDirectory)})", e) } } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala index 472401b23fe8e..b503c5a0f8089 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala @@ -22,7 +22,8 @@ import javax.annotation.concurrent.GuardedBy import scala.util.control.NonFatal import org.apache.spark.SparkException -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.END_POINT import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, ThreadSafeRpcEndpoint} @@ -206,7 +207,8 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint) // Should reduce the number of active threads before throw the error. numActiveThreads -= 1 } - logError(s"An error happened while processing message in the inbox for $endpointName", fatal) + logError(log"An error happened while processing message in the inbox for" + + log" ${MDC(END_POINT, endpointName)}", fatal) throw fatal } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala index df7cd0b44c900..2d94ed5d05e1c 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala @@ -22,7 +22,8 @@ import java.util.concurrent._ import scala.util.control.NonFatal import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.ERROR import org.apache.spark.internal.config.EXECUTOR_ID import org.apache.spark.internal.config.Network._ import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcEndpoint} @@ -74,7 +75,7 @@ private sealed abstract class MessageLoop(dispatcher: Dispatcher) extends Loggin } inbox.process(dispatcher) } catch { - case NonFatal(e) => logError(e.getMessage, e) + case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e) } } } catch { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7ee8dc7ec0c8a..41cbd795b7e5e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -37,7 +37,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{JOB_ID, STAGE_ID} +import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, CLASS_NAME, JOB_ID, PARTITION_ID, STAGE_ID, TASK_ID} import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED} import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY import org.apache.spark.network.shuffle.{BlockStoreClient, MergeFinalizerListener} @@ -1749,8 +1749,8 @@ private[spark] class DAGScheduler( case None => "Unknown class" } logError( - s"Failed to update accumulator $id ($accumClassName) for task ${task.partitionId}", - e) + log"Failed to update accumulator ${MDC(ACCUMULATOR_ID, id)} (${MDC(CLASS_NAME, accumClassName)}) " + + log"for task ${MDC(PARTITION_ID, task.partitionId)}", e) } } } @@ -1763,7 +1763,9 @@ private[spark] class DAGScheduler( } catch { case NonFatal(e) => val taskId = event.taskInfo.taskId - logError(s"Error when attempting to reconstruct metrics for task $taskId", e) + logError( + log"Error when attempting to reconstruct metrics for task ${MDC(TASK_ID, taskId)}", + e) null } } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index a3b8f1206b9d4..24c25d2377948 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -25,7 +25,7 @@ import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{LINE, LINE_NUM} +import org.apache.spark.internal.LogKey.{LINE, LINE_NUM, PATH} import org.apache.spark.scheduler.ReplayListenerBus._ import org.apache.spark.util.JsonProtocol @@ -125,7 +125,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { case ioe: IOException => throw ioe case e: Exception => - logError(s"Exception parsing Spark event log: $sourceName", e) + logError(log"Exception parsing Spark event log: ${MDC(PATH, sourceName)}", e) logError(log"Malformed line #${MDC(LINE_NUM, lineNumber)}: ${MDC(LINE, currentLine)}\n") false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index a30744da9ee98..7e61dad3c141b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -26,7 +26,8 @@ import scala.xml.{Node, XML} import org.apache.hadoop.fs.Path import org.apache.spark.SparkContext -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.{SCHEDULER_ALLOCATION_FILE, SCHEDULER_MODE} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -99,10 +100,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) } } catch { case NonFatal(t) => - val defaultMessage = "Error while building the fair scheduler pools" - val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" } - .getOrElse(defaultMessage) - logError(message, t) + if (fileData.isDefined) { + val fileName = fileData.get._2 + logError(log"Error while building the fair scheduler pools from ${MDC(PATH, fileName)}", + t) + } else { + logError("Error while building the fair scheduler pools", t) + } throw t } finally { fileData.foreach { case (is, fileName) => is.close() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index dc06567784558..1418901e3442c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -534,9 +534,9 @@ private[spark] class TaskSetManager( // If the task cannot be serialized, then there's no point to re-attempt the task, // as it will always fail. So just abort the whole task-set. case NonFatal(e) => - val msg = s"Failed to serialize task $taskId, not attempting to retry it." + val msg = log"Failed to serialize task ${MDC(TASK_ID, taskId)}, not attempting to retry it." logError(msg, e) - abort(s"$msg Exception during serialization: $e") + abort(s"${msg.message} Exception during serialization: $e") throw SparkCoreErrors.failToSerializeTaskError(e) } if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024 && diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 613080813d8e4..5a0b2ba3735c5 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -39,7 +39,8 @@ import org.roaringbitmap.RoaringBitmap import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.Kryo._ import org.apache.spark.internal.io.FileCommitProtocol._ import org.apache.spark.network.util.ByteUnit @@ -739,7 +740,7 @@ private object JavaIterableWrapperSerializer extends Logging { private val underlyingMethodOpt = { try Some(wrapperClass.getDeclaredMethod("underlying")) catch { case e: Exception => - logError("Failed to find the underlying field in " + wrapperClass, e) + logError(log"Failed to find the underlying field in ${MDC(CLASS_NAME, wrapperClass)}", e) None } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index d50b8f935d561..109a9a2e3eb94 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -25,7 +25,8 @@ import scala.collection.mutable.HashMap import scala.jdk.CollectionConverters._ import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR import org.apache.spark.status.AppStatusUtils.getQuantilesValue import org.apache.spark.status.api.v1 @@ -864,7 +865,7 @@ private[spark] object AppStatusStore extends Logging { Some(localDir) } catch { case e: IOException => - logError(s"Failed to create spark ui store path in $rootDir.", e) + logError(log"Failed to create spark ui store path in ${MDC(PATH, rootDir)}.", e) None } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e68239f260d9d..9aa100d9ff36e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -42,7 +42,7 @@ import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.DataReadMethod import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.BLOCK_ID +import org.apache.spark.internal.LogKey.{BLOCK_ID, COUNT, SLEEP_TIME_SECONDS} import org.apache.spark.internal.config.{Network, RDD_CACHE_VISIBILITY_TRACKING_ENABLED, Tests} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source @@ -626,8 +626,9 @@ private[spark] class BlockManager( return } catch { case e: Exception if i < MAX_ATTEMPTS => - logError(s"Failed to connect to external shuffle server, will retry ${MAX_ATTEMPTS - i}" - + s" more times after waiting $SLEEP_TIME_SECS seconds...", e) + logError(log"Failed to connect to external shuffle server, will retry " + + log"${MDC(COUNT, MAX_ATTEMPTS - i)} more times after waiting " + + log"${MDC(SLEEP_TIME_SECONDS, SLEEP_TIME_SECS)} seconds...", e) Thread.sleep(SLEEP_TIME_SECS * 1000L) case NonFatal(e) => throw SparkCoreErrors.unableToRegisterWithExternalShuffleServerError(e) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index 686003e2c51dc..5b4ecef233f8f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -26,8 +26,8 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.SHUFFLE_BLOCK_INFO import org.apache.spark.shuffle.ShuffleBlockInfo import org.apache.spark.storage.BlockManagerMessages.ReplicateBlock import org.apache.spark.util.{ThreadUtils, Utils} @@ -152,11 +152,13 @@ private[storage] class BlockManagerDecommissioner( isTargetDecommissioned = true keepRunning = false } else { - logError(s"Error occurred during migrating $shuffleBlockInfo", e) + logError(log"Error occurred during migrating " + + log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e) keepRunning = false } case e: Exception => - logError(s"Error occurred during migrating $shuffleBlockInfo", e) + logError(log"Error occurred during migrating " + + log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e) keepRunning = false } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index ac453d0f743c3..5bb4e096c029c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -32,7 +32,7 @@ import com.google.common.cache.CacheBuilder import org.apache.spark.{MapOutputTrackerMaster, SparkConf, SparkContext, SparkEnv} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.{config, Logging, MDC} -import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, OLD_BLOCK_MANAGER_ID} +import org.apache.spark.internal.LogKey.{BLOCK_MANAGER_ID, EXECUTOR_ID, OLD_BLOCK_MANAGER_ID} import org.apache.spark.internal.config.RDD_CACHE_VISIBILITY_TRACKING_ENABLED import org.apache.spark.network.shuffle.{ExternalBlockStoreClient, RemoteBlockPushResolver} import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} @@ -328,7 +328,8 @@ class BlockManagerMasterEndpoint( // care about the return result of removing blocks. That way we avoid breaking // down the whole application. case NonFatal(e) => - logError(s"Cannot determine whether executor $executorId is alive or not.", e) + logError(log"Cannot determine whether executor " + + log"${MDC(EXECUTOR_ID, executorId)} is alive or not.", e) false } if (!isAlive) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala index 5cc08714d41c1..1fccbd16ced5b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerStorageEndpoint.scala @@ -20,7 +20,8 @@ package org.apache.spark.storage import scala.concurrent.{ExecutionContext, ExecutionContextExecutorService, Future} import org.apache.spark.{MapOutputTracker, SparkEnv} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC, MessageWithContext} +import org.apache.spark.internal.LogKey.{BLOCK_ID, BROADCAST_ID, RDD_ID, SHUFFLE_ID} import org.apache.spark.rpc.{IsolatedThreadSafeRpcEndpoint, RpcCallContext, RpcEnv} import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.{ThreadUtils, Utils} @@ -44,18 +45,18 @@ class BlockManagerStorageEndpoint( // Operations that involve removing blocks may be slow and should be done asynchronously override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RemoveBlock(blockId) => - doAsync[Boolean]("removing block " + blockId, context) { + doAsync[Boolean](log"removing block ${MDC(BLOCK_ID, blockId)}", context) { blockManager.removeBlock(blockId) true } case RemoveRdd(rddId) => - doAsync[Int]("removing RDD " + rddId, context) { + doAsync[Int](log"removing RDD ${MDC(RDD_ID, rddId)}", context) { blockManager.removeRdd(rddId) } case RemoveShuffle(shuffleId) => - doAsync[Boolean]("removing shuffle " + shuffleId, context) { + doAsync[Boolean](log"removing shuffle ${MDC(SHUFFLE_ID, shuffleId)}", context) { if (mapOutputTracker != null) { mapOutputTracker.unregisterShuffle(shuffleId) } @@ -66,7 +67,7 @@ class BlockManagerStorageEndpoint( context.reply(blockManager.decommissionSelf()) case RemoveBroadcast(broadcastId, _) => - doAsync[Int]("removing broadcast " + broadcastId, context) { + doAsync[Int](log"removing broadcast ${MDC(BROADCAST_ID, broadcastId)}", context) { blockManager.removeBroadcast(broadcastId, tellMaster = true) } @@ -96,18 +97,20 @@ class BlockManagerStorageEndpoint( context.reply(blockManager.blockInfoManager.tryMarkBlockAsVisible(blockId)) } - private def doAsync[T](actionMessage: String, context: RpcCallContext)(body: => T): Unit = { + private def doAsync[T]( + actionMessage: MessageWithContext, + context: RpcCallContext)(body: => T): Unit = { val future = Future { - logDebug(actionMessage) + logDebug(actionMessage.message) body } future.foreach { response => - logDebug(s"Done $actionMessage, response is $response") + logDebug(s"Done ${actionMessage.message}, response is $response") context.reply(response) logDebug(s"Sent response: $response to ${context.senderAddress}") } future.failed.foreach { t => - logError(s"Error in $actionMessage", t) + logError(log"Error in " + actionMessage, t) context.sendFailure(t) } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 7446a55fc7c37..4c0b5f4a14f64 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -30,7 +30,8 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.{MERGE_DIR_NAME, PATH} import org.apache.spark.network.shuffle.ExecutorDiskUtils import org.apache.spark.storage.DiskBlockManager.ATTEMPT_ID_KEY import org.apache.spark.storage.DiskBlockManager.MERGE_DIR_KEY @@ -255,7 +256,8 @@ private[spark] class DiskBlockManager( Some(localDir) } catch { case e: IOException => - logError(s"Failed to create local dir in $rootDir. Ignoring this directory.", e) + logError( + log"Failed to create local dir in ${MDC(PATH, rootDir)}. Ignoring this directory.", e) None } } @@ -292,7 +294,8 @@ private[spark] class DiskBlockManager( } catch { case e: IOException => logError( - s"Failed to create $mergeDirName dir in $rootDir. Ignoring this directory.", e) + log"Failed to create ${MDC(MERGE_DIR_NAME, mergeDirName)} dir in " + + log"${MDC(PATH, rootDir)}. Ignoring this directory.", e) } } } @@ -370,7 +373,7 @@ private[spark] class DiskBlockManager( } } catch { case e: Exception => - logError(s"Exception while deleting local spark dir: $localDir", e) + logError(log"Exception while deleting local spark dir: ${MDC(PATH, localDir)}", e) } } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala index 80e268081fa7f..0b6e33ff5fb37 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala @@ -271,7 +271,8 @@ private[spark] class DiskBlockObjectWriter( logError(log"Exception occurred while reverting partial writes to file " + log"${MDC(PATH, file)}, ${MDC(ERROR, ce.getMessage)}") case e: Exception => - logError("Uncaught exception while reverting partial writes to file " + file, e) + logError( + log"Uncaught exception while reverting partial writes to file ${MDC(PATH, file)}", e) } finally { if (truncateStream != null) { truncateStream.close() diff --git a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala index 9b6048e90c9a6..31958af84e54b 100644 --- a/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala +++ b/core/src/main/scala/org/apache/spark/storage/PushBasedFetchHelper.scala @@ -28,7 +28,8 @@ import org.roaringbitmap.RoaringBitmap import org.apache.spark.MapOutputTracker import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{HOST, PORT, REDUCE_ID, SHUFFLE_ID, SHUFFLE_MERGE_ID} import org.apache.spark.network.shuffle.{BlockStoreClient, MergedBlockMeta, MergedBlocksMetaListener} import org.apache.spark.shuffle.ShuffleReadMetricsReporter import org.apache.spark.storage.BlockManagerId.SHUFFLE_MERGER_IDENTIFIER @@ -170,9 +171,10 @@ private class PushBasedFetchHelper( reduceId, sizeMap((shuffleId, reduceId)), meta.readChunkBitmaps(), address)) } catch { case exception: Exception => - logError(s"Failed to parse the meta of push-merged block for ($shuffleId, " + - s"$shuffleMergeId, $reduceId) from" + - s" ${req.address.host}:${req.address.port}", exception) + logError(log"Failed to parse the meta of push-merged block for (" + + log"${MDC(SHUFFLE_ID, shuffleId)}, ${MDC(SHUFFLE_MERGE_ID, shuffleMergeId)}, " + + log"${MDC(REDUCE_ID, reduceId)}) from ${MDC(HOST, req.address.host)}" + + log":${MDC(PORT, req.address.port)}", exception) iterator.addToResultsQueue( PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, reduceId, address)) @@ -181,8 +183,9 @@ private class PushBasedFetchHelper( override def onFailure(shuffleId: Int, shuffleMergeId: Int, reduceId: Int, exception: Throwable): Unit = { - logError(s"Failed to get the meta of push-merged block for ($shuffleId, $reduceId) " + - s"from ${req.address.host}:${req.address.port}", exception) + logError(log"Failed to get the meta of push-merged block for " + + log"(${MDC(SHUFFLE_ID, shuffleId)}, ${MDC(REDUCE_ID, reduceId)}) " + + log"from ${MDC(HOST, req.address.host)}:${MDC(PORT, req.address.port)}", exception) iterator.addToResultsQueue( PushMergedRemoteMetaFailedFetchResult(shuffleId, shuffleMergeId, reduceId, address)) } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 916cb83d379e0..d22ce3dbed772 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -37,7 +37,7 @@ import org.apache.spark.{MapOutputTracker, SparkException, TaskContext} import org.apache.spark.MapOutputTracker.SHUFFLE_PUSH_MAP_ID import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, MAX_ATTEMPTS} +import org.apache.spark.internal.LogKey.{BLOCK_ID, ERROR, HOST, MAX_ATTEMPTS, PORT} import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} @@ -314,7 +314,8 @@ final class ShuffleBlockFetcherIterator( override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { ShuffleBlockFetcherIterator.this.synchronized { - logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) + logError(log"Failed to get block(s) from " + + log"${MDC(HOST, req.address.host)}:${MDC(PORT, req.address.port)}", e) e match { // SPARK-27991: Catch the Netty OOM and set the flag `isNettyOOMOnShuffle` (shared among // tasks) to true as early as possible. The pending fetch requests won't be sent diff --git a/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala b/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala index 83a8fd628cd78..3102115159994 100644 --- a/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/DriverLogPage.scala @@ -21,7 +21,8 @@ import scala.xml.{Node, Unparsed} import jakarta.servlet.http.HttpServletRequest import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH} import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR import org.apache.spark.util.Utils import org.apache.spark.util.logging.DriverLogger.DRIVER_LOG_FILE @@ -136,7 +137,8 @@ private[ui] class DriverLogPage( (logText, startIndex, endIndex, totalLength) } catch { case e: Exception => - logError(s"Error getting $logType logs from directory $logDirectory", e) + logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from directory " + + log"${MDC(PATH, logDirectory)}", e) ("Error getting logs due to exception: " + e.getMessage, 0, 0, 0) } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 099e47abf408a..ddf451c16f3a2 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -23,7 +23,8 @@ import jakarta.servlet.http.{HttpServlet, HttpServletRequest, HttpServletRespons import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{SecurityManager, SparkConf, SparkContext} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config.DRIVER_LOG_LOCAL_DIR import org.apache.spark.internal.config.UI._ import org.apache.spark.scheduler._ @@ -155,7 +156,7 @@ private[spark] class SparkUI private ( serverInfo = Some(server) } catch { case e: Exception => - logError(s"Failed to bind $className", e) + logError(log"Failed to bind ${MDC(CLASS_NAME, className)}", e) System.exit(1) } } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 2c937e71f64b9..baeed322e8ad3 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -29,7 +29,8 @@ import org.eclipse.jetty.servlet.{FilterHolder, FilterMapping, ServletContextHan import org.json4s.JsonAST.{JNothing, JValue} import org.apache.spark.{SecurityManager, SparkConf, SSLOptions} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.CLASS_NAME import org.apache.spark.internal.config._ import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -158,7 +159,7 @@ private[spark] abstract class WebUI( logInfo(s"Bound $className to $hostName, and started at $webUrl") } catch { case e: Exception => - logError(s"Failed to bind $className", e) + logError(log"Failed to bind ${MDC(CLASS_NAME, className)}", e) System.exit(1) } } diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala index 5125adc9f7ca6..eaa9ef517294e 100644 --- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala +++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala @@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.control.NonFatal -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.EVENT_LOOP /** * An event loop to receive events from the caller and process all events in the event thread. It @@ -52,13 +53,13 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { try { onError(e) } catch { - case NonFatal(e) => logError("Unexpected error in " + name, e) + case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty - case NonFatal(e) => logError("Unexpected error in " + name, e) + case NonFatal(e) => logError(log"Unexpected error in ${MDC(EVENT_LOOP, name)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index f1daa76f3116c..814201d8c959c 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -26,7 +26,8 @@ import scala.util.control.NonFatal import com.codahale.metrics.Timer import org.apache.spark.SparkEnv -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.LISTENER import org.apache.spark.scheduler.EventLoggingListener import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate @@ -122,10 +123,11 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { } } catch { case ie: InterruptedException => - logError(s"Interrupted while posting to ${listenerName}. Removing that listener.", ie) + logError(log"Interrupted while posting to " + + log"${MDC(LISTENER, listenerName)}. Removing that listener.", ie) removeListenerOnError(listener) case NonFatal(e) if !isIgnorableException(e) => - logError(s"Listener ${listenerName} threw an exception", e) + logError(log"Listener ${MDC(LISTENER, listenerName)} threw an exception", e) } finally { if (maybeTimerContext != null) { val elapsed = maybeTimerContext.stop() diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index c6cad94401689..b9dece19f2651 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -26,7 +26,8 @@ import scala.util.Try import org.apache.hadoop.fs.FileSystem import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS @@ -68,7 +69,8 @@ private[spark] object ShutdownHookManager extends Logging { logInfo("Deleting directory " + dirPath) Utils.deleteRecursively(new File(dirPath)) } catch { - case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) + case e: Exception => + logError(log"Exception while deleting Spark temp dir: ${MDC(PATH, dirPath)}", e) } } } diff --git a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala index b24129eb36971..74f1474f9cf78 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala @@ -17,7 +17,8 @@ package org.apache.spark.util -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKey.THREAD /** * The default uncaught exception handler for Spark daemons. It terminates the whole process for @@ -36,11 +37,14 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: override def uncaughtException(thread: Thread, exception: Throwable): Unit = { try { + val mdc = MDC(THREAD, thread) // Make it explicit that uncaught exceptions are thrown when container is shutting down. // It will help users when they analyze the executor logs - val inShutdownMsg = if (ShutdownHookManager.inShutdown()) "[Container in shutdown] " else "" - val errMsg = "Uncaught exception in thread " - logError(inShutdownMsg + errMsg + thread, exception) + if (ShutdownHookManager.inShutdown()) { + logError(log"[Container in shutdown] Uncaught exception in thread $mdc", exception) + } else { + logError(log"Uncaught exception in thread $mdc", exception) + } // We may have been called from a shutdown hook. If so, we must not call System.exit(). // (If we do, we will deadlock.) @@ -61,7 +65,9 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: } catch { case oom: OutOfMemoryError => try { - logError(s"Uncaught OutOfMemoryError in thread $thread, process halted.", oom) + logError( + log"Uncaught OutOfMemoryError in thread ${MDC(THREAD, thread)}, process halted.", + oom) } catch { // absorb any exception/error since we're halting the process case _: Throwable => @@ -69,7 +75,9 @@ private[spark] class SparkUncaughtExceptionHandler(val exitOnUncaughtException: Runtime.getRuntime.halt(SparkExitCode.OOM) case t: Throwable => try { - logError(s"Another uncaught exception in thread $thread, process halted.", t) + logError( + log"Another uncaught exception in thread ${MDC(THREAD, thread)}, process halted.", + t) } catch { case _: Throwable => } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d7e174f5497c4..7022506e5508e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -69,7 +69,7 @@ import org.slf4j.Logger import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKey.{COMMAND, COMMAND_OUTPUT, EXIT_CODE, PATH} +import org.apache.spark.internal.LogKey._ import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Streaming._ import org.apache.spark.internal.config.Tests.IS_TESTING @@ -1276,11 +1276,13 @@ private[spark] object Utils case t: Throwable => val currentThreadName = Thread.currentThread().getName if (sc != null) { - logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) + logError(log"uncaught error in thread ${MDC(THREAD_NAME, currentThreadName)}, " + + log"stopping SparkContext", t) sc.stopInNewThread() } if (!NonFatal(t)) { - logError(s"throw uncaught fatal error in thread $currentThreadName", t) + logError( + log"throw uncaught fatal error in thread ${MDC(THREAD_NAME, currentThreadName)}", t) throw t } } @@ -1292,7 +1294,8 @@ private[spark] object Utils block } catch { case NonFatal(t) => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) } } @@ -1469,7 +1472,7 @@ private[spark] object Utils fileSize } catch { case e: Throwable => - logError(s"Cannot get file length of ${file}", e) + logError(log"Cannot get file length of ${MDC(PATH, file)}", e) throw e } finally { if (gzInputStream != null) { @@ -1847,7 +1850,8 @@ private[spark] object Utils case ct: ControlThrowable => throw ct case t: Throwable => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) throw t } } @@ -1861,7 +1865,8 @@ private[spark] object Utils case ct: ControlThrowable => throw ct case t: Throwable => - logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + logError( + log"Uncaught exception in thread ${MDC(THREAD_NAME, Thread.currentThread().getName)}", t) scala.util.Failure(t) } } @@ -2348,7 +2353,8 @@ private[spark] object Utils val currentUserGroups = groupMappingServiceProvider.getGroups(username) return currentUserGroups } catch { - case e: Exception => logError(s"Error getting groups for user=$username", e) + case e: Exception => + logError(log"Error getting groups for user=${MDC(USER_NAME, username)}", e) } } EMPTY_USER_GROUPS diff --git a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala index 2243239dce6fd..1dadf15da40fa 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala @@ -20,7 +20,8 @@ package org.apache.spark.util.logging import java.io.{File, FileOutputStream, InputStream, IOException} import org.apache.spark.SparkConf -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.util.{IntParam, Utils} /** @@ -90,7 +91,7 @@ private[spark] class FileAppender( } } catch { case e: Exception => - logError(s"Error writing stream to file $file", e) + logError(log"Error writing stream to file ${MDC(PATH, file)}", e) } } diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index e374c41b91405..f8f144f6e3885 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -24,7 +24,8 @@ import com.google.common.io.Files import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf -import org.apache.spark.internal.config +import org.apache.spark.internal.{config, MDC} +import org.apache.spark.internal.LogKey.PATH import org.apache.spark.util.ArrayImplicits._ /** @@ -77,7 +78,7 @@ private[spark] class RollingFileAppender( } } catch { case e: Exception => - logError(s"Error rolling over $activeFile", e) + logError(log"Error rolling over ${MDC(PATH, activeFile)}", e) } } @@ -156,7 +157,8 @@ private[spark] class RollingFileAppender( } } catch { case e: Exception => - logError("Error cleaning logs in directory " + activeFile.getParentFile.getAbsolutePath, e) + val path = activeFile.getParentFile.getAbsolutePath + logError(log"Error cleaning logs in directory ${MDC(PATH, path)}", e) } } } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java new file mode 100644 index 0000000000000..561d66092d641 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/functions/Reducer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.connector.catalog.functions; + +import org.apache.spark.annotation.Evolving; + +/** + * A 'reducer' for output of user-defined functions. + * + * @see ReducibleFunction + * + * A user defined function f_source(x) is 'reducible' on another user_defined function + * f_target(x) if + *
+ * Examples: + *
+ * For example, to return reducer for reducing f_source = bucket(4, x) on f_target = bucket(2, x) + *
+ * Example of reducing f_source = days(x) on f_target = hours(x) + *
+ * A [[Reducer]] exists for a transform expression function if it is + * 'reducible' on the other expression function. + *
+ * @return reducer function or None if not reducible on the other transform expression + */ + def reducers(other: TransformExpression): Option[Reducer[_, _]] = { + (function, other.function) match { + case(e1: ReducibleFunction[_, _], e2: ReducibleFunction[_, _]) => + reducer(e1, numBucketsOpt, e2, other.numBucketsOpt) + case _ => None + } + } + + // Return a Reducer for a reducible function on another reducible function + private def reducer( + thisFunction: ReducibleFunction[_, _], + thisNumBucketsOpt: Option[Int], + otherFunction: ReducibleFunction[_, _], + otherNumBucketsOpt: Option[Int]): Option[Reducer[_, _]] = { + val res = (thisNumBucketsOpt, otherNumBucketsOpt) match { + case (Some(numBuckets), Some(otherNumBuckets)) => + thisFunction.reducer(numBuckets, otherFunction, otherNumBuckets) + case _ => thisFunction.reducer(otherFunction) + } + Option(res) + } + override def dataType: DataType = function.resultType() override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index dfe07a443a230..63a000f50489d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -33,6 +33,8 @@ import org.codehaus.janino.util.ClassFile import org.apache.spark.{SparkException, SparkIllegalArgumentException, TaskContext, TaskKilledException} import org.apache.spark.executor.InputMetrics import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.metrics.source.CodegenMetrics import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.HashableWeakReference @@ -1593,7 +1595,8 @@ object CodeGenerator extends Logging { if (byteCodeSize > DEFAULT_JVM_HUGE_METHOD_LIMIT) { logInfo("Generated method too long to be JIT compiled: " + - s"${cf.getThisClassName}.${method.getName} is $byteCodeSize bytes") + log"${MDC(CLASS_NAME, cf.getThisClassName)}.${MDC(METHOD_NAME, method.getName)} " + + log"is ${MDC(BYTECODE_SIZE, byteCodeSize)} bytes") } byteCodeSize @@ -1638,7 +1641,7 @@ object CodeGenerator extends Logging { val timeMs: Double = duration.toDouble / NANOS_PER_MILLIS CodegenMetrics.METRIC_SOURCE_CODE_SIZE.update(code.body.length) CodegenMetrics.METRIC_COMPILATION_TIME.update(timeMs.toLong) - logInfo(s"Code generated in $timeMs ms") + logInfo(log"Code generated in ${MDC(TIME_UNITS, timeMs)} ms") _compileTime.add(duration) result } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 122cba5b74f8d..0e31595f919d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.catalyst.optimizer import scala.collection.mutable import org.apache.spark.SparkException +import org.apache.spark.internal.LogKey._ +import org.apache.spark.internal.MDC import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} @@ -455,7 +457,8 @@ abstract class Optimizer(catalogManager: CatalogManager) val filteredRules = batch.rules.filter { rule => val exclude = excludedRules.contains(rule.ruleName) if (exclude) { - logInfo(s"Optimization rule '${rule.ruleName}' is excluded from the optimizer.") + logInfo(log"Optimization rule '${MDC(RULE_NAME, rule.ruleName)}' " + + log"is excluded from the optimizer.") } !exclude } @@ -464,8 +467,8 @@ abstract class Optimizer(catalogManager: CatalogManager) } else if (filteredRules.nonEmpty) { Some(Batch(batch.name, batch.strategy, filteredRules: _*)) } else { - logInfo(s"Optimization batch '${batch.name}' is excluded from the optimizer " + - s"as all enclosed rules have been excluded.") + logInfo(log"Optimization batch '${MDC(RULE_BATCH_NAME, batch.name)}' " + + log"is excluded from the optimizer as all enclosed rules have been excluded.") None } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index c98a2a92a3abb..2364130f79e4c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -24,6 +24,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper +import org.apache.spark.sql.connector.catalog.functions.Reducer import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, IntegerType} @@ -833,10 +834,42 @@ case class KeyGroupedShuffleSpec( (left, right) match { case (_: LeafExpression, _: LeafExpression) => true case (left: TransformExpression, right: TransformExpression) => - left.isSameFunction(right) + if (SQLConf.get.v2BucketingPushPartValuesEnabled && + !SQLConf.get.v2BucketingPartiallyClusteredDistributionEnabled && + SQLConf.get.v2BucketingAllowCompatibleTransforms) { + left.isCompatible(right) + } else { + left.isSameFunction(right) + } case _ => false } + /** + * Return a set of [[Reducer]] for the partition expressions of this shuffle spec, + * on the partition expressions of another shuffle spec. + *
+ * A [[Reducer]] exists for a partition expression function of this shuffle spec if it is + * 'reducible' on the corresponding partition expression function of the other shuffle spec. + *
+ * If a value is returned, there must be one [[Reducer]] per partition expression. + * A None value in the set indicates that the particular partition expression is not reducible + * on the corresponding expression on the other shuffle spec. + *
+ * Returning none also indicates that none of the partition expressions can be reduced on the
+ * corresponding expression on the other shuffle spec.
+ *
+ * @param other other key-grouped shuffle spec
+ */
+ def reducers(other: KeyGroupedShuffleSpec): Option[Seq[Option[Reducer[_, _]]]] = {
+ val results = partitioning.expressions.zip(other.partitioning.expressions).map {
+ case (e1: TransformExpression, e2: TransformExpression) => e1.reducers(e2)
+ case (_, _) => None
+ }
+
+ // optimize to not return a value, if none of the partition expressions are reducible
+ if (results.forall(p => p.isEmpty)) None else Some(results)
+ }
+
override def canCreatePartitioning: Boolean = SQLConf.get.v2BucketingShuffleEnabled &&
// Only support partition expressions are AttributeReference for now
partitioning.expressions.forall(_.isInstanceOf[AttributeReference])
@@ -846,6 +879,21 @@ case class KeyGroupedShuffleSpec(
}
}
+object KeyGroupedShuffleSpec {
+ def reducePartitionValue(
+ row: InternalRow,
+ expressions: Seq[Expression],
+ reducers: Seq[Option[Reducer[_, _]]]):
+ InternalRowComparableWrapper = {
+ val partitionVals = row.toSeq(expressions.map(_.dataType))
+ val reducedRow = partitionVals.zip(reducers).map{
+ case (v, Some(reducer: Reducer[Any, Any])) => reducer.reduce(v)
+ case (v, _) => v
+ }.toArray
+ InternalRowComparableWrapper(new GenericInternalRow(reducedRow), expressions)
+ }
+}
+
case class ShuffleSpecCollection(specs: Seq[ShuffleSpec]) extends ShuffleSpec {
override def isCompatibleWith(other: ShuffleSpec): Boolean = {
specs.exists(_.isCompatibleWith(other))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index d5cd5a90e3382..ff2d3fd631127 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -18,7 +18,9 @@
package org.apache.spark.sql.catalyst.rules
import org.apache.spark.SparkException
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{Logging, MessageWithContext}
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
import org.apache.spark.sql.catalyst.QueryPlanningTracker
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_SECOND
@@ -56,10 +58,10 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
def logRule(ruleName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
if (!newPlan.fastEquals(oldPlan)) {
if (logRules.isEmpty || logRules.get.contains(ruleName)) {
- def message(): String = {
- s"""
- |=== Applying Rule $ruleName ===
- |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
+ def message(): MessageWithContext = {
+ log"""
+ |=== Applying Rule ${MDC(RULE_NAME, ruleName)} ===
+ |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))}
""".stripMargin
}
@@ -70,14 +72,14 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
def logBatch(batchName: String, oldPlan: TreeType, newPlan: TreeType): Unit = {
if (logBatches.isEmpty || logBatches.get.contains(batchName)) {
- def message(): String = {
+ def message(): MessageWithContext = {
if (!oldPlan.fastEquals(newPlan)) {
- s"""
- |=== Result of Batch $batchName ===
- |${sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n")}
+ log"""
+ |=== Result of Batch ${MDC(RULE_BATCH_NAME, batchName)} ===
+ |${MDC(QUERY_PLAN, sideBySide(oldPlan.treeString, newPlan.treeString).mkString("\n"))}
""".stripMargin
} else {
- s"Batch $batchName has no effect."
+ log"Batch ${MDC(RULE_BATCH_NAME, batchName)} has no effect."
}
}
@@ -88,26 +90,26 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging {
def logMetrics(metrics: QueryExecutionMetrics): Unit = {
val totalTime = metrics.time / NANOS_PER_SECOND.toDouble
val totalTimeEffective = metrics.timeEffective / NANOS_PER_SECOND.toDouble
- val message =
- s"""
+ val message: MessageWithContext =
+ log"""
|=== Metrics of Executed Rules ===
- |Total number of runs: ${metrics.numRuns}
- |Total time: $totalTime seconds
- |Total number of effective runs: ${metrics.numEffectiveRuns}
- |Total time of effective runs: $totalTimeEffective seconds
+ |Total number of runs: ${MDC(RULE_NUMBER_OF_RUNS, metrics.numRuns)}
+ |Total time: ${MDC(TIME_UNITS, totalTime)} seconds
+ |Total number of effective runs: ${MDC(RULE_NUMBER_OF_RUNS, metrics.numEffectiveRuns)}
+ |Total time of effective runs: ${MDC(TIME_UNITS, totalTimeEffective)} seconds
""".stripMargin
logBasedOnLevel(message)
}
- private def logBasedOnLevel(f: => String): Unit = {
+ private def logBasedOnLevel(f: => MessageWithContext): Unit = {
logLevel match {
- case "TRACE" => logTrace(f)
- case "DEBUG" => logDebug(f)
+ case "TRACE" => logTrace(f.message)
+ case "DEBUG" => logDebug(f.message)
case "INFO" => logInfo(f)
case "WARN" => logWarning(f)
case "ERROR" => logError(f)
- case _ => logTrace(f)
+ case _ => logTrace(f.message)
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
index 51140d0101939..3d93c4e8742ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/ValidatorUtil.scala
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFiles
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKey._
+import org.apache.spark.internal.MDC
/**
* Utilities for working with XSD validation.
@@ -56,7 +58,8 @@ object ValidatorUtil extends Logging {
case e: Throwable =>
// Handle case where it was added with sc.addFile
// When they are added via sc.addFile, they are always downloaded to local file system
- logInfo(s"$xsdPath was not found, falling back to look up files added by Spark")
+ logInfo(log"${MDC(XSD_PATH, xsdPath)} was not found, " +
+ log"falling back to look up files added by Spark")
val f = new File(SparkFiles.get(xsdPath.toString))
if (f.exists()) {
new FileInputStream(f)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 9f07722528e88..73cb4fba8637d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1558,6 +1558,18 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS =
+ buildConf("spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled")
+ .doc("Whether to allow storage-partition join in the case where the partition transforms " +
+ "are compatible but not identical. This config requires both " +
+ s"${V2_BUCKETING_ENABLED.key} and ${V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key} to be " +
+ s"enabled and ${V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
+ "to be disabled."
+ )
+ .version("4.0.0")
+ .booleanConf
+ .createWithDefault(false)
+
val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
.doc("The maximum number of buckets allowed.")
.version("2.4.0")
@@ -5323,6 +5335,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def v2BucketingAllowJoinKeysSubsetOfPartitionKeys: Boolean =
getConf(SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS)
+ def v2BucketingAllowCompatibleTransforms: Boolean =
+ getConf(SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS)
+
def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)
diff --git a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt
index d3b3aafc21e57..0317e61163752 100644
--- a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt
+++ b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-jdk21-results.txt
@@ -6,33 +6,66 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
putting 10000 rows (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-In-memory 9 10 1 1.1 894.7 1.0X
-RocksDB (trackTotalNumberOfRows: true) 41 42 2 0.2 4064.6 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 15 1 0.7 1466.8 0.6X
+In-memory 9 10 1 1.1 936.2 1.0X
+RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4068.9 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1500.4 0.6X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
putting 10000 rows (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------
-In-memory 9 10 0 1.1 893.1 1.0X
-RocksDB (trackTotalNumberOfRows: true) 40 40 1 0.3 3959.7 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1510.8 0.6X
+In-memory 9 11 1 1.1 929.8 1.0X
+RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.3 3955.7 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1497.3 0.6X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
putting 10000 rows (1000 rows to overwrite - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------
-In-memory 9 9 0 1.1 872.0 1.0X
-RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3887.2 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1532.3 0.6X
+In-memory 9 10 1 1.1 907.5 1.0X
+RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3886.5 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1497.2 0.6X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
putting 10000 rows (0 rows to overwrite - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
-In-memory 9 10 1 1.1 874.5 1.0X
-RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.3 3967.1 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1526.2 0.6X
+In-memory 9 10 1 1.1 904.0 1.0X
+RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3859.8 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1497.2 0.6X
+
+
+================================================================================================
+merge rows
+================================================================================================
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true) 519 533 7 0.0 51916.6 1.0X
+RocksDB (trackTotalNumberOfRows: false) 171 177 3 0.1 17083.9 3.0X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true) 506 521 7 0.0 50644.0 1.0X
+RocksDB (trackTotalNumberOfRows: false) 170 176 3 0.1 17022.0 3.0X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (1000 rows to overwrite - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true) 493 508 6 0.0 49319.3 1.0X
+RocksDB (trackTotalNumberOfRows: false) 169 175 3 0.1 16897.6 2.9X
+
+OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (0 rows to overwrite - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true) 495 508 6 0.0 49462.5 1.0X
+RocksDB (trackTotalNumberOfRows: false) 169 175 3 0.1 16896.6 2.9X
================================================================================================
@@ -43,33 +76,33 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
trying to delete 10000 rows from 10000 rows(10000 rows are non-existing - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 0 1 0 20.9 47.9 1.0X
-RocksDB (trackTotalNumberOfRows: true) 40 40 1 0.3 3956.8 0.0X
-RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.6 1541.9 0.0X
+In-memory 0 1 0 26.3 38.0 1.0X
+RocksDB (trackTotalNumberOfRows: true) 39 41 1 0.3 3942.0 0.0X
+RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1529.2 0.0X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
trying to delete 10000 rows from 10000 rows(5000 rows are non-existing - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 10 1 1.3 773.4 1.0X
-RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.2 4024.1 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1537.8 0.5X
+In-memory 8 9 1 1.3 790.4 1.0X
+RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.2 4036.7 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1536.9 0.5X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
trying to delete 10000 rows from 10000 rows(1000 rows are non-existing - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 10 1 1.2 817.7 1.0X
-RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4111.7 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.6 1540.5 0.5X
+In-memory 8 10 1 1.2 847.0 1.0X
+RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4099.8 0.2X
+RocksDB (trackTotalNumberOfRows: false) 16 16 0 0.6 1563.3 0.5X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
trying to delete 10000 rows from 10000 rows(0 rows are non-existing - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 10 1 1.2 820.0 1.0X
-RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4133.0 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1526.2 0.5X
+In-memory 9 10 1 1.2 859.4 1.0X
+RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4118.9 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1507.8 0.6X
================================================================================================
@@ -80,32 +113,30 @@ OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
evicting 10000 rows (maxTimestampToEvictInMillis: 9999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 9 0 1.2 805.5 1.0X
-RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3888.6 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1538.2 0.5X
+In-memory 8 9 1 1.2 831.0 1.0X
+RocksDB (trackTotalNumberOfRows: true) 40 40 1 0.3 3956.6 0.2X
+RocksDB (trackTotalNumberOfRows: false) 16 16 0 0.6 1571.3 0.5X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
evicting 5000 rows (maxTimestampToEvictInMillis: 4999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 8 0 1.3 754.7 1.0X
-RocksDB (trackTotalNumberOfRows: true) 21 22 0 0.5 2091.7 0.4X
-RocksDB (trackTotalNumberOfRows: false) 9 9 0 1.1 916.1 0.8X
+In-memory 8 8 1 1.3 787.6 1.0X
+RocksDB (trackTotalNumberOfRows: true) 21 22 0 0.5 2112.6 0.4X
+RocksDB (trackTotalNumberOfRows: false) 9 9 0 1.1 932.9 0.8X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
evicting 1000 rows (maxTimestampToEvictInMillis: 999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 7 8 1 1.4 692.8 1.0X
-RocksDB (trackTotalNumberOfRows: true) 7 7 0 1.5 654.6 1.1X
-RocksDB (trackTotalNumberOfRows: false) 4 4 0 2.4 423.8 1.6X
+In-memory 7 8 0 1.4 715.7 1.0X
+RocksDB (trackTotalNumberOfRows: true) 7 7 0 1.5 676.3 1.1X
+RocksDB (trackTotalNumberOfRows: false) 4 5 0 2.3 442.3 1.6X
OpenJDK 64-Bit Server VM 21.0.2+13-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
evicting 0 rows (maxTimestampToEvictInMillis: -1) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 0 0 0 24.2 41.2 1.0X
-RocksDB (trackTotalNumberOfRows: true) 3 3 0 3.4 290.1 0.1X
-RocksDB (trackTotalNumberOfRows: false) 3 3 0 3.4 290.6 0.1X
-
-
+In-memory 0 0 0 23.8 41.9 1.0X
+RocksDB (trackTotalNumberOfRows: true) 3 3 0 3.2 309.5 0.1X
+RocksDB (trackTotalNumberOfRows: false) 3 3 0 3.2 309.9 0.1X
diff --git a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt
index 86d3d44003316..d2aa646d5ec1d 100644
--- a/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt
+++ b/sql/core/benchmarks/StateStoreBasicOperationsBenchmark-results.txt
@@ -6,33 +6,66 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
putting 10000 rows (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------------
-In-memory 9 10 1 1.1 907.3 1.0X
-RocksDB (trackTotalNumberOfRows: true) 40 42 2 0.2 4048.4 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1508.4 0.6X
+In-memory 10 12 1 1.0 960.1 1.0X
+RocksDB (trackTotalNumberOfRows: true) 42 43 2 0.2 4173.9 0.2X
+RocksDB (trackTotalNumberOfRows: false) 16 16 1 0.6 1551.6 0.6X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
putting 10000 rows (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------
-In-memory 9 10 1 1.1 901.9 1.0X
-RocksDB (trackTotalNumberOfRows: true) 39 41 1 0.3 3922.0 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1513.5 0.6X
+In-memory 10 12 1 1.0 970.1 1.0X
+RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4095.8 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 17 1 0.6 1544.6 0.6X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
putting 10000 rows (1000 rows to overwrite - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------
-In-memory 9 9 0 1.1 880.2 1.0X
-RocksDB (trackTotalNumberOfRows: true) 38 39 1 0.3 3829.7 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1506.0 0.6X
+In-memory 9 11 1 1.1 933.3 1.0X
+RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.3 3966.2 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.6 1540.2 0.6X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
putting 10000 rows (0 rows to overwrite - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
---------------------------------------------------------------------------------------------------------------------------------
-In-memory 9 9 0 1.1 878.6 1.0X
-RocksDB (trackTotalNumberOfRows: true) 38 39 1 0.3 3803.4 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1500.3 0.6X
+In-memory 9 11 1 1.1 936.1 1.0X
+RocksDB (trackTotalNumberOfRows: true) 39 41 1 0.3 3942.4 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1530.1 0.6X
+
+
+================================================================================================
+merge rows
+================================================================================================
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (10000 rows to overwrite - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true) 525 538 6 0.0 52516.4 1.0X
+RocksDB (trackTotalNumberOfRows: false) 170 177 4 0.1 16960.4 3.1X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (5000 rows to overwrite - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true) 514 528 6 0.0 51351.9 1.0X
+RocksDB (trackTotalNumberOfRows: false) 168 174 4 0.1 16794.0 3.1X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (1000 rows to overwrite - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true) 500 513 6 0.0 49955.1 1.0X
+RocksDB (trackTotalNumberOfRows: false) 169 174 2 0.1 16867.1 3.0X
+
+OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
+AMD EPYC 7763 64-Core Processor
+merging 10000 rows with 10 values per key (0 rows to overwrite - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------------------------------------
+RocksDB (trackTotalNumberOfRows: true) 492 508 8 0.0 49225.8 1.0X
+RocksDB (trackTotalNumberOfRows: false) 168 173 3 0.1 16757.2 2.9X
================================================================================================
@@ -43,33 +76,33 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
trying to delete 10000 rows from 10000 rows(10000 rows are non-existing - rate 100): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 0 1 0 26.5 37.8 1.0X
-RocksDB (trackTotalNumberOfRows: true) 38 39 1 0.3 3779.3 0.0X
-RocksDB (trackTotalNumberOfRows: false) 15 15 1 0.7 1462.9 0.0X
+In-memory 0 1 0 26.1 38.3 1.0X
+RocksDB (trackTotalNumberOfRows: true) 38 40 1 0.3 3835.6 0.0X
+RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1455.7 0.0X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
trying to delete 10000 rows from 10000 rows(5000 rows are non-existing - rate 50): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 7 8 0 1.3 742.4 1.0X
-RocksDB (trackTotalNumberOfRows: true) 39 40 1 0.3 3913.8 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 15 0 0.7 1461.1 0.5X
+In-memory 8 9 1 1.3 793.9 1.0X
+RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.2 4018.1 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 0 0.7 1505.6 0.5X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
trying to delete 10000 rows from 10000 rows(1000 rows are non-existing - rate 10): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 8 0 1.3 794.8 1.0X
-RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.2 4012.8 0.2X
-RocksDB (trackTotalNumberOfRows: false) 15 15 0 0.7 1461.5 0.5X
+In-memory 8 10 1 1.2 837.2 1.0X
+RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4073.9 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 16 1 0.7 1470.6 0.6X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
trying to delete 10000 rows from 10000 rows(0 rows are non-existing - rate 0): Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 9 0 1.2 809.7 1.0X
-RocksDB (trackTotalNumberOfRows: true) 40 42 1 0.2 4043.4 0.2X
-RocksDB (trackTotalNumberOfRows: false) 14 15 1 0.7 1445.2 0.6X
+In-memory 8 9 0 1.2 843.6 1.0X
+RocksDB (trackTotalNumberOfRows: true) 41 42 1 0.2 4088.7 0.2X
+RocksDB (trackTotalNumberOfRows: false) 15 15 0 0.7 1466.1 0.6X
================================================================================================
@@ -80,32 +113,30 @@ OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
evicting 10000 rows (maxTimestampToEvictInMillis: 9999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 9 0 1.2 806.2 1.0X
-RocksDB (trackTotalNumberOfRows: true) 40 41 1 0.3 3980.0 0.2X
-RocksDB (trackTotalNumberOfRows: false) 16 16 0 0.6 1599.4 0.5X
+In-memory 8 9 0 1.2 833.5 1.0X
+RocksDB (trackTotalNumberOfRows: true) 40 41 0 0.3 3976.5 0.2X
+RocksDB (trackTotalNumberOfRows: false) 16 16 0 0.6 1588.1 0.5X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
evicting 5000 rows (maxTimestampToEvictInMillis: 4999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 8 8 0 1.3 752.2 1.0X
-RocksDB (trackTotalNumberOfRows: true) 22 22 0 0.5 2170.8 0.3X
-RocksDB (trackTotalNumberOfRows: false) 10 10 0 1.0 967.7 0.8X
+In-memory 8 8 0 1.3 784.3 1.0X
+RocksDB (trackTotalNumberOfRows: true) 22 22 0 0.5 2155.1 0.4X
+RocksDB (trackTotalNumberOfRows: false) 10 10 0 1.0 986.9 0.8X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
evicting 1000 rows (maxTimestampToEvictInMillis: 999) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-----------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 7 7 0 1.4 694.9 1.0X
-RocksDB (trackTotalNumberOfRows: true) 7 7 0 1.4 700.1 1.0X
-RocksDB (trackTotalNumberOfRows: false) 5 5 0 2.2 465.0 1.5X
+In-memory 7 8 0 1.4 722.3 1.0X
+RocksDB (trackTotalNumberOfRows: true) 7 7 0 1.4 718.8 1.0X
+RocksDB (trackTotalNumberOfRows: false) 5 5 0 2.0 488.7 1.5X
OpenJDK 64-Bit Server VM 17.0.10+7-LTS on Linux 6.5.0-1016-azure
AMD EPYC 7763 64-Core Processor
evicting 0 rows (maxTimestampToEvictInMillis: -1) from 10000 rows: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------------------------------
-In-memory 1 1 0 19.7 50.7 1.0X
-RocksDB (trackTotalNumberOfRows: true) 3 3 0 3.0 332.1 0.2X
-RocksDB (trackTotalNumberOfRows: false) 3 3 0 3.0 331.7 0.2X
-
-
+In-memory 0 1 0 21.3 46.9 1.0X
+RocksDB (trackTotalNumberOfRows: true) 4 4 0 2.8 358.9 0.1X
+RocksDB (trackTotalNumberOfRows: false) 4 4 0 2.8 358.7 0.1X
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
index 7cce599040189..f949dbf71a371 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala
@@ -24,9 +24,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Partitioning, SinglePartition}
+import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, KeyGroupedShuffleSpec, Partitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper}
import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.catalog.functions.Reducer
import org.apache.spark.sql.connector.read._
import org.apache.spark.util.ArrayImplicits._
@@ -164,6 +165,18 @@ case class BatchScanExec(
(groupedParts, expressions)
}
+ // Also re-group the partitions if we are reducing compatible partition expressions
+ val finalGroupedPartitions = spjParams.reducers match {
+ case Some(reducers) =>
+ val result = groupedPartitions.groupBy { case (row, _) =>
+ KeyGroupedShuffleSpec.reducePartitionValue(row, partExpressions, reducers)
+ }.map { case (wrapper, splits) => (wrapper.row, splits.flatMap(_._2)) }.toSeq
+ val rowOrdering = RowOrdering.createNaturalAscendingOrdering(
+ partExpressions.map(_.dataType))
+ result.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1))
+ case _ => groupedPartitions
+ }
+
// When partially clustered, the input partitions are not grouped by partition
// values. Here we'll need to check `commonPartitionValues` and decide how to group
// and replicate splits within a partition.
@@ -174,7 +187,7 @@ case class BatchScanExec(
.get
.map(t => (InternalRowComparableWrapper(t._1, partExpressions), t._2))
.toMap
- val nestGroupedPartitions = groupedPartitions.map { case (partValue, splits) =>
+ val nestGroupedPartitions = finalGroupedPartitions.map { case (partValue, splits) =>
// `commonPartValuesMap` should contain the part value since it's the super set.
val numSplits = commonPartValuesMap
.get(InternalRowComparableWrapper(partValue, partExpressions))
@@ -207,7 +220,7 @@ case class BatchScanExec(
} else {
// either `commonPartitionValues` is not defined, or it is defined but
// `applyPartialClustering` is false.
- val partitionMapping = groupedPartitions.map { case (partValue, splits) =>
+ val partitionMapping = finalGroupedPartitions.map { case (partValue, splits) =>
InternalRowComparableWrapper(partValue, partExpressions) -> splits
}.toMap
@@ -259,6 +272,7 @@ case class StoragePartitionJoinParams(
keyGroupedPartitioning: Option[Seq[Expression]] = None,
joinKeyPositions: Option[Seq[Int]] = None,
commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
+ reducers: Option[Seq[Option[Reducer[_, _]]]] = None,
applyPartialClustering: Boolean = false,
replicatePartitions: Boolean = false) {
override def equals(other: Any): Boolean = other match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 2a7c1206bb410..a0f74ef6c3d02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper
+import org.apache.spark.sql.connector.catalog.functions.Reducer
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
@@ -505,11 +506,28 @@ case class EnsureRequirements(
}
}
- // Now we need to push-down the common partition key to the scan in each child
- newLeft = populatePartitionValues(left, mergedPartValues, leftSpec.joinKeyPositions,
- applyPartialClustering, replicateLeftSide)
- newRight = populatePartitionValues(right, mergedPartValues, rightSpec.joinKeyPositions,
- applyPartialClustering, replicateRightSide)
+ // in case of compatible but not identical partition expressions, we apply 'reduce'
+ // transforms to group one side's partitions as well as the common partition values
+ val leftReducers = leftSpec.reducers(rightSpec)
+ val rightReducers = rightSpec.reducers(leftSpec)
+
+ if (leftReducers.isDefined || rightReducers.isDefined) {
+ mergedPartValues = reduceCommonPartValues(mergedPartValues,
+ leftSpec.partitioning.expressions,
+ leftReducers)
+ mergedPartValues = reduceCommonPartValues(mergedPartValues,
+ rightSpec.partitioning.expressions,
+ rightReducers)
+ val rowOrdering = RowOrdering
+ .createNaturalAscendingOrdering(partitionExprs.map(_.dataType))
+ mergedPartValues = mergedPartValues.sorted(rowOrdering.on((t: (InternalRow, _)) => t._1))
+ }
+
+ // Now we need to push-down the common partition information to the scan in each child
+ newLeft = populateCommonPartitionInfo(left, mergedPartValues, leftSpec.joinKeyPositions,
+ leftReducers, applyPartialClustering, replicateLeftSide)
+ newRight = populateCommonPartitionInfo(right, mergedPartValues, rightSpec.joinKeyPositions,
+ rightReducers, applyPartialClustering, replicateRightSide)
}
}
@@ -527,11 +545,12 @@ case class EnsureRequirements(
joinType == LeftAnti || joinType == LeftOuter
}
- // Populate the common partition values down to the scan nodes
- private def populatePartitionValues(
+ // Populate the common partition information down to the scan nodes
+ private def populateCommonPartitionInfo(
plan: SparkPlan,
values: Seq[(InternalRow, Int)],
joinKeyPositions: Option[Seq[Int]],
+ reducers: Option[Seq[Option[Reducer[_, _]]]],
applyPartialClustering: Boolean,
replicatePartitions: Boolean): SparkPlan = plan match {
case scan: BatchScanExec =>
@@ -539,13 +558,26 @@ case class EnsureRequirements(
spjParams = scan.spjParams.copy(
commonPartitionValues = Some(values),
joinKeyPositions = joinKeyPositions,
+ reducers = reducers,
applyPartialClustering = applyPartialClustering,
replicatePartitions = replicatePartitions
)
)
case node =>
- node.mapChildren(child => populatePartitionValues(
- child, values, joinKeyPositions, applyPartialClustering, replicatePartitions))
+ node.mapChildren(child => populateCommonPartitionInfo(
+ child, values, joinKeyPositions, reducers, applyPartialClustering, replicatePartitions))
+ }
+
+ private def reduceCommonPartValues(
+ commonPartValues: Seq[(InternalRow, Int)],
+ expressions: Seq[Expression],
+ reducers: Option[Seq[Option[Reducer[_, _]]]]) = {
+ reducers match {
+ case Some(reducers) => commonPartValues.groupBy { case (row, _) =>
+ KeyGroupedShuffleSpec.reducePartitionValue(row, expressions, reducers)
+ }.map{ case(wrapper, splits) => (wrapper.row, splits.map(_._2).sum) }.toSeq
+ case _ => commonPartValues
+ }
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
index 7fdc703007c2c..ec275fe101fd6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala
@@ -63,11 +63,17 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
Collections.emptyMap[String, String]
}
private val table: String = "tbl"
+
private val columns: Array[Column] = Array(
Column.create("id", IntegerType),
Column.create("data", StringType),
Column.create("ts", TimestampType))
+ private val columns2: Array[Column] = Array(
+ Column.create("store_id", IntegerType),
+ Column.create("dept_id", IntegerType),
+ Column.create("data", StringType))
+
test("clustered distribution: output partitioning should be KeyGroupedPartitioning") {
val partitions: Array[Transform] = Array(Expressions.years("ts"))
@@ -1309,6 +1315,474 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
}
}
+ test("SPARK-47094: Support compatible buckets") {
+ val table1 = "tab1e1"
+ val table2 = "table2"
+
+ Seq(
+ ((2, 4), (4, 2)),
+ ((4, 2), (2, 4)),
+ ((2, 2), (4, 6)),
+ ((6, 2), (2, 2))).foreach {
+ case ((table1buckets1, table1buckets2), (table2buckets1, table2buckets2)) =>
+ catalog.clearTables()
+
+ val partition1 = Array(bucket(table1buckets1, "store_id"),
+ bucket(table1buckets2, "dept_id"))
+ val partition2 = Array(bucket(table2buckets1, "store_id"),
+ bucket(table2buckets2, "dept_id"))
+
+ Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) =>
+ createTable(tab, columns2, part)
+ val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
+ "(0, 0, 'aa'), " +
+ "(0, 0, 'ab'), " + // duplicate partition key
+ "(0, 1, 'ac'), " +
+ "(0, 2, 'ad'), " +
+ "(0, 3, 'ae'), " +
+ "(0, 4, 'af'), " +
+ "(0, 5, 'ag'), " +
+ "(1, 0, 'ah'), " +
+ "(1, 0, 'ai'), " + // duplicate partition key
+ "(1, 1, 'aj'), " +
+ "(1, 2, 'ak'), " +
+ "(1, 3, 'al'), " +
+ "(1, 4, 'am'), " +
+ "(1, 5, 'an'), " +
+ "(2, 0, 'ao'), " +
+ "(2, 0, 'ap'), " + // duplicate partition key
+ "(2, 1, 'aq'), " +
+ "(2, 2, 'ar'), " +
+ "(2, 3, 'as'), " +
+ "(2, 4, 'at'), " +
+ "(2, 5, 'au'), " +
+ "(3, 0, 'av'), " +
+ "(3, 0, 'aw'), " + // duplicate partition key
+ "(3, 1, 'ax'), " +
+ "(3, 2, 'ay'), " +
+ "(3, 3, 'az'), " +
+ "(3, 4, 'ba'), " +
+ "(3, 5, 'bb'), " +
+ "(4, 0, 'bc'), " +
+ "(4, 0, 'bd'), " + // duplicate partition key
+ "(4, 1, 'be'), " +
+ "(4, 2, 'bf'), " +
+ "(4, 3, 'bg'), " +
+ "(4, 4, 'bh'), " +
+ "(4, 5, 'bi'), " +
+ "(5, 0, 'bj'), " +
+ "(5, 0, 'bk'), " + // duplicate partition key
+ "(5, 1, 'bl'), " +
+ "(5, 2, 'bm'), " +
+ "(5, 3, 'bn'), " +
+ "(5, 4, 'bo'), " +
+ "(5, 5, 'bp')"
+
+ // additional unmatched partitions to test push down
+ val finalStr = if (tab == table1) {
+ insertStr ++ ", (8, 0, 'xa'), (8, 8, 'xx')"
+ } else {
+ insertStr ++ ", (9, 0, 'ya'), (9, 9, 'yy')"
+ }
+
+ sql(finalStr)
+ }
+
+ Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+ withSQLConf(
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+ SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
+ allowJoinKeysSubsetOfPartitionKeys.toString,
+ SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+ val df = sql(
+ s"""
+ |${selectWithMergeJoinHint("t1", "t2")}
+ |t1.store_id, t1.dept_id, t1.data, t2.data
+ |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+ |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
+ |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+ |""".stripMargin)
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty, "SPJ should be triggered")
+
+ val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+ partitions.length)
+ val expectedBuckets = Math.min(table1buckets1, table2buckets1) *
+ Math.min(table1buckets2, table2buckets2)
+ assert(scans == Seq(expectedBuckets, expectedBuckets))
+
+ checkAnswer(df, Seq(
+ Row(0, 0, "aa", "aa"),
+ Row(0, 0, "aa", "ab"),
+ Row(0, 0, "ab", "aa"),
+ Row(0, 0, "ab", "ab"),
+ Row(0, 1, "ac", "ac"),
+ Row(0, 2, "ad", "ad"),
+ Row(0, 3, "ae", "ae"),
+ Row(0, 4, "af", "af"),
+ Row(0, 5, "ag", "ag"),
+ Row(1, 0, "ah", "ah"),
+ Row(1, 0, "ah", "ai"),
+ Row(1, 0, "ai", "ah"),
+ Row(1, 0, "ai", "ai"),
+ Row(1, 1, "aj", "aj"),
+ Row(1, 2, "ak", "ak"),
+ Row(1, 3, "al", "al"),
+ Row(1, 4, "am", "am"),
+ Row(1, 5, "an", "an"),
+ Row(2, 0, "ao", "ao"),
+ Row(2, 0, "ao", "ap"),
+ Row(2, 0, "ap", "ao"),
+ Row(2, 0, "ap", "ap"),
+ Row(2, 1, "aq", "aq"),
+ Row(2, 2, "ar", "ar"),
+ Row(2, 3, "as", "as"),
+ Row(2, 4, "at", "at"),
+ Row(2, 5, "au", "au"),
+ Row(3, 0, "av", "av"),
+ Row(3, 0, "av", "aw"),
+ Row(3, 0, "aw", "av"),
+ Row(3, 0, "aw", "aw"),
+ Row(3, 1, "ax", "ax"),
+ Row(3, 2, "ay", "ay"),
+ Row(3, 3, "az", "az"),
+ Row(3, 4, "ba", "ba"),
+ Row(3, 5, "bb", "bb"),
+ Row(4, 0, "bc", "bc"),
+ Row(4, 0, "bc", "bd"),
+ Row(4, 0, "bd", "bc"),
+ Row(4, 0, "bd", "bd"),
+ Row(4, 1, "be", "be"),
+ Row(4, 2, "bf", "bf"),
+ Row(4, 3, "bg", "bg"),
+ Row(4, 4, "bh", "bh"),
+ Row(4, 5, "bi", "bi"),
+ Row(5, 0, "bj", "bj"),
+ Row(5, 0, "bj", "bk"),
+ Row(5, 0, "bk", "bj"),
+ Row(5, 0, "bk", "bk"),
+ Row(5, 1, "bl", "bl"),
+ Row(5, 2, "bm", "bm"),
+ Row(5, 3, "bn", "bn"),
+ Row(5, 4, "bo", "bo"),
+ Row(5, 5, "bp", "bp")
+ ))
+ }
+ }
+ }
+ }
+
+ test("SPARK-47094: Support compatible buckets with common divisor") {
+ val table1 = "tab1e1"
+ val table2 = "table2"
+
+ Seq(
+ ((6, 4), (4, 6)),
+ ((6, 6), (4, 4)),
+ ((4, 4), (6, 6)),
+ ((4, 6), (6, 4))).foreach {
+ case ((table1buckets1, table1buckets2), (table2buckets1, table2buckets2)) =>
+ catalog.clearTables()
+
+ val partition1 = Array(bucket(table1buckets1, "store_id"),
+ bucket(table1buckets2, "dept_id"))
+ val partition2 = Array(bucket(table2buckets1, "store_id"),
+ bucket(table2buckets2, "dept_id"))
+
+ Seq((table1, partition1), (table2, partition2)).foreach { case (tab, part) =>
+ createTable(tab, columns2, part)
+ val insertStr = s"INSERT INTO testcat.ns.$tab VALUES " +
+ "(0, 0, 'aa'), " +
+ "(0, 0, 'ab'), " + // duplicate partition key
+ "(0, 1, 'ac'), " +
+ "(0, 2, 'ad'), " +
+ "(0, 3, 'ae'), " +
+ "(0, 4, 'af'), " +
+ "(0, 5, 'ag'), " +
+ "(1, 0, 'ah'), " +
+ "(1, 0, 'ai'), " + // duplicate partition key
+ "(1, 1, 'aj'), " +
+ "(1, 2, 'ak'), " +
+ "(1, 3, 'al'), " +
+ "(1, 4, 'am'), " +
+ "(1, 5, 'an'), " +
+ "(2, 0, 'ao'), " +
+ "(2, 0, 'ap'), " + // duplicate partition key
+ "(2, 1, 'aq'), " +
+ "(2, 2, 'ar'), " +
+ "(2, 3, 'as'), " +
+ "(2, 4, 'at'), " +
+ "(2, 5, 'au'), " +
+ "(3, 0, 'av'), " +
+ "(3, 0, 'aw'), " + // duplicate partition key
+ "(3, 1, 'ax'), " +
+ "(3, 2, 'ay'), " +
+ "(3, 3, 'az'), " +
+ "(3, 4, 'ba'), " +
+ "(3, 5, 'bb'), " +
+ "(4, 0, 'bc'), " +
+ "(4, 0, 'bd'), " + // duplicate partition key
+ "(4, 1, 'be'), " +
+ "(4, 2, 'bf'), " +
+ "(4, 3, 'bg'), " +
+ "(4, 4, 'bh'), " +
+ "(4, 5, 'bi'), " +
+ "(5, 0, 'bj'), " +
+ "(5, 0, 'bk'), " + // duplicate partition key
+ "(5, 1, 'bl'), " +
+ "(5, 2, 'bm'), " +
+ "(5, 3, 'bn'), " +
+ "(5, 4, 'bo'), " +
+ "(5, 5, 'bp')"
+
+ // additional unmatched partitions to test push down
+ val finalStr = if (tab == table1) {
+ insertStr ++ ", (8, 0, 'xa'), (8, 8, 'xx')"
+ } else {
+ insertStr ++ ", (9, 0, 'ya'), (9, 9, 'yy')"
+ }
+
+ sql(finalStr)
+ }
+
+ Seq(true, false).foreach { allowJoinKeysSubsetOfPartitionKeys =>
+ withSQLConf(
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+ SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key ->
+ allowJoinKeysSubsetOfPartitionKeys.toString,
+ SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+ val df = sql(
+ s"""
+ |${selectWithMergeJoinHint("t1", "t2")}
+ |t1.store_id, t1.dept_id, t1.data, t2.data
+ |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+ |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
+ |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+ |""".stripMargin)
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty, "SPJ should be triggered")
+
+ val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+ partitions.length)
+
+ def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
+ val expectedBuckets = gcd(table1buckets1, table2buckets1) *
+ gcd(table1buckets2, table2buckets2)
+ assert(scans == Seq(expectedBuckets, expectedBuckets))
+
+ checkAnswer(df, Seq(
+ Row(0, 0, "aa", "aa"),
+ Row(0, 0, "aa", "ab"),
+ Row(0, 0, "ab", "aa"),
+ Row(0, 0, "ab", "ab"),
+ Row(0, 1, "ac", "ac"),
+ Row(0, 2, "ad", "ad"),
+ Row(0, 3, "ae", "ae"),
+ Row(0, 4, "af", "af"),
+ Row(0, 5, "ag", "ag"),
+ Row(1, 0, "ah", "ah"),
+ Row(1, 0, "ah", "ai"),
+ Row(1, 0, "ai", "ah"),
+ Row(1, 0, "ai", "ai"),
+ Row(1, 1, "aj", "aj"),
+ Row(1, 2, "ak", "ak"),
+ Row(1, 3, "al", "al"),
+ Row(1, 4, "am", "am"),
+ Row(1, 5, "an", "an"),
+ Row(2, 0, "ao", "ao"),
+ Row(2, 0, "ao", "ap"),
+ Row(2, 0, "ap", "ao"),
+ Row(2, 0, "ap", "ap"),
+ Row(2, 1, "aq", "aq"),
+ Row(2, 2, "ar", "ar"),
+ Row(2, 3, "as", "as"),
+ Row(2, 4, "at", "at"),
+ Row(2, 5, "au", "au"),
+ Row(3, 0, "av", "av"),
+ Row(3, 0, "av", "aw"),
+ Row(3, 0, "aw", "av"),
+ Row(3, 0, "aw", "aw"),
+ Row(3, 1, "ax", "ax"),
+ Row(3, 2, "ay", "ay"),
+ Row(3, 3, "az", "az"),
+ Row(3, 4, "ba", "ba"),
+ Row(3, 5, "bb", "bb"),
+ Row(4, 0, "bc", "bc"),
+ Row(4, 0, "bc", "bd"),
+ Row(4, 0, "bd", "bc"),
+ Row(4, 0, "bd", "bd"),
+ Row(4, 1, "be", "be"),
+ Row(4, 2, "bf", "bf"),
+ Row(4, 3, "bg", "bg"),
+ Row(4, 4, "bh", "bh"),
+ Row(4, 5, "bi", "bi"),
+ Row(5, 0, "bj", "bj"),
+ Row(5, 0, "bj", "bk"),
+ Row(5, 0, "bk", "bj"),
+ Row(5, 0, "bk", "bk"),
+ Row(5, 1, "bl", "bl"),
+ Row(5, 2, "bm", "bm"),
+ Row(5, 3, "bn", "bn"),
+ Row(5, 4, "bo", "bo"),
+ Row(5, 5, "bp", "bp")
+ ))
+ }
+ }
+ }
+ }
+
+ test("SPARK-47094: Support compatible buckets with less join keys than partition keys") {
+ val table1 = "tab1e1"
+ val table2 = "table2"
+
+ Seq((2, 4), (4, 2), (2, 6), (6, 2)).foreach {
+ case (table1buckets, table2buckets) =>
+ catalog.clearTables()
+
+ val partition1 = Array(identity("data"),
+ bucket(table1buckets, "dept_id"))
+ val partition2 = Array(bucket(3, "store_id"),
+ bucket(table2buckets, "dept_id"))
+
+ createTable(table1, columns2, partition1)
+ sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
+ "(0, 0, 'aa'), " +
+ "(1, 0, 'ab'), " +
+ "(2, 1, 'ac'), " +
+ "(3, 2, 'ad'), " +
+ "(4, 3, 'ae'), " +
+ "(5, 4, 'af'), " +
+ "(6, 5, 'ag'), " +
+
+ // value without other side match
+ "(6, 6, 'xx')"
+ )
+
+ createTable(table2, columns2, partition2)
+ sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
+ "(6, 0, '01'), " +
+ "(5, 1, '02'), " + // duplicate partition key
+ "(5, 1, '03'), " +
+ "(4, 2, '04'), " +
+ "(3, 3, '05'), " +
+ "(2, 4, '06'), " +
+ "(1, 5, '07'), " +
+
+ // value without other side match
+ "(7, 7, '99')"
+ )
+
+
+ withSQLConf(
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> "true",
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key -> "false",
+ SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true",
+ SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+ val df = sql(
+ s"""
+ |${selectWithMergeJoinHint("t1", "t2")}
+ |t1.store_id, t2.store_id, t1.dept_id, t2.dept_id, t1.data, t2.data
+ |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+ |ON t1.dept_id = t2.dept_id
+ |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+ |""".stripMargin)
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ assert(shuffles.isEmpty, "SPJ should be triggered")
+
+ val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+ partitions.length)
+
+ val expectedBuckets = Math.min(table1buckets, table2buckets)
+
+ assert(scans == Seq(expectedBuckets, expectedBuckets))
+
+ checkAnswer(df, Seq(
+ Row(0, 6, 0, 0, "aa", "01"),
+ Row(1, 6, 0, 0, "ab", "01"),
+ Row(2, 5, 1, 1, "ac", "02"),
+ Row(2, 5, 1, 1, "ac", "03"),
+ Row(3, 4, 2, 2, "ad", "04"),
+ Row(4, 3, 3, 3, "ae", "05"),
+ Row(5, 2, 4, 4, "af", "06"),
+ Row(6, 1, 5, 5, "ag", "07")
+ ))
+ }
+ }
+ }
+
+ test("SPARK-47094: Compatible buckets does not support SPJ with " +
+ "push-down values or partially-clustered") {
+ val table1 = "tab1e1"
+ val table2 = "table2"
+
+ val partition1 = Array(bucket(4, "store_id"),
+ bucket(2, "dept_id"))
+ val partition2 = Array(bucket(2, "store_id"),
+ bucket(2, "dept_id"))
+
+ createTable(table1, columns2, partition1)
+ sql(s"INSERT INTO testcat.ns.$table1 VALUES " +
+ "(0, 0, 'aa'), " +
+ "(1, 1, 'bb'), " +
+ "(2, 2, 'cc')"
+ )
+
+ createTable(table2, columns2, partition2)
+ sql(s"INSERT INTO testcat.ns.$table2 VALUES " +
+ "(0, 0, 'aa'), " +
+ "(1, 1, 'bb'), " +
+ "(2, 2, 'cc')"
+ )
+
+ Seq(true, false).foreach{ allowPushDown =>
+ Seq(true, false).foreach{ partiallyClustered =>
+ withSQLConf(
+ SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION.key -> "false",
+ SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> allowPushDown.toString,
+ SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
+ partiallyClustered.toString,
+ SQLConf.V2_BUCKETING_ALLOW_JOIN_KEYS_SUBSET_OF_PARTITION_KEYS.key -> "true",
+ SQLConf.V2_BUCKETING_ALLOW_COMPATIBLE_TRANSFORMS.key -> "true") {
+ val df = sql(
+ s"""
+ |${selectWithMergeJoinHint("t1", "t2")}
+ |t1.store_id, t1.store_id, t1.dept_id, t2.dept_id, t1.data, t2.data
+ |FROM testcat.ns.$table1 t1 JOIN testcat.ns.$table2 t2
+ |ON t1.store_id = t2.store_id AND t1.dept_id = t2.dept_id
+ |ORDER BY t1.store_id, t1.dept_id, t1.data, t2.data
+ |""".stripMargin)
+
+ val shuffles = collectShuffles(df.queryExecution.executedPlan)
+ val scans = collectScans(df.queryExecution.executedPlan).map(_.inputRDD.
+ partitions.length)
+
+ (allowPushDown, partiallyClustered) match {
+ case (true, false) =>
+ assert(shuffles.isEmpty, "SPJ should be triggered")
+ assert(scans == Seq(2, 2))
+ case (_, _) =>
+ assert(shuffles.nonEmpty, "SPJ should not be triggered")
+ assert(scans == Seq(3, 2))
+ }
+
+ checkAnswer(df, Seq(
+ Row(0, 0, 0, 0, "aa", "aa"),
+ Row(1, 1, 1, 1, "bb", "bb"),
+ Row(2, 2, 2, 2, "cc", "cc")
+ ))
+ }
+ }
+ }
+ }
+
test("SPARK-44647: test join key is the second cluster key") {
val table1 = "tab1e1"
val table2 = "table2"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
index 61895d49c4a2a..5cdb900901056 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/catalog/functions/transformFunctions.scala
@@ -76,7 +76,7 @@ object UnboundBucketFunction extends UnboundFunction {
override def name(): String = "bucket"
}
-object BucketFunction extends ScalarFunction[Int] {
+object BucketFunction extends ScalarFunction[Int] with ReducibleFunction[Int, Int] {
override def inputTypes(): Array[DataType] = Array(IntegerType, LongType)
override def resultType(): DataType = IntegerType
override def name(): String = "bucket"
@@ -85,6 +85,26 @@ object BucketFunction extends ScalarFunction[Int] {
override def produceResult(input: InternalRow): Int = {
(input.getLong(1) % input.getInt(0)).toInt
}
+
+ override def reducer(
+ thisNumBuckets: Int,
+ otherFunc: ReducibleFunction[_, _],
+ otherNumBuckets: Int): Reducer[Int, Int] = {
+
+ if (otherFunc == BucketFunction) {
+ val gcd = this.gcd(thisNumBuckets, otherNumBuckets)
+ if (gcd != thisNumBuckets) {
+ return BucketReducer(thisNumBuckets, gcd)
+ }
+ }
+ null
+ }
+
+ private def gcd(a: Int, b: Int): Int = BigInt(a).gcd(BigInt(b)).toInt
+}
+
+case class BucketReducer(thisNumBuckets: Int, divisor: Int) extends Reducer[Int, Int] {
+ override def reduce(bucket: Int): Int = bucket % divisor
}
object UnboundStringSelfFunction extends UnboundFunction {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
index a5c393ac0567f..36035e35ee258 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/StateStoreBasicOperationsBenchmark.scala
@@ -38,7 +38,8 @@ import org.apache.spark.util.Utils
* 2. build/sbt "sql/Test/runMain