Skip to content

Commit

Permalink
Merge branch 'apache:master' into SPARK-45926
Browse files Browse the repository at this point in the history
  • Loading branch information
ahshahid authored Apr 6, 2024
2 parents a3f47f1 + 11abc64 commit d78bde0
Show file tree
Hide file tree
Showing 63 changed files with 1,384 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,52 +21,75 @@ package org.apache.spark.internal
* All structured logging keys should be defined here for standardization.
*/
object LogKey extends Enumeration {
val ACCUMULATOR_ID = Value
val APP_DESC = Value
val APP_ID = Value
val APP_STATE = Value
val BLOCK_ID = Value
val BLOCK_MANAGER_ID = Value
val BROADCAST_ID = Value
val BUCKET = Value
val BYTECODE_SIZE = Value
val CATEGORICAL_FEATURES = Value
val CLASS_LOADER = Value
val CLASS_NAME = Value
val COMMAND = Value
val COMMAND_OUTPUT = Value
val COMPONENT = Value
val CONFIG = Value
val CONFIG2 = Value
val CONTAINER_ID = Value
val COUNT = Value
val DRIVER_ID = Value
val END_POINT = Value
val ERROR = Value
val EVENT_LOOP = Value
val EVENT_QUEUE = Value
val EXECUTOR_ID = Value
val EXECUTOR_STATE_CHANGED = Value
val EXECUTOR_STATE = Value
val EXIT_CODE = Value
val FAILURES = Value
val HOST = Value
val JOB_ID = Value
val JOIN_CONDITION = Value
val LEARNING_RATE = Value
val LINE = Value
val LINE_NUM = Value
val LISTENER = Value
val LOG_TYPE = Value
val MASTER_URL = Value
val MAX_ATTEMPTS = Value
val MAX_CATEGORIES = Value
val MAX_EXECUTOR_FAILURES = Value
val MAX_SIZE = Value
val MERGE_DIR_NAME = Value
val METHOD_NAME = Value
val MIN_SIZE = Value
val NUM_ITERATIONS = Value
val OBJECT_ID = Value
val OLD_BLOCK_MANAGER_ID = Value
val OPTIMIZER_CLASS_NAME = Value
val PARTITION_ID = Value
val PATH = Value
val PATHS = Value
val POD_ID = Value
val PORT = Value
val QUERY_PLAN = Value
val RANGE = Value
val RDD_ID = Value
val REASON = Value
val REDUCE_ID = Value
val REMOTE_ADDRESS = Value
val RETRY_COUNT = Value
val RPC_ADDRESS = Value
val RULE_BATCH_NAME = Value
val RULE_NAME = Value
val RULE_NUMBER_OF_RUNS = Value
val SHUFFLE_BLOCK_INFO = Value
val SHUFFLE_ID = Value
val SHUFFLE_MERGE_ID = Value
val SIZE = Value
val SLEEP_TIME_SECONDS = Value
val STAGE_ID = Value
val SUBMISSION_ID = Value
val SUBSAMPLING_RATE = Value
Expand All @@ -75,9 +98,16 @@ object LogKey extends Enumeration {
val TASK_NAME = Value
val TASK_SET_NAME = Value
val TASK_STATE = Value
val THREAD = Value
val THREAD_NAME = Value
val TID = Value
val TIMEOUT = Value
val TIME_UNITS = Value
val URI = Value
val USER_NAME = Value
val WATERMARK_CONSTRAINT = Value
val WORKER_URL = Value
val XSD_PATH = Value

type LogKey = Value
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ case class MessageWithContext(message: String, context: java.util.HashMap[String
resultMap.putAll(mdc.context)
MessageWithContext(message + mdc.message, resultMap)
}

def stripMargin: MessageWithContext = copy(message = message.stripMargin)
}

/**
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, Scheduled
import scala.jdk.CollectionConverters._

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{ACCUMULATOR_ID, BROADCAST_ID, LISTENER, RDD_ID, SHUFFLE_ID}
import org.apache.spark.internal.config._
import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
import org.apache.spark.scheduler.SparkListener
Expand Down Expand Up @@ -226,7 +227,7 @@ private[spark] class ContextCleaner(
listeners.asScala.foreach(_.rddCleaned(rddId))
logDebug("Cleaned RDD " + rddId)
} catch {
case e: Exception => logError("Error cleaning RDD " + rddId, e)
case e: Exception => logError(log"Error cleaning RDD ${MDC(RDD_ID, rddId)}", e)
}
}

Expand All @@ -245,7 +246,7 @@ private[spark] class ContextCleaner(
logDebug("Asked to cleanup non-existent shuffle (maybe it was already removed)")
}
} catch {
case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
case e: Exception => logError(log"Error cleaning shuffle ${MDC(SHUFFLE_ID, shuffleId)}", e)
}
}

Expand All @@ -257,7 +258,8 @@ private[spark] class ContextCleaner(
listeners.asScala.foreach(_.broadcastCleaned(broadcastId))
logDebug(s"Cleaned broadcast $broadcastId")
} catch {
case e: Exception => logError("Error cleaning broadcast " + broadcastId, e)
case e: Exception =>
logError(log"Error cleaning broadcast ${MDC(BROADCAST_ID, broadcastId)}", e)
}
}

Expand All @@ -269,7 +271,8 @@ private[spark] class ContextCleaner(
listeners.asScala.foreach(_.accumCleaned(accId))
logDebug("Cleaned accumulator " + accId)
} catch {
case e: Exception => logError("Error cleaning accumulator " + accId, e)
case e: Exception =>
logError(log"Error cleaning accumulator ${MDC(ACCUMULATOR_ID, accId)}", e)
}
}

Expand All @@ -285,7 +288,8 @@ private[spark] class ContextCleaner(
logDebug("Cleaned rdd checkpoint data " + rddId)
}
catch {
case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
case e: Exception =>
logError(log"Error cleaning rdd checkpoint data ${MDC(RDD_ID, rddId)}", e)
}
}

Expand All @@ -295,7 +299,8 @@ private[spark] class ContextCleaner(
sc.listenerBus.removeListener(listener)
logDebug(s"Cleaned Spark listener $listener")
} catch {
case e: Exception => logError(s"Error cleaning Spark listener $listener", e)
case e: Exception =>
logError(log"Error cleaning Spark listener ${MDC(LISTENER, listener)}", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ private[spark] class MapOutputTrackerMaster(
.getOrElse(Seq.empty[BlockManagerId]))
}
} catch {
case NonFatal(e) => logError(e.getMessage, e)
case NonFatal(e) => logError(log"${MDC(ERROR, e.getMessage)}", e)
}
}
} catch {
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.{Executor, ExecutorMetrics, ExecutorMetricsSource}
import org.apache.spark.input.{FixedLengthBinaryInputFormat, PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
Expand Down Expand Up @@ -748,7 +749,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
} catch {
case e: Exception =>
logError(s"Exception getting thread dump from executor $executorId", e)
logError(
log"Exception getting thread dump from executor ${MDC(LogKey.EXECUTOR_ID, executorId)}",
e)
None
}
}
Expand Down Expand Up @@ -778,7 +781,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
} catch {
case e: Exception =>
logError(s"Exception getting heap histogram from executor $executorId", e)
logError(
log"Exception getting heap histogram from " +
log"executor ${MDC(LogKey.EXECUTOR_ID, executorId)}", e)
None
}
}
Expand Down Expand Up @@ -2140,7 +2145,7 @@ class SparkContext(config: SparkConf) extends Logging {
Seq(env.rpcEnv.fileServer.addJar(file))
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e)
Nil
}
}
Expand All @@ -2161,7 +2166,7 @@ class SparkContext(config: SparkConf) extends Logging {
Seq(path)
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
logError(log"Failed to add ${MDC(LogKey.PATH, path)} to Spark environment", e)
Nil
}
} else {
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKey.LISTENER
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
Expand Down Expand Up @@ -246,7 +247,7 @@ private[spark] class TaskContextImpl(
}
}
listenerExceptions += e
logError(s"Error in $name", e)
logError(log"Error in ${MDC(LISTENER, name)}", e)
}
}
if (listenerExceptions.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import io.netty.handler.timeout.ReadTimeoutException

import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.api.r.SerDe._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{METHOD_NAME, OBJECT_ID}
import org.apache.spark.internal.config.R._
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -76,7 +77,7 @@ private[r] class RBackendHandler(server: RBackend)
writeObject(dos, null, server.jvmObjectTracker)
} catch {
case e: Exception =>
logError(s"Removing $objId failed", e)
logError(log"Removing ${MDC(OBJECT_ID, objId)} failed", e)
writeInt(dos, -1)
writeString(dos, s"Removing $objId failed: ${e.getMessage}")
}
Expand Down Expand Up @@ -192,7 +193,7 @@ private[r] class RBackendHandler(server: RBackend)
}
} catch {
case e: Exception =>
logError(s"$methodName on $objId failed", e)
logError(log"${MDC(METHOD_NAME, methodName)} on ${MDC(OBJECT_ID, objId)} failed", e)
writeInt(dos, -1)
// Writing the error message of the cause for the exception. This will be returned
// to user in the R process.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.LogKey.{DRIVER_ID, RPC_ADDRESS}
import org.apache.spark.internal.LogKey.{DRIVER_ID, ERROR, RPC_ADDRESS}
import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
import org.apache.spark.resource.ResourceUtils
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
Expand Down Expand Up @@ -61,7 +61,7 @@ private class ClientEndpoint(
t => t match {
case ie: InterruptedException => // Exit normally
case e: Throwable =>
logError(e.getMessage, e)
logError(log"${MDC(ERROR, e.getMessage)}", e)
System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.json4s.{DefaultFormats, Extraction, Formats}
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.COMPONENT
import org.apache.spark.resource.{ResourceAllocation, ResourceID, ResourceInformation, ResourceRequirement}
import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -103,9 +104,10 @@ private[spark] object StandaloneResourceUtils extends Logging {
writeResourceAllocationJson(allocations, tmpFile)
} catch {
case NonFatal(e) =>
val errMsg = s"Exception threw while preparing resource file for $compShortName"
val errMsg =
log"Exception threw while preparing resource file for ${MDC(COMPONENT, compShortName)}"
logError(errMsg, e)
throw new SparkException(errMsg, e)
throw new SparkException(errMsg.message, e)
}
val resourcesFile = File.createTempFile(s"resource-$compShortName-", ".json", dir)
tmpFile.renameTo(resourcesFile)
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.io.File
import jakarta.servlet.http.HttpServletRequest

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{LOG_TYPE, PATH}
import org.apache.spark.ui.JettyUtils.createServletHandler
import org.apache.spark.ui.WebUI
import org.apache.spark.util.Utils.{getFileLength, offsetBytes}
Expand Down Expand Up @@ -95,7 +96,8 @@ private[deploy] object Utils extends Logging {
(logText, startIndex, endIndex, totalLength)
} catch {
case e: Exception =>
logError(s"Error getting $logType logs from directory $logDirectory", e)
logError(log"Error getting ${MDC(LOG_TYPE, logType)} logs from " +
log"directory ${MDC(PATH, logDirectory)}", e)
("Error getting logs due to exception: " + e.getMessage, 0, 0, 0)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ import org.apache.hadoop.security.AccessControlException

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.PATH
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
Expand Down Expand Up @@ -920,7 +921,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case e: AccessControlException =>
logWarning(s"Insufficient permission while compacting log for $rootPath", e)
case e: Exception =>
logError(s"Exception while compacting log for $rootPath", e)
logError(log"Exception while compacting log for ${MDC(PATH, rootPath)}", e)
} finally {
endProcessing(rootPath)
}
Expand Down Expand Up @@ -1402,7 +1403,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
case _: AccessControlException =>
logInfo(s"No permission to delete $log, ignoring.")
case ioe: IOException =>
logError(s"IOException in cleaning $log", ioe)
logError(log"IOException in cleaning ${MDC(PATH, log)}", ioe)
}
}
deleted
Expand Down
Loading

0 comments on commit d78bde0

Please sign in to comment.